Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #53736 > unrolled thread

Multiprocessing / threading confusion

Started byPaul Pittlerson <menkomigen6@gmail.com>
First post2013-09-05 12:27 -0700
Last post2013-09-06 17:15 -0400
Articles 12 — 6 participants

Back to article view | Back to comp.lang.python


Contents

  Multiprocessing / threading confusion Paul Pittlerson <menkomigen6@gmail.com> - 2013-09-05 12:27 -0700
    Re: Multiprocessing / threading confusion "marduk@python.net" <marduk@python.net> - 2013-09-05 18:28 -0400
      Re: Multiprocessing / threading confusion Paul Pittlerson <menkomigen6@gmail.com> - 2013-09-05 16:34 -0700
        Re: Multiprocessing / threading confusion Chris Angelico <rosuav@gmail.com> - 2013-09-06 13:00 +1000
    Re: Multiprocessing / threading confusion Chris Angelico <rosuav@gmail.com> - 2013-09-06 08:46 +1000
      Re: Multiprocessing / threading confusion Paul Pittlerson <menkomigen6@gmail.com> - 2013-09-05 17:03 -0700
        Re: Multiprocessing / threading confusion Piet van Oostrum <piet@vanoostrum.org> - 2013-09-05 23:54 -0400
          Re: Multiprocessing / threading confusion Piet van Oostrum <piet@vanoostrum.org> - 2013-09-06 00:28 -0400
    Re: Multiprocessing / threading confusion Paul Pittlerson <menkomigen6@gmail.com> - 2013-09-06 11:27 -0700
      Re: Multiprocessing / threading confusion Skip Montanaro <skip@pobox.com> - 2013-09-06 13:53 -0500
      Re: Multiprocessing / threading confusion Dave Angel <davea@davea.name> - 2013-09-06 20:34 +0000
      Re: Multiprocessing / threading confusion Piet van Oostrum <piet@vanoostrum.org> - 2013-09-06 17:15 -0400

#53736 — Multiprocessing / threading confusion

FromPaul Pittlerson <menkomigen6@gmail.com>
Date2013-09-05 12:27 -0700
SubjectMultiprocessing / threading confusion
Message-ID<ca7ea9d1-4dad-4a30-97b2-ad8536a1860b@googlegroups.com>
I'm trying to understand data handling using multiprocessing and threading, haven't gotten very far without running into problems. This is my code:

#!/usr/bin/python

from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import current_process

from threading import Thread

import time

def gogo(qu):
    w = Worker(qu)
    w.start()

class Worker(Thread):
    def __init__(self, Que):
        super (Worker, self).__init__()
        
        self._pid = current_process().pid  
        
        self.que = Que        
        self.que.put('started worker %s' % self._pid)
    
    def run(self):
        self.que.put('%s ticked' % self._pid)
    
    def __del__(self):
        self.que.put('%s has exited' % self._pid)

class Debugger(Thread):
    def __init__(self, q):
        super(Debugger, self).__init__()
        
        self.q = q
        
    def run(self):
        while True:
            time.sleep(1)
            
            if not self.q.empty():
                print self.q.get()
            
            else: break;

if __name__ == '__main__':
    
    debug_q = Queue()
    
    debug = Debugger(debug_q)
    debug.start()
    
    for i in range(5):
        d = Process(target=gogo, args=(debug_q,))
        d.start()

What I expect to happen is the Debugger object will receive one string at a time, and read it from the queue. But that's not what I see the the output, the "started worker" stuff seems to print for every process, but "ticked" and "exited" will show up in unpredictable ways, I'm guessing they overwrite each other and therefore will not always appear in the output.

So I'm looking for help in trying to make sense of this kind of stuff, I thought this was the basic functionality that Queue() would take care of unless there is some other problem in my code.

[toc] | [next] | [standalone]


#53743

From"marduk@python.net" <marduk@python.net>
Date2013-09-05 18:28 -0400
Message-ID<mailman.107.1378420122.5461.python-list@python.org>
In reply to#53736

On Thu, Sep 5, 2013, at 03:27 PM, Paul Pittlerson wrote:
> I'm trying to understand data handling using multiprocessing and
> threading, haven't gotten very far without running into problems. This is
> my code:

[snip (not sure why you are using multiprocessing and threading at the
same time]


> What I expect to happen is the Debugger object will receive one string at
> a time, and read it from the queue. But that's not what I see the the
> output, the "started worker" stuff seems to print for every process, but
> "ticked" and "exited" will show up in unpredictable ways, I'm guessing
> they overwrite each other and therefore will not always appear in the
> output.
> 
> So I'm looking for help in trying to make sense of this kind of stuff, I
> thought this was the basic functionality that Queue() would take care of
> unless there is some other problem in my code.

My output is probably totally different than your output.  I only get
the processes starting.  Here's why:  This stuff all runs
asynchronously.  When you start the "Debugger" thread.. I see you put a
sleep() in it, but that guarantees nothing.  At least on my machine
which is somewhat loaded ATM, by the time the Processes are started, the
Debugger thread has already finished (because of the check to see if the
queue was empty).  Apparently it is took longer than 1 second from the
time the Debugger was started and the first Process was started.
Likewise, what you are getting is probably a case where the queue is
momentarily empty by the time the debugger loop gets ahold of the queue
lock and checks to see if it's empty.  Therefore the Debugger quits. 
Also because of the asynchronicity of processes, threads, you can not
guarantee the order that the processes will get the opportunity to put()
into the queue. 

Also you can't (and shouldn't) depend on the time that __del__ gets
called.  It can get called at any time, in any order and sometimes not
at all.*

Hope this helps.

*
http://docs.python.org/3/reference/datamodel.html?highlight=__del__#object.__del__

[toc] | [prev] | [next] | [standalone]


#53745

FromPaul Pittlerson <menkomigen6@gmail.com>
Date2013-09-05 16:34 -0700
Message-ID<9d088493-8224-4a48-a2d3-5b7207dc8947@googlegroups.com>
In reply to#53743
On Friday, September 6, 2013 1:28:39 AM UTC+3, mar...@python.net wrote:

> Also you can't (and shouldn't) depend on the time that __del__ gets
> called.  It can get called at any time, in any order and sometimes not
> at all.*

Wow I did not know that! I was counting on that it reliably gets called when the object is destroyed.

[toc] | [prev] | [next] | [standalone]


#53752

FromChris Angelico <rosuav@gmail.com>
Date2013-09-06 13:00 +1000
Message-ID<mailman.111.1378436451.5461.python-list@python.org>
In reply to#53745
On Fri, Sep 6, 2013 at 9:34 AM, Paul Pittlerson <menkomigen6@gmail.com> wrote:
> On Friday, September 6, 2013 1:28:39 AM UTC+3, mar...@python.net wrote:
>
>> Also you can't (and shouldn't) depend on the time that __del__ gets
>> called.  It can get called at any time, in any order and sometimes not
>> at all.*
>
> Wow I did not know that! I was counting on that it reliably gets called when the object is destroyed.

Even that isn't technically reliable, though in CPython, objects will
usually be __del__'d promptly as long as they're not in reference
cycles.  But the main problem here is that the destruction of the
object has nothing to do with the ending of the thread or process; the
object will hang around for as long as the caller might want it.
You'll want to put your "end of process" code at the bottom of run(),
I think, unless there's some other place for it.

ChrisA

[toc] | [prev] | [next] | [standalone]


#53744

FromChris Angelico <rosuav@gmail.com>
Date2013-09-06 08:46 +1000
Message-ID<mailman.108.1378421209.5461.python-list@python.org>
In reply to#53736
On Fri, Sep 6, 2013 at 5:27 AM, Paul Pittlerson <menkomigen6@gmail.com> wrote:
> I'm trying to understand data handling using multiprocessing and threading, haven't gotten very far without running into problems. This is my code:
>
>
> What I expect to happen is the Debugger object will receive one string at a time, and read it from the queue. But that's not what I see the the output, the "started worker" stuff seems to print for every process, but "ticked" and "exited" will show up in unpredictable ways, I'm guessing they overwrite each other and therefore will not always appear in the output.
>
> So I'm looking for help in trying to make sense of this kind of stuff, I thought this was the basic functionality that Queue() would take care of unless there is some other problem in my code.

The first thing I notice is that your Debugger will quit as soon as
its one-secondly poll results in no data. This may or may not be a
problem for your code, but I'd classify it as code smell at best. Is
your goal here to make sure Debugger doesn't stop your process from
exiting? If so, a simpler and safer solution is to make it a daemon
thread.

The other thing I see here is your use of __del__ to print your exit
message. I don't know if Thread objects are involved in reference
loops, but if they are, __del__ (probably) won't be called immediately
on thread termination.

Your subprocesses are a little odd; they spin off another thread, then
halt the first thread. Why not simply do the work in the first thread?
You then treat the thread's __del__ method as the process's death,
which isn't strictly true, but probably close enough.

ChrisA

[toc] | [prev] | [next] | [standalone]


#53747

FromPaul Pittlerson <menkomigen6@gmail.com>
Date2013-09-05 17:03 -0700
Message-ID<bd1902cd-30c9-40f9-a54d-5ed05a2b07ae@googlegroups.com>
In reply to#53744
On Friday, September 6, 2013 1:46:40 AM UTC+3, Chris Angelico wrote:

> The first thing I notice is that your Debugger will quit as soon as
> its one-secondly poll results in no data. This may or may not be a
> problem for your code, but I'd classify it as code smell at best. Is
> your goal here to make sure Debugger doesn't stop your process from
> exiting? If so, a simpler and safer solution is to make it a daemon
> thread.

I didn't think it would be a problem, because unless the system is very
slow, the functions will finish in a fraction of a second, on my machine
it does not matter whether I have it set as 0.1 second or several seconds,
the output is still the same. It's not eloquent, but the point was just to
exit the test when no more prints are to be made.

But how can I fix the actual bug I was asking about though? I want to 
print ticked and exited for all the processes, just to acknowledge to
myself that the code is working.. so I can proceed to experiment with
more complexity! :D

[toc] | [prev] | [next] | [standalone]


#53757

FromPiet van Oostrum <piet@vanoostrum.org>
Date2013-09-05 23:54 -0400
Message-ID<m2mwnqr4ed.fsf@cochabamba.vanoostrum.org>
In reply to#53747
Paul Pittlerson <menkomigen6@gmail.com> writes:

> On Friday, September 6, 2013 1:46:40 AM UTC+3, Chris Angelico wrote:
>
>> The first thing I notice is that your Debugger will quit as soon as
>> its one-secondly poll results in no data. This may or may not be a
>> problem for your code, but I'd classify it as code smell at best. Is
>> your goal here to make sure Debugger doesn't stop your process from
>> exiting? If so, a simpler and safer solution is to make it a daemon
>> thread.
>
> I didn't think it would be a problem, because unless the system is very
> slow, the functions will finish in a fraction of a second, on my machine
> it does not matter whether I have it set as 0.1 second or several seconds,
> the output is still the same. It's not eloquent, but the point was just to
> exit the test when no more prints are to be made.
>
> But how can I fix the actual bug I was asking about though? I want to 
> print ticked and exited for all the processes, just to acknowledge to
> myself that the code is working.. so I can proceed to experiment with
> more complexity! :D

On my system I get the output:

started worker 75501
75501 ticked
75501 has exited
started worker 75505
75505 ticked
75505 has exited
started worker 75504
started worker 75502
started worker 75503
75502 ticked
75502 has exited
75504 ticked
75504 has exited
75503 ticked
75503 has exited

So all the provesses have their 'started' 'ticked' and 'exited' message.
But as others have indicated, because your code is timing dependent,
that is just coincidental. Because multiprocessing/multithreading is
inherently non-deterministic, the order of the messages will be
unpredictable. But you should not make the exiting of the Debugger non
deterministic, as you have it now. One way to do this is to count the
number of processes that have exited, and wait until all are done. In
this case you could count the number of 'exited' messages that have
arrived.

    def run(self):
        nbr_process = 5
        while True:
            time.sleep(1)
            
            msg = self.q.get()
            print msg
            if 'exited' in msg:
                nbr_process -= 1
                if nbr_process == 0:
                    break
 
Of course the 5 should be given as a parameter.

This still leaves you with the uncertainty of the __del__ being called.
Why not just put the message at the end of the Worker run code?

    def run(self):
        self.que.put('%s ticked' % self._pid)
        # do some work
        time.sleep(1)
        self.que.put('%s has exited' % self._pid)

However, in my experiments (bot Python 2.7.5 and 3.3.2 on Mac OS X
10.6.8) it seems that there is a problem with a Thread inside a Process:
When the main thread of the Process is finished, the other Thread is
also terminated, as if it is a daemon thread. Although self.daemon ==
False!!

You can check this with the following Worker code:

    def run(self):
        for n in range(5):
            self.que.put('%s tick %d' % (self._pid, n))
            # do some work
            time.sleep(1)
        self.que.put('%s has exited' % self._pid)

It appears that not all ticks are deliverd. In my system, only one tick
per thread, and then it disappears. I have no idea if this is a bug. I
certainly couldn't find it documented.

The solution to this is to put a join statement in gogo:

def gogo(qu):
    w = Worker(qu)
    w.start()
    w.join()

-- 
Piet van Oostrum <piet@vanoostrum.org>
WWW: http://pietvanoostrum.com/
PGP key: [8DAE142BE17999C4]

[toc] | [prev] | [next] | [standalone]


#53763

FromPiet van Oostrum <piet@vanoostrum.org>
Date2013-09-06 00:28 -0400
Message-ID<m2ioyer2ui.fsf@cochabamba.vanoostrum.org>
In reply to#53757
Piet van Oostrum <piet@vanoostrum.org> writes:


>     def run(self):
>         for n in range(5):
>             self.que.put('%s tick %d' % (self._pid, n))
>             # do some work
>             time.sleep(1)
>         self.que.put('%s has exited' % self._pid)

To prevent the 'exited' message to disappear if there is an exception in
the thread you should protect it with try -- finally:

    def run(self):
        try:
            for n in range(5):
                self.que.put('%s tick %d' % (self._pid, n))
                # do some work
                time.sleep(1)
        finally:
            self.que.put('%s has exited' % self._pid)

This doesn't help for the premature termination of the thread, as that
isn't an exception. But use the w.join() for that. Or you could put the
'exited' message after the w.join() command.
-- 
Piet van Oostrum <piet@vanoostrum.org>
WWW: http://pietvanoostrum.com/
PGP key: [8DAE142BE17999C4]

[toc] | [prev] | [next] | [standalone]


#53801

FromPaul Pittlerson <menkomigen6@gmail.com>
Date2013-09-06 11:27 -0700
Message-ID<eaa938b9-518a-4c79-a252-a60cab696383@googlegroups.com>
In reply to#53736
Ok here is the fixed and shortened version of my script:

#!/usr/bin/python

from multiprocessing import Process, Queue, current_process
from threading import Thread
from time import sleep

class Worker():
    def __init__(self, Que):
        self._pid = current_process().pid        
        self.que = Que
        self.que.put('started worker %s' % self._pid)
        
        for n in range(5):
            self.que.put('%s tick %d' % (self._pid, n))
            # do some work
            sleep(0.01)
            
        self.que.put('%s has exited' % self._pid) 
        
class Debugger(Thread):
    def __init__(self, q):
        super(Debugger, self).__init__()
        self.q = q
        
    def run(self):
        while True:
            
            sleep(0.1)
            
            if not self.q.empty():
                print self.q.get()
                
            else:
                break
        #

if __name__ == '__main__':
    
    debug_q = Queue()
    debug = Debugger(debug_q)
    debug.start()
    
    for i in range(5):
        
        d = Process(target=Worker, args=(debug_q,))
        d.start()

This works great on linux, but does not run on windows (7). The behavior was: I 
opened it with double clicking and so a window appeared and disappeared (no 
text) then I opened it with IDLE and ran it there, where it worked a couple 
times. Then reopened it with IDLE and this time it did not work at all. After 
that the script did not run either through IDLE or opening directly.

What may be the reason it works on linux, but seems buggy on windows?

[toc] | [prev] | [next] | [standalone]


#53804

FromSkip Montanaro <skip@pobox.com>
Date2013-09-06 13:53 -0500
Message-ID<mailman.133.1378493640.5461.python-list@python.org>
In reply to#53801
On Fri, Sep 6, 2013 at 1:27 PM, Paul Pittlerson <menkomigen6@gmail.com> wrote:
> Ok here is the fixed and shortened version of my script:

Before going any further, I think you need to return to marduk's
response and consider if you really and truly need both threads and
fork (via multiprocessing).

http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them

Skip

[toc] | [prev] | [next] | [standalone]


#53809

FromDave Angel <davea@davea.name>
Date2013-09-06 20:34 +0000
Message-ID<mailman.136.1378499715.5461.python-list@python.org>
In reply to#53801
On 6/9/2013 14:27, Paul Pittlerson wrote:

f> Ok here is the fixed and shortened version of my script:
>
> #!/usr/bin/python
>
> from multiprocessing import Process, Queue, current_process
> from threading import Thread
> from time import sleep
>
> class Worker():
>     def __init__(self, Que):
>         self._pid = current_process().pid        
>         self.que = Que
>         self.que.put('started worker %s' % self._pid)
>         
>         for n in range(5):
>             self.que.put('%s tick %d' % (self._pid, n))
>             # do some work
>             sleep(0.01)
>             
>         self.que.put('%s has exited' % self._pid) 
>         
> class Debugger(Thread):
>     def __init__(self, q):
>         super(Debugger, self).__init__()
>         self.q = q
>         
>     def run(self):
>         while True:
>             
>             sleep(0.1)
>             
>             if not self.q.empty():
>                 print self.q.get()
>                 
>             else:
>                 break
>         #
>
> if __name__ == '__main__':
>     
>     debug_q = Queue()
>     debug = Debugger(debug_q)
>     debug.start()
>     
>     for i in range(5):
>         
>         d = Process(target=Worker, args=(debug_q,))
>         d.start()
>
> This works great on linux, but does not run on windows (7). The behavior was: I 
> opened it with double clicking and so a window appeared and disappeared (no 
> text) then I opened it with IDLE and ran it there, where it worked a couple 
> times. Then reopened it with IDLE and this time it did not work at all. After 
> that the script did not run either through IDLE or opening directly.
>
> What may be the reason it works on linux, but seems buggy on windows?

In Linux, the Process() class works very differently.  One effect is
that it's probably much quicker than in Windows.  But also, the
relationship between the original process and the extra 5 is different.

I wouldn't even try to debug anything else till I add join() calls for
both the extra thread and the 5 extra processes.  You could just stick a
raw_input() at the end to fake it, but that's just a temporary hack.

Untested:


if __name__ == '__main__':
    
    debug_q = Queue()
    debug = Debugger(debug_q)
    debug.start()

    processes = []    
    for i in range(5):
        
        d = Process(target=Worker, args=(debug_q,))
	processes.append(d)
        d.start()
    for proc in processes:
        proc.join()
    debug.join()


As for running it in various shells, you missed the only one worth
using:  run it in the cmd shell.  When you double-click, you can't
really be sure if that temporary cmd shell was really empty before it
vanished;  it might just not have bothered to update its pixels on its
way out.  And any IDE shell like IDLE has so many other glitches in it,
that bugs like these are as likely to be the IDE's fault as anything
else.


-- 
DaveA

[toc] | [prev] | [next] | [standalone]


#53811

FromPiet van Oostrum <piet@vanoostrum.org>
Date2013-09-06 17:15 -0400
Message-ID<m2sixhps6r.fsf@cochabamba.vanoostrum.org>
In reply to#53801
Paul Pittlerson <menkomigen6@gmail.com> writes:

[...]
>     def run(self):
>         while True:
>             
>             sleep(0.1)
>             
>             if not self.q.empty():
>                 print self.q.get()
>                 
>             else:
>                 break
[...]

> This works great on linux, but does not run on windows (7). The behavior was: I 
> opened it with double clicking and so a window appeared and disappeared (no 
> text) then I opened it with IDLE and ran it there, where it worked a couple 
> times. Then reopened it with IDLE and this time it did not work at all. After 
> that the script did not run either through IDLE or opening directly.
>
> What may be the reason it works on linux, but seems buggy on windows?

That it works on Linux is just coincidence. Your script is still timing
dependent because the while loop in Debug.run stops when the queue is
empty. As has been explained in other answers, the queue can just become
empty when Debug empties it faster than the other processes can fill it.
That is entirely dependent on the scheduling of the O.S. so you have no
control over it. You must use a safe way to stop, for example to count
the exited messages.

Another way is to join all the processes in the main program, and after
that put a special END message to the queue, which causes Debug to stop:

class Debugger(Thread):
...        
    def run(self):
        while True:
            sleep(0.1)
            msg = self.q.get()
            print(msg)
            if 'END' in msg:
                break

..main..
    processes = []
    for i in range(5):
        
        d = Process(target=Worker, args=(debug_q,))
        d.start()
        processes.append(d)

    for p in processes:
        p.join()
    debug_q.put('END')

-- 
Piet van Oostrum <piet@vanoostrum.org>
WWW: http://pietvanoostrum.com/
PGP key: [8DAE142BE17999C4]

[toc] | [prev] | [standalone]


Back to top | Article view | comp.lang.python


csiph-web