Feat: Concurrent batch processing by ukpratik · Pull Request #795 · samber/lo

  • This is not Batch processing
  • done chan is redundant - can simply wrap it with the Async0
  • processor should be func(ctx context.Context, index int, item T)
  • go func() for each element is overkill

That is, something like

func Process[T any](ctx context.Context, maxConcurrency int, collections []T, processor func(ctx context.Context, index int, item T)) {
	numInput := len(collections)

	if maxConcurrency <= 0 || maxConcurrency > numInput {
		maxConcurrency = numInput
	}

	var wg sync.WaitGroup
	var idx atomic.Int64

	wg.Add(maxConcurrency)

	for i := 0; i < maxConcurrency; i++ {
		go func() {
			defer wg.Done()
			i := int(idx.Add(1) - 1)

			for ; i < numInput; i = int(idx.Add(1) - 1) {
				select {
				case <-ctx.Done():
					return

				default:
					processor(ctx, i, collections[i])
				}
			}
		}()
	}

	wg.Wait()
}

But I probably wouldn't add it