Path: csiph.com!feeder.erje.net!2.eu.feeder.erje.net!newsreader4.netcologne.de!news.netcologne.de!fu-berlin.de!uni-berlin.de!not-for-mail From: Ian Kelly Newsgroups: comp.lang.python Subject: Re: Passing data across callbacks in ThreadPoolExecutor Date: Fri, 19 Feb 2016 11:32:27 -0700 Lines: 110 Message-ID: References: <63d71b83fd9c43868f6a97ecaa6af31c@activenetwerx.com> Mime-Version: 1.0 Content-Type: text/plain; charset=UTF-8 X-Trace: news.uni-berlin.de ZP3/8wi2g84FSVj8uwrH/AbRxax/l0xveVv9ZHDitE0w== Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.000 X-Spam-Evidence: '*H*': 1.00; '*S*': 0.00; 'received:209.85.223': 0.03; 'essentially': 0.04; '"c"': 0.07; 'caller': 0.07; 'continuation': 0.07; 'exception.': 0.07; 'raises': 0.07; 'valueerror:': 0.07; '"a"': 0.09; 'callback': 0.09; 'exception,': 0.09; 'tasks,': 0.09; 'url:github': 0.09; 'itself.': 0.11; 'def': 0.13; 'result.': 0.15; '"b"': 0.16; '2016': 0.16; '55,': 0.16; 'brand-new': 0.16; 'chained': 0.16; 'did.': 0.16; 'distinct': 0.16; 'executor': 0.16; 'failed.': 0.16; 'received:io': 0.16; 'received:psf.io': 0.16; 'reraise': 0.16; 'task.': 0.16; 'wrote:': 0.16; 'runs': 0.18; 'do.': 0.22; '"",': 0.22; 'arguments': 0.22; 'complete,': 0.22; 'decorator': 0.22; 'pass': 0.22; 'trying': 0.22; 'am,': 0.23; 'feb': 0.23; 'sets': 0.23; 'this:': 0.23; 'import': 0.24; '(most': 0.24; 'header:In-Reply-To:1': 0.24; "i've": 0.25; "doesn't": 0.26; 'fri,': 0.27; 'message- id:@mail.gmail.com': 0.27; 'cancel': 0.27; 'specifically': 0.28; 'function': 0.28; 'initial': 0.28; 'resolution': 0.28; 'looks': 0.29; 'about.': 0.29; 'raise': 0.29; 'allows': 0.30; 'work.': 0.30; 'task': 0.30; 'another': 0.32; "can't": 0.32; 'run': 0.33; 'problem': 0.33; 'apply,': 0.33; 'traceback': 0.33; 'file': 0.34; 'handle': 0.34; 'previous': 0.34; 'received:google.com': 0.35; 'skip:c 30': 0.35; 'clear': 0.35; 'set.': 0.35; 'skip:" 50': 0.35; 'tasks': 0.35; 'but': 0.36; 'instead': 0.36; 'received:209.85': 0.36; 'to:addr:python-list': 0.36; 'subject:: ': 0.37; 'really': 0.37; 'thanks': 0.37; 'suggestion': 0.37; 'received:209': 0.38; 'does': 0.39; "didn't": 0.39; 'submit': 0.39; 'skip:e 20': 0.39; 'well.': 0.40; 'to:addr:python.org': 0.40; 'still': 0.40; 'future': 0.60; 'waiting': 0.60; 'determine': 0.61; 'back': 0.62; 'course': 0.62; 'more': 0.63; 'different': 0.63; 'detail.': 0.66; 'promise': 0.66; 'subject:skip:T 10': 0.66; 'below.': 0.66; 'future.': 0.67; 'reply': 0.68; 'completed': 0.69; 'completes': 0.84; 'to:name:python': 0.84; 'abstracts': 0.91; 'do:': 0.91; 'url:2013': 0.91 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=8S7Bp5svJER4xWrP9ULUiLhv2DEjNXmu3TNoxpAADOM=; b=af1A7w11+WNDC9VCPYH4SunPSm+9lHFm4Vu0RFFe+0C5+WKQ6vhBl5shh6HPVOK0wy e+G6VnxCHmr+Gv3rgS3D2DGWz0fJnKgJnwmyOovsaOVjC76sXPbQed2eMZUlwXZFji4x NFvm/HHPgyfNgfZgpr1jrInjRzgyPML4zngjQKRLcslsgVaNQ1lYYgIe7EQmKErSsmnc DJFHVY/KRUmdvgT62EkCzti5o71LqsKX+LYvSwaPmYdQhS9ac+6XNz+v1TUtudQUSwG7 kdyqlcqea00EqXCG25sPhW22dJogKA1LTG/K6C6T4fCT4DnqJjsawNS7BMgXFiTFs+JX R7uA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=8S7Bp5svJER4xWrP9ULUiLhv2DEjNXmu3TNoxpAADOM=; b=SmvPEol3kN992BlxLN0KSKX29felAsXubxeeiUtq6wWfNcE1aOwWGLji0WomHvqJLs bs98YdAdZvIvD24wq2+OdhNBeYA+A2Wy/gsbzBOXT4kGNaHwU4orjIoJnzAkXWi3z62L Nu8rS0JoiG5wn80sb7fzqqi9zMY9WEzx5tDfGlmeW/jJa8qv2aG207r27yBj9byahVOI wlBoziyXM9EYBXQC4se8gObmNpUKLfNYMwu2MMg5mLbYMzkUEjvyJ8pibcMFK5XrV42R Uf1G6KD9YBD5wePcKZF9ZFGNlBM/Uj71nlwlVN6gI0ID3RidxEhD7FIA6s6RarGSmgOM rwSw== X-Gm-Message-State: AG10YOSMgkOM6APIuS72oHbP42BroSNGff7eTE0NNQEWrXDEEA8/Wqv2+C/WkUDM66jMushgnwXzO3Qf0k6rtw== X-Received: by 10.107.11.93 with SMTP id v90mr15745985ioi.188.1455906787359; Fri, 19 Feb 2016 10:33:07 -0800 (PST) In-Reply-To: <63d71b83fd9c43868f6a97ecaa6af31c@activenetwerx.com> X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.21rc2 Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Xref: csiph.com comp.lang.python:103216 On Fri, Feb 19, 2016 at 10:18 AM, Joseph L. Casale wrote: >> It's still not clear to me specifically what you're trying to do. It >> would really help if you would describe the problem in more detail. >> Here's what I think you're trying to do: >> >> 1) Submit a task to a ThreadPoolExecutor and get back a future. >> >> 2) When the task is complete, submit another task that needs the >> result of the first task to do its work. >> >> 3) When the chained task is complete, return the final result of the >> chained task back to whoever submitted the original task via the >> original task's future. >> >> The problem arises in that the original task's future already >> completed when the original task did. The chained task sets its result >> in a different future that the submitter didn't know about. > > Yes, I may have 2 or more tasks that depend on the previous. > They are each distinct tasks, or continuations, so they each want the result > of the previous task however each one can cancel the set. > > The callback model doesn't apply, its based on the result of _one_ task. > > What I need I are continuations, much like the .NET TPL. > > def task_a(): > return "a" > > def task_b(): > return "b" > > def task_c(): > return "c" > > So I submit task_a, if it completes successfully, task_b runs on its result. > If task_b completes successfully, task_c runs on its result. They "look" > like callbacks, but they are continuations. > > Task_b must be able to cancel the continuation of task_c if it see's task_a > has failed. Thanks for the clarification. The first suggestion that I gave in my initial reply will handle cancellations implicitly. If task_b() is waiting on future_a.result() and task_a() raises an uncaught exception, then future_a.result() will reraise the exception. If it then propagates uncaught out of task_b() then future_b.result() will again reraise the exception, and task_c() that is waiting on it will reraise it as well. See this in action below. You might also be interested in the promise decorator described at https://bloerg.net/2013/04/05/chaining-python-futures.html. It does essentially the same as what I've described, but it abstracts it into a decorator and allows the caller to determine which arguments to pass as futures and which to pass as concrete values, instead of hard-coding the resolution of futures of dependencies into the task function itself. In the course of digging that up, I also found this: https://github.com/dvdotsenko/python-future-then which is brand-new and looks promising, but I can't see how one would use it with a ThreadPoolExecutor. py> import concurrent.futures py> def task_a(): ... raise ValueError(42) ... py> def task_b(future_a): ... return future_a.result() ... py> def task_c(future_b): ... return future_b.result() ... py> executor = concurrent.futures.ThreadPoolExecutor() py> future_a = executor.submit(task_a) py> future_b = executor.submit(task_b, future_a) py> future_c = executor.submit(task_c, future_b) py> future_c.result() Traceback (most recent call last): File "", line 1, in File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "", line 2, in task_c File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "", line 2, in task_b File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "", line 2, in task_a ValueError: 42