Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.lang.python > #101872

parallel (concurrent) eventlet

Path csiph.com!fu-berlin.de!uni-berlin.de!not-for-mail
From David Gabriel <davidgab283@gmail.com>
Newsgroups comp.lang.python
Subject parallel (concurrent) eventlet
Date Mon, 18 Jan 2016 12:03:13 +0100
Lines 331
Message-ID <mailman.86.1453115159.15297.python-list@python.org> (permalink)
Mime-Version 1.0
Content-Type text/plain; charset=UTF-8
X-Trace news.uni-berlin.de 6cehLLCkPPYN91gGQ1V4xwcdKkp9l+pzdIBQAOd7Ru1A==
Return-Path <davidgab283@gmail.com>
X-Original-To python-list@python.org
Delivered-To python-list@mail.python.org
X-Spam-Status OK 0.000
X-Spam-Evidence '*H*': 1.00; '*S*': 0.00; 'else:': 0.03; 'receives': 0.03; 'debug': 0.04; 'handler': 0.04; '"""': 0.05; 'base.': 0.05; 'none:': 0.05; 'skip:p 60': 0.05; "'',": 0.07; '*args,': 0.07; '-*-': 0.07; 'attributes': 0.07; 'executed': 0.07; 'exit': 0.07; 'implements': 0.07; 'run,': 0.07; 'see.': 0.07; 'utf-8': 0.07; 'welcome.': 0.07; 'api': 0.09; 'blocked': 0.09; 'coding:': 0.09; 'concurrent': 0.09; 'deletion': 0.09; 'exception,': 0.09; 'failed:': 0.09; 'handlers': 0.09; 'msgid': 0.09; 'notes:': 0.09; 'recieved': 0.09; 'threads.': 0.09; 'url:github': 0.09; 'thread': 0.10; 'advance': 0.10; 'python': 0.10; 'jan': 0.11; 'programs.': 0.11; 'exception': 0.13; 'def': 0.13; 'properly': 0.15; 'variables': 0.15; "'connecting": 0.16; '2016': 0.16; 'attributes,': 0.16; 'case...': 0.16; 'deleted,': 0.16; 'detects': 0.16; 'false:': 0.16; 'pathname': 0.16; 'received:io': 0.16; 'received:psf.io': 0.16; 'run.': 0.16; 'sync': 0.16; 'sys.exit(0)': 0.16; 'sys.exit(1)': 0.16; 'threads': 0.16; 'url:py': 0.16; 'wrote:': 0.16; 'later': 0.16; '&lt;': 0.18; 'attribute': 0.18; 'debugging': 0.18; 'skip:l 30': 0.18; 'try:': 0.18; 'email addr:gmail.com&gt;': 0.18; '>>>': 0.20; 'extension': 0.20; 'issue.': 0.20; 'fix': 0.21; '%s"': 0.22; "aren't": 0.22; 'delay': 0.22; 'parsing': 0.22; 'pass': 0.22; 'am,': 0.23; 'code,': 0.23; 'code.': 0.23; 'advance.': 0.23; 'import': 0.24; 'mon,': 0.24; 'wondering': 0.25; 'script': 0.25; 'install': 0.25; 'example': 0.26; 'define': 0.27; 'skip:# 10': 0.27; 'message- id:@mail.gmail.com': 0.27; 'skip:e 30': 0.27; 'function': 0.28; "skip:' 10": 0.28; 'values': 0.28; 'fine': 0.28; 'interface': 0.29; '**kwargs)': 0.29; 'blocking': 0.29; 'command-line': 0.29; 'cookie': 0.29; 'running.': 0.29; 'reset': 0.29; 'asked': 0.29; 'print': 0.30; '(including': 0.30; 'classes': 0.30; 'connection': 0.30; 'url:mailman': 0.30; 'code': 0.30; 'entry': 0.31; 'skip:s 30': 0.31; 'post': 0.31; "can't": 0.32; 'skip:_ 10': 0.32; 'subject:) ': 0.32; 'run': 0.33; 'class': 0.33; 'url:python': 0.33; 'skip:_ 30': 0.33; 'url:listinfo': 0.34; "skip:' 20": 0.34; 'except': 0.34; 'handle': 0.34; 'previous': 0.34; 'running': 0.34; 'add': 0.34; 'server': 0.34; 'list': 0.34; 'skip:& 20': 0.35; 'received:google.com': 0.35; 'could': 0.35; 'execution': 0.35; 'false': 0.35; 'knowledge': 0.35; 'but': 0.36; 'should': 0.36; 'between': 0.65; 'here': 0.66; 'fact,': 0.67; 'regards.': 0.67; 'skip:\xc2 10': 0.67; 'consumer': 0.67; '8bit%:100': 0.70; 'inform': 0.79; '7:00': 0.84; 'batchelder': 0.84; 'david,': 0.84; 'dict()': 0.84; 'gabriel': 0.84; 'url:master': 0.84; '\xc2\xa0\xc2\xa0\xc2\xa0\xc2\xa0': 0.84; 'commence': 0.91; 'url:demo': 0.91
DKIM-Signature v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=WVJjsl444StzAtaDGdLAj7Y4COByPRzesNNT+Rze5bg=; b=qcFruk5mfdSfNmy2IRyWNZMZb5zwwAMAa27/KzJM8j68cdi8ge0iwLYprh1MQlnsdx fKZIRsq2x+1Kd1q5t1DrQ2Jxf+Ke70E79hOegb5GdNsGWA3nfxGTbQCU1sg10QH00HH/ rz+uGVx9SVKURuXbSlIxwke4BLH8cXV5wlUiCrfWO4HryQKsx1sU0vdov44krQ3b604O n96obvcZu+Rk4AA60JvDFAhSzdr8vCxs6Fw3eSv/AE+zkwoDxo1sQsZaOb6jTqCYWtKT 6XCIqsjMK5ANHO/M3qexGIxQ6mPEodqjlu8TiugUOd0XJ8Bba8DSaKCFmKJCKsMqOLZ2 sfOw==
X-Google-DKIM-Signature v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:date:message-id:subject:from:to :content-type; bh=WVJjsl444StzAtaDGdLAj7Y4COByPRzesNNT+Rze5bg=; b=JnD2Xa+++e1ZDbsd4IpKdoD5sN31rQecaODC8MsCAZ2y2Y34Hqa36T0zqfOItwEwSB wfOiZIMNwp+FIIsfcVxh524PozmLFXuf89Um2Gvv8Iu9y/LmTPEcyojGGJk6TO7wjPri NeWywRpsgr7pZDeDz9OgGSjJ/0aZxAmdtdEub1uBusFnn5i6cCin/e76GoW1e/GKcaEq 3hs+dY4OtS9bOjo/8kJ8GlEl7f5cnFfeWTPAJr36RcujQg/ZKLQcl25dgCzKr+V8XK91 zz3pLoxtHDjqRohNNJbCoApH4d+WFeQuOtHhjaH5ZpvZ1/TRf/sHb0840e+SNPMJ2EeH lliQ==
X-Gm-Message-State AG10YOThrEUYVD5IvQRN2r2EuZbexg5AM8V/xLo4DOHKDuFrhl2DoEmugKaictxoAwXMRyr+Adf4WQXowK5T4Q==
X-Received by 10.50.36.105 with SMTP id p9mr11053562igj.54.1453114993310; Mon, 18 Jan 2016 03:03:13 -0800 (PST)
X-Mailman-Approved-At Mon, 18 Jan 2016 06:05:57 -0500
X-Content-Filtered-By Mailman/MimeDel 2.1.20+
X-BeenThere python-list@python.org
X-Mailman-Version 2.1.20+
Precedence list
List-Id General discussion list for the Python programming language <python-list.python.org>
List-Unsubscribe <https://mail.python.org/mailman/options/python-list>, <mailto:python-list-request@python.org?subject=unsubscribe>
List-Archive <http://mail.python.org/pipermail/python-list/>
List-Post <mailto:python-list@python.org>
List-Help <mailto:python-list-request@python.org?subject=help>
List-Subscribe <https://mail.python.org/mailman/listinfo/python-list>, <mailto:python-list-request@python.org?subject=subscribe>
Xref csiph.com comp.lang.python:101872

Show key headers only | View raw


Dears,

I have an issue when I use eventlet Api to create parallel threads.
In fact, when I run the below code, only the program dealing with the
synchronozation with ldap data base is working and is continuously blocking
the others to run.
But, when I use the 'thread' Api the program is working fine without any
blocking issue. However, I can not use thread Api and I must to use
eventlet.
So I am wondering how to get the thread Api behavior using the eventlet Api
?

Could you please inform me how to fix this issue ?

Kindly find below my code.
But you need some configurations regarding ldap server/client.

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This script implements a syncrepl consumer which syncs data from an OpenLDAP
server to a local (shelve) database.
Notes:
The bound user needs read access to the attributes entryDN and entryCSN.
This needs the following software:
Python
pyasn1 0.1.4+
pyasn1-modules
python-ldap 2.4.10+
"""

# Import the python-ldap modules
import ldap,ldapurl
# Import specific classes from python-ldap
from ldap.ldapobject import ReconnectLDAPObject
from ldap.syncrepl import SyncreplConsumer

# Import modules from Python standard lib
import shelve,signal,time,sys,logging
import eventlet
#import thread
eventlet.monkey_patch()

# Global state
watcher_running = True
ldap_connection = False



class SyncReplConsumer(ReconnectLDAPObject,SyncreplConsumer):
    """
    Syncrepl Consumer interface
    """
    def __init__(self,db_path,*args,**kwargs):
        # Initialise the LDAP Connection first
        ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
        # Now prepare the data store
        self.__data = shelve.open(db_path, 'c')
        # We need this for later internal use
        self.__presentUUIDs = dict()

    def __del__(self):
            # Close the data store properly to avoid corruption
            self.__data.close()

    def syncrepl_get_cookie(self):
        if 'cookie' in self.__data:
            return self.__data['cookie']

    def syncrepl_set_cookie(self,cookie):
        self.__data['cookie'] = cookie

    def syncrepl_entry(self,dn,attributes,uuid):

        # First we determine the type of change we have here (and store
away the previous data for later if needed)
        previous_attributes = dict()
        if uuid in self.__data:
            change_type = 'modify'
            previous_attributes = self.__data[uuid]
        else:
            change_type = 'add'
        # Now we store our knowledge of the existence of this entry
(including the DN as an attribute for convenience)
        attributes['dn'] = dn
        self.__data[uuid] = attributes
        # Debugging
        print 'Detected', change_type, 'of entry:', dn
        # If we have a cookie then this is not our first time being run, so
it must be a change
        if 'ldap_cookie' in self.__data:
                self.perform_application_sync(dn, attributes,
previous_attributes)

    def syncrepl_delete(self,uuids):
        # Make sure we know about the UUID being deleted, just in case...
        uuids = [uuid for uuid in uuids if uuid in self.__data]
        # Delete all the UUID values we know of
        for uuid in uuids:
            print 'Detected deletion of entry:', self.__data[uuid]['dn']
            del self.__data[uuid]

    def syncrepl_present(self,uuids,refreshDeletes=False):
        # If we have not been given any UUID values, then we have recieved
all the present controls...
        if uuids is None:
            # We only do things if refreshDeletes is false as the syncrepl
extension will call syncrepl_delete instead when it detects a delete notice
            if refreshDeletes is False:
                deletedEntries = [uuid for uuid in self.__data.keys() if
uuid not in self.__presentUUIDs and uuid != 'ldap_cookie']
                self.syncrepl_delete( deletedEntries )
            # Phase is now completed, reset the list
            self.__presentUUIDs = {}
        else:
            # Note down all the UUIDs we have been sent
            for uuid in uuids:
                    self.__presentUUIDs[uuid] = True

    def perform_application_sync(self,dn,attributes,previous_attributes):
        print 'Performing application sync for:', dn
        return True


# Shutdown handler
#def commenceShutdown(signum, stack):
def commenceShutdown():
    # Declare the needed global variables
    global watcher_running, ldap_connection
    print 'Shutting down!'

    # We are no longer running
    watcher_running = False

    # Tear down the server connection
    if( ldap_connection ):
            del ldap_connection

    # Shutdown
    sys.exit(0)

def mainOfSyncrepl(threadName):
     # Time to actually begin execution
     # Install our signal handlers
#     signal.signal(signal.SIGTERM,commenceShutdown)
#     signal.signal(signal.SIGINT,commenceShutdown)
     try:
       ldap_url =
ldapurl.LDAPUrl('ldap://localhost/dc=example,dc=org?*?sub?(objectClass=*)?bindname=cn=admin%2cdc=test%2cdc=com,X-BINDPW=myPassword')#ldapurl.LDAPUrl(sys.argv[1])
     #  ldap_url = ldapurl.LDAPUrl(link)
       database_path = 'test.com'#sys.argv[2]
     #  database_path = pathName
     except IndexError,e:
       print 'Usage: syncrepl-client.py <LDAP URL> <pathname of database>'
       sys.exit(1)
     except ValueError,e:
       print 'Error parsing command-line arguments:',str(e)
       sys.exit(1)

     while watcher_running:
         print 'Connecting to LDAP server now...'
         # Prepare the LDAP server connection (triggers the connection as
well)
         ldap_connection =
SyncReplConsumer(database_path,ldap_url.initializeUrl())

         # Now we login to the LDAP server
         try:
             ldap_connection.simple_bind_s(ldap_url.who,ldap_url.cred)
         except ldap.INVALID_CREDENTIALS, e:
             print 'Login to LDAP server failed: ', str(e)
             sys.exit(1)
         except ldap.SERVER_DOWN:
             continue

         # Commence the syncing
         print 'Commencing sync process'
         ldap_search = ldap_connection.syncrepl_search(
           ldap_url.dn or '',
           ldap_url.scope or ldap.SCOPE_SUBTREE,
           mode = 'refreshAndPersist',
           filterstr = ldap_url.filterstr or '(objectClass=*)')
         print 'After syncrepl_search.'
         try:
             while ldap_connection.syncrepl_poll( all = 1, msgid =
ldap_search):
                  pass
         except KeyboardInterrupt:
          # User asked to exit
             commenceShutdown()
             pass
         except Exception, e:
          # Handle any exception
             if watcher_running:
                 print 'Encountered a problem, going to retry. Error:',
str(e)
                 eventlet.sleep(5)
             pass

# Define a function for the 2nd thread
def print_time(ThreadName):
     count = 0
     delay = 3
     while 1:#count < 5:
         count += 1
         print "%s: %s" % (ThreadName, time.ctime(time.time()) )
         eventlet.sleep(delay)



print 'Before call threads'

evt1 = eventlet.spawn(mainOfSyncrepl, "Thread-1",)
evt2 = eventlet.spawn(print_time, "Thread-2",)
evt3 = eventlet.spawn(print_time, "Thread-3",)

print 'After call threads'

evt1.wait()
evt2.wait()
evt3.wait()

print 'After wait'



2016-01-12 7:20 GMT-08:00 Ned Batchelder <ned@nedbatchelder.com>:

> David,
>
> We aren't going to be able to debug code that we can't see.  Please post a
> link to the *actual* code that you are running.
>
> --Ned.
>
>
> On 1/12/16 7:00 AM, David Gabriel wrote:
>
> Dears
>
> For more details, I am using this code
> <https://github.com/rbarrois/python-ldap/blob/master/Demo/pyasn1/syncrepl.py>in
> order to ensure the updates from my data base.
> However, when I create an eventlet basing on this code, my program is
> blocked there and is not running other eventlets !!!
> Please advise me how to fix this issue ?
>
> Thanks in advance.
> Regards
>
> 2016-01-11 7:29 GMT-08:00 David Gabriel <davidgab283@gmail.com>:
>
>> Thanks so much John
>> In fact your proposal works fine for this simple example but when I use
>> it for a complex code (a data base client that receives all updates from
>> the db), my program is continously running this db client and not other
>> programs.
>>
>> Any suggestions
>> Thanks in advance
>> regards
>>
>> 2016-01-11 5:50 GMT-08:00 John Eskew < <john.eskew@gmail.com>
>> john.eskew@gmail.com>:
>>
>>> Add this line below your imports:
>>>
>>> eventlet.monkey_patch()
>>>
>>> Here's why that line should fix things:
>>>
>>> http://eventlet.net/doc/patching.html#greening-the-world
>>>
>>> On Mon, Jan 11, 2016 at 6:27 AM, David Gabriel < <davidgab283@gmail.com>
>>> davidgab283@gmail.com> wrote:
>>>
>>>> Dears,
>>>> It is the first time I am developping with python and I want to execute
>>>> parallel threads using eventlet.When I run the below code, only one thread
>>>> is executed and not both.Please could you tell me how to fix this issue ?
>>>> Please advise me how to ensure a concurrent behavior between evt1 and
>>>> evt2.
>>>>
>>>> import eventlet
>>>> import time
>>>>
>>>> def print_time(ThreadName):
>>>>         count = 0
>>>>         delay = 3
>>>>         while 1:#count < 5:
>>>>             count += 1
>>>>             print "%s: %s" % (ThreadName, time.ctime(time.time()) )
>>>>             time.sleep(delay)
>>>>
>>>> print 'Before call threads'
>>>>
>>>> evt1 = eventlet.spawn(print_time, "Thread-1",)
>>>> evt2 = eventlet.spawn(print_time, "Thread-2",)
>>>>
>>>> print 'After call threads'
>>>>
>>>> evt1.wait()
>>>> evt2.wait()
>>>>
>>>>
>>>>
>>>> Any answer is welcome.
>>>> Thanks in advance.
>>>> Regards.
>>>>
>>>> _______________________________________________
>>>> Boston mailing list
>>>> Boston@python.org
>>>> https://mail.python.org/mailman/listinfo/boston
>>>>
>>>>
>>>
>>
>
>
> _______________________________________________
> Boston mailing listBoston@python.orghttps://mail.python.org/mailman/listinfo/boston
>
>
>
> _______________________________________________
> Boston mailing list
> Boston@python.org
> https://mail.python.org/mailman/listinfo/boston
>
>

Back to comp.lang.python | Previous | Next | Find similar | Unroll thread


Thread

parallel (concurrent) eventlet David Gabriel <davidgab283@gmail.com> - 2016-01-18 12:03 +0100

csiph-web