ThreadPool Wait For All Tasks To Finish in Python - Super Fast Python

You can wait for tasks issued to the ThreadPool to complete by calling wait() on the AsyncResult object or calling join() on the ThreadPool.

In this tutorial, you will discover how to wait for tasks to complete in the ThreadPool in Python.

Let’s get started.

The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.

A thread pool object which controls a pool of worker threads to which jobs can be submitted.

multiprocessing — Process-based parallelism

The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.

Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.

A ThreadPool can be configured when it is created, which will prepare the new threads.

We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().

When using the ThreadPool, we may need to wait for all tasks to complete.

This may be for many reasons, such as:

  • Waiting for all tasks to complete before issuing follow-up tasks.
  • Waiting for all task results so that they can be combined or used.
  • Waiting for all tasks to complete before continuing on with the program.

How can we wait for all tasks to complete in the ThreadPool?

How to Wait For All Tasks to Finish

There are two ways that we can wait for tasks to finish in the ThreadPool.

They are:

  1. Wait for an asynchronous set of tasks to complete with the wait() method.
  2. Wait for all issued tasks to complete after shutdown with the join() method.

Let’s take a closer look at each approach.

How to Wait For All Tasks in a Batch

Tasks may be issued asynchronously to the ThreadPool.

This can be achieved using a method such as apply_async(), map_async(), and starmap_async(). These methods return an AsyncResult object.

We can wait for a single batch of tasks issued asynchronously to the ThreadPool to complete by calling the wait() method on the returned AsyncResult object.

For example:

...

# issue tasks

result = pool.map_async(...)

# wait for issued tasks to complete

result.wait()

If multiple batches of asynchronous tasks are issued to the ThreadPool, we can collect the AsyncResult objects that are returned and wait on each in turn.

You can learn more about how to wait on the AsyncResult object in the tutorial:

How to Wait For All Tasks After Shutdown

We may issue many batches of asynchronous tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned.

Instead, we can wait for all tasks in the ThreadPool to complete by first shutting down the ThreadPool, then joining it to wait for all issued tasks to be completed.

This can be achieved by first calling the close() method that will prevent any further tasks to be issued to the ThreadPool and close down the worker threads once all tasks are complete.

We can then call the join() method. This will block the caller until all tasks in the ThreadPool are completed and the worker threads in the ThreadPool have closed.

For example:

...

# close the thread pool

pool.close()

# block until all tasks are complete and threads close

pool.join()

The downside of this approach is that we cannot issue tasks to the pool after it is closed. This approach can only be used once you know that you have no further tasks to issue to the ThreadPool.

You can learn more about joining the ThreadPool after shutdown in the tutorial:

Now that we know how to wait for all tasks in the ThreadPool to complete, let’s look at some worked examples.

Example of Waiting for All Tasks in a Batch

We can explore how to wait for a batch of issued tasks to complete in the ThreadPool.

In this example, we will define a task that blocks for a moment and then reports a message. From the main thread, we will issue a batch of tasks to the ThreadPool asynchronously. We will then explicitly wait on the batch of tasks to complete by waiting on the returned AsyncResult object.

Firstly, we can define a task to execute in the ThreadPool.

The custom function will take an integer argument, will block for a fraction of a second to simulate computational effort, then reports a message that the task is done.

The task() function below implements this.

# task executed in a worker thread

def task(identifier):

    # block for a moment

    sleep(0.5)

    # report done

    print(f'Task {identifier} done')

Next, in the main thread, we will create the ThreadPool with the default configuration.

We will use the context manager interface to ensure that the ThreadPool is closed automatically once we are finished with it.

...

# create and configure the thread pool

with ThreadPool() as pool:

# ...

You can learn more about the context manager interface in the tutorial:

We will then issue 10 tasks to the ThreadPool by calling our custom task() function with integers from 0 to 9. This can be achieved via the map_async() method.

...

# issue tasks into the thread pool

result = pool.map_async(task, range(10))

You can learn more about issuing asynchronous tasks to the ThreadPool with the map_async() method in the tutorial:

This returns an AsyncResult object on which we can call the wait() method.

This method call will block until all 10 tasks issued asynchronously are completed.

...

# wait for tasks to complete

result.wait()

We can then report a message that the tasks are completed.

...

# report all tasks done

print('All tasks are done')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# SuperFastPython.com

# example of waiting for all tasks in a batch to finish

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # block for a moment

    sleep(0.5)

    # report done

    print(f'Task {identifier} done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # issue tasks into the thread pool

        result = pool.map_async(task, range(10))

        # wait for tasks to complete

        result.wait()

        # report all tasks done

        print('All tasks are done')

    # thread pool is closed automatically

Running the example first creates the ThreadPool.

The ten tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and the main thread then blocks until the issued tasks are completed.

Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.

All ten tasks are issued as a batch to the ThreadPool are completed, then the wait() method returns and the main thread continues on.

A final message is reported, then the ThreadPool is closed automatically via the context manager interface.

Task 0 done

Task 5 done

Task 2 done

Task 3 done

Task 6 done

Task 7 done

Task 1 done

Task 4 done

Task 8 done

Task 9 done

All tasks are done

Next, let’s look at how we might wait for all tasks in the ThreadPool to be completed when shutting down the pool.


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


Example of Waiting for All Tasks After Shutdown

We can wait for all tasks issued to the ThreadPool to complete by shutting down the pool, then joining it.

In this example, we can update the previous example to issue two batches of tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned. We can then close the ThreadPool and join it, blocking until all tasks issued to the ThreadPool have been completed.

This can be achieved by first issuing two batches of tasks to the pool via two calls to the map_async() method.

...

# issue tasks into the thread pool

_ = pool.map_async(task, range(5))

# issue more tasks into the thread pool

_ = pool.map_async(task, range(5, 10))

Next, we can close the ThreadPool to prevent any further tasks to be issued to the pool and to shut down the worker threads once they are done.

...

# shutdown the thread pool

pool.close()

We can then join the thread pool which will block until all tasks issued to the pool have been completed.

...

# wait for tasks to complete

pool.join()

Finally, we can report a message that all tasks are completed.

...

# report all tasks done

print('All tasks are done')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

# SuperFastPython.com

# example of waiting for all tasks after shutdown

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(identifier):

    # block for a moment

    sleep(0.5)

    # report done

    print(f'Task {identifier} done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    with ThreadPool() as pool:

        # issue tasks into the thread pool

        _ = pool.map_async(task, range(5))

        # issue more tasks into the thread pool

        _ = pool.map_async(task, range(5, 10))

        # shutdown the thread pool

        pool.close()

        # wait for tasks to complete

        pool.join()

        # report all tasks done

        print('All tasks are done')

    # thread pool is closed automatically

Running the example first creates the ThreadPool.

The five tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and ignored.

A further five tasks are then issued to the ThreadPool asynchronously, and again an AsyncResult object is returned and ignored.

The ThreadPool is then closed, preventing any further tasks from being issued to the pool and closing the worker threads once all tasks are completed. The main thread then blocks waiting for the ThreadPool to close completely.

Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.

All ten tasks issued to the ThreadPool are completed and the worker threads in the pool terminate. The blocking call to the join() method returns and the main thread continues on.

A final message is reported.

Task 1 done

Task 2 done

Task 0 done

Task 4 done

Task 5 done

Task 6 done

Task 7 done

Task 3 done

Task 9 done

Task 8 done

All tasks are done

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References


    Python ThreadPool Jump-Start

    Loving The Tutorials?

    Why not take the next step? Get the book.

    Learn more
     


    Takeaways

    You now know how to wait for tasks to complete in the ThreadPool in Python.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Grant Durr on Unsplash