[Stackless] evsocket

Phoenix Sol phoenix at burninglabs.com
Mon Mar 17 20:43:33 CET 2008


Damn, I'm making a fool of myself. I found an error in this code: In 'recv',
there's a 'break' keyword, but it's not in a loop. I, however seem to be
stuck in one...
If all this noise is unwelcome, please accept my apologies.

On Mon, Mar 17, 2008 at 12:33 PM, Phoenix Sol <phoenix at burninglabs.com>
wrote:

> Here is a slightly improved version of evsocket. I don't want to keep
> posting updated versions to the list, so please let me know if I can acquire
> svn privileges on the stacklessexamples project;
> if not, I can start a Google code project for it. (Or inform me of the
> common solution; I see there are few 'owners' of the stacklessexamples
> project...)
>
> Once more, here it is; it is also included as a file attachment.
>
>
> _____evsocket.py_________________________________________________________________
>
>
> ################################################################################
> #
> #   A libevent/ pyevent based Stackless-compatible socket module
> #
> #   (Owing much to Richard Tew's stacklesssocket (and Sam Rushing's
> Asyncore))
> #
> #
> #   MIT License
> phoenix at burninglabs.com
> #
>
> ################################################################################
>
>
> import traceback
> import stackless
> import event
> import socket as stdsocket
>
> from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
>      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, errorcode
>
>
> sockets = []
> managerRunning = False
>
>
> # Arnar Birgisson's neat little sleep function ;-)
> def sleep(seconds):
>     def wakeup(ch):
>         ch.send(None)
>     ch = stackless.channel()
>     event.timeout(seconds, wakeup, ch)
>     ch.receive()
>
>
> # If we are to masquerade as the socket module, we need to provide the
> constants.
> if "__all__" in stdsocket.__dict__:
>     __all__ = stdsocket.__dict__
>     for k, v in stdsocket.__dict__.iteritems():
>         if k in __all__:
>             globals()[k] = v
> else:
>     for k, v in stdsocket.__dict__.iteritems():
>         if k.upper() == k:
>             globals()[k] = v
>     error = stdsocket.error
>     timeout = stdsocket.timeout
>     # WARNING: this function blocks and is not thread safe.
>     # The only solution is to spawn a thread to handle all
>     # getaddrinfo requests.  Implementing a stackless DNS
>     # lookup service is only second best as getaddrinfo may
>     # use other methods.
>     getaddrinfo = stdsocket.getaddrinfo
>
> # urllib2 apparently uses this directly.  We need to cater for that.
> _fileobject = stdsocket._fileobject
>
>
> # Event Loop:
> def ManageSockets():
>     while len(sockets):
>         #print "event loop"
>         event.loop()
>         stackless.schedule()
>
>     managerRunning = False
>
> def StartManager():
>     global managerRunning
>     if not managerRunning:
>         managerRunning = True
>         stackless.tasklet(ManageSockets)()
>
> #
> # Replacement for standard socket() constructor.
> #
> def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
>     realSocket = stdsocket.socket(family, type, proto)
>     realSocket.setblocking(0)
>     evsock = evSocket(realSocket)
>     global sockets
>     sockets.append(evsock)
>     StartManager()
>     return evsock
>
>
> class evSocket(object):
>     """a pyEvent based socket proxy object"""
>
>     address = None
>     connected = False
>     sending = False
>     receiving = True
>     acceptChannel = None
>     recvChannel = None
>
>     def __init__(self, sock):
>         # Assert that we have a real socket, not a proxy object
>         if not isinstance(sock, stdsocket.socket):
>             raise StandardError("Invalid socket passed to dispatcher")
>
>         self.sock = sock
>         self._fileno = sock.fileno()
>         self.recvChannel = stackless.channel()
>         self.sendChannel = stackless.channel()
>
>     def __getattr__(self, attr):
>         if not attr.startswith('__'):
>             return getattr(self.sock, attr)
>
>     def makefile(self, mode='r', bufsize=-1):
>         return stdsocket._fileobject(self, mode, bufsize)
>
>     def accept(self):
>
>         # For some reason this option degrades performance
>         #self.sock.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR,
> 1)
>
>         if not self.acceptChannel:
>             self.acceptChannel = stackless.channel()
>
>         def cb(ev, sock, event_type, *arg):
>             s, a = self.sock.accept()
>             self.acceptChannel.send((s, a))
>
>         event.event(cb, handle=self.sock, evtype=event.EV_READ |
>                                                  event.EV_PERSIST).add()
>
>         return (self.acceptChannel.receive())
>
>     def bind(self, address):
>         self.address = address
>         return self.sock.bind(address)
>
>     def close(self):
>         stackless.tasklet(self.handle_close)()
>
>     def handle_close(self):
>
>         # XXX There just might be a better way to do this:
>         while self.receiving:
>             # Tweaking this value for performance has yet to be conclusive
> ;-)
>             sleep(.5)
>
>         self.connected = False
>         self.sending = False  # breaks the loop in sendall
>
>         global sockets
>         sockets.remove(self)
>         self._fileno = None
>         self.sock.close()
>
>         # XXX Am I forgetting anything here?
>
>         #Clear out all the channels with relevant errors.
>         while self.acceptChannel and self.acceptChannel.balance < 0:
>             self.acceptChannel.send_exception(error, 9, 'Bad file
> descriptor')
>
>         while self.recvChannel and self.recvChannel.balance < 0:
>             self.recvChannel.send("")
>
>     def connect(self, address):
>         while not self.connected:
>             err = self.sock.connect_ex(address)
>
>             if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
>                 stackless.schedule()
>                 continue
>
>             if err in (0, EISCONN):
>                 self.connected = True
>                 self.address = address
>             else:
>                 raise socket.error, (err, errorcode[err])
>
>     def connect_ex(self, address):
>         err = self.sock.connect_ex(address)
>
>         if err in (0, EISCONN):
>             self.connected = True
>             self.address = address
>
>         return err
>
>     def recv(self, byteCount):
>         self.receiving = True
>
>         def cb():
>             self.recvChannel.send(self.sock.recv(byteCount))
>
>         data = ""
>         if len(data) < byteCount:
>             try:
>                 event.read(self.sock, cb)
>                 data += self.recvChannel.receive()
>             except socket.error, error:
>                 if error[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
>                 self.close()
>                 break
>             else:
>                 raise
>
>         self.receiving = False
>         return data
>
>
>     def recvfrom(self, byteCount):
>         if self.socket.type == SOCK_STREAM:
>             return (self.recv(byteCount), None)
>         else:
>             return (self.recv(byteCount), address)
>
>     def send(self, data):
>         def cb():
>             try:
>                 self.sendChannel.send(self.sock.send(data))
>             except stdsocket.error, err:
>                 if err[0] == EWOULDBLOCK:
>                     return 0
>                 else:
>                     raise
>                 return 0
>
>         event.write(self.sock, cb)
>         return self.sendChannel.receive()
>
>     def sendall(self, data):
>         stackless.tasklet(handle_sendall)(data)
>
>     def handle_sendall(self, data):
>         self.sending = True
>         while data and self.sending:
>             sent = self.send(data)
>             data = data[sent + 1:]
>             stackless.schedule()
>         self.sending = False
>
>     def sendto(self, data, address):
>         def cb():
>             try:
>                 self.sendChannel(self.sock.sendto(data, address))
>             except stdsocket.error, err:
>                 if err[0] == EWOULDBLOCK:
>                     return 0
>                 else:
>                     raise
>                 return 0
>
>         event.write(self.sock, cb)
>         return self.sendChannel.receive()
>
>
> _____evsocket.py_________________________________________________________________
>
> --
> Phoenix Sol
> 541-646-8612
> 130 'A' Street
> Ashland, Oregon
> 97520




-- 
Phoenix Sol
541-646-8612
130 'A' Street
Ashland, Oregon
97520
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.stackless.com/pipermail/stackless/attachments/20080317/2507e67b/attachment.htm>


More information about the Stackless mailing list