How to Show Progress for Tasks With the ThreadPoolExecutor in Python - Super Fast Python

You can show progress of tasks in the ThreadPoolExecutor by using a callback function.

In this tutorial, you will discover how to show progress of tasks in a Python thread pool.

Let’s get started.

The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.

You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.

Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.

You may need to show progress of all or a subset of tasks submitted to the thread pool with the submit() function.

This may be for many reasons, such as reporting progress to the user or internally keeping track of how much time or work remains to be completed.

How can we show progress of completed tasks in the ThreadPoolExecutor in a standardized way?

Use add_done_callback() to Show Progress

You can add a callback function to provide a standard way to track progress for completed tasks.

This can be achieved by calling the add_done_callback() function on the Future object for each task and specifying a custom function to track or report progress.

...

# add a callback function to a task

future.add_done_callback(custom_callback)

Recall that we get a Future object when calling the submit() function on the ThreadPoolExecutor when submitting a task.

The callback function must have a single argument, which is the Future object on which it was called.

# task callback function

def custom_callback(future):

# do something...

Using a callback is not the only way to show progress. For example, you can enumerate tasks as they are completed in the main thread via the as_completed() function.

...

# report progress as tasks are completed

for future in as_completed(futures):

print('Another task has completed!')

The problem with this approach is that it requires that you mix both result processing and task progress indication code together, which may be less clean than using a callback function.

The callback function will only be called when the task is completed, e.g. “done”.

A task may be completed in one of three ways:

  • The task finishes successfully.
  • The task is cancelled.
  • The task raises an exception that is not handled.

As such, you may want to check the status of the task to see if it was cancelled or if an exception was raised in the callback function before reporting on the progress.

# task callback function

def custom_callback(future):

# check if task was cancelled

if future.cancelled():

# the task was cancelled

elseif future.exception()

# the task raised an exception

else:

# the task finished successfully

There are many ways to report progress in the callback function.

For example, you can print a character, one for each task that is completed.

...

# show progress for one task

print('.', end='', flush=True)

Now that we know how to show the progress of tasks in a standard way, let’s look at a worked example.

Example of Showing Progress of Tasks

Let’s look at how we might show the progress of tasks completed in the ThreadPoolExecutor.

First, let’s define a mock task that sleeps for a fraction of a second.

# mock test that works for moment

def task(name):

    sleep(random())

Next, we can define our callback function that will take a Future object and report progress by printing one character for each task that completes.

This is a generic and scalable way of showing progress for tasks, although it does not provide an indication of how many tasks remain to be executed.

# simple progress indicator callback function

def progress_indicator(future):

    print('.', end='', flush=True)

Next, we can create a thread pool with two threads and submit many tasks.

...

# start the thread pool

with ThreadPoolExecutor(2) as executor:

    # send in the tasks

    futures = [executor.submit(task, i) for i in range(20)]

We can then register the callback with each task, which will be executed after each task has completed.

...

# register the progress indicator callback

for future in futures:

    future.add_done_callback(progress_indicator)

That’s it.

Tying this together, the complete example of showing a progress indicator of completed tasks is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of a simple progress indicator for tasks

from time import sleep

from random import random

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import wait

# simple progress indicator callback function

def progress_indicator(future):

    print('.', end='', flush=True)

# mock test that works for moment

def task(name):

    sleep(random())

# start the thread pool

with ThreadPoolExecutor(2) as executor:

    # send in the tasks

    futures = [executor.submit(task, i) for i in range(20)]

    # register the progress indicator callback

    for future in futures:

        future.add_done_callback(progress_indicator)

    # wait for all tasks to complete

print('\nDone!')

Running the example creates the thread pool and submits twenty tasks for execution.

The tasks complete one-by-one, reporting the progress of completed tasks with printed dots.

Once all 20 tasks have completed with all 20 dots printed, we can carry on with our program.

....................

Done!


Free Python ThreadPoolExecutor Course

Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.

Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.

Learn more
 


How to Count Tasks Completed

We may want to report the total number of tasks that have completed and/or the total tasks that remain to be executed.

This requires a counter of the total number of tasks completed so far.

Multiple threads may try and update or report values from the counter at the same time, which may make the counter inconsistent or report incorrect values.

...

# update the counter

tasks_completed += 1

# report progress

print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.')

Reading and writing the counter must be made thread-safe, e.g. an operation that only one thread can perform at a time.

This can be achieved using a threading.Lock class, which must be acquired before the counter can be updated or reported.

Once the lock is held by one thread, any other thread must wait to acquire the lock before it can update the counter. This waiting is performed automatically. As soon as the lock is released, another thread may acquire the lock and update the counter.

...

# acquire the lock

lock.acquire()

# update the counter

tasks_completed += 1

# report progress

print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.')

# release the lock

lock.release()

The lock can be acquired using a context manager, ensuring it is released automatically once we are finished with it.

...

# obtain the lock

with lock:

    # update the counter

    tasks_completed += 1

    # report progress

    print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.')

We cannot pass objects to the callback function or store data in the Future object for the task, so this approach would require the use of global variables or to occur within a thread that keeps track of tasks as they are completed via the as_completed() function.

Let’s use global variables for the lock, the counter, and the total number of tasks and access them from the callback function called by the ThreadPoolExecutor as each task is completed.

The updated callback function to report the total complete and total remaining tasks in the thread pool is listed below.

# simple progress indicator callback function

def progress_indicator(future):

    global lock, tasks_total, tasks_completed

    # obtain the lock

    with lock:

        # update the counter

        tasks_completed += 1

        # report progress

        print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.')

We then create the Lock and counters at the beginning of our program

...

# create a lock for the counter

lock = Lock()

# total tasks we will execute

tasks_total = 20

# total completed tasks

tasks_completed = 0

Tying this together, the complete example of keeping track of the number of complete and remaining tasks is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

# SuperFastPython.com

# example of a simple progress indicator for tasks

from time import sleep

from random import random

from threading import Lock

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import as_completed

# simple progress indicator callback function

def progress_indicator(future):

    global lock, tasks_total, tasks_completed

    # obtain the lock

    with lock:

        # update the counter

        tasks_completed += 1

        # report progress

        print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.')

# mock test that works for moment

def task(name):

    sleep(random())

# create a lock for the counter

lock = Lock()

# total tasks we will execute

tasks_total = 20

# total completed tasks

tasks_completed = 0

# start the thread pool

with ThreadPoolExecutor(2) as executor:

    # send in the tasks

    futures = [executor.submit(task, i) for i in range(20)]

    # register the progress indicator callback

    for future in futures:

        future.add_done_callback(progress_indicator)

    # wait for all tasks to complete

print('Done!')

Running the example creates the thread pool, submits the tasks, and registers the callback function as before.

As tasks complete, the updated callback function is used, first updating the counter, reporting the total complete and total remaining tasks in a thread-safe manner, governed by a lock.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

1/20 completed, 19 remain.

2/20 completed, 18 remain.

3/20 completed, 17 remain.

4/20 completed, 16 remain.

5/20 completed, 15 remain.

6/20 completed, 14 remain.

7/20 completed, 13 remain.

8/20 completed, 12 remain.

9/20 completed, 11 remain.

10/20 completed, 10 remain.

11/20 completed, 9 remain.

12/20 completed, 8 remain.

13/20 completed, 7 remain.

14/20 completed, 6 remain.

15/20 completed, 5 remain.

16/20 completed, 4 remain.

17/20 completed, 3 remain.

18/20 completed, 2 remain.

19/20 completed, 1 remain.

20/20 completed, 0 remain.

Done!

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

Guides

APIs

References


Python ThreadPoolExecutor Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Takeaways

You now know how to show progress of tasks in the ThreadPoolExecutor.

Do you have any questions about how to show progress of tasks?
Ask your questions in the comments below and I will do my best to answer.

Photo by Zoltan Tasi on Unsplash