Path: csiph.com!v102.xanadu-bbs.net!xanadu-bbs.net!goblin2!goblin.stu.neva.ru!newsfeed.xs4all.nl!newsfeed3.news.xs4all.nl!xs4all!post.news.xs4all.nl!not-for-mail Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.044 X-Spam-Evidence: '*H*': 0.91; '*S*': 0.00; 'explicit': 0.07; 'returned.': 0.07; 'iterate': 0.09; 'subject:skip:c 10': 0.09; 'def': 0.12; '12:57': 0.16; 'executor': 0.16; 'generator.': 0.16; 'guessing': 0.16; 'iterable': 0.16; 'iterable,': 0.16; 'itertools': 0.16; 'messy': 0.16; 'proceeds': 0.16; 'uncommon': 0.16; 'discussions': 0.16; 'wrote:': 0.18; 'seems': 0.21; 'memory': 0.22; 'import': 0.22; 'helpful': 0.24; '>': 0.26; 'header:In-Reply-To:1': 0.27; 'tried': 0.27; 'testing': 0.29; 'involving': 0.30; 'message-id:@mail.gmail.com': 0.30; "i'm": 0.30; 'code': 0.31; 'submitting': 0.31; '100000': 0.31; 'complete,': 0.31; 'fine,': 0.31; 'skip:c 30': 0.32; 'run': 0.32; 'could': 0.34; 'problem': 0.35; "can't": 0.35; 'offered': 0.35; 'something': 0.35; 'good.': 0.35; 'but': 0.35; 'received:google.com': 0.35; 'there': 0.35; 'done,': 0.36; 'yield': 0.36; 'done': 0.36; 'should': 0.36; 'skip:& 10': 0.38; 'problems': 0.38; 'follows:': 0.38; 'tasks': 0.38; 'handle': 0.38; 'to:addr:python-list': 0.38; 'pm,': 0.38; 'skip:& 20': 0.39; 'though,': 0.39; 'to:addr:python.org': 0.39; 'how': 0.40; 'results.': 0.60; 'most': 0.60; 'entire': 0.61; 'first': 0.61; 'show': 0.63; 'such': 0.63; 'more': 0.64; 'tasks.': 0.68; 'results': 0.69; 'skip:r 30': 0.69; 'futures?': 0.84; 'think:': 0.84; 'toy': 0.84; 'trick,': 0.84; 'workers,': 0.84; 'lazy': 0.91 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=yuseXlByqxgrOB/B7plXp0cAIWY5phUEq5nVwjUXGZs=; b=TD53JmkI88HX3KJM6CaAOFEUBxyeZKxyCApTPQMfkzkmR5hiMa2XEioDcWiA5K3Vxq Ybki51SiB0WeCoXeAGXkEcY+FRgxMnwUql1oZb6iyUGq33uTllV+7tUwUOPFtywD/JC1 UIb9shOdWGAdz47vQu467bNKkHKLM8NCkDeQAKSZ2e1UnVhNjHugbhR+wJRQkP1xl8KV 6SPT4mnGkdcHvJbcdeLNp0/F0dSaVBnTdfpjrtIKtiYT/PYP4BnklpFNzFXSEtTmCGcF PXcR9zBS9kicdruMmQ7cEoovhTEz9OHWWaUcvs3qqozSWDBC1vCZuAKlPA+0dAMvzll6 D/Lw== X-Received: by 10.66.240.70 with SMTP id vy6mr11760573pac.80.1399581908907; Thu, 08 May 2014 13:45:08 -0700 (PDT) MIME-Version: 1.0 In-Reply-To: <536BD338.4070004@andros.org.uk> References: <536BD338.4070004@andros.org.uk> From: Ian Kelly Date: Thu, 8 May 2014 14:44:28 -0600 Subject: Re: Real-world use of concurrent.futures To: Python Content-Type: multipart/alternative; boundary=047d7b15aba983be7d04f8e98e4c X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Newsgroups: comp.lang.python Message-ID: Lines: 104 NNTP-Posting-Host: 2001:888:2000:d::a6 X-Trace: 1399581918 news.xs4all.nl 2878 [2001:888:2000:d::a6]:38541 X-Complaints-To: abuse@xs4all.nl Xref: csiph.com comp.lang.python:71129 --047d7b15aba983be7d04f8e98e4c Content-Type: text/plain; charset=UTF-8 On May 8, 2014 12:57 PM, "Andrew McLean" wrote: > So far so good. However, I thought this would be an opportunity to > explore concurrent.futures and to see whether it offered any benefits > over the more explicit approach discussed above. The problem I am having > is that all the discussions I can find of the use of concurrent.futures > show use with toy problems involving just a few tasks. The url > downloader in the documentation is typical, it proceeds as follows: > > 1. Get an instance of concurrent.futuresThreadPoolExecutor > 2. Submit a few tasks to the executer > 3. Iterate over the results using concurrent.futures.as_completed > > That's fine, but I suspect that isn't a helpful pattern if I have a very > large number of tasks. In my case I could run out of memory if I tried > submitting all of the tasks to the executor before processing any of the > results. I thought that ThreadPoolExecutor.map would handle this transparently if you passed it a lazy iterable such as a generator. From my testing though, that seems not to be the case; with a generator of 100000 items and a pool of 2 workers, the entire generator was consumed before any results were returned. > I'm guessing what I want to do is, submit tasks in batches of perhaps a > few hundred, iterate over the results until most are complete, then > submit some more tasks and so on. I'm struggling to see how to do this > elegantly without a lot of messy code just there to do "bookkeeping". > This can't be an uncommon scenario. Am I missing something, or is this > just not a job suitable for futures? I don't think it needs to be "messy". Something like this should do the trick, I think: from concurrent.futures import * from itertools import islice def batched_pool_runner(f, iterable, pool, batch_size): it = iter(iterable) # Submit the first batch of tasks. futures = set(pool.submit(f, x) for x in islice(it, batch_size)) while futures: done, futures = wait(futures, return_when=FIRST_COMPLETED) # Replenish submitted tasks up to the number that completed. futures.update(pool.submit(f, x) for x in islice(it, len(done))) yield from done --047d7b15aba983be7d04f8e98e4c Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
On May 8, 2014 12:57 PM, "Andrew McLean" <lists@andros.org.uk> wrote:
&= gt; So far so good. However, I thought this would be an opportunity to
&= gt; explore concurrent.futures and to see whether it offered any benefits > over the more explicit approach discussed above. The problem I am havi= ng
> is that all the discussions I can find of the use of concurrent.= futures
> show use with toy problems involving just a few tasks. The = url
> downloader in the documentation is typical, it proceeds as follows:>
> 1. Get an instance of concurrent.futuresThreadPoolExecutor> 2. Submit a few tasks to the executer
> 3. Iterate over the res= ults using concurrent.futures.as_completed
>
> That's fine, but I suspect that isn't a helpful patter= n if I have a very
> large number of tasks. In my case I could run ou= t of memory if I tried
> submitting all of the tasks to the executor = before processing any of the
> results.

I thought that ThreadPoolExecutor.map would handle thi= s transparently if you passed it a lazy iterable such as a generator. =C2= =A0From my testing though, that seems not to be the case; with a generator = of 100000 items and a pool of 2 workers, the entire generator was consumed = before any results were returned.

> I'm guessing what I want to do is, submit tasks in batches of = perhaps a
> few hundred, iterate over the results until most are comp= lete, then
> submit some more tasks and so on. I'm struggling to = see how to do this
> elegantly without a lot of messy code just there to do "bookkeepi= ng".
> This can't be an uncommon scenario. Am I missing some= thing, or is this
> just not a job suitable for futures?

I don't think it needs to be "messy". Something like this sho= uld do the trick, I think:

from concurrent.futures import *
from itertools import islice

def batched_pool_runner(f, iterable, pool, batch_size= ):
=C2=A0 it =3D iter(iterable)
=C2=A0 # Submit the fir= st batch of tasks.
=C2=A0 futures =3D set(pool.submit(f, x) for x in islice(it, batch_size))=C2=A0 while futures:
=C2=A0 =C2=A0 done, futures =3D wait(futures, re= turn_when=3DFIRST_COMPLETED)
=C2=A0 =C2=A0 # Replenish submitted = tasks up to the number that completed.
=C2=A0 =C2=A0 futures.update(pool.submit(f, x) for x in islice(it, len(done= )))
=C2=A0 =C2=A0 yield from done

--047d7b15aba983be7d04f8e98e4c--