Thread-Safe SimpleQueue in Python - Super Fast Python

You can use a fast and simple thread-safe queue via the queue.SimpleQueue class.

In this tutorial you will discover how to use the thread-safe SimpleQueue class in Python.

Let’s get started.

Need for a Thread-Safe Simple Queue

A thread is a thread of execution in a computer program.

Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create additional threads in our program in order to execute code concurrently.

Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.

You can learn more about Python threads in the guide:

In concurrent programming, we often need to share data between threads.

One approach to sharing data is to use a queue.

Python provides a number of thread-safe queues in the queue module, such as the queue.SimpleQueue class.

What is the SimpleQueue and how can we use it in Python?

How to Use the SimpleQueue

Python provides a simple thread-safe queue in the queue.SimpleQueue class.

Thread-safe means that it can be used by multiple threads to put and get items concurrently without a race condition.

The “Simple” in the name of the class refers to it providing a minimal interface compared to other queues in the “queue” module.

This minimal interface includes:

  • Checking the size via qsize().
  • Check if empty via empty().
  • Add an item to the queue via put() and put_nowait().
  • Get an item from the queue via get() and get_nowait().

Unlike the queue.Queue class, the queue.SimpleQueue does not provide the ability to limit the capacity of the queue and as such does not allow threads to block while putting items on the queue. It also does not provide the ability for consumer threads to mark tasks as done and have threads join the queue until all tasks are marked done.

Next, let’s take a closer look at this interface.

SimpleQueue Interface

We can create a new instance of the queue via the constructor which will be unbounded in size by default.

...

# create a new queue

queue = queue.SimpleQueue()

Items are added to the end of the queue and retrieved from the front of the queue in a first-in, first-out or FIFO order.

We can add items to the queue by calling the put() function or the put_nowait() function.

For example:

...

# add an item to the queue (without blocking)

queue.put(item)

Adding items to the queue does not block and does not use a timeout, although the function does take a “block” and “timeout” to match the queue.Queue interface, but these arguments have no effect.

Put item into the queue. The method never blocks and always succeeds (except for potential low-level errors such as failure to allocate memory). The optional args block and timeout are ignored and only provided for compatibility with Queue.put().

SimpleQueue Objects

Therefore, put_nowait() is equivalent to put().

...

# add an item to the queue without blocking

queue.put_nowait(item)

Items can be retrieved from the queue by calling the get() function.

...

# get an item from the queue

item = queue.get()

By default, the call to get() will block until an item is available to be returned. Therefore, the above is equivalent to the following:

...

# get an item from the queue

item = queue.get(block=True, timeout=None)

A timeout can be used when waiting for an item on the queue via the “timeout” argument. If the timeout expires before an item is available, then an queue.Empty exception is raised.

...

# get an item from the queue with a timeout

try:

item = queue.get(timeout=1)

except Empty:

# no item...

The “block” argument can be set false, which means the function will return immediately if there is an item to retrieve, otherwise it will raise an queue.Empty exception.

...

# get an item from the queue without blocking

try:

item = queue.get(block=False)

except Empty:

# no item...

This is equivalent to calling the get_nowait() function, for example:

...

# get an item from the queue without blocking

try:

item = queue.get_nowait()

except Empty:

# no item...

The number of items in the queue can be queried via the qsize() function.

For example:

...

# check the size of the queue

size = queue.qsize()

We can check if the queue has no values via the empty() function.

For example

...

# check if the queue is empty

if queue.empty():

# ...

Next, let’s take a look at the reentrant capability of the SimpleQueue class.

SimpleQueue is Reentrant

Unlike the queue.Queue class, the get() and put() functions on the queue supports reentry.

Reentrancy in concurrency refers to a capability from concurrent programming where a function call can be interrupted in the middle of execution and another function call before the first function call has finished.

For example, a reentrant lock provided by threading.RLock class can be acquired by a thread, then acquired again by the same thread.

The queue.SimpleQueue is reentrant.

This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue Objects

In the queue.SimpleQueue, reentrant refers to the ability to call get() while calling put(), or the reverse, calling put() while calling get().

This can happen in obscure situations.

For example, see:

Now that we know how to use the queue.SimpleQueue class, let’s look at some worked examples.

Example of Using the SimpleQueue

We can explore how to use the queue.SimpleQueue class with a worked example.

In this example, we will create a producer thread that will generate ten random numbers and put them on the queue. We will also create a consumer thread that will get numbers from the queue and report their values.

The queue.SimpleQueue provides a simple way to allow these simple producer and consumer threads to communicate data with each other.

First, we can define the function to be executed by the producer thread.

The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.

Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.

The producer() function below implements this by taking the queue instance as an argument.

# generate work

def producer(queue):

    print('Producer: Running')

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # add to the queue

        queue.put(value)

    # all done

    queue.put(None)

    print('Producer: Done')

Next, we can define the function to be executed by the consumer thread.

The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.

If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread. Otherwise, the value is reported.

The consumer() function below implements this and takes the queue instance as an argument.

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        item = queue.get()

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

Finally, in the main thread we can create the shared queue instance.

...

# create the shared queue

queue = SimpleQueue()

We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.

...

# start the consumer

consumer = Thread(target=consumer, args=(queue,))

consumer.start()

Then we can configure and start the producer thread, which will generate work and add it to the queue for the consumer to retrieve.

...

# start the producer

producer = Thread(target=producer, args=(queue,))

producer.start()

The main thread will then block until both the producer and consumer threads terminate, then terminate itself.

...

# wait for all threads to finish

producer.join()

consumer.join()

Tying this together, the complete example of using the queue.SimpleQueue is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

# SuperFastPython.com

# example of using the simple queue

from time import sleep

from random import random

from threading import Thread

from queue import SimpleQueue

# generate work

def producer(queue):

    print('Producer: Running')

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # add to the queue

        queue.put(value)

    # all done

    queue.put(None)

    print('Producer: Done')

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        item = queue.get()

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

# create the shared queue

queue = SimpleQueue()

# start the consumer

consumer = Thread(target=consumer, args=(queue,))

consumer.start()

# start the producer

producer = Thread(target=producer, args=(queue,))

producer.start()

# wait for all threads to finish

producer.join()

consumer.join()

Running the example first creates the shared queue instance.

Next, the consumer thread is started and passed the queue instance. Then the producer thread is started and the main thread blocks until the worker threads terminate.

The producer thread generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer thread waits on the queue for items to arrive, then consumes them one at a time, reporting their value.

Finally, the producer task finishes, a None value is put on the queue and the thread terminates. The consumer thread gets the None value, breaks its loop and also terminates.

This highlights how the queue.SimpleQueue can be used to share data easily between a producer and consumer threads.

A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.

Consumer: Running

Producer: Running

>got 0.49138927575578184

>got 0.9089938160636776

>got 0.9000414387422191

>got 0.69580688995332

>got 0.7816543465569435

>got 0.2853295879765214

>got 0.6840369305347521

>got 0.2025555428328133

>got 0.5096460791785361

Producer: Done

>got 0.3847119182144745

Consumer: Done

Next, let’s look at how we might get values from the queue without blocking.


Free Python Threading Course

Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.

Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores

Learn more
 


Using the SimpleQueue Without Blocking

We can get values from the queue without blocking.

This might be useful if we wish to use busy waiting in the consumer task to check other state or perform other tasks while waiting for data to arrive on the queue.

You can learn more about busy waiting in the tutorial:

We can update the example from the previous section to get items from the queue without blocking.

This can be achieved by setting the “blocking” argument to False when calling get().

For example:

...

# get a value from the queue without blocking

item = queue.get(block=False)

The get() function will return immediately.

If there is a value in the queue to retrieve, then it is returned. Otherwise, if the queue is empty, then a queue.Empty exception is raised, which can be handled.

In this case, if there is no value to get from the queue, we report a message and sleep for a fraction of a second. We will then continue which will jump back to the start of the consumer busy waiting loop.

For example:

...

# get a unit of work

try:

    item = queue.get(block=False)

except Empty:

    print('Consumer: got nothing, waiting a while...')

    sleep(0.5)

    continue

The updated version of the consumer() function with this change is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        try:

            item = queue.get(block=False)

        except Empty:

            print('Consumer: got nothing, waiting a while...')

            sleep(0.5)

            continue

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

# SuperFastPython.com

# example of using the simple queue without blocking

from time import sleep

from random import random

from threading import Thread

from queue import SimpleQueue

from queue import Empty

# generate work

def producer(queue):

    print('Producer: Running')

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # add to the queue

        queue.put(value)

    # all done

    queue.put(None)

    print('Producer: Done')

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        try:

            item = queue.get(block=False)

        except Empty:

            print('Consumer: got nothing, waiting a while...')

            sleep(0.5)

            continue

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

# create the shared queue

queue = SimpleQueue()

# start the consumer

consumer = Thread(target=consumer, args=(queue,))

consumer.start()

# start the producer

producer = Thread(target=producer, args=(queue,))

producer.start()

# wait for all threads to finish

producer.join()

consumer.join()

Running the example creates the shared queue, then starts the consumer and producer threads, as before.

The producer thread will generate, block and add items to the queue.

The consumer thread will attempt to get a value from the queue. If there is no value to retrieve, a queue.Empty exception is raised and handled by reporting a message, sleeping for a fraction of a second then starting the busy waiting loop again.

Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.

A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.

We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.

This highlights how to get items from the queue.SimpleQueue without blocking.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

Consumer: Running

Consumer: got nothing, waiting a while...

Producer: Running

Consumer: got nothing, waiting a while...

>got 0.8315210847454647

Consumer: got nothing, waiting a while...

Consumer: got nothing, waiting a while...

>got 0.9040040774166886

Consumer: got nothing, waiting a while...

Consumer: got nothing, waiting a while...

>got 0.9654033549252191

Consumer: got nothing, waiting a while...

>got 0.7305006936148903

>got 0.06463532037048014

Consumer: got nothing, waiting a while...

Consumer: got nothing, waiting a while...

>got 0.8309731213746061

Consumer: got nothing, waiting a while...

Consumer: got nothing, waiting a while...

>got 0.9414631757891674

Consumer: got nothing, waiting a while...

Consumer: got nothing, waiting a while...

Producer: Done

>got 0.7431005827399254

>got 0.0005807148605703194

>got 0.1393792628777406

Consumer: Done

Next, let’s look at how we might get values from the queue with a timeout.

Using the SimpleQueue With a Timeout

We can get values from the queue.SimpleQueue by blocking but limited by a timeout.

This allows a consumer thread to both block while waiting for values to arrive in the queue, but also execute other tasks while busy waiting. It may be more efficient that busy waiting without any blocking calls.

We can update the above example to use a timeout when getting items from the queue in the consumer thread.

This can be achieved by calling the get() function and specifying a timeout value in seconds.

For example:

...

# get a value and block for a timeout

item = queue.get(timeout=0.5)

If a value is available, the function will return immediately with the value.

Otherwise, if no value is available within the timeout, then a queue.Empty exception is raised which may be handled. In this case, we will handle the exception by reporting a message and starting the busy-wait loop again.

...

# get a unit of work

try:

    item = queue.get(timeout=0.5)

except Empty:

    print('Consumer: gave up waiting...')

    continue

Tying this together, the updated version of the consumer() function with this change is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        try:

            item = queue.get(timeout=0.5)

        except Empty:

            print('Consumer: gave up waiting...')

            continue

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

# SuperFastPython.com

# example of using the simple queue with a timeout

from time import sleep

from random import random

from threading import Thread

from queue import SimpleQueue

from queue import Empty

# generate work

def producer(queue):

    print('Producer: Running')

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # add to the queue

        queue.put(value)

    # all done

    queue.put(None)

    print('Producer: Done')

# consume work

def consumer(queue):

    print('Consumer: Running')

    # consume work

    while True:

        # get a unit of work

        try:

            item = queue.get(timeout=0.5)

        except Empty:

            print('Consumer: gave up waiting...')

            continue

        # check for stop

        if item is None:

            break

        # report

        print(f'>got {item}')

    # all done

    print('Consumer: Done')

# create the shared queue

queue = SimpleQueue()

# start the consumer

consumer = Thread(target=consumer, args=(queue,))

consumer.start()

# start the producer

producer = Thread(target=producer, args=(queue,))

producer.start()

# wait for all threads to finish

producer.join()

consumer.join()

Running the example creates the shared queue, then starts the consumer and producer threads, as before.

The producer thread will generate, block and add items to the queue.

The consumer thread will attempt to get a value from the queue. The thread will block for a timeout.

If no value is available before the timeout expires, then a queue.Empty exception is raised and handled by reporting a message then starting the busy wait loop again. Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.

A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.

We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.

This highlights how to get items from the queue.SimpleQueue by blocking with a timeout.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

Consumer: Running

Producer: Running

>got 0.38573432597027735

>got 0.4636838697288195

Consumer: gave up waiting...

>got 0.7517639069916314

>got 0.4420844820641848

>got 0.08157221351989186

Consumer: gave up waiting...

>got 0.7285944241750508

Consumer: gave up waiting...

>got 0.6143809725593375

>got 0.3434913262962024

Consumer: gave up waiting...

>got 0.6376752385413578

Consumer: gave up waiting...

Producer: Done

>got 0.568278446546111

Consumer: Done


Python Threading Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Further Reading

This section provides additional resources that you may find helpful.

Python Threading Books

I also recommend specific chapters in the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

Takeaways

You now know how to use the queue.SimpleQueue in Python.

Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.

Photo by Lital Levy on Unsplash