How to Use ThreadPool imap() in Python - Super Fast Python

Last Updated on October 29, 2022

You can issue tasks to the ThreadPool pool one-by-one and execute them in threads via the imap() method.

In this tutorial you will discover how to use the imap() method to issue tasks to the ThreadPool in Python.

Let’s get started.

Need a Lazy and Parallel Version of map()

The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.

A thread pool object which controls a pool of worker threads to which jobs can be submitted.

multiprocessing — Process-based parallelism

The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.

Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.

A ThreadPool can be configured when it is created, which will prepare the new threads.

We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().

The ThreadPool map() method allows you to apply a function to each item in an iterable. A problem with this method is that it converts the provided iterable of items into a list and submits all items as tasks to the ThreadPool then blocks until all tasks are complete.

The imap() method is a lazier version of them map() method where we submit tasks one-by-one to the ThreadPool and retrieve results for tasks as they complete

How can we use the imap() method for the ThreadPool?

The ThreadPool provides a lazy parallel map() function via map() method.

Recall that the built-in map() function will apply a given function to each item in a given iterable.

Return an iterator that applies function to every item of iterable, yielding the results.

Built-in Functions

It yields one result returned from the given target function called with one item from a given iterable. It is common to call map and iterate the results in a for-loop.

For example:

...

# iterates results from map

for result in map(task, items):

# ...

Also, recall that the ThreadPool provides a version of the map() function where the target function is called for each item in the provided iterable in a new worker thread.

The problem with the ThreadPool map() method is that it converts the provided iterable into a list and issues a task for each item all at once. This can be a problem if the iterable contains many hundreds or thousands of items as it may use a lot of main memory.

As an alternative, the ThreadPool provides the imap() method which is a lazy version of map() for applying a target function to each item in an iterable in a lazy manner.

A lazier version of map().

multiprocessing — Process-based parallelism

Specifically:

  • Items are yielded from the provided iterable one at a time instead of all at once.
  • Results are yielded in order as they are completed rather than after all tasks are completed.

We can use the imap() method on the ThreadPool just like the map() method.

For example:

...

# apply function to each item in the iterable in threads

for result in pool.imap(task, items):

# ...

Each item in the iterable is taken as a separate task in the ThreadPool.

Like the built-in map() function, the returned iterator of results will be in the order of the provided iterable. This means that tasks are issued in the same order as the results are returned.

Unlike the built-in map() function, the imap() method only takes one iterable as an argument. This means that the target function executed in the ThreadPool can only take a single argument.

Unlike the ThreadPool map() method, the imap() method will iterate the provided iterable one item at a time and issue tasks to the ThreadPool. It will also yield return values as tasks are completed rather than all at once after all tasks are completed.

Like the ThreadPool map() method, it is possible to split up the items in the iterable evenly to worker threads.

For example, if we had a ThreadPool with 4 worker threads and an iterable with 40 items, we can split up the items into 4 chunks of 10 items, with one chunk allocated to each worker thread.

The effect is less overhead in transmitting tasks to worker threads and collecting results.

This can be achieved via the “chunksize” argument to imap().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

multiprocessing — Process-based parallelism

For example:

...

# iterates results from map with chunksize

for result in pool.imap(task, items, chunksize=10):

# ...

Next, let’s take a closer look at how the imap() method compares to other methods on the ThreadPool.

Difference Between imap() and map()

How does the imap() method compare to the map() method for issuing tasks to the ThreadPool?

Both the imap() and map() methods may be used to issue tasks that call a function to all items in an iterable via the ThreadPool.

The following summarizes the key differences between these two methods:

  • The imap() method issues one task at a time to the ThreadPool, the map() method issues all tasks at once to the pool.
  • The imap() method blocks until each task is complete when iterating the return values, the map() method blocks until all tasks complete when iterating return values.

The imap() method should be used for issuing tasks one-by-one and handle the results for tasks in order as they are available.

The map() method should be used for issuing all tasks at once and handle results in order only once all issued tasks have completed.


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


Difference Between imap() and imap_unordered()

How does the imap() method compare to the imap_unordered() method for issuing tasks to the ThreadPool?

The imap() and imap_unordered() methods have a lot in common, such as:

  • Both the imap() and imap_unordered() may be used to issue tasks that call a function to all items in an iterable via the ThreadPool.
  • Both the imap() and imap_unordered() are lazy versions of the map() method.
  • Both the imap() and imap_unordered() methods return an iterable over the return values immediately.

Nevertheless, there is one key difference between the two methods:

  • The iterable returned from imap() yields results in order as they are completed, whereas the imap_unordered() method yields results in an arbitrary order in which tasks are completed.

The imap() method should be used when the caller needs to iterate return values in the order that they were submitted from tasks as they are completed.

The imap_unordered() method should be used when the caller needs to iterate return values in any arbitrary order (not the order that they were submitted) from tasks as they are completed.

Now that we know how to use the imap() method to execute tasks in the ThreadPool, let’s look at some worked examples.

Example of ThreadPool imap()

We can explore how to use the imap() on the ThreadPool.

In this example, we can define a target task function that takes an integer as an argument, generates a random number, reports the value then returns the value that was generated. We can then call this function for each integer between 0 and 9 using the ThreadPool imap().

This will apply the function to each integer in parallel using as many cores as are available in the system.

Firstly, we can define the target task function.

The function takes an argument, generates a random number between 0 and 1, reports the integer and generated number. It then blocks for a fraction of a second to simulate computational effort, then returns the number that was generated.

The task() function below implements this.

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

    # return the generated value

    return value

We can then create and configure a ThreadPool.

We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.

If you are new to the context manager interface of the ThreadPool, you can learn more in the tutorial:

...

# create and configure the thread pool

with ThreadPool() as pool:

# ...

We can then call the imap() method on the thread pool to apply our task() function to each value in a range between 0 and 49.

This returns an iterator over return values that will yield results in the order that the tasks were submitted to the pool, as the tasks are completed.

We can then report the return values directly.

This can all be achieved in a for-loop.

...

# execute tasks in order

for result in pool.imap(task, range(50)):

    print(f'Got result: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

# SuperFastPython.com

# example of parallel imap() with the thread pool

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # execute tasks in order

        for result in pool.imap(task, range(50)):

            print(f'Got result: {result}')

    # thread pool is closed automatically

Running the example first creates the ThreadPool with a default configuration.

It will have one worker thread for each logical CPU in your system.

The imap() method is then called for the range.

This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned with the result for each function call, in order.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.

Importantly, tasks are issued to the ThreadPool one-by-one, as space in the pool becomes available.

As importantly, results in the main thread are reported as tasks are completed.

A truncated listing of results is provided below. We can see that tasks are running and reporting their generated results while the main thread is receiving and reporting return values.

This is unlike the map() method that must wait for all tasks to complete before reporting return values.

Note, results will differ each time the program is run given the use of random numbers.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

...

Task 44 executing with 0.8059601935689584

Task 45 executing with 0.48598799703863815

Task 46 executing with 0.6945569960834632

Got result: 0.9593996566896341

Got result: 0.25931911278772

Got result: 0.3721250020408787

Task 47 executing with 0.6740637574801822

Task 48 executing with 0.9586533625326895

Task 49 executing with 0.7386243215735206

Got result: 0.7520985511403526

Got result: 0.7103006703637886

Got result: 0.08098899257293801

Got result: 0.304245428921023

Got result: 0.5993997250046631

Got result: 0.30305971250819985

Got result: 0.8825095508946762

Got result: 0.7362612054796109

Got result: 0.8059601935689584

Got result: 0.48598799703863815

Got result: 0.6945569960834632

Got result: 0.6740637574801822

Got result: 0.9586533625326895

Got result: 0.7386243215735206

Next, let’s look at an example of issuing tasks that do not have a return value.


Python ThreadPool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of ThreadPool imap() with No Return Value

We can explore using the imap() method to call a function for each item in an iterable that does not have a return value.

This means that we are not interested in the iterable of results returned by the call to imap() and instead are only interested that all issued tasks get executed.

This can be achieved by updating the previous example so that the task() function does not return a value.

The updated task() function with this change is listed below.

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

Then, in the main thread, we can call imap() with our task() function and the range, and not iterate the results.

...

# issue tasks to the thread pool

pool.imap(task, range(50))

The call to imap() will return immediately.

Therefore, we must explicitly wait for all tasks in the ThreadPool to complete. Otherwise, the context manager for the ThreadPool would exit and forcefully terminate the ThreadPool and all running tasks in the pool.

This can be achieved by first closing the ThreadPool so no further tasks can be submitted to the pool, then joining the pool to wait for all tasks to complete and all worker threads to close.

...

# shutdown the thread pool

pool.close()

# wait for all issued task to complete

pool.join()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

# SuperFastPython.com

# example of parallel imap() with the thread pool and a task that does not return a value

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(value)

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # issue tasks to the thread pool

        pool.imap(task, range(50))

        # shutdown the thread pool

        pool.close()

        # wait for all issued task to complete

        pool.join()

Running the example first creates the ThreadPool with a default configuration.

The imap() method is then called for the range. This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned immediately with the result for each function call, but is ignored in this case.

The main thread carries on, first closing the ThreadPool then joining it to wait for all tasks to complete.

Each call to the task function generates a random number between 0 and 1, reports a message, then blocks.

The tasks in the pool finish then the ThreadPool is closed.

This example again highlights that the call to imap() does not block. That it only blocks when we loop over the returned iterator of return values.

Below is a truncated listing of results.

Note, results will differ each time the program is run given the use of random numbers.

...

Task 35 executing with 0.15058518381693187

Task 36 executing with 0.6882759301894497

Task 37 executing with 0.6637150877497019

Task 38 executing with 0.5046947164962725

Task 39 executing with 0.1044639581962642

Task 40 executing with 0.9042527260866252

Task 41 executing with 0.1471442225236529

Task 42 executing with 0.19755851252004908

Task 43 executing with 0.6121490393416081

Task 44 executing with 0.07121766629938453

Task 45 executing with 0.626422947133471

Task 46 executing with 0.9121666402285978

Task 47 executing with 0.8571597016507869

Task 48 executing with 0.7737306867971672

Task 49 executing with 0.6323037952640108

Next, let’s explore the chunksize argument to the imap() method.

Example of ThreadPool imap() with chunksize

The imap() method will apply a function to each item in an iterable one-by-one.

If the iterable has a large number of items, it may be inefficient as each task must retrieve the input from the provided iterable and be serialized to be sent to and executed by thread workers.

A more efficient approach for very large iterables might be to divide the items in the iterable into chunks and issue chunks of items to each worker thread to which the target function can be applied.

This can be achieved with the “chunksize” argument to the imap() method.

The example below updates the example to first configure the ThreadPool with 4 worker threads, then to issue 40 tasks with 10 tasks per worker thread.

Firstly, we can configure the ThreadPool.

# create and configure the thread pool

with ThreadPool(4) as pool:

# ...

Next, we can issue 40 tasks, with 10 tasks assigned to each worker via the “chunksize” argument.

...

# issue tasks to the thread pool

pool.imap(task, range(40), chunksize=10)

The main thread then closes the pool and waits for all tasks to complete.

...

# shutdown the thread pool

pool.close()

# wait for all issued task to complete

pool.join()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

# SuperFastPython.com

# example of parallel imap() with the thread pool with a larger iterable and chunksize

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # generate a value

    value = random()

    # report a message

    print(f'Task {identifier} executing with {value}')

    # block for a moment

    sleep(1)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool(4) as pool:

        # issue tasks to the process pool

        pool.imap(task, range(40), chunksize=10)

        # shutdown the process pool

        pool.close()

        # wait for all issued task to complete

        pool.join()

Running the example first creates the ThreadPool with 4 thread workers.

The imap() method is then called for the range with a chunksize of 10.

This issues 4 units of work to the ThreadPool, one for each worker thread and each composed of 10 calls to the task() function.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main thread carries on, first closing the ThreadPool then joining it to wait for all tasks to complete.

The tasks in the pool finish then the ThreadPool is closed.

Below is a truncated listing of results.

Note, results will differ each time the program is run given the use of random numbers.

...

Task 37 executing with 0.49549256276698295

Task 17 executing with 0.08008359398329978

Task 8 executing with 0.25164685520569763

Task 28 executing with 0.6803821338729011

Task 38 executing with 0.7779208876305378

Task 18 executing with 0.4143919947947172

Task 19 executing with 0.9621104252302982

Task 39 executing with 0.28760436925830335

Task 9 executing with 0.08510357215537578

Task 29 executing with 0.307586021173349

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

    Takeaways

    You now know how to use the imap() method to issue tasks to the ThreadPool in Python.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Yoshiki 787 on Unsplash