Path: csiph.com!fu-berlin.de!uni-berlin.de!not-for-mail From: David Gabriel Newsgroups: comp.lang.python Subject: Re: parallel (concurrent) eventlet Date: Mon, 18 Jan 2016 16:03:23 +0100 Lines: 356 Message-ID: References: Mime-Version: 1.0 Content-Type: text/plain; charset=UTF-8 X-Trace: news.uni-berlin.de XWJxjEXmruY2NlNxYyVXIwQ4ZR4N6rA/CYSEpSeX/pvA== Return-Path: X-Original-To: python-list@python.org Delivered-To: python-list@mail.python.org X-Spam-Status: OK 0.000 X-Spam-Evidence: '*H*': 1.00; '*S*': 0.00; 'else:': 0.03; 'receives': 0.03; 'debug': 0.04; 'handler': 0.04; '"""': 0.05; 'base.': 0.05; 'none:': 0.05; 'skip:e 50': 0.05; 'skip:p 60': 0.05; "'',": 0.07; '*args,': 0.07; '-*-': 0.07; 'attributes': 0.07; 'executed': 0.07; 'exit': 0.07; 'implements': 0.07; 'run,': 0.07; 'see.': 0.07; 'utf-8': 0.07; 'welcome.': 0.07; 'api': 0.09; 'blocked': 0.09; 'coding:': 0.09; 'concurrent': 0.09; 'deletion': 0.09; 'exception,': 0.09; 'failed:': 0.09; 'handlers': 0.09; 'msgid': 0.09; 'notes:': 0.09; 'recieved': 0.09; 'threads.': 0.09; 'url:github': 0.09; 'thread': 0.10; 'advance': 0.10; 'python': 0.10; '<>.': 0.11; 'jan': 0.11; 'programs.': 0.11; 'exception': 0.13; 'def': 0.13; 'properly': 0.15; 'result.': 0.15; 'variables': 0.15; "'connecting": 0.16; '2016': 0.16; 'attributes,': 0.16; 'case...': 0.16; 'deleted,': 0.16; 'detects': 0.16; 'false:': 0.16; 'pathname': 0.16; 'received:io': 0.16; 'received:psf.io': 0.16; 'run.': 0.16; 'sync': 0.16; 'sys.exit(0)': 0.16; 'sys.exit(1)': 0.16; 'threads': 0.16; 'url:py': 0.16; 'wrote:': 0.16; 'later': 0.16; '<': 0.18; 'attribute': 0.18; 'debugging': 0.18; 'skip:l 30': 0.18; 'skip:u 30': 0.18; 'try:': 0.18; 'email addr:gmail.com>': 0.18; '>>>': 0.20; 'extension': 0.20; 'issue.': 0.20; 'skip:" 30': 0.20; 'skip:" 40': 0.20; 'fix': 0.21; '%s"': 0.22; "aren't": 0.22; 'delay': 0.22; 'parsing': 0.22; 'skip:% 10': 0.22; 'pass': 0.22; 'am,': 0.23; 'code,': 0.23; 'code.': 0.23; 'advance.': 0.23; 'import': 0.24; 'header:In-Reply- To:1': 0.24; 'mon,': 0.24; 'wondering': 0.25; 'script': 0.25; 'install': 0.25; 'example': 0.26; 'define': 0.27; 'skip:# 10': 0.27; 'message-id:@mail.gmail.com': 0.27; '8bit%:3': 0.27; 'skip:e 30': 0.27; 'function': 0.28; "skip:' 10": 0.28; 'values': 0.28; 'fine': 0.28; 'interface': 0.29; '**kwargs)': 0.29; 'blocking': 0.29; 'command-line': 0.29; 'cookie': 0.29; 'running.': 0.29; 'reset': 0.29; 'asked': 0.29; 'print': 0.30; '(including': 0.30; 'classes': 0.30; 'connection': 0.30; 'url:mailman': 0.30; 'code': 0.30; 'entry': 0.31; 'skip:s 30': 0.31; 'post': 0.31; "can't": 0.32; 'skip:_ 10': 0.32; 'subject:) ': 0.32; 'run': 0.33; 'class': 0.33; 'url:python': 0.33; 'skip:_ 30': 0.33; 'url:listinfo': 0.34; "skip:' 20": 0.34; 'except': 0.34; 'handle': 0.34; 'previous': 0.34; 'running': 0.34; 'add': 0.34; 'server': 0.34; '>>>>>': 0.66; 'here': 0.66; 'fact,': 0.67; 'regards.': 0.67; 'skip:\xc2 10': 0.67; 'consumer': 0.67; '8bit%:100': 0.70; 'inform': 0.79; '7:00': 0.84; 'batchelder': 0.84; 'david,': 0.84; 'dict()': 0.84; 'gabriel': 0.84; 'url:master': 0.84; '\xc2\xa0\xc2\xa0\xc2\xa0\xc2\xa0': 0.84; 'commence': 0.91; 'url:demo': 0.91 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=jNjyb97Py4bSy6LnNRzJESm5+jGFPAkl3zaSoJuA3z0=; b=IPXu/M/G57eL8vTi7vRMUAk9UrAGNI0mCpYXOWkPXAUMFLbG4qGaUqxHZfFmCMKYQz 7YOu/tI72qihvXQo5of6QbhUvwdITqIwXWFSlGvikCBhi4bNsiZ27F0MhqcNNh3FvqjN E7tXgbTM1zJCnVqkyofsqNravYy4OzKtwbVt08iTI9PQBf+iUL0mDQC8dGuYmzZY7x/h OhCyTagwUcHbj3xlH8SWOgHgaLNqYkS1dtdqh7dFEebLu5N24/0++MEly4Oe3U4kid9m 7cW/DI+0ID2Wr+V5mF7MPpcXFjruNWF5JRWTJg9K+LsTN47UeJZgxiTXA+5wwbrpRtRF GKUw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=jNjyb97Py4bSy6LnNRzJESm5+jGFPAkl3zaSoJuA3z0=; b=RJB2u24W4l/psoKPo+950eiQBgTvyr5w0JXOJkP0tERbcYQHBiZnfpFFLoZwiD99Xw BZ9QSyp/BPh9+t4s0wP/S7RVx+ln4lXSBNkm5SpIwIY+hNzVKcV0FbPeyWx2MkVisuWq D9GO+IdBeda+5un2nZ9q/RSyHP2ulQEIHdQadycoBHqHkkV8BWierqqSmkWimstzVC0A 0FneC9nE92gUsW0I9jgCMyisGdLpONVV7/wQAWDNFn8nR4XSuldavmrp3aFoZYtXIm9K tmP2y7M8HEOmssjpllBoxzbBVy8tSqllX2BKZyEud9MzWpsdZaQMRzq1a6iqvjfrxP0C /Ltw== X-Gm-Message-State: AG10YOSZ20ehhFlLLLkVDiZC0VSm68HaUUk1TaR7QREjmz/bHAFBEfcVKWFriG+YLFoUGrfgLNO+0GiNJPDjPg== X-Received: by 10.50.131.201 with SMTP id oo9mr10867378igb.68.1453129403702; Mon, 18 Jan 2016 07:03:23 -0800 (PST) In-Reply-To: X-Content-Filtered-By: Mailman/MimeDel 2.1.20+ X-BeenThere: python-list@python.org X-Mailman-Version: 2.1.20+ Precedence: list List-Id: General discussion list for the Python programming language List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Xref: csiph.com comp.lang.python:101875 Dears, Let me add one more detail: When I add these two lines to check whether my modules are monkey_patched or not I get *False* as a result. I think it is strange to get this result since I patched my modules at the beginning using: eventlet.monkey_patch() as detailed here . print "*****is_monkey_patched(ldap.syncrepl) : %s*****" % eventlet.patcher.is_monkey_patched('ldap.syncrepl') print "*****is_monkey_patched(ldap.ldapobject) : %s*****" % eventlet.patcher.is_monkey_patched('ldap.ldapobject') Please advise me how to fix this issue. Kind regards. 2016-01-18 12:03 GMT+01:00 David Gabriel : > Dears, > > I have an issue when I use eventlet Api to create parallel threads. > In fact, when I run the below code, only the program dealing with the > synchronozation with ldap data base is working and is continuously blocking > the others to run. > But, when I use the 'thread' Api the program is working fine without any > blocking issue. However, I can not use thread Api and I must to use > eventlet. > So I am wondering how to get the thread Api behavior using the eventlet > Api ? > > Could you please inform me how to fix this issue ? > > Kindly find below my code. > But you need some configurations regarding ldap server/client. > > > #!/usr/bin/python > # -*- coding: utf-8 -*- > """ > This script implements a syncrepl consumer which syncs data from an > OpenLDAP > server to a local (shelve) database. > Notes: > The bound user needs read access to the attributes entryDN and entryCSN. > This needs the following software: > Python > pyasn1 0.1.4+ > pyasn1-modules > python-ldap 2.4.10+ > """ > > # Import the python-ldap modules > import ldap,ldapurl > # Import specific classes from python-ldap > from ldap.ldapobject import ReconnectLDAPObject > from ldap.syncrepl import SyncreplConsumer > > # Import modules from Python standard lib > import shelve,signal,time,sys,logging > import eventlet > #import thread > eventlet.monkey_patch() > > # Global state > watcher_running = True > ldap_connection = False > > > > class SyncReplConsumer(ReconnectLDAPObject,SyncreplConsumer): > """ > Syncrepl Consumer interface > """ > def __init__(self,db_path,*args,**kwargs): > # Initialise the LDAP Connection first > ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs) > # Now prepare the data store > self.__data = shelve.open(db_path, 'c') > # We need this for later internal use > self.__presentUUIDs = dict() > > def __del__(self): > # Close the data store properly to avoid corruption > self.__data.close() > > def syncrepl_get_cookie(self): > if 'cookie' in self.__data: > return self.__data['cookie'] > > def syncrepl_set_cookie(self,cookie): > self.__data['cookie'] = cookie > > def syncrepl_entry(self,dn,attributes,uuid): > > # First we determine the type of change we have here (and store > away the previous data for later if needed) > previous_attributes = dict() > if uuid in self.__data: > change_type = 'modify' > previous_attributes = self.__data[uuid] > else: > change_type = 'add' > # Now we store our knowledge of the existence of this entry > (including the DN as an attribute for convenience) > attributes['dn'] = dn > self.__data[uuid] = attributes > # Debugging > print 'Detected', change_type, 'of entry:', dn > # If we have a cookie then this is not our first time being run, > so it must be a change > if 'ldap_cookie' in self.__data: > self.perform_application_sync(dn, attributes, > previous_attributes) > > def syncrepl_delete(self,uuids): > # Make sure we know about the UUID being deleted, just in case... > uuids = [uuid for uuid in uuids if uuid in self.__data] > # Delete all the UUID values we know of > for uuid in uuids: > print 'Detected deletion of entry:', self.__data[uuid]['dn'] > del self.__data[uuid] > > def syncrepl_present(self,uuids,refreshDeletes=False): > # If we have not been given any UUID values, then we have recieved > all the present controls... > if uuids is None: > # We only do things if refreshDeletes is false as the syncrepl > extension will call syncrepl_delete instead when it detects a delete notice > if refreshDeletes is False: > deletedEntries = [uuid for uuid in self.__data.keys() if > uuid not in self.__presentUUIDs and uuid != 'ldap_cookie'] > self.syncrepl_delete( deletedEntries ) > # Phase is now completed, reset the list > self.__presentUUIDs = {} > else: > # Note down all the UUIDs we have been sent > for uuid in uuids: > self.__presentUUIDs[uuid] = True > > def perform_application_sync(self,dn,attributes,previous_attributes): > print 'Performing application sync for:', dn > return True > > > # Shutdown handler > #def commenceShutdown(signum, stack): > def commenceShutdown(): > # Declare the needed global variables > global watcher_running, ldap_connection > print 'Shutting down!' > > # We are no longer running > watcher_running = False > > # Tear down the server connection > if( ldap_connection ): > del ldap_connection > > # Shutdown > sys.exit(0) > > def mainOfSyncrepl(threadName): > # Time to actually begin execution > # Install our signal handlers > # signal.signal(signal.SIGTERM,commenceShutdown) > # signal.signal(signal.SIGINT,commenceShutdown) > try: > ldap_url = > ldapurl.LDAPUrl('ldap://localhost/dc=example,dc=org?*?sub?(objectClass=*)?bindname=cn=admin%2cdc=test%2cdc=com,X-BINDPW=myPassword')#ldapurl.LDAPUrl(sys.argv[1]) > # ldap_url = ldapurl.LDAPUrl(link) > database_path = 'test.com'#sys.argv[2] > # database_path = pathName > except IndexError,e: > print 'Usage: syncrepl-client.py ' > sys.exit(1) > except ValueError,e: > print 'Error parsing command-line arguments:',str(e) > sys.exit(1) > > while watcher_running: > print 'Connecting to LDAP server now...' > # Prepare the LDAP server connection (triggers the connection as > well) > ldap_connection = > SyncReplConsumer(database_path,ldap_url.initializeUrl()) > > # Now we login to the LDAP server > try: > ldap_connection.simple_bind_s(ldap_url.who,ldap_url.cred) > except ldap.INVALID_CREDENTIALS, e: > print 'Login to LDAP server failed: ', str(e) > sys.exit(1) > except ldap.SERVER_DOWN: > continue > > # Commence the syncing > print 'Commencing sync process' > ldap_search = ldap_connection.syncrepl_search( > ldap_url.dn or '', > ldap_url.scope or ldap.SCOPE_SUBTREE, > mode = 'refreshAndPersist', > filterstr = ldap_url.filterstr or '(objectClass=*)') > print 'After syncrepl_search.' > try: > while ldap_connection.syncrepl_poll( all = 1, msgid = > ldap_search): > pass > except KeyboardInterrupt: > # User asked to exit > commenceShutdown() > pass > except Exception, e: > # Handle any exception > if watcher_running: > print 'Encountered a problem, going to retry. Error:', > str(e) > eventlet.sleep(5) > pass > > # Define a function for the 2nd thread > def print_time(ThreadName): > count = 0 > delay = 3 > while 1:#count < 5: > count += 1 > print "%s: %s" % (ThreadName, time.ctime(time.time()) ) > eventlet.sleep(delay) > > > > print 'Before call threads' > > evt1 = eventlet.spawn(mainOfSyncrepl, "Thread-1",) > evt2 = eventlet.spawn(print_time, "Thread-2",) > evt3 = eventlet.spawn(print_time, "Thread-3",) > > print 'After call threads' > > evt1.wait() > evt2.wait() > evt3.wait() > > print 'After wait' > > > > 2016-01-12 7:20 GMT-08:00 Ned Batchelder : > >> David, >> >> We aren't going to be able to debug code that we can't see. Please post >> a link to the *actual* code that you are running. >> >> --Ned. >> >> >> On 1/12/16 7:00 AM, David Gabriel wrote: >> >> Dears >> >> For more details, I am using this code >> in >> order to ensure the updates from my data base. >> However, when I create an eventlet basing on this code, my program is >> blocked there and is not running other eventlets !!! >> Please advise me how to fix this issue ? >> >> Thanks in advance. >> Regards >> >> 2016-01-11 7:29 GMT-08:00 David Gabriel : >> >>> Thanks so much John >>> In fact your proposal works fine for this simple example but when I use >>> it for a complex code (a data base client that receives all updates from >>> the db), my program is continously running this db client and not other >>> programs. >>> >>> Any suggestions >>> Thanks in advance >>> regards >>> >>> 2016-01-11 5:50 GMT-08:00 John Eskew < >>> john.eskew@gmail.com>: >>> >>>> Add this line below your imports: >>>> >>>> eventlet.monkey_patch() >>>> >>>> Here's why that line should fix things: >>>> >>>> http://eventlet.net/doc/patching.html#greening-the-world >>>> >>>> On Mon, Jan 11, 2016 at 6:27 AM, David Gabriel < >>>> davidgab283@gmail.com> wrote: >>>> >>>>> Dears, >>>>> It is the first time I am developping with python and I want to >>>>> execute parallel threads using eventlet.When I run the below code, only one >>>>> thread is executed and not both.Please could you tell me how to fix this >>>>> issue ? >>>>> Please advise me how to ensure a concurrent behavior between evt1 and >>>>> evt2. >>>>> >>>>> import eventlet >>>>> import time >>>>> >>>>> def print_time(ThreadName): >>>>> count = 0 >>>>> delay = 3 >>>>> while 1:#count < 5: >>>>> count += 1 >>>>> print "%s: %s" % (ThreadName, time.ctime(time.time()) ) >>>>> time.sleep(delay) >>>>> >>>>> print 'Before call threads' >>>>> >>>>> evt1 = eventlet.spawn(print_time, "Thread-1",) >>>>> evt2 = eventlet.spawn(print_time, "Thread-2",) >>>>> >>>>> print 'After call threads' >>>>> >>>>> evt1.wait() >>>>> evt2.wait() >>>>> >>>>> >>>>> >>>>> Any answer is welcome. >>>>> Thanks in advance. >>>>> Regards. >>>>> >>>>> _______________________________________________ >>>>> Boston mailing list >>>>> Boston@python.org >>>>> https://mail.python.org/mailman/listinfo/boston >>>>> >>>>> >>>> >>> >> >> >> _______________________________________________ >> Boston mailing listBoston@python.orghttps://mail.python.org/mailman/listinfo/boston >> >> >> >> _______________________________________________ >> Boston mailing list >> Boston@python.org >> https://mail.python.org/mailman/listinfo/boston >> >> > >