Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]
Groups > comp.lang.python > #90133 > unrolled thread
| Started by | Michael Welle <mwe012008@gmx.net> |
|---|---|
| First post | 2015-05-08 12:08 +0200 |
| Last post | 2015-05-09 01:05 +1000 |
| Articles | 4 — 2 participants |
Back to article view | Back to comp.lang.python
multiprocessing, queue Michael Welle <mwe012008@gmx.net> - 2015-05-08 12:08 +0200
Re: multiprocessing, queue Chris Angelico <rosuav@gmail.com> - 2015-05-08 23:34 +1000
Re: multiprocessing, queue Michael Welle <mwe012008@gmx.net> - 2015-05-08 16:31 +0200
Re: multiprocessing, queue Chris Angelico <rosuav@gmail.com> - 2015-05-09 01:05 +1000
| From | Michael Welle <mwe012008@gmx.net> |
|---|---|
| Date | 2015-05-08 12:08 +0200 |
| Subject | multiprocessing, queue |
| Message-ID | <gheu1cx2pd.ln2@news.c0t0d0s0.de> |
Hello, what's wrong with [0]? As num_tasks gets higher proc.join() seems to block forever. First I thought the magical frontier is around 32k tasks, but then it seemed to work with 40k tasks. Now I'm stuck around 7k tasks. I think I do something fundamentally wrong, but I can't find it. Regards hmw [0] http://pastebin.com/adfBYgY9 -- biff4emacsen - A biff-like tool for (X)Emacs http://www.c0t0d0s0.de/biff4emacsen/biff4emacsen.html Flood - Your friendly network packet generator http://www.c0t0d0s0.de/flood/flood.html
[toc] | [next] | [standalone]
| From | Chris Angelico <rosuav@gmail.com> |
|---|---|
| Date | 2015-05-08 23:34 +1000 |
| Message-ID | <mailman.251.1431092090.12865.python-list@python.org> |
| In reply to | #90133 |
On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012008@gmx.net> wrote:
> Hello,
>
> what's wrong with [0]? As num_tasks gets higher proc.join() seems to
> block forever. First I thought the magical frontier is around 32k tasks,
> but then it seemed to work with 40k tasks. Now I'm stuck around 7k
> tasks. I think I do something fundamentally wrong, but I can't find it.
>
> Regards
> hmw
>
> [0] http://pastebin.com/adfBYgY9
Your code's small enough to include inline, so I'm doing that:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
class Foo(Process):
def __init__(self, task_queue, result_queue):
Process.__init__(self)
self.result_queue = result_queue
self.task_queue = task_queue
def run(self):
while True:
n = self.task_queue.get()
if n is None:
break
self.result_queue.put(1)
return
def main():
results = Queue()
tasks = Queue()
procs = []
num_procs = 8
num_tasks = 8000
for i in range(num_procs):
proc = Foo(tasks, results)
procs.append(proc)
for proc in procs:
proc.start()
for i in range(num_tasks):
tasks.put(i)
for i in range(num_procs):
tasks.put(None)
for proc in procs:
print("join")
proc.join()
while not results.empty():
result = results.get()
print('Result: {}'.format(result))
if __name__ == '__main__':
main()
# -- end of code --
First thing I'd look at is the default queue size. If your result
queue fills up, all processes will block until something starts
retrieving results. If you really want to have all your results stay
in the queue like that, you may need to specify a huge queue size,
which may cost you a lot of memory; much better would be to have each
job post something on the result queue when it's done, and then you
wait till they're all done:
from multiprocessing import Process, Queue
def foo(task_queue, result_queue):
while True:
n = task_queue.get()
if n is None: break
result_queue.put(1)
# Make sure None is not a possible actual result
# Otherwise, create an object() to use as a flag.
result_queue.put(None)
def feed_tasks(num_tasks, num_procs, tasks):
for i in range(num_tasks):
tasks.put(i)
for i in range(num_procs):
tasks.put(None)
def main():
results = Queue()
tasks = Queue()
num_procs = 8
num_tasks = 8000
procs = [Process(target=foo, args=(tasks, results)) for i in
range(num_procs)]
for proc in procs: proc.start()
Process(target=feed_tasks, args=(num_tasks, num_procs, tasks)).start()
while num_procs:
result = results.get()
if result is None: num_procs -= 1
else: print('Result: {}'.format(result))
for proc in procs:
print("join")
proc.join()
if __name__ == '__main__':
main()
I've also made a few other changes (for instance, no need to subclass
Process just to pass args), but the most important parts are a
result_queue.put() just before the process ends, and switching the
order of the result-queue-pump and process-join loops.
That still might block, though, at the point where the tasks are being
put onto the queue; so I've spun that off into its own process. (It
might not be necessary, depending on how your tasks work.) But I
tested this on 200,000 tasks (with the printing of results replaced
with a simple counter), and it worked fine, churning through the work
in about ten seconds.
As a general rule, queues need to have both ends operating
simultaneously, otherwise you're likely to have them blocking. In
theory, your code should all work with ridiculously low queue sizes;
the only cost will be concurrency (since you'd forever be waiting for
the queue, so your tasks will all be taking turns). I tested this by
changing the Queue() calls to Queue(1), and the code took about twice
as long to complete. :)
Hope that helps!
ChrisA
[toc] | [prev] | [next] | [standalone]
| From | Michael Welle <mwe012008@gmx.net> |
|---|---|
| Date | 2015-05-08 16:31 +0200 |
| Message-ID | <autu1cxa2m.ln2@news.c0t0d0s0.de> |
| In reply to | #90169 |
Hello,
Chris Angelico <rosuav@gmail.com> writes:
> On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012008@gmx.net> wrote:
>> Hello,
>>
>> what's wrong with [0]? As num_tasks gets higher proc.join() seems to
>> block forever. First I thought the magical frontier is around 32k tasks,
>> but then it seemed to work with 40k tasks. Now I'm stuck around 7k
>> tasks. I think I do something fundamentally wrong, but I can't find it.
>>
>> Regards
>> hmw
>>
>> [0] http://pastebin.com/adfBYgY9
>
> Your code's small enough to include inline, so I'm doing that:
[...]
> First thing I'd look at is the default queue size. If your result
> queue fills up, all processes will block until something starts
> retrieving results.
I tried to create the queues with a size of 100k, did not change the
behaviour.
> If you really want to have all your results stay
> in the queue like that, you may need to specify a huge queue size,
> which may cost you a lot of memory; much better would be to have each
> job post something on the result queue when it's done, and then you
> wait till they're all done:
>
> from multiprocessing import Process, Queue
>
> def foo(task_queue, result_queue):
> while True:
> n = task_queue.get()
> if n is None: break
> result_queue.put(1)
> # Make sure None is not a possible actual result
> # Otherwise, create an object() to use as a flag.
> result_queue.put(None)
[...]
> while num_procs:
> result = results.get()
> if result is None: num_procs -= 1
> else: print('Result: {}'.format(result))
>
> for proc in procs:
> print("join")
> proc.join()
>
> if __name__ == '__main__':
> main()
>
>
> I've also made a few other changes (for instance, no need to subclass
> Process just to pass args), but the most important parts are a
> result_queue.put() just before the process ends,
In general there is no inherent reason for the last put() operation,
it's just the way you evaluate the results in the while loop?
> and switching the
> order of the result-queue-pump and process-join loops.
That seems to decide if my code blocks or not. Why do you do it that
way ;)? In the Queue's documentation one can find the following:
| Warning
|
| As mentioned above, if a child process has put items on a queue (and
| it has not used JoinableQueue.cancel_join_thread), then that process
| will not terminate until all buffered items have been flushed to the
| pipe.
|
| This means that if you try joining that process you may get a deadlock
| unless you are sure that all items which have been put on the queue
| have been consumed. Similarly, if the child process is non-daemonic
| then the parent process may hang on exit when it tries to join all its
| non-daemonic children.
|
| Note that a queue created using a manager does not have this
| issue. See Programming guidelines.
I guess that's what's biting me.
[...]
> As a general rule, queues need to have both ends operating
> simultaneously, otherwise you're likely to have them blocking. In
> theory, your code should all work with ridiculously low queue sizes;
> the only cost will be concurrency (since you'd forever be waiting for
> the queue, so your tasks will all be taking turns). I tested this by
> changing the Queue() calls to Queue(1), and the code took about twice
> as long to complete. :)
;) I know, as you might guess it's not a real world example. It's just
to explore the multiprocessing module.
Regards
hmw
--
biff4emacsen - A biff-like tool for (X)Emacs
http://www.c0t0d0s0.de/biff4emacsen/biff4emacsen.html
Flood - Your friendly network packet generator
http://www.c0t0d0s0.de/flood/flood.html
[toc] | [prev] | [next] | [standalone]
| From | Chris Angelico <rosuav@gmail.com> |
|---|---|
| Date | 2015-05-09 01:05 +1000 |
| Message-ID | <mailman.252.1431097527.12865.python-list@python.org> |
| In reply to | #90170 |
On Sat, May 9, 2015 at 12:31 AM, Michael Welle <mwe012008@gmx.net> wrote: >> As a general rule, queues need to have both ends operating >> simultaneously, otherwise you're likely to have them blocking. In >> theory, your code should all work with ridiculously low queue sizes; >> the only cost will be concurrency (since you'd forever be waiting for >> the queue, so your tasks will all be taking turns). I tested this by >> changing the Queue() calls to Queue(1), and the code took about twice >> as long to complete. :) > ;) I know, as you might guess it's not a real world example. It's just > to explore the multiprocessing module. Sure, but even in a real-world example, it shouldn't ever be necessary to create huge queues. Larger queues allow for inconsistent performance of producer and/or consumer (eg if your consumer takes 1s to do each of 500 jobs, then 500s to do one job, it's capable of coping with a producer that puts one job on the queue every 2s, but only if the queue can handle ~250 jobs), but otherwise, the only effect of shrinking the queues is to force the processes into lock-step. Bigger queues mean that an over-performing producer will run you out of memory rather than get blocked. At very least, it should be a safe way to debug your logic - cut the queues down to just 2-3 elements each, and keep on printing out what's in the queue. ChrisA
[toc] | [prev] | [standalone]
Back to top | Article view | comp.lang.python
csiph-web