Ordered Concurrently
A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.
Usage
Get Module
go get github.com/tejzpr/ordered-concurrently/v3
Import Module in your source code
import concurrently "github.com/tejzpr/ordered-concurrently/v3"
Create a work function by implementing WorkFunction interface
// Create a type based on your input to the work function type loadWorker int // The work that needs to be performed // The input type should implement the WorkFunction interface func (w loadWorker) Run(ctx context.Context) interface{} { time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) return w * 2 }
Demo
Run
Example - 1
func main() { max := 10 inputChan := make(chan concurrently.WorkFunction) ctx := context.Background() output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10}) go func() { for work := 0; work < max; work++ { inputChan <- loadWorker(work) } close(inputChan) }() for out := range output { log.Println(out.Value) } }
Example - 2 - Process unknown number of inputs
func main() { inputChan := make(chan concurrently.WorkFunction, 10) ctx := context.Background() output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10}) ticker := time.NewTicker(100 * time.Millisecond) done := make(chan bool) wg := &sync.WaitGroup{} go func() { input := 0 for { select { case <-done: return case <-ticker.C: inputChan <- loadWorker(input) wg.Add(1) input++ default: } } }() var res []loadWorker go func() { for out := range output { res = append(res, out.Value.(loadWorker)) wg.Done() } }() time.Sleep(1600 * time.Millisecond) ticker.Stop() done <- true close(inputChan) wg.Wait() // Check if output is sorted isSorted := sort.SliceIsSorted(res, func(i, j int) bool { return res[i] < res[j] }) if !isSorted { log.Println("output is not sorted") } }
Credits
- u/justinisrael for inputs on improving resource usage.
- mh-cbon for identifying potential deadlocks.