Path: csiph.com!v102.xanadu-bbs.net!xanadu-bbs.net!news.albasani.net!feeder.erje.net!eu.feeder.erje.net!newsfeed.xs4all.nl!newsfeed1a.news.xs4all.nl!xs4all!newsgate.cistron.nl!newsgate.news.xs4all.nl!post.news.xs4all.nl!not-for-mail Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.069 X-Spam-Evidence: '*H*': 0.86; '*S*': 0.00; 'url:pipermail': 0.05; 'python3': 0.07; 'tests.': 0.07; 'already.': 0.09; 'def': 0.12; '1),': 0.16; 'badly.': 0.16; 'crawling': 0.16; 'flow.': 0.16; 'invocations': 0.16; 'modification': 0.16; 'trying': 0.19; '<': 0.19; 'proposed': 0.22; 'propose': 0.24; 'task': 0.26; 'this:': 0.26; 'to:2**1': 0.27; 'function': 0.29; 'appreciated.': 0.29; "we'd": 0.29; "doesn't": 0.30; '8bit%:3': 0.30; 'cool': 0.30; 'network.': 0.30; 'message-id:@mail.gmail.com': 0.30; 'page.': 0.31; 'easy,': 0.31; 'run': 0.32; 'url:python': 0.33; 'subject:the': 0.34; "i'd": 0.34; 'could': 0.34; 'subject:with': 0.35; 'something': 0.35; 'form.': 0.35; 'test': 0.35; 'but': 0.35; 'received:google.com': 0.35; '8bit%:9': 0.36; 'leads': 0.36; 'done': 0.36; 'url:org': 0.36; 'two': 0.37; 'starting': 0.37; 'skip:o 20': 0.38; 'tasks': 0.38; 'needed': 0.38; 'to:addr:python- list': 0.38; 'skip:& 20': 0.39; 'to:addr:python.org': 0.39; 'url:mail': 0.40; 'completed': 0.61; 'new': 0.61; 'here:': 0.62; 'kind': 0.63; 'real': 0.63; 'our': 0.64; 'to:addr:gmail.com': 0.65; 'natural': 0.68; 'flood': 0.68; 'ineffective': 0.84; 'progression.': 0.84; 'spawned': 0.84; 'subject:&': 0.84; 'fibonacci': 0.91; 'thing,': 0.91 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:from:date:message-id:subject:to:content-type; bh=izF6Q50XkH48ZC9vWdqf7bHp9WYNAsbO3Ia4hpw5zbY=; b=EsAejh2hiY1l/AesrMikAaCpw7DTeG7CKz6dU7B2ynGF+vw7rwmFvID7epFRmhhCoY 7/nAHWfSg6UoYnNHMt2feSPvAaYmSZxXs5JWLNWY50d7nz7cF2QntmeVx2yllBhgoUYa Ucigsdq6/mBp/dIPt95YQDI7u5tIeYvacqQ3gXOGZEhlc0P3KWFW+0LDr9+Dmv4WdrkZ y1U4ndYhHrFZXe7RJGW2WLQX2seGmw3o23WCUK9etzu1N2iBWuDpZ2YhAndrU0sYdO6k P7Zss/18AluAglW+LhmoF3ZhKbbLUG1i43xkuqfv8j1bHOY8fChajmDZMP97mmYvFMIv WTpg== X-Received: by 10.182.142.67 with SMTP id ru3mr4403227obb.15.1407074522054; Sun, 03 Aug 2014 07:02:02 -0700 (PDT) MIME-Version: 1.0 From: Valery Khamenya Date: Sun, 3 Aug 2014 16:01:41 +0200 Subject: asyncio with map&reduce flavor and without flooding the event loop To: python-list@python.org, Maxime Steisel Content-Type: multipart/alternative; boundary=001a11c2ecea0f650904ffba11b8 X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Newsgroups: comp.lang.python Message-ID: Lines: 121 NNTP-Posting-Host: 2001:888:2000:d::a6 X-Trace: 1407074531 news.xs4all.nl 2973 [2001:888:2000:d::a6]:37663 X-Complaints-To: abuse@xs4all.nl Xref: csiph.com comp.lang.python:75598 --001a11c2ecea0f650904ffba11b8 Content-Type: text/plain; charset=UTF-8 Hi all I am trying to use asyncio in real applications and it doesn't go that easy, a help of asyncio gurus is needed badly. Consider a task like crawling the web starting from some web-sites. Each site leads to generation of new downloading tasks in exponential(!) progression. However we don't want neither to flood the event loop nor to overload our network. We'd like to control the task flow. This is what I achieve well with modification of nice Maxime's solution proposed here: https://mail.python.org/pipermail/python-list/2014-July/675048.html Well, but I'd need as well a very natural thing, kind of map() & reduce() or functools.reduce() if we are on python3 already. That is, I'd need to call a "summarizing" function for all the downloading tasks completed on links from a page. This is where i fail :( I'd propose an oversimplified but still a nice test to model the use case: Let's use fibonacci function implementation in its ineffective form. That is, let the coro_sum() be our reduce() function and coro_fib be our map(). Something like this: @asyncio.coroutine def coro_sum(x): return sum(x) @asyncio.coroutine def coro_fib(x): if x < 2: return 1 res_coro = executor_pool.spawn_task_when_arg_list_of_coros_ready(coro=coro_sum, arg_coro_list=[coro_fib(x - 1), coro_fib(x - 2)]) return res_coro So that we could run the following tests. Test #1 on one worker: executor_pool = ExecutorPool(workers=1) executor_pool.as_completed( coro_fib(x) for x in range(20) ) Test #2 on two workers: executor_pool = ExecutorPool(workers=2) executor_pool.as_completed( coro_fib(x) for x in range(20) ) It would be very important that both each coro_fib() and coro_sum() invocations are done via a Task on some worker, not just spawned implicitly and unmanaged! It would be cool to find asyncio gurus interested in this very natural goal. Your help and ideas would be very much appreciated. best regards -- Valery --001a11c2ecea0f650904ffba11b8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi all

I am trying to use asyncio in re= al applications and it doesn't go that easy, a help of asyncio gurus is= needed badly.

Consider a task like crawling the w= eb starting from some web-sites. Each site leads to generation of new downl= oading tasks in exponential(!) progression. However we don't want neith= er to flood the event loop nor to overload our network. We'd like to co= ntrol the task flow. This is what I achieve well with modification of nice = Maxime's solution proposed here:

Well, but I'd need as well a very n= atural thing, kind of map() & reduce() or=C2=A0functools.reduce() if we= are on python3 already. That is, I'd need to call a "summarizing&= quot; function for all the downloading tasks completed on links from a page= . This is where i fail :(

I'd propose an oversimplified but still a nice test= to model the use case:
Let's use fibonacci function implemen= tation in its ineffective form.
That is, let the coro_sum() be ou= r reduce() function and coro_fib be our map().
Something like this:

@asyncio.coroutine<= /div>
def coro_sum(x):
=C2=A0 =C2=A0 return sum(x)
=
@asyncio.coroutine
def coro_fib(x):=C2=A0
=C2=A0 =C2=A0 if x < 2:
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return 1
=C2=A0 =C2=A0 res_coro = =3D executor_pool.spawn_task_when_arg_list_of_coros_ready(coro=3Dcoro_sum,<= /div>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0arg_coro_list=3D[coro_fib(x - 1), coro_fib(x - 2)])
=C2=A0 =C2=A0 return res_coro
=C2=A0
So that we could run the following tests.

Test #1= on one worker:

=C2=A0 executor_pool =3D ExecutorP= ool(workers=3D1)
=C2=A0 executor_pool.as_completed( coro_fib(x) for x in range(20) )

Test #2 on two workers:
=C2=A0 ex= ecutor_pool =3D ExecutorPool(workers=3D2)
=C2=A0 executor_pool.as= _completed( coro_fib(x) for x in range(20) )

It would be very important that both each c= oro_fib() and coro_sum() invocations are done via a Task on some worker, no= t just spawned implicitly and unmanaged!

It would = be cool to find asyncio gurus interested in this very natural goal.
Your help and ideas would be very much appreciated.

best regards
--
Valery=C2=A0
--001a11c2ecea0f650904ffba11b8--