## Test code ##
## Modified a bit from the original written by Doug Hellmann
## https://pymotw.com/3/multiprocessing/communication.html
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
print('Getting task')
next_task = self.task_queue.get()
print(f'task got: {next_task}')
if next_task is None:
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
print('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1)
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a * self.b)
def __str__(self):
return '{self.a} * {self.b}'.format(self=self)
def test():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [Consumer(tasks, results) for i in range(num_consumers)]
[w.start() for w in consumers]
num_jobs = 10
print('Putting')
[tasks.put(Task(i, i)) for i in range(num_jobs)]
print('Poisoning')
[tasks.put(None) for i in range(num_consumers)]
print('Joining')
tasks.join()
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
###
1. This code works perfectly in 3.7.1 but halts the main process in 3.7.2
2. It seems the JoinableQueue is empty when it is accessed by processes.
3. IMHO, resource sharing mechanism in multiprocessing.queue seems not working properly. |