Path: csiph.com!v102.xanadu-bbs.net!xanadu-bbs.net!feeder.erje.net!eu.feeder.erje.net!feeds.phibee-telecom.net!newsfeed.xs4all.nl!newsfeed2.news.xs4all.nl!xs4all!newsgate.cistron.nl!newsgate.news.xs4all.nl!post.news.xs4all.nl!not-for-mail Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.046 X-Spam-Evidence: '*H*': 0.91; '*S*': 0.00; 'explicit': 0.07; 'returned.': 0.07; 'iterate': 0.09; 'subject:skip:c 10': 0.09; 'def': 0.12; '12:57': 0.16; 'elsewhere.': 0.16; 'executor': 0.16; 'from:addr:andrew': 0.16; 'generator.': 0.16; 'guessing': 0.16; 'iterable': 0.16; 'iterable,': 0.16; 'itertools': 0.16; 'messy': 0.16; 'proceeds': 0.16; 'uncommon': 0.16; 'discussions': 0.16; 'sender:addr:gmail.com': 0.17; 'wrote:': 0.18; 'seems': 0.21; 'memory': 0.22; 'example': 0.22; 'import': 0.22; 'header:User- Agent:1': 0.23; 'helpful': 0.24; '>': 0.26; 'header:In-Reply- To:1': 0.27; 'tried': 0.27; 'testing': 0.29; 'andrew': 0.30; 'involving': 0.30; "i'm": 0.30; 'code': 0.31; 'submitting': 0.31; '100000': 0.31; 'complete,': 0.31; 'fine,': 0.31; 'skip:c 30': 0.32; 'run': 0.32; 'worked': 0.33; 'could': 0.34; 'problem': 0.35; "can't": 0.35; 'offered': 0.35; 'something': 0.35; 'good.': 0.35; 'but': 0.35; 'received:google.com': 0.35; 'there': 0.35; 'done,': 0.36; 'yield': 0.36; 'done': 0.36; 'should': 0.36; 'message- id:@gmail.com': 0.38; 'thank': 0.38; 'problems': 0.38; 'follows:': 0.38; 'tasks': 0.38; 'handle': 0.38; 'to:addr:python-list': 0.38; 'pm,': 0.38; 'though,': 0.39; 'to:addr:python.org': 0.39; 'how': 0.40; 'ian': 0.60; 'results.': 0.60; 'most': 0.60; 'entire': 0.61; 'first': 0.61; 'you.': 0.62; 'show': 0.63; 'such': 0.63; 'more': 0.64; 'bridge': 0.65; 'between': 0.67; 'benefit': 0.68; 'tasks.': 0.68; 'results': 0.69; 'futures?': 0.84; 'gap': 0.84; 'think:': 0.84; 'toy': 0.84; 'trick,': 0.84; 'workers,': 0.84; 'lazy': 0.91 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:message-id:date:from:user-agent:mime-version:to:subject :references:in-reply-to:content-type; bh=G2D7YNJXIz0GxipnanZh9DJzpwhHpV7D1Tkr5uPsP6E=; b=qaebk2w6eROlpRIDR1bU2rRNARrFInFZ2INv0BzNYmeQxBai3PeOSiCzEogfHj9az1 XETMCap8TIefmxuwdk/HIRcnen+kDcEHyS4yyNdnauT/Tyae4DQoaXdJwajt0Bpv+ofp y9i1npJJtnl7/+NOHOAgk70+I+baq6FNG3rovwT0kbSFaOLRXGeSRp7yJMOx7KBhOpyC DKR/HVxT3wPGnMzcSjQy87PRpceDx7eDwcMbqBYk7mtGL4i+dOVdYdmtcppRVYFkSeQE DRTaocltfEimGhBG8+hG5Cil99QuGnOHvKLNsA8neehnE7PCICNDL3R7IAGqQyyZhLg6 7bvQ== X-Received: by 10.194.57.38 with SMTP id f6mr4233790wjq.59.1400011767342; Tue, 13 May 2014 13:09:27 -0700 (PDT) Sender: Andrew McLean X-Google-Original-From: Andrew McLean Date: Tue, 13 May 2014 21:09:28 +0100 From: Andrew McLean User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:24.0) Gecko/20100101 Thunderbird/24.5.0 MIME-Version: 1.0 To: python-list@python.org Subject: Re: Real-world use of concurrent.futures References: <536BD338.4070004@andros.org.uk> In-Reply-To: Content-Type: multipart/alternative; boundary="------------000406050508010900070909" X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Newsgroups: comp.lang.python Message-ID: Lines: 165 NNTP-Posting-Host: 2001:888:2000:d::a6 X-Trace: 1400011769 news.xs4all.nl 2976 [2001:888:2000:d::a6]:48764 X-Complaints-To: abuse@xs4all.nl Xref: csiph.com comp.lang.python:71510 This is a multi-part message in MIME format. --------------000406050508010900070909 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit On 08/05/2014 21:44, Ian Kelly wrote: > On May 8, 2014 12:57 PM, "Andrew McLean" > wrote: > > So far so good. However, I thought this would be an opportunity to > > explore concurrent.futures and to see whether it offered any benefits > > over the more explicit approach discussed above. The problem I am having > > is that all the discussions I can find of the use of concurrent.futures > > show use with toy problems involving just a few tasks. The url > > downloader in the documentation is typical, it proceeds as follows: > > > > 1. Get an instance of concurrent.futuresThreadPoolExecutor > > 2. Submit a few tasks to the executer > > 3. Iterate over the results using concurrent.futures.as_completed > > > > That's fine, but I suspect that isn't a helpful pattern if I have a very > > large number of tasks. In my case I could run out of memory if I tried > > submitting all of the tasks to the executor before processing any of the > > results. > > I thought that ThreadPoolExecutor.map would handle this transparently > if you passed it a lazy iterable such as a generator. From my testing > though, that seems not to be the case; with a generator of 100000 > items and a pool of 2 workers, the entire generator was consumed > before any results were returned. > > > I'm guessing what I want to do is, submit tasks in batches of perhaps a > > few hundred, iterate over the results until most are complete, then > > submit some more tasks and so on. I'm struggling to see how to do this > > elegantly without a lot of messy code just there to do "bookkeeping". > > This can't be an uncommon scenario. Am I missing something, or is this > > just not a job suitable for futures? > > I don't think it needs to be "messy". Something like this should do > the trick, I think: > > from concurrent.futures import * > from itertools import islice > > def batched_pool_runner(f, iterable, pool, batch_size): > it = iter(iterable) > # Submit the first batch of tasks. > futures = set(pool.submit(f, x) for x in islice(it, batch_size)) > while futures: > done, futures = wait(futures, return_when=FIRST_COMPLETED) > # Replenish submitted tasks up to the number that completed. > futures.update(pool.submit(f, x) for x in islice(it, len(done))) > yield from done That worked very nicely, thank you. I think that would make a good recipe, whether for the documentation or elsewhere. I suspect I'm not the only person that would benefit from something to bridge the gap between a toy example and something practical. Andrew --------------000406050508010900070909 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit
On 08/05/2014 21:44, Ian Kelly wrote:
On May 8, 2014 12:57 PM, "Andrew McLean" <lists@andros.org.uk> wrote:
> So far so good. However, I thought this would be an opportunity to
> explore concurrent.futures and to see whether it offered any benefits
> over the more explicit approach discussed above. The problem I am having
> is that all the discussions I can find of the use of concurrent.futures
> show use with toy problems involving just a few tasks. The url
> downloader in the documentation is typical, it proceeds as follows:
>
> 1. Get an instance of concurrent.futuresThreadPoolExecutor
> 2. Submit a few tasks to the executer
> 3. Iterate over the results using concurrent.futures.as_completed
>
> That's fine, but I suspect that isn't a helpful pattern if I have a very
> large number of tasks. In my case I could run out of memory if I tried
> submitting all of the tasks to the executor before processing any of the
> results.

I thought that ThreadPoolExecutor.map would handle this transparently if you passed it a lazy iterable such as a generator.  From my testing though, that seems not to be the case; with a generator of 100000 items and a pool of 2 workers, the entire generator was consumed before any results were returned.

> I'm guessing what I want to do is, submit tasks in batches of perhaps a
> few hundred, iterate over the results until most are complete, then
> submit some more tasks and so on. I'm struggling to see how to do this
> elegantly without a lot of messy code just there to do "bookkeeping".
> This can't be an uncommon scenario. Am I missing something, or is this
> just not a job suitable for futures?

I don't think it needs to be "messy". Something like this should do the trick, I think:

from concurrent.futures import *
from itertools import islice

def batched_pool_runner(f, iterable, pool, batch_size):
  it = iter(iterable)
  # Submit the first batch of tasks.
  futures = set(pool.submit(f, x) for x in islice(it, batch_size))
  while futures:
    done, futures = wait(futures, return_when=FIRST_COMPLETED)
    # Replenish submitted tasks up to the number that completed.
    futures.update(pool.submit(f, x) for x in islice(it, len(done)))
    yield from done

That worked very nicely, thank you.  I think that would make a good recipe, whether for the documentation or elsewhere. I suspect I'm not the only person that would benefit from something to bridge the gap between a toy example and something practical.

Andrew



--------------000406050508010900070909--