Feat: Concurrent batch processing by ukpratik · Pull Request #795 · samber/lo
- This is not
Batchprocessing donechan is redundant - can simply wrap it with theAsync0processorshould befunc(ctx context.Context, index int, item T)go func()for each element is overkill
That is, something like
func Process[T any](ctx context.Context, maxConcurrency int, collections []T, processor func(ctx context.Context, index int, item T)) { numInput := len(collections) if maxConcurrency <= 0 || maxConcurrency > numInput { maxConcurrency = numInput } var wg sync.WaitGroup var idx atomic.Int64 wg.Add(maxConcurrency) for i := 0; i < maxConcurrency; i++ { go func() { defer wg.Done() i := int(idx.Add(1) - 1) for ; i < numInput; i = int(idx.Add(1) - 1) { select { case <-ctx.Done(): return default: processor(ctx, i, collections[i]) } } }() } wg.Wait() }
But I probably wouldn't add it