Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #105346 > unrolled thread

multiprocessing, pool, queue length

Started byMichael Welle <mwe012008@gmx.net>
First post2016-03-21 11:25 +0100
Last post2016-03-22 07:19 +0100
Articles 5 — 2 participants

Back to article view | Back to comp.lang.python


Contents

  multiprocessing, pool, queue length Michael Welle <mwe012008@gmx.net> - 2016-03-21 11:25 +0100
    Re: multiprocessing, pool, queue length Ian Kelly <ian.g.kelly@gmail.com> - 2016-03-21 13:12 -0600
      Re: multiprocessing, pool, queue length Michael Welle <mwe012008@gmx.net> - 2016-03-21 20:46 +0100
        Re: multiprocessing, pool, queue length Ian Kelly <ian.g.kelly@gmail.com> - 2016-03-21 15:24 -0600
          Re: multiprocessing, pool, queue length Michael Welle <mwe012008@gmx.net> - 2016-03-22 07:19 +0100

#105346 — multiprocessing, pool, queue length

FromMichael Welle <mwe012008@gmx.net>
Date2016-03-21 11:25 +0100
Subjectmultiprocessing, pool, queue length
Message-ID<0qu4scx89u.ln2@news.c0t0d0s0.de>
Hello,

I use a multiprocessing pool. My producer calls pool.map_async()
to fill the pool's job queue. It can do that quite fast, while the
consumer processes need much more time to empty the job queue. Since the
producer can create a lot of jobs, I thought about asking the pool for
the amount of jobs it has in its queue and then only produce more jobs
if the current value is below a threshold. It seems like the pool
doesn't want to tell me the level of the queue, does it? What is a
better strategy to solve this problem? Implementing a pool around
multiprocessing's Process and Queue?

Regards
hmw

[toc] | [next] | [standalone]


#105393

FromIan Kelly <ian.g.kelly@gmail.com>
Date2016-03-21 13:12 -0600
Message-ID<mailman.456.1458587577.12893.python-list@python.org>
In reply to#105346
On Mon, Mar 21, 2016 at 4:25 AM, Michael Welle <mwe012008@gmx.net> wrote:
> Hello,
>
> I use a multiprocessing pool. My producer calls pool.map_async()
> to fill the pool's job queue. It can do that quite fast, while the
> consumer processes need much more time to empty the job queue. Since the
> producer can create a lot of jobs, I thought about asking the pool for
> the amount of jobs it has in its queue and then only produce more jobs
> if the current value is below a threshold. It seems like the pool
> doesn't want to tell me the level of the queue, does it? What is a
> better strategy to solve this problem? Implementing a pool around
> multiprocessing's Process and Queue?

A simple solution would be to have a shared multiprocessing.Value that
tracks how many items are in the pool. Whenever the producer produces
items it increments the Value, and whenever a consumer finishes a job
it decrements the Value.

An alternative solution that doesn't require adding a small amount of
work to every job would be to have the producer add a sentinel task
that does nothing at or near the end of the batch, and either wait on
the result or check it periodically. When it's done, then the pool is
low enough to add more jobs.

[toc] | [prev] | [next] | [standalone]


#105398

FromMichael Welle <mwe012008@gmx.net>
Date2016-03-21 20:46 +0100
Message-ID<olv5scxgnt.ln2@news.c0t0d0s0.de>
In reply to#105393
Hello,

Ian Kelly <ian.g.kelly@gmail.com> writes:

> On Mon, Mar 21, 2016 at 4:25 AM, Michael Welle <mwe012008@gmx.net> wrote:
>> Hello,
>>
>> I use a multiprocessing pool. My producer calls pool.map_async()
>> to fill the pool's job queue. It can do that quite fast, while the
>> consumer processes need much more time to empty the job queue. Since the
>> producer can create a lot of jobs, I thought about asking the pool for
>> the amount of jobs it has in its queue and then only produce more jobs
>> if the current value is below a threshold. It seems like the pool
>> doesn't want to tell me the level of the queue, does it? What is a
>> better strategy to solve this problem? Implementing a pool around
>> multiprocessing's Process and Queue?
>
> A simple solution would be to have a shared multiprocessing.Value that
> tracks how many items are in the pool. Whenever the producer produces
> items it increments the Value, and whenever a consumer finishes a job
> it decrements the Value.
I thought about that, but it doesn't feel 'right'.


> An alternative solution that doesn't require adding a small amount of
> work to every job would be to have the producer add a sentinel task
> that does nothing at or near the end of the batch, and either wait on
> the result or check it periodically. When it's done, then the pool is
> low enough to add more jobs.
Wait on the result means to set a multiprocessing.Event if one of the
consumers finds the sentinel task and wait for it on the producer? Hmm,
that might be better than incrementing a counter. But still, it couples
the consumers and the producer more than I like.

Another idea that I had is to use map() instead of map_async() and then
put the producer in its own process. That should work if job creation is
fast. 

Regards
hmw

[toc] | [prev] | [next] | [standalone]


#105409

FromIan Kelly <ian.g.kelly@gmail.com>
Date2016-03-21 15:24 -0600
Message-ID<mailman.465.1458595524.12893.python-list@python.org>
In reply to#105398
On Mon, Mar 21, 2016 at 1:46 PM, Michael Welle <mwe012008@gmx.net> wrote:
> Wait on the result means to set a multiprocessing.Event if one of the
> consumers finds the sentinel task and wait for it on the producer? Hmm,
> that might be better than incrementing a counter. But still, it couples
> the consumers and the producer more than I like.

No, I mean calling AsyncResult.wait() on the result of the sentinel
task (or just calling Pool.apply instead of Pool.apply_async in the
first place).

> Another idea that I had is to use map() instead of map_async() and then
> put the producer in its own process. That should work if job creation is
> fast.

Essentially the same thing.

[toc] | [prev] | [next] | [standalone]


#105445

FromMichael Welle <mwe012008@gmx.net>
Date2016-03-22 07:19 +0100
Message-ID<mo47scx1sv.ln2@news.c0t0d0s0.de>
In reply to#105409
Hello,

Ian Kelly <ian.g.kelly@gmail.com> writes:

> On Mon, Mar 21, 2016 at 1:46 PM, Michael Welle <mwe012008@gmx.net> wrote:
>> Wait on the result means to set a multiprocessing.Event if one of the
>> consumers finds the sentinel task and wait for it on the producer? Hmm,
>> that might be better than incrementing a counter. But still, it couples
>> the consumers and the producer more than I like.
>
> No, I mean calling AsyncResult.wait() on the result of the sentinel
> task (or just calling Pool.apply instead of Pool.apply_async in the
> first place).
ah, I see.

Thanks
hmw

[toc] | [prev] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web