Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #92883 > unrolled thread

Catching exceptions with multi-processing

Started byFabien <fabien.maussion@gmail.com>
First post2015-06-19 16:01 +0200
Last post2015-06-21 13:27 -0700
Articles 11 — 7 participants

Back to article view | Back to comp.lang.python


Contents

  Catching exceptions with multi-processing Fabien <fabien.maussion@gmail.com> - 2015-06-19 16:01 +0200
    Re: Catching exceptions with multi-processing Andres Riancho <andres.riancho@gmail.com> - 2015-06-19 11:25 -0300
      Re: Catching exceptions with multi-processing Fabien <fabien.maussion@gmail.com> - 2015-06-19 18:16 +0200
        Re: Catching exceptions with multi-processing Cameron Simpson <cs@zip.com.au> - 2015-06-20 13:14 +1000
          Re: Catching exceptions with multi-processing Fabien <fabien.maussion@gmail.com> - 2015-06-20 11:03 +0200
    Re: Catching exceptions with multi-processing Oscar Benjamin <oscar.j.benjamin@gmail.com> - 2015-06-19 16:01 +0100
    Re: Catching exceptions with multi-processing Steven D'Aprano <steve@pearwood.info> - 2015-06-20 01:41 +1000
      Re: Catching exceptions with multi-processing Fabien <fabien.maussion@gmail.com> - 2015-06-19 18:07 +0200
      Re: Catching exceptions with multi-processing Chris Angelico <rosuav@gmail.com> - 2015-06-20 06:58 +1000
        Re: Catching exceptions with multi-processing Fabien <fabien.maussion@gmail.com> - 2015-06-20 11:01 +0200
    Re: Catching exceptions with multi-processing Paul Rubin <no.email@nospam.invalid> - 2015-06-21 13:27 -0700

#92883 — Catching exceptions with multi-processing

FromFabien <fabien.maussion@gmail.com>
Date2015-06-19 16:01 +0200
SubjectCatching exceptions with multi-processing
Message-ID<mm17as$i8m$1@speranza.aioe.org>
Folks,

I am developing a tool which works on individual entities (glaciers) and 
do a lot of operations on them. There are many tasks to do, one after 
each other, and each task follows the same interface:

def task_1(path_to_glacier_dir):
     open file1 in path_to_glacier_dir
     do stuff
     if dont_work:
         raise RuntimeError("didnt work")
     write file2 in path_to_glacier_dir

This way, the tasks can be run in parallel very easily:

import multiprocessing as mp
pool = mp.Pool(4)

dirs = [list_of_dirs]
pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

... and so forth. I tested the tool for about a hundred glaciers but now 
it has to run for thousands of them. There are going to be errors, some 
of them are even expected for special outliers. What I would like the 
tool to do is that in case of error, it writes the identifier of the 
problematic glacier somewhere, the error encountered and more info if 
possible. Because of multiprocessing, I can't write in a shared file, so 
I thought that the individual processes should write a unique "error 
file" in a dedicated directory.

What I don't know how to, however, is how to do this at minimal cost and 
in a generic way for all tasks. Also, the task2 should not be run if 
task1 threw an error. Sometimes (for debugging), I'd rather keep the 
normal behavior of raising an error and stopping the program.

Do I have to wrap all tasks with a "try: exept:" block? How to switch 
between behaviors? All the solutions I could think about look quite ugly 
to me. And it seems that this is a general problem that someone cleverer 
than me had solved before ;-)

Thanks,

Fabien






[toc] | [next] | [standalone]


#92885

FromAndres Riancho <andres.riancho@gmail.com>
Date2015-06-19 11:25 -0300
Message-ID<mailman.637.1434723958.13271.python-list@python.org>
In reply to#92883
Fabien,

   My recommendation is that you should pass some extra arguments to the task:
    * A unique task id
    * A result multiprocessing.Queue

    When an exception is raised you put (unique_id, exception) to the
queue. When it succeeds you put (unique_id, None). In the main process
you consume the queue and do your error handling.

    Note that some exceptions can't be serialized, there is where
tblib [0] comes handy.

[0] https://pypi.python.org/pypi/tblib

Regards,

On Fri, Jun 19, 2015 at 11:01 AM, Fabien <fabien.maussion@gmail.com> wrote:
> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and do
> a lot of operations on them. There are many tasks to do, one after each
> other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>     open file1 in path_to_glacier_dir
>     do stuff
>     if dont_work:
>         raise RuntimeError("didnt work")
>     write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but now it
> has to run for thousands of them. There are going to be errors, some of them
> are even expected for special outliers. What I would like the tool to do is
> that in case of error, it writes the identifier of the problematic glacier
> somewhere, the error encountered and more info if possible. Because of
> multiprocessing, I can't write in a shared file, so I thought that the
> individual processes should write a unique "error file" in a dedicated
> directory.
>
> What I don't know how to, however, is how to do this at minimal cost and in
> a generic way for all tasks. Also, the task2 should not be run if task1
> threw an error. Sometimes (for debugging), I'd rather keep the normal
> behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite ugly to
> me. And it seems that this is a general problem that someone cleverer than
> me had solved before ;-)
>
> Thanks,
>
> Fabien
>
>
>
>
>
>
>
> --
> https://mail.python.org/mailman/listinfo/python-list



-- 
Andrés Riancho
Project Leader at w3af - http://w3af.org/
Web Application Attack and Audit Framework
Twitter: @w3af
GPG: 0x93C344F3

[toc] | [prev] | [next] | [standalone]


#92891

FromFabien <fabien.maussion@gmail.com>
Date2015-06-19 18:16 +0200
Message-ID<mm1f9d$751$1@speranza.aioe.org>
In reply to#92885
On 06/19/2015 04:25 PM, Andres Riancho wrote:
> Fabien,
>
>     My recommendation is that you should pass some extra arguments to the task:
>      * A unique task id
>      * A result multiprocessing.Queue
>
>      When an exception is raised you put (unique_id, exception) to the
> queue. When it succeeds you put (unique_id, None). In the main process
> you consume the queue and do your error handling.
>
>      Note that some exceptions can't be serialized, there is where
> tblib [0] comes handy.
>
> [0]https://pypi.python.org/pypi/tblib
>
> Regards,

Thanks, I wasn't aware of the multiprocessing.Queue workflow. It seems 
like its going to require some changes in the actual code of the tasks 
though. Did I get it right that I should stop raising exceptions then?

Something like:

def task_1(path, q):
     # Do stuffs
     if dont_work:
         q.put(RuntimeError("didnt work"))
	return
     # finished
     q.put(None)
     return


Fabien




[toc] | [prev] | [next] | [standalone]


#92900

FromCameron Simpson <cs@zip.com.au>
Date2015-06-20 13:14 +1000
Message-ID<mailman.649.1434771749.13271.python-list@python.org>
In reply to#92891
On 19Jun2015 18:16, Fabien <fabien.maussion@gmail.com> wrote:
>On 06/19/2015 04:25 PM, Andres Riancho wrote:
>>    My recommendation is that you should pass some extra arguments to the task:
>>     * A unique task id
>>     * A result multiprocessing.Queue
>>
>>     When an exception is raised you put (unique_id, exception) to the
>>queue. When it succeeds you put (unique_id, None). In the main process
>>you consume the queue and do your error handling.
>>
>>     Note that some exceptions can't be serialized, there is where
>>tblib [0] comes handy.
>>
>>[0]https://pypi.python.org/pypi/tblib
>>
>>Regards,
>
>Thanks, I wasn't aware of the multiprocessing.Queue workflow. It seems 
>like its going to require some changes in the actual code of the tasks 
>though. Did I get it right that I should stop raising exceptions then?
>
>Something like:
>
>def task_1(path, q):
>    # Do stuffs
>    if dont_work:
>        q.put(RuntimeError("didnt work"))
>	return
>    # finished
>    q.put(None)
>    return

I would keep your core logic Pythonic, raise exceptions. But I would wrap each 
task in something to catch any Exception subclass and report back to the queue.  
Untested example:

  def subwrapper(q, callable, *args, **kwargs):
    try:
      q.put( ('COMPLETED', callable(*args, **kwargs)) )
    except Exception as e:
      q.put( ('FAILED', e, callable, args, kwargs) )

then dispatch tasks like this:

  pool.map(subwrapper, q, task1, dirs, chunksize=1)

and have a thread (or main program) collect things from the queue for logging 
and other handling. Obviously you might return something more sophisticated 
that my simple tuple above, but I'm sure you get the idea.

Cheers,
Cameron Simpson <cs@zip.com.au>

He's silly and he's ignorant, but he's got guts, and guts is enough.
        - Sgt. Hartmann

[toc] | [prev] | [next] | [standalone]


#92907

FromFabien <fabien.maussion@gmail.com>
Date2015-06-20 11:03 +0200
Message-ID<mm3a9l$eb$2@speranza.aioe.org>
In reply to#92900
On 06/20/2015 05:14 AM, Cameron Simpson wrote:
> I would keep your core logic Pythonic, raise exceptions. But I would
> wrap each task in something to catch any Exception subclass and report
> back to the queue. Untested example:
>
>   def subwrapper(q, callable, *args, **kwargs):
>     try:
>       q.put( ('COMPLETED', callable(*args, **kwargs)) )
>     except Exception as e:
>       q.put( ('FAILED', e, callable, args, kwargs) )
>
> then dispatch tasks like this:
>
>   pool.map(subwrapper, q, task1, dirs, chunksize=1)
>
> and have a thread (or main program) collect things from the queue for
> logging and other handling. Obviously you might return something more
> sophisticated that my simple tuple above, but I'm sure you get the idea.
>
> Cheers,
> Cameron Simpson

Perfect! Much more elegant and easier to implement on top of my existing 
workflow based on raising exceptions.

thanks to all responses,

Fabien


[toc] | [prev] | [next] | [standalone]


#92887

FromOscar Benjamin <oscar.j.benjamin@gmail.com>
Date2015-06-19 16:01 +0100
Message-ID<mailman.639.1434726105.13271.python-list@python.org>
In reply to#92883
On 19 June 2015 at 15:01, Fabien <fabien.maussion@gmail.com> wrote:
> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and do
> a lot of operations on them. There are many tasks to do, one after each
> other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>     open file1 in path_to_glacier_dir
>     do stuff
>     if dont_work:
>         raise RuntimeError("didnt work")
>     write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but now it
> has to run for thousands of them. There are going to be errors, some of them
> are even expected for special outliers. What I would like the tool to do is
> that in case of error, it writes the identifier of the problematic glacier
> somewhere, the error encountered and more info if possible. Because of
> multiprocessing, I can't write in a shared file, so I thought that the
> individual processes should write a unique "error file" in a dedicated
> directory.
>
> What I don't know how to, however, is how to do this at minimal cost and in
> a generic way for all tasks. Also, the task2 should not be run if task1
> threw an error. Sometimes (for debugging), I'd rather keep the normal
> behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite ugly to
> me. And it seems that this is a general problem that someone cleverer than
> me had solved before ;-)

A simple way to approach this could be something like:

#!/usr/bin/env python3

import math
import multiprocessing

def sqrt(x):
    if x < 0:
        return 'error', x
    else:
        return 'success', math.sqrt(x)

if __name__ == "__main__":
    numbers = [1, 2, 3, -1, -3]
    pool = multiprocessing.Pool()
    for ret, val in pool.imap(sqrt, numbers):
        if ret == 'error':
            raise ValueError(val)
        print(val)

Just replace the raise statement with whatever you want to do (write
to a file etc). Since all errors are handled in the master process
there are no issues with writing to a file.

--
Oscar

[toc] | [prev] | [next] | [standalone]


#92888

FromSteven D'Aprano <steve@pearwood.info>
Date2015-06-20 01:41 +1000
Message-ID<55843842$0$1644$c3e8da3$5496439d@news.astraweb.com>
In reply to#92883
On Sat, 20 Jun 2015 12:01 am, Fabien wrote:

> Folks,
> 
> I am developing a tool which works on individual entities (glaciers) and
> do a lot of operations on them. There are many tasks to do, one after
> each other, and each task follows the same interface:

I'm afraid your description is contradictory. Here you say the tasks run one
after another, but then you say:

> This way, the tasks can be run in parallel very easily:

and then later still you contradict this:

> Also, the task2 should not be run if task1 threw an error.


If task2 relies on task1, then you *cannot* run them in parallel. You have
to run them one after the other, sequentially.


You also ask:

> There are going to be errors, some 
> of them are even expected for special outliers. What I would like the 
> tool to do is that in case of error, it writes the identifier of the 
> problematic glacier somewhere, the error encountered and more info if 
> possible. Because of multiprocessing, I can't write in a shared file, so 
> I thought that the individual processes should write a unique "error 
> file" in a dedicated directory.

The documentation for the logging module has examples of using
multiprocessing write to a single log file from multiple processes. It's a
bit complicated, since *directly* writing to a single log from multiple
processes is not supported, but it is possible.

https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes


Or if you are on a Unix or Linux system, you can log to syslog and let
syslog handle it.

Since your sample code appears to have a lot of file I/O, it may be that you
can use threads rather than multiprocessing. That would allow all the
threads to communicate with a single thread that handles logging.

Or use a lock file:

http://stackoverflow.com/questions/1444790/python-module-for-creating-pid-based-lockfile



-- 
Steven

[toc] | [prev] | [next] | [standalone]


#92890

FromFabien <fabien.maussion@gmail.com>
Date2015-06-19 18:07 +0200
Message-ID<mm1enf$66l$1@speranza.aioe.org>
In reply to#92888
On 06/19/2015 05:41 PM, Steven D'Aprano wrote:
> On Sat, 20 Jun 2015 12:01 am, Fabien wrote:
>
>> >Folks,
>> >
>> >I am developing a tool which works on individual entities (glaciers) and
>> >do a lot of operations on them. There are many tasks to do, one after
>> >each other, and each task follows the same interface:
> I'm afraid your description is contradictory. Here you say the tasks run one
> after another, but then you say:
>
>> >This way, the tasks can be run in parallel very easily:
> and then later still you contradict this:
>
>> >Also, the task2 should not be run if task1 threw an error.
>
> If task2 relies on task1, then you*cannot*  run them in parallel. You have
> to run them one after the other, sequentially.

Hi Steve,

I meant: "for several glaciers in parallel" as shown by the code snippet:

import multiprocessing as mp
pool = mp.Pool(4)

dirs = [list_of_dirs]
pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

which should be changed to something like (after some of the responses):

dirs = [list_of_dirs]
pool.map(task1, dirs, ...)
# handle exceptions
dirs_reduced = [dirs that did not fail]
pool.map(task2, dirs_reduced, ...)

this way the tasks are run sequentially for each glacier but in parallel 
over a list of glaciers...

Fabien

[toc] | [prev] | [next] | [standalone]


#92898

FromChris Angelico <rosuav@gmail.com>
Date2015-06-20 06:58 +1000
Message-ID<mailman.647.1434747496.13271.python-list@python.org>
In reply to#92888
On Sat, Jun 20, 2015 at 1:41 AM, Steven D'Aprano <steve@pearwood.info> wrote:
> On Sat, 20 Jun 2015 12:01 am, Fabien wrote:
>
>> Folks,
>>
>> I am developing a tool which works on individual entities (glaciers) and
>> do a lot of operations on them. There are many tasks to do, one after
>> each other, and each task follows the same interface:
>
> I'm afraid your description is contradictory. Here you say the tasks run one
> after another, but then you say:
>
>> This way, the tasks can be run in parallel very easily:
>
> and then later still you contradict this:
>
>> Also, the task2 should not be run if task1 threw an error.
>
>
> If task2 relies on task1, then you *cannot* run them in parallel. You have
> to run them one after the other, sequentially.

AIUI what he's doing is all the subparts of task1 in parallel, then
all the subparts of task2:

pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

task1 can be done on all of dirs in parallel, as no instance of task1
depends on any other instance of task1; but task2 should be started
only if all task1s finish successfully. OP, is this how it is? If not,
I apologize for the noise.

ChrisA

[toc] | [prev] | [next] | [standalone]


#92906

FromFabien <fabien.maussion@gmail.com>
Date2015-06-20 11:01 +0200
Message-ID<mm3a5l$eb$1@speranza.aioe.org>
In reply to#92898
On 06/19/2015 10:58 PM, Chris Angelico wrote:
> AIUI what he's doing is all the subparts of task1 in parallel, then
> all the subparts of task2:
>
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> task1 can be done on all of dirs in parallel, as no instance of task1
> depends on any other instance of task1; but task2 should be started
> only if all task1s finish successfully. OP, is this how it is? If not,
> I apologize for the noise.

That's it! Thanks for clarifying, I might have trouble explaining myself 
sometimes ;-)

Fabien

[toc] | [prev] | [next] | [standalone]


#92972

FromPaul Rubin <no.email@nospam.invalid>
Date2015-06-21 13:27 -0700
Message-ID<87fv5kiz77.fsf@jester.gateway.sonic.net>
In reply to#92883
Fabien <fabien.maussion@gmail.com> writes:
> I am developing a tool which works on individual entities (glaciers)
> and do a lot of operations on them. There are many tasks to do, one
> after each other, and each task follows the same interface: ...

If most of the resources will be spent on computation and the
communications overhead is fairly low, the path of least resistance may
be to:

1) write a script that computes just one glacier (no multiprocessing)
2) write a control script that runs the glacier script through something
   like os.popen(), so normally it will collect an answer, but it can
   also notice if the glacier script crashes, or kill it from a timeout
   if it takes too long
3) Track the glacier tasks in an external queue server: I've used Redis
   (redis.io) for this, since it's simple and powerful, but there are
   other tools like 0mq that might be more precisely fitted.
4) The control script can read the queue server for tasks and update the
   queue server when results are ready

The advantages of this over multiprocessing are:

1) Redis is a TCP server which means you can spread your compute scripts
over multiple computers easily, getting more parallelism.  You can write
values into it as JSON strings if they are compound values that are not
too large.  Otherwise you probably have to use files, but can pass the
filenames through Redis.  You can connect new clients whenever you want
through the publish/subscribe interface, etc.

2) by using a simple control script you don't have to worry too much
about the many ways that the computation script might fail, you can
restart it, you can put the whole thing under your favorite supervision
daemon (cron, upstart, systemd or whatever) so it can restart
automatically even if your whole computer reboots, etc.  Redis can even
mirror itself to a failover server in real time if you think you need
that, plus it can checkpoint its state to disk.

[toc] | [prev] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web