Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #75598 > unrolled thread

asyncio with map&reduce flavor and without flooding the event loop

Started byValery Khamenya <khamenya@gmail.com>
First post2014-08-03 16:01 +0200
Last post2014-08-03 16:01 +0200
Articles 1 — 1 participant

Back to article view | Back to comp.lang.python


Contents

  asyncio with map&reduce flavor and without flooding the event loop Valery Khamenya <khamenya@gmail.com> - 2014-08-03 16:01 +0200

#75598 — asyncio with map&reduce flavor and without flooding the event loop

FromValery Khamenya <khamenya@gmail.com>
Date2014-08-03 16:01 +0200
Subjectasyncio with map&reduce flavor and without flooding the event loop
Message-ID<mailman.12584.1407074531.18130.python-list@python.org>

[Multipart message — attachments visible in raw view] — view raw

Hi all

I am trying to use asyncio in real applications and it doesn't go that
easy, a help of asyncio gurus is needed badly.

Consider a task like crawling the web starting from some web-sites. Each
site leads to generation of new downloading tasks in exponential(!)
progression. However we don't want neither to flood the event loop nor to
overload our network. We'd like to control the task flow. This is what I
achieve well with modification of nice Maxime's solution proposed here:
https://mail.python.org/pipermail/python-list/2014-July/675048.html

Well, but I'd need as well a very natural thing, kind of map() & reduce()
or functools.reduce() if we are on python3 already. That is, I'd need to
call a "summarizing" function for all the downloading tasks completed on
links from a page. This is where i fail :(

I'd propose an oversimplified but still a nice test to model the use case:
Let's use fibonacci function implementation in its ineffective form.
That is, let the coro_sum() be our reduce() function and coro_fib be our
map().
Something like this:

@asyncio.coroutine
def coro_sum(x):
    return sum(x)

@asyncio.coroutine
def coro_fib(x):
    if x < 2:
        return 1
    res_coro =
executor_pool.spawn_task_when_arg_list_of_coros_ready(coro=coro_sum,

 arg_coro_list=[coro_fib(x - 1), coro_fib(x - 2)])
    return res_coro

So that we could run the following tests.

Test #1 on one worker:

  executor_pool = ExecutorPool(workers=1)
  executor_pool.as_completed( coro_fib(x) for x in range(20) )

Test #2 on two workers:
  executor_pool = ExecutorPool(workers=2)
  executor_pool.as_completed( coro_fib(x) for x in range(20) )

It would be very important that both each coro_fib() and coro_sum()
invocations are done via a Task on some worker, not just spawned implicitly
and unmanaged!

It would be cool to find asyncio gurus interested in this very natural goal.
Your help and ideas would be very much appreciated.

best regards
--
Valery

[toc] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web