Message 332812 - Python tracker

Message332812

Author June Kim
Recipients June Kim
Date 2018-12-31.11:49:30
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1546256970.74.0.591831290501.issue35627@roundup.psfhosted.org>
In-reply-to
Content
## Test code ##
## Modified a bit from the original written by Doug Hellmann
## https://pymotw.com/3/multiprocessing/communication.html

import multiprocessing
import time


class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            print('Getting task')
            next_task = self.task_queue.get()
            print(f'task got: {next_task}')
            if next_task is None:
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


def test():
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [Consumer(tasks, results) for i in range(num_consumers)]
    [w.start() for w in consumers]
    num_jobs = 10
    print('Putting')
    [tasks.put(Task(i, i)) for i in range(num_jobs)]
    print('Poisoning')
    [tasks.put(None) for i in range(num_consumers)]
    print('Joining')
    tasks.join()
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

###
1. This code works perfectly in 3.7.1 but halts the main process in 3.7.2
2. It seems the JoinableQueue is empty when it is accessed by processes.
3. IMHO, resource sharing mechanism in multiprocessing.queue seems not working properly.
History
Date User Action Args
2018-12-31 11:49:31June Kimsetrecipients: + June Kim
2018-12-31 11:49:30June Kimsetmessageid: <1546256970.74.0.591831290501.issue35627@roundup.psfhosted.org>
2018-12-31 11:49:30June Kimlinkissue35627 messages
2018-12-31 11:49:30June Kimcreate