ThreadPool Get Results from Asynchronous Tasks - Super Fast Python

Last Updated on October 29, 2022

You can get results from tasks in the ThreadPool using a callback or by calling AsyncResult.get().

In this tutorial, you will discover how to get results from tasks issued asynchronously to the ThreadPool in Python.

Let’s get started.

We can execute tasks asynchronously using the ThreadPool.

The ThreadPool class provides three functions for issuing tasks asynchronously, they are:

  • apply_async(): For executing one-off tasks.
  • map_async(): For calling the same function multiple times with different arguments.
  • starmap_async(): For calling the same function many times with more than one argument.

Each of these approaches for executing an asynchronous task in the ThreadPool returns immediately with an AsyncResult object.

This object provides a handle on the task or tasks and allows us to check on the status of the tasks and to get results.

How can we get results from asynchronous tasks?

How to Get Results from Asynchronous Tasks

There are two ways that we can get results from asynchronous tasks executed with the ThreadPool.

They are:

  1. Use a result callback.
  2. Get results from the AsyncResult object.

Let’s take a closer look at each approach.

How to Get Results Via a Callback

We can get results from asynchronous tasks via a result callback.

A result callback can be specified via the “callback” argument.

The argument specifies the name of a custom function to call with the result from an asynchronous task or tasks.

The function may have any name you like, as long as it does not conflict with a function name already in use.

For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.

# result callback function

def result_callback(result):

print(result)

...

# issue a single task

result = apply_async(..., callback=result_callback)

Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the ThreadPool.

# result callback function

def result_callback(result):

# iterate all results

for value in result:

print(value)

...

# issue a single task

result = map_async(..., callback=result_callback)

How to Get Results Via AsyncResult

We can get results from asynchronous tasks via the AsyncResult object.

An AsyncResult provides a handle on one or more issued tasks.

It allows the caller to check on the status of the issued tasks, wait for the tasks to complete, and get the results once tasks are completed.

We can get the result of an issued task by calling the AsyncResult.get() function.

This will return the result of the specific function called to issue the task.

  • apply_async(): Returns the return value of the target function.
  • map_async(): Returns an iterable over the return values of the target function.
  • starmap_async(): Returns an iterable over the return values of the target function.

For example:

...

# get the result of the task or tasks

value = result.get()

If the issued tasks have not yet been completed, then get() will block until the tasks are finished.

A “timeout” argument can be specified. If the tasks are still running and do not completed within the specified number of seconds, a multiprocessing.TimeoutError is raised.

For example:

...

try:

# get the task result with a timeout

value = result.get(timeout=10)

except multiprocessing.TimeoutError as e:

# ...

If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.

We may need to handle this case explicitly if we expect a task to raise an exception on failure.

For example:

...

try:

# get the task result that might raise an exception

value = result.get()

except Exception as e:

# ...

You can learn more about the AsyncResult object in the tutorial:

Now that we know how to get results from asynchronous tasks issued to the ThreadPool, let’s look at some worked examples.

How to Get Results From apply_async()

We can explore how to get results from tasks issued asynchronously with apply_async().

In this example, we define a simple task that generates and returns a number. We will then issue a task to execute this function and then get the result from the issued task.

Firstly, we can define a target task function.

The function will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the number that was generated.

The task() function below implements this.

# task executed by thread worker

def task():

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value

    return value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...

# create the thread pool

with ThreadPool() as pool:

# ...

We will then issue a call to the task() function to be executed by the ThreadPool asynchronously via the apply_async() function.

...

# issue a task asynchronously

async_result = pool.apply_async(task)

You can learn more about how to use the apply_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the task.

This will block until the task has been completed and then returns the return value from the task.

...

# wait for the task to complete and get the return value

print('Waiting for result...')

result = async_result.get()

Finally, we report the value retrieved from the task.

...

# report the result

print(f'Got: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

# SuperFastPython.com

# example of getting a result from an asynchronous task issued with apply_async

from time import sleep

from random import random

from multiprocessing.pool import ThreadPool

# task executed by thread worker

def task():

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create the thread pool

    with ThreadPool() as pool:

        # issue a task asynchronously

        async_result = pool.apply_async(task)

        # wait for the task to complete and get the return value

        print('Waiting for result...')

        result = async_result.get()

        # report the result

        print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, the task is issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the result.

The task executes, first generating a random number, blocking for a fraction of a second, then returning the number.

The main thread receives the result from the asynchronous task and reports the value.

Note, that the output will differ each time the program is run given the use of random numbers.

Waiting for result...

Got: 0.45583546811729037

This example demonstrated how to get the result from a task executed asynchronously with apply_async().

Next, let’s look at how we might get the result from tasks executed with map_async().


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


How to Get Results From map_async()

We can explore how to get results from tasks issued asynchronously with map_async().

In this example, we define a simple task that takes an integer as an argument, generates a random number, then returns a combination of the argument with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take an integer as an argument. It will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the generated number added to the input argument.

The task() function below implements this.

# task executed in a thread worker

def task(arg):

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value combined with the argument

    return arg + value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...

# create the thread pool

with ThreadPool() as pool:

# ...

We will then issue a call to the task() function multiple times with values between 0 and 4. The tasks are issued using map_async() to be executed asynchronously.

...

# issue tasks asynchronously

async_result = pool.map_async(task, range(5))

You can learn more about how to use the map_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have been completed.

...

# wait for the task to complete and get the iterable of return values

print('Waiting for results...')

results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...

# iterate return values and report

for result in results:

    # report the result

    print(f'Got: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

# SuperFastPython.com

# example of getting a result from an asynchronous task issued with map_async

from time import sleep

from random import random

from multiprocessing.pool import ThreadPool

# task executed in a thread worker

def task(arg):

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value combined with the argument

    return arg + value

# protect the entry point

if __name__ == '__main__':

    # create the thread pool

    with ThreadPool() as pool:

        # issue tasks asynchronously

        async_result = pool.map_async(task, range(5))

        # wait for the task to complete and get the iterable of return values

        print('Waiting for results...')

        results = async_result.get()

        # iterate return values and report

        for result in results:

            # report the result

            print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, that the output will differ each time the program is run given the use of random numbers.

Waiting for results...

Got: 0.8595136493881548

Got: 1.8697912864375383

Got: 2.5849814054642755

Got: 3.9306465445033094

Got: 4.084973998527668

This example demonstrates how to get results from tasks executed asynchronously with map_async().

Next, let’s look at how we might get the result from tasks executed with starmap_async().

How to Get Result From starmap_async()

We can explore how to get results from tasks issued asynchronously with starmap_async().

In this example, we define a simple task that takes three integers as an argument, generates a random number, then returns a combination of the input arguments with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take three integers as arguments. It will generate a random number between 0 and 1. It then blocks for a fraction of a second to simulate computational effort, then adds the generated number and all arguments and returns the sum.

The task() function below implements this.

# task executed in a thread worker

def task(arg1, arg2, arg3):

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value combined with the argument

    return arg1 + arg2 + arg3 + value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...

# create the thread pool

with ThreadPool() as pool:

# ...

Next, we prepare arguments for the starmap() function.

In this case, we create a list of five tuples. Each tuple has an integer argument between 0 and 4, then the square and the cube of each integer.

...

# prepare arguments

args = [(i,i*2, i*3) for i in range(5)]

We will then issue a call to the task() function multiple times with the prepared arguments. The tasks are issued using starmap_async() to be executed asynchronously.

...

# issue tasks asynchronously

async_result = pool.starmap_async(task, args)

You can learn more about how to use the starmap_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have been completed.

...

# wait for the task to complete and get the iterable of return values

print('Waiting for results...')

results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...

# iterate return values and report

for result in results:

    # report the result

    print(f'Got: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

# SuperFastPython.com

# example of getting a result from an asynchronous task issued with starmap_async

from time import sleep

from random import random

from multiprocessing.pool import ThreadPool

# task executed in a thread worker

def task(arg1, arg2, arg3):

    # generate a random value between 0 and 1

    value = random()

    # block for a moment

    sleep(value)

    # return the generated value combined with the argument

    return arg1 + arg2 + arg3 + value

# protect the entry point

if __name__ == '__main__':

    # create the thread pool

    with ThreadPool() as pool:

        # prepare arguments

        args = [(i,i*2, i*3) for i in range(5)]

        # issue tasks asynchronously

        async_result = pool.starmap_async(task, args)

        # wait for the task to complete and get the iterable of return values

        print('Waiting for results...')

        results = async_result.get()

        # iterate return values and report

        for result in results:

            # report the result

            print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, arguments are prepared and five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, the output will differ each time the program is run given the use of random numbers.

Waiting for results...

Got: 0.68185718775623

Got: 6.432202267794842

Got: 12.449108841533565

Got: 18.551857634870373

Got: 24.583687905402517

This example demonstrates how to get results from tasks executed asynchronously with starmap_async().


Python ThreadPool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

    Takeaways

    You now know how to get results from asynchronous tasks executed with the ThreadPool.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Alec Douglas on Unsplash