How to Use Map With the ProcessPoolExecutor in Python - Super Fast Python

You can execute tasks asynchronously with the ProcessPoolExecutor by calling the map() function.

In this tutorial you will discover how to use the map() function to execute tasks with the process pool in Python.

Let’s get started.

Need to Call Functions in Separate Processes

You may have a for-loop that calls a function for each item in an iterable like a list.

...

# apply a function to each item in a list

for item in items:

result = task(item)

How can you make each function call in a separate process?

Or put another way:

How can you make the for-loop concurrent?

Alternatively, you may be using the built-in map() function to apply the function to each item in an iterable for you.

...

# apply the function to each element in the collection

results = map(task, items)

This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:

...

# iterate the results from map

for result in results:

print(result)

Therefore, it is common to see this operation consolidated to the following:

...

# iterate the results from map

for result in map(task, items):

print(result)

There are a number of other common patterns when using the map() function.

For example, you can use the built-in enumerate() function on the results of map() to enumerate the returned values from the function with an index, helpful if you need to access a parallel list of data.

...

# map function to items

results = map(task, items)

# enumerate the results from map

for i,result in enumerate(results):

    # do something...

Finally, it is also common to want to iterate the results of map() as well as the list of items passed to map.

This can be achieved using the zip() built-in function that takes a number of iterables and yields results from each per iteration.

For example:

...

# map function to items

results = map(task, items)

# iterate items and results together

for item,result in zip(items, results):

    # do something...

Now that we are familiar with map(), how can we execute each call by map() concurrently using processes?

How to Call map() With Separate Processes

The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.

You can specify the number of processes to create in the process pool as an argument, which defaults to the number of logical CPU cores in your system.

...

# create a process pool

executor = ProcessPoolExecutor()

You can also submit tasks by calling the map() function and specify the name of the function to execute and the iterable of items to which your function will be applied.

...

# execute each function call in a separate process

results = executor.map(task, items)

Each call to the target function with one item from the iterable will be executed in a separate process.

You can then iterate over the results as we would with map(), except we may have to wait for the results as the tasks complete in separate processes. This waiting is called “blocking” and happens automatically.

...

# iterate the results from map

for result in results:

print(result)

Although the tasks are executed concurrently, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.

In this way, we can think of the process pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use processes.

We can call the map() function to execute the functions in separate processes and process the results using the common for-loop idiom, as follows:

...

# iterate the results from map performed in separate processes

for result in executor.map(task, items):

print(result)

Map Submitted Tasks to Internal Tasks with Chunksize

The map() function on the ProcessPoolExecutor takes a parameter called “chunksize” which defaults to 1.

...

# apply a function to each item in an iterable with a chunksize

for result in executor.map(task, items, chunksize=1)

    # ...

The “chunksize” argument controls the mapping of items in the iterable passed to map to tasks used in the ProcessPoolExecutor executor.

A value of one means that one item is mapped to one task.

Recall that the data for each task in terms of arguments sent to the target task function and values that are returned must be serialized by pickle. This happens automatically, but incurs some computational and memory cost, adding overhead to each task processed by the process pool.

When there are a vast number of tasks or tasks are relatively quick to compute, then the chunksize should be tuned to maximize the grouping of items to process pool tasks in order to minimize the overhead per task and in turn reduce the overall compute time.

This will likely require some tuning of the chunksize that you may be able to perform with real task data, or perhaps a test harness with mock data and task processes.

Some values to try might include:

  • 1: The default mapping of one item per task.
  • items / max_workers: Splits all items into max_workers groups, e.g. one batch of items per process.

Note, the (items / max_workers) division may need to be rounded as the “chunksize” argument must be a positive integer.

For example:

...

# estimate the chunksize

size = round(len(items) / executor._max_workers)

# apply a function to each item in an iterable with a chunksize

for result in executor.map(task, items, chunksize=size)

    # ...

Compare the performance to see if one configuration is better than another, and if so, use it as a starting point for similar values to evaluate.

Wait For Results With Timeout

We may want to limit how long we are willing to wait for a task to complete and return a result when iterating the return value from the map() function.

This can be achieved by setting the “timeout” argument when calling map() and specifying how long we are willing to wait in seconds.

...

# iterate the results from map performed in separate processes, wait a limited time

for result in executor.map(task, items, timeout=5):

print(result)

If more than the timeout number of seconds elapses before a task completes and returns a result, then a TimeoutError is raised that may need to be handled.

...

# handle a timeout error when getting results

try:

# iterate the results from map performed in separate processes, wait a limited time

for result in executor.map(task, items, timeout=5):

print(result)

except TimeoutError:

print('Waited too long')

Wait For All Results to Complete

We don’t have to wait for the results from the call to the map() function in order to continue.

For example, we can call map() with a target task function and an iterable and then carry on with other operations.

The call to map() does not block, meaning that we do not have to wait for the results unless we iterate the results returned from the call to the map() function explicitly.

This might be helpful if our target task function does not return a result, although you may want to use the submit() function on the ProcessPoolExecutor instead.

...

# perform the tasks in separate processes

map(task, items) # does not block

# do other things..

Alternatively, we may want the map() function to call the target function in separate processes and wait for all tasks to complete.

This can be achieved by calling the shutdown() function of the ProcessPoolExecutor which by default will wait for all scheduled and running tasks to complete before returning.

...

# wait for all tasks to complete

executor.shutdown()

Alternately, we use the context manager of the process pool that will automatically shutdown the process pool for us when we are finished and will block until all tasks have completed.

...

# start the process pool

with ProcessPoolExecutor() as executor:

# perform the tasks in separate processes

map(task, items) # does not block

# shutdown automatically, wait for all tasks to complete

Call map() With Multiple Arguments

The ProcessPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map().

For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map(). The map() function will then apply the function with both sets of arguments and stop when the shortest iterable is exhausted.

...

# call map with a target function that takes more than one argument

executor.map(task, items1, items2)

If you want to provide ad hoc arguments to your target task function, e.g. variable numbers of arguments or the same argument to multiple function calls, then you may be better off using the submit() function on the ProcessPoolExecutor.

Now that we are familiar with how to use map() function to call a function using processes in Python, let’s look at a worked example.

Example of Using map() With the ProcessPoolExecutor

Let’s explore how to use the map() function on the ProcessPoolExecutor with a worked example.

First, we can define a simple task that will block for a variable amount of time less than one second.

The task takes a unique name and will return a message that it has completed.

# custom task that will block for a variable amount of time

def task(name):

    # sleep for less than a second

    sleep(random())

    return f'Task: {name} done.'

This task is a good candidate for calling the built-in map() function and the ProcessPoolExecutor map() function because it is a pure function that does not have any side effects and does not access or change state outside of the function itself.

Next, we can define a process pool with ten worker processes using the context manager.

...

# start the process pool

with ProcessPoolExecutor(10) as executor:

# ...

Next, we can call the map() function to apply the task() function to a range of integers from 0 to 9.

Finally, we can report the results as they are made available in the order that the tasks were submitted to the process pool for execution.

...

# execute tasks concurrently and process results in order

for result in executor.map(task, range(10)):

    # report the result

    print(result)

Tying this together, the complete example of calling map() to apply a function to a range of data using worker processes in the ProcessPoolExecutor is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

# SuperFastPython.com

# example of calling map and processing results

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

# custom task that will sleep for a variable amount of time

def task(name):

    # sleep for less than a second

    sleep(random())

    return f'Task: {name} done.'

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(10) as executor:

        # execute tasks concurrently and process results in order

        for result in executor.map(task, range(10)):

            # report the result

            print(result)

Running the example first creates the process pool, then submits the tasks to the process pool for execution.

The tasks are completed and results are retrieved and reported as they become available in the order that tasks were submitted.

Task: 0 done.

Task: 1 done.

Task: 2 done.

Task: 3 done.

Task: 4 done.

Task: 5 done.

Task: 6 done.

Task: 7 done.

Task: 8 done.

Task: 9 done.


Free Python ProcessPoolExecutor Course

Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.

Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.

Learn more
 


Example of Setting Chunksize For Many Small Tasks

When we have thousands or millions of short duration tasks to complete, it is a good idea to set the “chunksize” argument when calling the map() function.

This will group calls to our target function into chunks that are then sent to work processes.

Each call to a function and the argument must be serialized and transmitted to a worker process using inter-process communication.

Although fast, it does incur some computational cost for each task transmitted. This adds up when we have a large number of short duration tasks.

Let’s demonstrate this with an example of a target function that squares a provided number.

# square a provided number

def task(value):

    return value**2

We can then call this 10,000 times with integers from 0 to 9,999.

# SuperFastPython.com

# example of calling map with a chunksize of 1

from concurrent.futures import ProcessPoolExecutor

# square a provided number

def task(value):

    return value**2

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(4) as executor:

        # execute tasks concurrently and process results in order

        for result in executor.map(task, range(10000), chunksize=1):

            # report the result

            print(result)

Running the example will square 10,000 numbers and will complete reasonably quickly.

It takes about 2.3 seconds on my system.

...

99860049

99880036

99900025

99920016

99940009

99960004

99980001

It does not take this long to square some numbers in Python.

It’s slow because the process pool must manage 10,000 very short duration tasks.

We can dramatically speed up the execution of these tasks by grouping them into large chunks for each process to work on.

Setting the chunksize is a bit of an art and will depend on the number of CPU cores, the number of worker processes, the duration of each task and the amount of data transmitted for each task.

As such, it can be a good idea to run some tests to see what works well for your system.

A good starting point is to divide the number of tasks by the number of processes.

For example, if we have 10,000 tasks and 4 worker processes (and at least 4 CPU cores), then we might set the chunksize to 2,500 as a first test.

  • chunksize = number of tasks / number of worker processes
  • chunksize = 10,000 / 4
  • chunksize = 2,500

The example below demonstrates the same batch of tasks with the chunksize of 2,500.

# SuperFastPython.com

# example of calling map with a chunksize of 2500

from concurrent.futures import ProcessPoolExecutor

# square a provided number

def task(value):

    return value**2

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(4) as executor:

        # execute tasks concurrently and process results in order

        for result in executor.map(task, range(10000), chunksize=2500):

            # report the result

            print(result)

Running the example calculates the same results, but is much faster.

It takes about 150 milliseconds on my system, compared to the 2.3 seconds with a chunksize of 1.

...

99860049

99880036

99900025

99920016

99940009

99960004

99980001

Example of Calling map() With a Timeout

We may want the results from the function calls, but are unwilling to wait an extended period.

This can be achieved by setting the “timeout” argument when calling the map() function.

As we process the results by iterating the value returned from the calling map(), if a task takes more than the timeout to return a value, then a TimeoutError is raised, which we can choose to handle.

The list below updates the example to wait a timeout of a tiny fraction of a second and if the timeout elapses, a message is reported.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of calling map and processing results with a timeout

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

from concurrent.futures import TimeoutError

# custom task that will sleep for a variable amount of time

def task(name):

    # sleep for less than a second

    sleep(random())

    return f'Task: {name} done.'

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(10) as executor:

        # handle a timeout error when getting results

        try:

            # iterate the results from map performed in separate processes, wait a limited time

            for result in executor.map(task, range(10), timeout=0.05):

                print(result)

        except TimeoutError:

            print('Waited too long')

Running the example starts the process pool then submits ten function call tasks to the pool.

We wait a fraction of a second for the first result, before a TimeoutError is raised and caught, reporting a message that we gave up waiting.

Note that if a TimeoutError is raised, it does not impact the tasks exciting in the process pool. For example, although we were impatient for a result, the tasks will continue to run and the process pool will not close until all tasks have completed.


Python ProcessPoolExecutor Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of Waiting for All Tasks To Complete

We may have a target task function that does not return a value and yet we may want to call the function for an iterable of data and wait for all tasks to complete.

We can do this by calling the map() function, not iterating the results and using the context manager to close the process pool which will wait for all tasks to complete.

First, let’s update our target task function to not return a value, but instead report a value directly via a print statement.

# custom task that will sleep for a variable amount of time

def task(value):

    # sleep for less than a second

    sleep(random())

    print(f'Done: {value}')

We can then submit the tasks to the process pool via a call to map() without iterating the results.

...

# submit all tasks

executor.map(task, range(5))

The call to map() does not block, so we are able to carry on with other tasks in our program.

We can then wait for all tasks in the process pool to complete by allowing the context manager to call shutdown() for us and return once all tasks have finished.

To make this clear, we will add a print statement that we are waiting for tasks to complete.

...

# shutdown, wait for all tasks to complete

print('Waiting...')

Once all tasks have completed, we can carry on with our program.

Tying this together, the complete example of waiting for all map() tasks to complete in the ProcessPoolExecutor is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# SuperFastPython.com

# example of calling map and waiting for all tasks to complete

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

# custom task that will sleep for a variable amount of time

def task(value):

    # sleep for less than a second

    sleep(random())

    print(f'Done: {value}')

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor() as executor:

        # submit all tasks

        executor.map(task, range(5))

        # shutdown, wait for all tasks to complete

        print('Waiting...')

    print('All done.')

Running the example starts the process pool and submits five tasks by calling the map() function.

We do not iterate the results of the tasks, instead we start waiting for the tasks to complete immediately with an implicit call to shutdown() on the process pool by the context manager.

The tasks complete one by one and we then carry on with our program once all tasks have completed.

Waiting...

Done: 1

Done: 3

Done: 4

Done: 0

Done: 2

All done!

Example of Calling map() With Multiple Arguments

We may want to call a target task function that takes more than one argument.

This can be done with the built-in map() function and with the ProcessPoolExecutor map() function.

Both map() functions take one or more iterables and the map function will stop calling the target task function with values from each iterable when the shortest iterable is exhausted.

We can update our target task function to take two values and to return a tuple that combines the values after a short sleep.

# custom task that will sleep for a variable amount of time

def task(value1, value2):

    # sleep for less than a second

    sleep(random())

    return (value1, value2)

We can define two lists of data that we will use as iterables for our call to map.

...

# define our data

data1 = ['1', '2', '3']

data2 = ['a', 'b', 'c']

Finally, we can update our call to map() to take both iterables of data.

...

# submit all tasks

for result in executor.map(task, data1, data2):

    print(result)

Tying this together, the complete example of calling map to execute function calls with two arguments asynchronously with the ProcessPoolExecutor is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of calling map with multiple arguments

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

# custom task that will sleep for a variable amount of time

def task(value1, value2):

    # sleep for less than a second

    sleep(random())

    return (value1, value2)

# entry point

if __name__ == '__main__':

    # define our data

    data1 = ['1', '2', '3']

    data2 = ['a', 'b', 'c']

    # start the process pool

    with ProcessPoolExecutor() as executor:

        # submit all tasks

        for result in executor.map(task, data1, data2):

            print(result)

Running the example first starts the process pool, then submits three function calls to the worker processes, one for each pair of values in the two iterables passed to map.

Each task returns a tuple that is printed.

('1', 'a')

('2', 'b')

('3', 'c')

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

Guides

APIs

References

Takeaways

You now know how to use the map() function to execute tasks asynchronously with the ProcessPoolExecutor

Do you have any questions?
Ask your question in the comments below and I will do my best to answer.

Photo by Arkin Si on Unsplash