Path: csiph.com!usenet.pasdenom.info!news.redatomik.org!newsfeed.xs4all.nl!newsfeed3a.news.xs4all.nl!xs4all!post.news.xs4all.nl!not-for-mail Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.002 X-Spam-Evidence: '*H*': 1.00; '*S*': 0.00; 'else:': 0.03; 'skip:[ 20': 0.04; '-*-': 0.07; 'none:': 0.07; 'utf-8': 0.07; '[0]': 0.09; '__name__': 0.09; 'coding:': 0.09; 'main()': 0.09; 'seemed': 0.09; 'wrong,': 0.09; 'cc:addr:python-list': 0.11; 'def': 0.12; 'changes': 0.15; "'__main__':": 0.16; '(it': 0.16; '200,000': 0.16; 'concurrency': 0.16; 'ends,': 0.16; 'from:addr:rosuav': 0.16; 'from:name:chris angelico': 0.16; 'helps!': 0.16; 'main():': 0.16; 'object()': 0.16; 'proc': 0.16; 'retrieving': 0.16; 'seconds.': 0.16; 'subclass': 0.16; 'subject:skip:m 10': 0.16; 'tasks,': 0.16; 'true:': 0.16; 'work.)': 0.16; 'size,': 0.16; 'wrote:': 0.18; 'starts': 0.20; 'seems': 0.21; 'import': 0.22; 'otherwise,': 0.22; 'cc:addr:python.org': 0.22; 'instance,': 0.24; 'specify': 0.24; 'cc:2**0': 0.24; "i've": 0.25; 'pass': 0.26; 'post': 0.26; '(for': 0.26; 'gets': 0.27; 'header:In-Reply-To:1': 0.27; 'point': 0.28; 'michael': 0.29; 'message- id:@mail.gmail.com': 0.30; "i'm": 0.30; 'code': 0.31; '(since': 0.31; 'block,': 0.31; 'fine,': 0.31; 'class': 0.32; 'worked': 0.33; 'fri,': 0.33; 'skip:# 10': 0.33; 'actual': 0.34; 'skip:_ 10': 0.34; "i'd": 0.34; "can't": 0.35; 'something': 0.35; 'but': 0.35; 'received:google.com': 0.35; 'really': 0.36; 'complete.': 0.36; 'done,': 0.36; 'doing': 0.36; 'possible': 0.36; 'should': 0.36; 'changing': 0.37; 'wrong': 0.37; 'operating': 0.37; 'being': 0.38; 'ends': 0.38; 'process,': 0.38; 'tasks': 0.38; 'pm,': 0.38; 'that,': 0.38; 'though,': 0.39; 'sure': 0.39; 'enough': 0.39; 'skip:p 20': 0.39; 'how': 0.40; 'results.': 0.60; 'up,': 0.60; 'most': 0.60; 'hope': 0.61; 'break': 0.61; 'till': 0.61; 'took': 0.61; 'simple': 0.61; "you're": 0.61; 'first': 0.61; 'taking': 0.65; 'size.': 0.65; 'tasks.': 0.68; 'results': 0.69; 'default': 0.69; 'low': 0.83; '2015': 0.84; 'done:': 0.84; 'flag.': 0.84; 'ridiculously': 0.84; 'welle': 0.84; 'forever.': 0.91; 'magical': 0.91; 'to:none': 0.92 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:cc :content-type; bh=ajH2p3osI468JlvWlzjSz2P97dNvVu8Cdgom5u7phQA=; b=Bo5wpEthfKDdQdJGFobwS/r4dk4lYVsP9O+DtXmgPy4ftQ+HiyCc2Tqh6pqe4DTQJO 3gdd9RyQ1wMwimKwj6bFUuagqobw6tNbDVtAi8irAW2tMXawE3CMj9/fKW+TWBx0cs3j +BjbVDzphMh6p0GeiFNcv/BROE5fejbMgpCG8PQ9GQ07Kyz0nVn8V1BeoOkkS63ppsP0 wFi2MMCsdg4/vphZCKggfpNQziuZHnwNOX+KCRO6vqCcqqOGKY2Cgbyw9slwVJrNQ/Wj jakobkJCNslMSo8zDM2tjPhmgrXM/DDlZaOj7X/UZ/IfxFGESn730zps+7kZ9VZkx8GP ugbA== MIME-Version: 1.0 X-Received: by 10.42.43.199 with SMTP id y7mr4174010ice.12.1431092087242; Fri, 08 May 2015 06:34:47 -0700 (PDT) In-Reply-To: References: Date: Fri, 8 May 2015 23:34:47 +1000 Subject: Re: multiprocessing, queue From: Chris Angelico Cc: "python-list@python.org" Content-Type: text/plain; charset=UTF-8 X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.20+ Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Newsgroups: comp.lang.python Message-ID: Lines: 147 NNTP-Posting-Host: 2001:888:2000:d::a6 X-Trace: 1431092090 news.xs4all.nl 2942 [2001:888:2000:d::a6]:49458 X-Complaints-To: abuse@xs4all.nl Xref: csiph.com comp.lang.python:90169 On Fri, May 8, 2015 at 8:08 PM, Michael Welle wrote: > Hello, > > what's wrong with [0]? As num_tasks gets higher proc.join() seems to > block forever. First I thought the magical frontier is around 32k tasks, > but then it seemed to work with 40k tasks. Now I'm stuck around 7k > tasks. I think I do something fundamentally wrong, but I can't find it. > > Regards > hmw > > [0] http://pastebin.com/adfBYgY9 Your code's small enough to include inline, so I'm doing that: #!/usr/bin/python3 # -*- coding: utf-8 -*- from multiprocessing import Process, Queue class Foo(Process): def __init__(self, task_queue, result_queue): Process.__init__(self) self.result_queue = result_queue self.task_queue = task_queue def run(self): while True: n = self.task_queue.get() if n is None: break self.result_queue.put(1) return def main(): results = Queue() tasks = Queue() procs = [] num_procs = 8 num_tasks = 8000 for i in range(num_procs): proc = Foo(tasks, results) procs.append(proc) for proc in procs: proc.start() for i in range(num_tasks): tasks.put(i) for i in range(num_procs): tasks.put(None) for proc in procs: print("join") proc.join() while not results.empty(): result = results.get() print('Result: {}'.format(result)) if __name__ == '__main__': main() # -- end of code -- First thing I'd look at is the default queue size. If your result queue fills up, all processes will block until something starts retrieving results. If you really want to have all your results stay in the queue like that, you may need to specify a huge queue size, which may cost you a lot of memory; much better would be to have each job post something on the result queue when it's done, and then you wait till they're all done: from multiprocessing import Process, Queue def foo(task_queue, result_queue): while True: n = task_queue.get() if n is None: break result_queue.put(1) # Make sure None is not a possible actual result # Otherwise, create an object() to use as a flag. result_queue.put(None) def feed_tasks(num_tasks, num_procs, tasks): for i in range(num_tasks): tasks.put(i) for i in range(num_procs): tasks.put(None) def main(): results = Queue() tasks = Queue() num_procs = 8 num_tasks = 8000 procs = [Process(target=foo, args=(tasks, results)) for i in range(num_procs)] for proc in procs: proc.start() Process(target=feed_tasks, args=(num_tasks, num_procs, tasks)).start() while num_procs: result = results.get() if result is None: num_procs -= 1 else: print('Result: {}'.format(result)) for proc in procs: print("join") proc.join() if __name__ == '__main__': main() I've also made a few other changes (for instance, no need to subclass Process just to pass args), but the most important parts are a result_queue.put() just before the process ends, and switching the order of the result-queue-pump and process-join loops. That still might block, though, at the point where the tasks are being put onto the queue; so I've spun that off into its own process. (It might not be necessary, depending on how your tasks work.) But I tested this on 200,000 tasks (with the printing of results replaced with a simple counter), and it worked fine, churning through the work in about ten seconds. As a general rule, queues need to have both ends operating simultaneously, otherwise you're likely to have them blocking. In theory, your code should all work with ridiculously low queue sizes; the only cost will be concurrency (since you'd forever be waiting for the queue, so your tasks will all be taking turns). I tested this by changing the Queue() calls to Queue(1), and the code took about twice as long to complete. :) Hope that helps! ChrisA