Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #27660 > unrolled thread

How to properly implement worker processes

Started byDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
First post2012-08-22 10:29 -0700
Last post2012-08-22 16:09 -0400
Articles 8 — 3 participants

Back to article view | Back to comp.lang.python


Contents

  How to properly implement worker processes Dennis Jacobfeuerborn <djacobfeuerborn@gmail.com> - 2012-08-22 10:29 -0700
    Re: How to properly implement worker processes Ian Kelly <ian.g.kelly@gmail.com> - 2012-08-22 11:46 -0600
      Re: How to properly implement worker processes Dennis Jacobfeuerborn <djacobfeuerborn@gmail.com> - 2012-08-22 12:40 -0700
      Re: How to properly implement worker processes Dennis Jacobfeuerborn <djacobfeuerborn@gmail.com> - 2012-08-22 12:40 -0700
        Re: How to properly implement worker processes Ian Kelly <ian.g.kelly@gmail.com> - 2012-08-22 15:15 -0600
          Re: How to properly implement worker processes Dennis Jacobfeuerborn <djacobfeuerborn@gmail.com> - 2012-08-22 19:28 -0700
          Re: How to properly implement worker processes Dennis Jacobfeuerborn <djacobfeuerborn@gmail.com> - 2012-08-22 19:28 -0700
    Re: How to properly implement worker processes Dennis Lee Bieber <wlfraed@ix.netcom.com> - 2012-08-22 16:09 -0400

#27660 — How to properly implement worker processes

FromDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
Date2012-08-22 10:29 -0700
SubjectHow to properly implement worker processes
Message-ID<db59479c-198f-468d-9e73-40b1b992895c@googlegroups.com>
Hi,
I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.

The plan is to spawn a worker process for each URL that sends the status of the last check to its parent which keeps track of the state of all URLs. When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.

What I've been going for so far is that the parent process creates a global queue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue is created that the parent uses to issue commands to the child.

The issue is that since the child processes spent some time in sleep() when a command from the parent comes they cannot respond immediately which is rather undesirable. What I would rather like to do is have the parent simply kill the child instead which is instantaneous and more reliable.

My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.

How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?

Regards,
  Dennis

[toc] | [next] | [standalone]


#27665

FromIan Kelly <ian.g.kelly@gmail.com>
Date2012-08-22 11:46 -0600
Message-ID<mailman.3673.1345657626.4697.python-list@python.org>
In reply to#27660
On Wed, Aug 22, 2012 at 11:29 AM, Dennis Jacobfeuerborn
<djacobfeuerborn@gmail.com> wrote:
> Hi,
> I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.
>
> The plan is to spawn a worker process for each URL that sends the status of the last check to its parent which keeps track of the state of all URLs. When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.
>
> What I've been going for so far is that the parent process creates a global queue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue is created that the parent uses to issue commands to the child.
>
> The issue is that since the child processes spent some time in sleep() when a command from the parent comes they cannot respond immediately which is rather undesirable. What I would rather like to do is have the parent simply kill the child instead which is instantaneous and more reliable.
>
> My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.
>
> How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?

The usual approach to killing worker processes safely is to send them
an "exit" command, which they should respond to by terminating
cleanly.  Instead of using sleep(), have the workers do a blocking
get() on the queue with a timeout.  This way they'll receive the
"exit" message immediately as desired, but they'll still wake up at
the desired intervals in order to do their work.

[toc] | [prev] | [next] | [standalone]


#27679

FromDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
Date2012-08-22 12:40 -0700
Message-ID<mailman.3684.1345664434.4697.python-list@python.org>
In reply to#27665
On Wednesday, August 22, 2012 7:46:34 PM UTC+2, Ian wrote:
> On Wed, Aug 22, 2012 at 11:29 AM, Dennis Jacobfeuerborn
> 
> <djacobfeuerborn@gmail.com> wrote:
> 
> > Hi,
> 
> > I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.
> 
> >
> 
> > The plan is to spawn a worker process for each URL that sends the status of the last check to its parent which keeps track of the state of all URLs. When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.
> 
> >
> 
> > What I've been going for so far is that the parent process creates a global queue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue is created that the parent uses to issue commands to the child.
> 
> >
> 
> > The issue is that since the child processes spent some time in sleep() when a command from the parent comes they cannot respond immediately which is rather undesirable. What I would rather like to do is have the parent simply kill the child instead which is instantaneous and more reliable.
> 
> >
> 
> > My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.
> 
> >
> 
> > How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?
> 
> 
> 
> The usual approach to killing worker processes safely is to send them
> 
> an "exit" command, which they should respond to by terminating
> 
> cleanly.  Instead of using sleep(), have the workers do a blocking
> 
> get() on the queue with a timeout.  This way they'll receive the
> 
> "exit" message immediately as desired, but they'll still wake up at
> 
> the desired intervals in order to do their work.

I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.

The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.

Regards,
  Dennis

[toc] | [prev] | [next] | [standalone]


#27681

FromDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
Date2012-08-22 12:40 -0700
Message-ID<f1de8e1a-d836-4741-8c06-2e6adabbeed5@googlegroups.com>
In reply to#27665
On Wednesday, August 22, 2012 7:46:34 PM UTC+2, Ian wrote:
> On Wed, Aug 22, 2012 at 11:29 AM, Dennis Jacobfeuerborn
> 
> <djacobfeuerborn@gmail.com> wrote:
> 
> > Hi,
> 
> > I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.
> 
> >
> 
> > The plan is to spawn a worker process for each URL that sends the status of the last check to its parent which keeps track of the state of all URLs. When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.
> 
> >
> 
> > What I've been going for so far is that the parent process creates a global queue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue is created that the parent uses to issue commands to the child.
> 
> >
> 
> > The issue is that since the child processes spent some time in sleep() when a command from the parent comes they cannot respond immediately which is rather undesirable. What I would rather like to do is have the parent simply kill the child instead which is instantaneous and more reliable.
> 
> >
> 
> > My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.
> 
> >
> 
> > How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?
> 
> 
> 
> The usual approach to killing worker processes safely is to send them
> 
> an "exit" command, which they should respond to by terminating
> 
> cleanly.  Instead of using sleep(), have the workers do a blocking
> 
> get() on the queue with a timeout.  This way they'll receive the
> 
> "exit" message immediately as desired, but they'll still wake up at
> 
> the desired intervals in order to do their work.

I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.

The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.

Regards,
  Dennis

[toc] | [prev] | [next] | [standalone]


#27685

FromIan Kelly <ian.g.kelly@gmail.com>
Date2012-08-22 15:15 -0600
Message-ID<mailman.3688.1345670143.4697.python-list@python.org>
In reply to#27681
On Wed, Aug 22, 2012 at 1:40 PM, Dennis Jacobfeuerborn
<djacobfeuerborn@gmail.com> wrote:
> I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.

At a lower level, it is possible to poll on both the pipe and the
socket simultaneously.  At this point though you might want to start
looking at an asynchronous or event-driven framework like twisted or
gevent.

> Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.

I think you mean that it is the most "reliable" way.  In general, the
only "safe" way to cause a process to exit is the cooperative
approach, because it may otherwise leave external resources such as
file data in an unexpected state that could cause problems later.

> The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.

Assuming that you're using a Unix system:

from select import select

while True:
    ready, _, _ = select(pipes, [], [], timeout)
    if not ready:
        # process timeout
    else:
        for pipe in ready:
            message = pipe.get()
            # process message

[toc] | [prev] | [next] | [standalone]


#27691

FromDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
Date2012-08-22 19:28 -0700
Message-ID<b78aca97-555b-4abc-b3de-59eed5df42bb@googlegroups.com>
In reply to#27685
On Wednesday, August 22, 2012 11:15:10 PM UTC+2, Ian wrote:
> On Wed, Aug 22, 2012 at 1:40 PM, Dennis Jacobfeuerborn
> 
> <djacobfeuerborn@gmail.com> wrote:
> 
> > I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
> 
> 
> 
> At a lower level, it is possible to poll on both the pipe and the
> 
> socket simultaneously.  At this point though you might want to start
> 
> looking at an asynchronous or event-driven framework like twisted or
> 
> gevent.
> 

I was looking at twisted and while the Agent class would allow me to make async request it doesn't seem to support setting a timeout or aborting the running request. That's really the important part since the http request is really the only thing that might block for a while. If I can make the request asynchronously and abort it when I receive a QUIT command from the parent then this would pretty much solve the issue.

> 
> > Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.
> 
> 
> 
> I think you mean that it is the most "reliable" way.  In general, the
> 
> only "safe" way to cause a process to exit is the cooperative
> 
> approach, because it may otherwise leave external resources such as
> 
> file data in an unexpected state that could cause problems later.
> 

True but the child is doing nothing but making http requests and reporting the result to the parent so killing the process shouldn't be too much of a deal in this case. A segfault in an Apache worker process is very similar in that it's an uncontrolled termination of the process and that works out fine.

> 
> > The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.
> 
> 
> 
> Assuming that you're using a Unix system:
> 
> 
> 
> from select import select
> 
> 
> 
> while True:
> 
>     ready, _, _ = select(pipes, [], [], timeout)
> 
>     if not ready:
> 
>         # process timeout
> 
>     else:
> 
>         for pipe in ready:
> 
>             message = pipe.get()
> 
>             # process message

That looks like a workable solution. When I decide to kill a worker process I can remove the pipe from the pipes list and discard it since it's not shared.

Regards,
  Dennis

[toc] | [prev] | [next] | [standalone]


#27692

FromDennis Jacobfeuerborn <djacobfeuerborn@gmail.com>
Date2012-08-22 19:28 -0700
Message-ID<mailman.3692.1345688905.4697.python-list@python.org>
In reply to#27685
On Wednesday, August 22, 2012 11:15:10 PM UTC+2, Ian wrote:
> On Wed, Aug 22, 2012 at 1:40 PM, Dennis Jacobfeuerborn
> 
> <djacobfeuerborn@gmail.com> wrote:
> 
> > I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
> 
> 
> 
> At a lower level, it is possible to poll on both the pipe and the
> 
> socket simultaneously.  At this point though you might want to start
> 
> looking at an asynchronous or event-driven framework like twisted or
> 
> gevent.
> 

I was looking at twisted and while the Agent class would allow me to make async request it doesn't seem to support setting a timeout or aborting the running request. That's really the important part since the http request is really the only thing that might block for a while. If I can make the request asynchronously and abort it when I receive a QUIT command from the parent then this would pretty much solve the issue.

> 
> > Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.
> 
> 
> 
> I think you mean that it is the most "reliable" way.  In general, the
> 
> only "safe" way to cause a process to exit is the cooperative
> 
> approach, because it may otherwise leave external resources such as
> 
> file data in an unexpected state that could cause problems later.
> 

True but the child is doing nothing but making http requests and reporting the result to the parent so killing the process shouldn't be too much of a deal in this case. A segfault in an Apache worker process is very similar in that it's an uncontrolled termination of the process and that works out fine.

> 
> > The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.
> 
> 
> 
> Assuming that you're using a Unix system:
> 
> 
> 
> from select import select
> 
> 
> 
> while True:
> 
>     ready, _, _ = select(pipes, [], [], timeout)
> 
>     if not ready:
> 
>         # process timeout
> 
>     else:
> 
>         for pipe in ready:
> 
>             message = pipe.get()
> 
>             # process message

That looks like a workable solution. When I decide to kill a worker process I can remove the pipe from the pipes list and discard it since it's not shared.

Regards,
  Dennis

[toc] | [prev] | [next] | [standalone]


#27682

FromDennis Lee Bieber <wlfraed@ix.netcom.com>
Date2012-08-22 16:09 -0400
Message-ID<mailman.3685.1345666196.4697.python-list@python.org>
In reply to#27660
On Wed, 22 Aug 2012 10:29:49 -0700 (PDT), Dennis Jacobfeuerborn
<djacobfeuerborn@gmail.com> declaimed the following in
gmane.comp.python.general:

> How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?
> 

	How much actual processing is done during the "check"? 

	Your description makes it sound like these are I/O bound operations
(combined with sleep()) -- and plain old threading tends to work fine
for I/O bound systems.

	If you used threading, you could signal a thread to die via simply
setting a property in the thread:

	t[x].die = True.

The thread would wrap a loop of the form:

while not self.die:
	#do URL check
	resque.put(status of check)
	time.sleep()

Thereby doing away with many of your queues -- you'd only need the
result/status queue, and the thread would only exit at a clean point.

	The next step up, depending on the overhead of spawning processes,
would be to still use control threads and a local result queue, but have
"do URL check" create the check process each time -- you could probably
use "proc.communicate()" to obtain the status via the process stdout
[and pass the URL via stdin]. The rest of the control thread remains the
same.

	The third step: Still use controller threads, but the controller
thread would create a queue pair (to-process, from-process) on
initialization, and then spawn the process. You might even be able to
remove the time.sleep() from the thread level. Actually, checking the
docs, forget about the Queue... Use a Pipe

self.from, self.to = multiprocessing.Pipe()
self.p = multiprocessing.Process(target = worker, 
				args = (self.from, self.to, URL) )
workerdead = False
while not self.die:
	try:
		status = self.from.recv()	#blocks until data
		resque.put(status)	#local Queue collecting results
	except EOFError:
		# whatever for unexpected shutdown
		workerdead = True
if not workerdead:
	self.to.send("SHUTDOWN")
	while True:
		status = self.from.recv()
		if status == "SHUTTING DOWN": break
		resque.put(status)	#might have had a last cycle


	The worker should poll rather than sleep.

def worker(pto, pfrom, URL):	#note reverse of to/from connections
	while True:
		#do URL check
		pto.send(status)
		data = pfrom.poll(1.0)		#sleep until command or time-out
		if data:
			command = pfrom.recv()
			if command == "SHUTDOWN":
				pto.send("SHUTTING DOWN")
				break
-- 
	Wulfraed                 Dennis Lee Bieber         AF6VN
        wlfraed@ix.netcom.com    HTTP://wlfraed.home.netcom.com/

[toc] | [prev] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web