How to Use ThreadPool map() in Python - Super Fast Python

You can execute multiple tasks in the ThreadPool using the map() method.

In this tutorial you will discover how to use the map() method with the ThreadPool in Python.

Let’s get started.

Need a Concurrent Version of map()

The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.

A thread pool object which controls a pool of worker threads to which jobs can be submitted.

multiprocessing — Process-based parallelism

The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.

Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.

A ThreadPool can be configured when it is created, which will prepare the new threads.

We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

The built-in map() function allows you to apply a function to each item in an iterable.

The ThreadPool provides a map() method for executing multiple tasks in the pool.

How can we use the ThreadPool map() method?

The ThreadPool provides the map() method for executing multiple tasks.

Recall that the built-in map() function will apply a given function to each item in a given iterable.

Return an iterator that applies function to every item of iterable, yielding the results.

Built-in Functions

It yields one result returned from the given target function called with one item from a given iterable. It is common to call map and iterate the results in a for-loop.

For example:

...

# iterates results from map

for result in map(task, items):

# ...

The ThreadPool provides a version of the map() function where the target function is called for each item in the provided iterable with separate worker threads.

A parallel equivalent of the map() built-in function […]. It blocks until the result is ready.

multiprocessing — Process-based parallelism

For example:

...

# iterates results from map

for result in pool.map(task, items):

# ...

Each item in the iterable is taken as a separate task in the ThreadPool.

Like the built-in map() function, the returned iterator of results will be in the order of the provided iterable. This means that tasks are issued (and perhaps executed) in the same order as the results are returned.

Unlike the built-in map() function, the ThreadPool map() method only takes one iterable as an argument. This means that the target function executed in the thread can only take a single argument.

The iterable of items that is passed is iterated in order to issue all tasks to the ThreadPool. Therefore, if the iterable is very long, it may result in many tasks waiting in memory to execute, e.g. one per worker thread.

It is possible to split up the items in the iterable evenly to worker threads.

For example, if we had a ThreadPool with 4 worker threads and an iterable with 40 items, we can split up the items into 4 chunks of 10 items, with one chunk allocated to each worker thread.

The effect is less overhead in transmitting tasks to worker threads and collecting results.

This can be achieved via the “chunksize” argument to map().

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

multiprocessing — Process-based parallelism

For example:

...

# iterates results from map with chunksize

for result in pool.map(task, items, chunksize=10):

# ...

Difference Between map() and map_async()

How does the map() method compare to the map_async() method for issuing tasks to the ThreadPool?

Both the map() and map_async() may be used to issue tasks that call a function to all items in an iterable via the ThreadPool.

The following summarizes the key differences between these two methods:

  • The map() method blocks, whereas the map_async() method does not block.
  • The map() method returns an iterable of return values from the target function, whereas the map_async() function returns an AsyncResult.
  • The map() method does not support callback functions, whereas the map_async() method can execute callback functions on return values and errors.

The map() method should be used for issuing target task functions to the ThreadPool where the caller can or must block until all function calls are complete.

The map_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.

Now that we know how to use the map() method to execute tasks in the ThreadPool, let’s look at some worked examples.


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


Example of ThreadPool map()

We can explore how to use the map() method on the ThreadPool.

In this example, we can define a target task function that takes an integer as an argument, generates a random number, reports the value then returns the value that was generated. We can then call this function for each integer between 0 and 9 using the ThreadPool map() method.

This will apply the function to each integer in parallel using as many cores as are available in the system.

Firstly, we can define the target task function.

The function takes an argument, generates a random number between 0 and 1, reports the integer and generated number. It then blocks for a fraction of a second to simulate computational effort, then returns the number that was generated.

The task() function below implements this.

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

    # return the generated value

    return value

We can then create and configure a ThreadPool.

We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.

...

# create and configure the thread pool

with ThreadPool() as pool:

# ...

We can then call the map() function on the ThreadPool to apply our task() function to each value in a range between 0 and 1.

This returns an iterator over the results returned from the task() function, in the order that function calls are completed. We will iterate over the results and report each in turn.

This can all be achieved in a for-loop.

...

# execute tasks in order

for result in pool.map(task, range(10)):

    print(f'Got result: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

# SuperFastPython.com

# example of parallel map() with the thread pool

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # execute tasks in order

        for result in pool.map(task, range(10)):

            print(f'Got result: {result}')

    # thread pool is closed automatically

Running the example first creates the ThreadPool with a default configuration.

It will have one worker thread for each logical CPU in your system.

The map() function is then called for the range.

This issues ten calls to the task() function, one for each integer between 0 and 9. An iterator is returned with the result for each function call, in order.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.

Importantly, all task() function calls are issued and executed before the iterator of results is returned. We cannot iterate over results as they are completed by the caller.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

Task 0 executing with 0.9939823945701354

Task 1 executing with 0.47729468910203754

Task 2 executing with 0.8758836044357526

Task 3 executing with 0.5022561566448386

Task 4 executing with 0.9057454582446577

Task 5 executing with 0.2211189207208677

Task 6 executing with 0.13548680901347598

Task 7 executing with 0.12991297482843456

Task 8 executing with 0.3186675013898512

Task 9 executing with 0.6515046375113036

Got result: 0.9939823945701354

Got result: 0.47729468910203754

Got result: 0.8758836044357526

Got result: 0.5022561566448386

Got result: 0.9057454582446577

Got result: 0.2211189207208677

Got result: 0.13548680901347598

Got result: 0.12991297482843456

Got result: 0.3186675013898512

Got result: 0.6515046375113036

Next, let’s look at an example where we might call a map for a function with no return value.

Example of ThreadPool map() with No Return Value

We can explore using the map() function to call a function for each item in an iterable that does not have a return value.

This means that we are not interested in the iterable of results returned by the call to map() and instead are only interested that all issued tasks get executed.

This can be achieved by updating the previous example so that the task() function does not return a value.

The updated task() function with this change is listed below.

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

Then, in the main thread, we can call map() with our task() function and the range, and not iterate the results.

...

# execute tasks, block until all completed

pool.map(task, range(10))

Importantly, the call to map() on the ThreadPool will block the main thread until all issued tasks are completed.

Once completed, the call will return and the ThreadPool will be closed by the context manager.

This is a helpful pattern to issue many tasks to the ThreadPool with a single function call.

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of parallel map() with the thread pool and a task that does not return a value

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # execute tasks, block until all completed

        pool.map(task, range(10))

    # thread pool is closed automatically

Running the example first creates the ThreadPool with a default configuration.

The map() function is then called for the range. This issues ten calls to the task() function, one for each integer between 0 and 9. An iterator is returned with the result for each function call, but is ignored in this case.

Each call to the task function generates a random number between 0 and 1, reports a message, then blocks.

The main thread blocks until the map() function returns.

The tasks finish, map() returns, then the ThreadPool is closed.

This example again highlights that the call to map() blocks until all issued tasks are completed.

Task 0 executing with 0.43501726038642363

Task 1 executing with 0.06737546043581677

Task 2 executing with 0.7243486114586718

Task 3 executing with 0.8507370154883928

Task 4 executing with 0.419007224196702

Task 5 executing with 0.8363778406379463

Task 6 executing with 0.5561916282608673

Task 7 executing with 0.6785705226822631

Task 8 executing with 0.5750485172990072

Task 9 executing with 0.23184915901578418

Next, let’s explore the chunksize argument to the map() function.


Python ThreadPool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of ThreadPool map() with chunksize

The map() function will apply a function to each item in an iterable.

If the iterable has a large number of items, it may be inefficient to issue function calls to the target function for each item.

A more efficient approach would be to divide the items in the iterable into chunks and issue chunks of items to each worker thread to which the target function can be applied.

This can be achieved with the “chunksize” argument to the map() function.

We can demonstrate this with an example, first without using the “chunksize” argument, then using the “chunksize” to speed up the division of work.

Example Without Chunks

Before we demonstrate the “chunksize” argument, we can devise an example that has a reasonably large iterable.

In this example, we can update the previous example to call the task() function that returns a value as before, but to have 4 worker threads and to call the task() function 40 times, for integers 0 and 39

This number of worker threads and calls to task() were chosen so that we can test the “chunksize” argument in the next section. If you have fewer than 4 logical CPU cores in your system, change the example accordingly, e.g. 2 worker threads and 20 tasks.

...

# create and configure the thread pool

with ThreadPool(4) as pool:

    # execute tasks, block until all complete

    pool.map(task, range(40))

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of parallel map() with the thread pool with a larger iterable

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(1)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool(4) as pool:

        # execute tasks, block until all complete

        pool.map(task, range(40))

    # thread pool is closed automatically

Running the example first creates the ThreadPool with 4 thread workers.

The map() function is then called for the range.

This issues 40 calls to the task() function, one for each integer between 0 and 39. An iterator is returned with the result for each function call, in order.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.

On my system, the example took about 12.2 seconds to complete.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

Task 0 executing with 0.05101144919690426

Task 3 executing with 0.41536115411459507

Task 6 executing with 0.18432973765312977

Task 9 executing with 0.4121165717166093

Task 1 executing with 0.6526851129540462

Task 4 executing with 0.3633527093870562

Task 7 executing with 0.03836331451131314

Task 10 executing with 0.5025071772499283

Task 2 executing with 0.5917107837535422

Task 5 executing with 0.24927356845428306

Task 11 executing with 0.029055109351983965

Task 8 executing with 0.7944756165805348

Task 12 executing with 0.953161547946998

Task 15 executing with 0.9633620949523187

Task 18 executing with 0.5630690381245529

Task 21 executing with 0.7917587608087215

Task 13 executing with 0.8886629299919615

Task 22 executing with 0.7678136381876737

Task 19 executing with 0.20375196689255326

Task 16 executing with 0.5825942559782655

Task 14 executing with 0.7629464546697364

Task 17 executing with 0.6080522573065245

Task 20 executing with 0.15562873549515743

Task 23 executing with 0.3991570170366793

Task 24 executing with 0.23485439519262563

Task 27 executing with 0.060887602001631014

Task 30 executing with 0.014004149328884608

Task 33 executing with 0.8863013572225961

Task 25 executing with 0.6254970384413928

Task 28 executing with 0.06705618072486286

Task 34 executing with 0.25579916690630056

Task 31 executing with 0.1140059202602185

Task 35 executing with 0.12945687883162815

Task 29 executing with 0.8168323186884446

Task 32 executing with 0.4232992703694398

Task 26 executing with 0.7958862148609922

Task 36 executing with 0.3210403570430992

Task 39 executing with 0.43617001646541165

Task 37 executing with 0.2931899057063859

Task 38 executing with 0.2332844283750951

Next, let’s look at the same example using a chunksize.

Example With Chunks

We can update the previous example to use a chunksize.

This can be achieved by adding a “chunksize” argument to the call to map().

Given that we are issuing 40 calls to the task() function to the ThreadPool and that we have 4 worker threads, we can divide the work evenly into 4 groups of ten function calls.

This even division is a good starting point, but it is always a good idea to test different values of the chunksize and optimize it for the performance of your specific application with some trial and error.

This means we can set the chunksize to 10.

...

# execute tasks in chunks, block until all complete

pool.map(task, range(40), chunksize=10)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of parallel map() with the thread pool with a larger iterable and chunksize

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(1)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool(4) as pool:

        # execute tasks in chunks, block until all complete

        pool.map(task, range(40), chunksize=10)

    # thread pool is closed automatically

Running the example first creates the ThreadPool with 4 thread workers.

The map() function is then called for the range with a chunksize of 10.

This issues 4 units of work to the ThreadPool, one for each worker thread and each composed of 10 calls to the task() function.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.

On my system, the example took about 10.2 seconds to complete. This is about 1.20x faster.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

Task 0 executing with 0.4292359227954552

Task 10 executing with 0.8035383645900721

Task 20 executing with 0.8055058403391574

Task 30 executing with 0.5716898589126458

Task 11 executing with 0.7301860867528934

Task 1 executing with 0.9427468480637557

Task 31 executing with 0.3120724424695628

Task 21 executing with 0.8669744851178738

Task 2 executing with 0.8822181175850636

Task 22 executing with 0.5356334224776518

Task 12 executing with 0.36804922820385

Task 32 executing with 0.37973678217433093

Task 23 executing with 0.9655524667616698

Task 3 executing with 0.1994058828293016

Task 33 executing with 0.49890604894446444

Task 13 executing with 0.9088708711374434

Task 4 executing with 0.9044398148839545

Task 34 executing with 0.7193553257167641

Task 24 executing with 0.02503920599590137

Task 14 executing with 0.42626836352563635

Task 35 executing with 0.6325508905206192

Task 5 executing with 0.564840401593125

Task 25 executing with 0.004884008120288219

Task 15 executing with 0.37919302159091073

Task 6 executing with 0.2471211041819601

Task 26 executing with 0.7081653903136206

Task 36 executing with 0.4169043153310472

Task 16 executing with 0.7107127096350152

Task 37 executing with 0.8624703449972991

Task 17 executing with 0.5654652526893504

Task 27 executing with 0.5867574922576663

Task 7 executing with 0.06293018780599402

Task 18 executing with 0.524178579044118

Task 38 executing with 0.4080694402854004

Task 8 executing with 0.5836109732516527

Task 28 executing with 0.5069538921285582

Task 9 executing with 0.8417207064383568

Task 39 executing with 0.9516992338401458

Task 29 executing with 0.8145430811197704

Task 19 executing with 0.6213181769812072

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

    Takeaways

    You now know how to use the map() method to issue many tasks to the ThreadPool.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Kevin Schmid on Unsplash