Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #90133 > unrolled thread

multiprocessing, queue

Started byMichael Welle <mwe012008@gmx.net>
First post2015-05-08 12:08 +0200
Last post2015-05-09 01:05 +1000
Articles 4 — 2 participants

Back to article view | Back to comp.lang.python


Contents

  multiprocessing, queue Michael Welle <mwe012008@gmx.net> - 2015-05-08 12:08 +0200
    Re: multiprocessing, queue Chris Angelico <rosuav@gmail.com> - 2015-05-08 23:34 +1000
      Re: multiprocessing, queue Michael Welle <mwe012008@gmx.net> - 2015-05-08 16:31 +0200
        Re: multiprocessing, queue Chris Angelico <rosuav@gmail.com> - 2015-05-09 01:05 +1000

#90133 — multiprocessing, queue

FromMichael Welle <mwe012008@gmx.net>
Date2015-05-08 12:08 +0200
Subjectmultiprocessing, queue
Message-ID<gheu1cx2pd.ln2@news.c0t0d0s0.de>
Hello,

what's wrong with [0]? As num_tasks gets higher proc.join() seems to
block forever. First I thought the magical frontier is around 32k tasks,
but then it seemed to work with 40k tasks. Now I'm stuck around 7k
tasks. I think I do something fundamentally wrong, but I can't find it.

Regards
hmw

[0] http://pastebin.com/adfBYgY9

-- 
biff4emacsen - A biff-like tool for (X)Emacs
http://www.c0t0d0s0.de/biff4emacsen/biff4emacsen.html
Flood - Your friendly network packet generator
http://www.c0t0d0s0.de/flood/flood.html

[toc] | [next] | [standalone]


#90169

FromChris Angelico <rosuav@gmail.com>
Date2015-05-08 23:34 +1000
Message-ID<mailman.251.1431092090.12865.python-list@python.org>
In reply to#90133
On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012008@gmx.net> wrote:
> Hello,
>
> what's wrong with [0]? As num_tasks gets higher proc.join() seems to
> block forever. First I thought the magical frontier is around 32k tasks,
> but then it seemed to work with 40k tasks. Now I'm stuck around 7k
> tasks. I think I do something fundamentally wrong, but I can't find it.
>
> Regards
> hmw
>
> [0] http://pastebin.com/adfBYgY9

Your code's small enough to include inline, so I'm doing that:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue

class Foo(Process):

    def __init__(self, task_queue, result_queue):
        Process.__init__(self)
        self.result_queue = result_queue
        self.task_queue = task_queue

    def run(self):
        while True:
            n = self.task_queue.get()
            if n is None:
                break

            self.result_queue.put(1)

        return


def main():
    results = Queue()
    tasks = Queue()
    procs = []
    num_procs = 8
    num_tasks = 8000

    for i in range(num_procs):
        proc = Foo(tasks, results)
        procs.append(proc)

    for proc in procs:
        proc.start()

    for i in range(num_tasks):
        tasks.put(i)

    for i in range(num_procs):
        tasks.put(None)

    for proc in procs:
        print("join")
        proc.join()

    while not results.empty():
        result = results.get()
        print('Result: {}'.format(result))


if __name__ == '__main__':
    main()

# -- end of code --


First thing I'd look at is the default queue size. If your result
queue fills up, all processes will block until something starts
retrieving results. If you really want to have all your results stay
in the queue like that, you may need to specify a huge queue size,
which may cost you a lot of memory; much better would be to have each
job post something on the result queue when it's done, and then you
wait till they're all done:

from multiprocessing import Process, Queue

def foo(task_queue, result_queue):
    while True:
        n = task_queue.get()
        if n is None: break
        result_queue.put(1)
    # Make sure None is not a possible actual result
    # Otherwise, create an object() to use as a flag.
    result_queue.put(None)

def feed_tasks(num_tasks, num_procs, tasks):
    for i in range(num_tasks):
        tasks.put(i)

    for i in range(num_procs):
        tasks.put(None)

def main():
    results = Queue()
    tasks = Queue()
    num_procs = 8
    num_tasks = 8000
    procs = [Process(target=foo, args=(tasks, results)) for i in
range(num_procs)]

    for proc in procs: proc.start()

    Process(target=feed_tasks, args=(num_tasks, num_procs, tasks)).start()

    while num_procs:
        result = results.get()
        if result is None: num_procs -= 1
        else: print('Result: {}'.format(result))

    for proc in procs:
        print("join")
        proc.join()

if __name__ == '__main__':
    main()


I've also made a few other changes (for instance, no need to subclass
Process just to pass args), but the most important parts are a
result_queue.put() just before the process ends, and switching the
order of the result-queue-pump and process-join loops.

That still might block, though, at the point where the tasks are being
put onto the queue; so I've spun that off into its own process. (It
might not be necessary, depending on how your tasks work.) But I
tested this on 200,000 tasks (with the printing of results replaced
with a simple counter), and it worked fine, churning through the work
in about ten seconds.

As a general rule, queues need to have both ends operating
simultaneously, otherwise you're likely to have them blocking. In
theory, your code should all work with ridiculously low queue sizes;
the only cost will be concurrency (since you'd forever be waiting for
the queue, so your tasks will all be taking turns). I tested this by
changing the Queue() calls to Queue(1), and the code took about twice
as long to complete. :)

Hope that helps!

ChrisA

[toc] | [prev] | [next] | [standalone]


#90170

FromMichael Welle <mwe012008@gmx.net>
Date2015-05-08 16:31 +0200
Message-ID<autu1cxa2m.ln2@news.c0t0d0s0.de>
In reply to#90169
Hello,

Chris Angelico <rosuav@gmail.com> writes:

> On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012008@gmx.net> wrote:
>> Hello,
>>
>> what's wrong with [0]? As num_tasks gets higher proc.join() seems to
>> block forever. First I thought the magical frontier is around 32k tasks,
>> but then it seemed to work with 40k tasks. Now I'm stuck around 7k
>> tasks. I think I do something fundamentally wrong, but I can't find it.
>>
>> Regards
>> hmw
>>
>> [0] http://pastebin.com/adfBYgY9
>
> Your code's small enough to include inline, so I'm doing that:
[...]
> First thing I'd look at is the default queue size. If your result
> queue fills up, all processes will block until something starts
> retrieving results.
I tried to create the queues with a size of 100k, did not change the
behaviour.


> If you really want to have all your results stay
> in the queue like that, you may need to specify a huge queue size,
> which may cost you a lot of memory; much better would be to have each
> job post something on the result queue when it's done, and then you
> wait till they're all done:
>
> from multiprocessing import Process, Queue
>
> def foo(task_queue, result_queue):
>     while True:
>         n = task_queue.get()
>         if n is None: break
>         result_queue.put(1)
>     # Make sure None is not a possible actual result
>     # Otherwise, create an object() to use as a flag.
>     result_queue.put(None)
[...]
>     while num_procs:
>         result = results.get()
>         if result is None: num_procs -= 1
>         else: print('Result: {}'.format(result))
>
>     for proc in procs:
>         print("join")
>         proc.join()
>
> if __name__ == '__main__':
>     main()
>
>
> I've also made a few other changes (for instance, no need to subclass
> Process just to pass args), but the most important parts are a
> result_queue.put() just before the process ends,
In general there is no inherent reason for the last put() operation,
it's just the way you evaluate the results in the while loop?


> and switching the
> order of the result-queue-pump and process-join loops.
That seems to decide if my code blocks or not. Why do you do it that
way ;)? In the Queue's documentation one can find the following:


| Warning
| 
| As mentioned above, if a child process has put items on a queue (and
| it has not used JoinableQueue.cancel_join_thread), then that process
| will not terminate until all buffered items have been flushed to the
| pipe. 
| 
| This means that if you try joining that process you may get a deadlock
| unless you are sure that all items which have been put on the queue
| have been consumed. Similarly, if the child process is non-daemonic
| then the parent process may hang on exit when it tries to join all its
| non-daemonic children. 
| 
| Note that a queue created using a manager does not have this
| issue. See Programming guidelines. 

I guess that's what's biting me.


[...]
> As a general rule, queues need to have both ends operating
> simultaneously, otherwise you're likely to have them blocking. In
> theory, your code should all work with ridiculously low queue sizes;
> the only cost will be concurrency (since you'd forever be waiting for
> the queue, so your tasks will all be taking turns). I tested this by
> changing the Queue() calls to Queue(1), and the code took about twice
> as long to complete. :)
;) I know, as you might guess it's not a real world example. It's just
to explore the multiprocessing module.


Regards
hmw

-- 
biff4emacsen - A biff-like tool for (X)Emacs
http://www.c0t0d0s0.de/biff4emacsen/biff4emacsen.html
Flood - Your friendly network packet generator
http://www.c0t0d0s0.de/flood/flood.html

[toc] | [prev] | [next] | [standalone]


#90176

FromChris Angelico <rosuav@gmail.com>
Date2015-05-09 01:05 +1000
Message-ID<mailman.252.1431097527.12865.python-list@python.org>
In reply to#90170
On Sat, May 9, 2015 at 12:31 AM, Michael Welle <mwe012008@gmx.net> wrote:
>> As a general rule, queues need to have both ends operating
>> simultaneously, otherwise you're likely to have them blocking. In
>> theory, your code should all work with ridiculously low queue sizes;
>> the only cost will be concurrency (since you'd forever be waiting for
>> the queue, so your tasks will all be taking turns). I tested this by
>> changing the Queue() calls to Queue(1), and the code took about twice
>> as long to complete. :)
> ;) I know, as you might guess it's not a real world example. It's just
> to explore the multiprocessing module.

Sure, but even in a real-world example, it shouldn't ever be necessary
to create huge queues. Larger queues allow for inconsistent
performance of producer and/or consumer (eg if your consumer takes 1s
to do each of 500 jobs, then 500s to do one job, it's capable of
coping with a producer that puts one job on the queue every 2s, but
only if the queue can handle ~250 jobs), but otherwise, the only
effect of shrinking the queues is to force the processes into
lock-step. Bigger queues mean that an over-performing producer will
run you out of memory rather than get blocked. At very least, it
should be a safe way to debug your logic - cut the queues down to just
2-3 elements each, and keep on printing out what's in the queue.

ChrisA

[toc] | [prev] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web