晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。
我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。
另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?
Medium 原文地址
我的改造:
package mainimport "sync"import "runtime"import "fmt"var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询type GetListProductResponse struct { Data []ProductListResponse `json:"data"`}type ProductListResponse struct { Code string `json:"code"` Name string `json:"name"` Price string `json:"price"` Status bool `json:"status"`}func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) { // calling endpoint 3rd party // parse to response // and return the data return &productList, nil}func main() { ctx, cancelFunc := context.WithCancel(ctx)defer cancelFunc() wg := sync.WaitGroup{} doneChan := make(chan struct{}, 1) productsChan := make(chan *GetListProductResponse) errChan := make(chan error) // LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量 maxConcurrency := 5 semaphore := make(chan struct{}, maxConcurrency) wg.Add(len(LIST_PRODUCT_TYPE)) for key := range LIST_PRODUCT_TYPE { req := &GetProductListRequest{ ProductType: LIST_PRODUCT_TYPE[key], } select { case <-ctx.Done(): return nil, ctx.Err() case semaphore<-struct{}{}: go func() { defer wg.Done() defer func() { <-semaphore }() products, err := getProductList(ctx, req) if err != nil { errChan <- err return } productsChan <- products }() } } go func() { wg.Wait() doneChan <- struct{}{} }() var ( catalogues GetListProductResponse data []ProductListResponse ) for { select { case <-ctx.Done(): return nil, ctx.Err() case err := <-errChan: return nil, err case products := <-productsChan: data = append(data, products.Data...) catalogues.Data = data case <-doneChan: return &catalogues, nil } }}