# socketlibevent.py - MIT License # phoenix@burninglabs.com # # Non-blocking socket I/O for Stackless Python using libevent, via pyevent. # # Usage: # import sys, socketlibevent; sys.modules['socket'] = socketlibevent # # Based on Richard Tew's stacklesssocket module. # Uses Dug Song's pyevent. # # Thanks a Heap ! import stackless import sys import errno import weakref import signal import socket as stdsocket try: import event except: try: import rel; rel.override() import event except: print "please install libevent and pyevent" # http://code.google.com/p/pyevent/ print "(or 'stackless ez_setup.py rel' for quick testing)" # http://code.google.com/p/registeredeventlistener/ sys.exit() # For SSL support, this module uses the 'ssl' module (built in from 2.6 up): # ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/) try: import ssl as ssl_ ssl_enabled = True except: ssl_enabled = False # import everything from the regular socket module if __name__ == '__main__': globals().update(stdsocket.__dict__) __name__ = '__main__' else: globals().update(stdsocket.__dict__) _GLOBAL_DEFAULT_TIMEOUT = 2 setdefaulttimeout(_GLOBAL_DEFAULT_TIMEOUT) class _fileobject(stdsocket._fileobject): def close(self): try: if self._sock: self.flush() finally: if self._close: self._sock.close() self._sock = None # simple decorator to run a function in a tasklet def tasklet(task): def run(*args, **kwargs): stackless.tasklet(task)(*args, **kwargs) return run # Event Loop Management loop_running = False sockets = weakref.WeakValueDictionary() @tasklet def eventLoop(): global loop_running global event_errors while len(sockets) is not 0: # If there are other tasklets scheduled: # use the nonblocking loop # else: use the blocking loop event.loop( stackless.getruncount() > 2 ) # main tasklet + this one stackless.schedule() loop_running = False def abort(): print '\nKeyboardInterrupt' sys.exit() def runEventLoop(): global loop_running if not loop_running: event.init() event.signal(signal.SIGINT, abort) event.signal(signal.SIGQUIT, abort) eventLoop() loop_running = True # Replacement Socket Module Functions def socket(family=AF_INET, type=SOCK_STREAM, proto=0): return evsocket(stdsocket.socket(family, type, proto)) def create_connection(address, timeout=None): if timeout is None: timeout = getdefaulttimeout() s = socket() s.connect(address, timeout) return s def ssl(sock, keyfile=None, certfile=None): if ssl_enabled: return evsocketssl(sock, keyfile, certfile) else: raise RuntimeError(\ "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'") # Socket Proxy Class class evsocket(): # XXX Not all socketobject methods are implemented! # XXX Currently, the sockets are using the default, blocking mode. def __init__(self, sock): self.sock = sock sock.setblocking(0) self.accepting = False self.connected = False self.remote_addr = None self.fileobject = None self.read_channel = stackless.channel() self.write_channel = stackless.channel() self.accept_channel = None self.connect_channel = None global sockets sockets[id(self)] = self runEventLoop() def __getattr__(self, attr): return getattr(self.sock, attr) def listen(self, backlog=5): self.accepting = True return self.sock.listen(backlog) def accept(self): if not self.accept_channel: self.accept_channel = stackless.channel() event.event(self.handle_accept, handle=self.sock, evtype=event.EV_READ | event.EV_PERSIST).add() return self.accept_channel.receive() @tasklet def handle_accept(self, ev, sock, event_type, *arg): try: s, a = self.sock.accept() s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1) except Exception, e: self.accept_channel.send_exception(type(e), *e.args) else: s = evsocket(s) self.accept_channel.send((s,a)) @tasklet def handle_connect(self, ev, sock, event_type, *arg): if event_type == event.EV_TIMEOUT: self.connect_channel.send_exception( stdsocket.error, errno.ETIMEDOUT, errno.errorcode[errno.ETIMEDOUT]) else: self.connect_channel.send(None) def connect(self, address, timeout=None): if timeout is None: timeout = getdefaulttimeout() err = self.sock.connect_ex(address) if err == 0: self.connected = True self.remote_addr = address elif err == errno.EINPROGRESS: if self.connect_channel is None: self.connect_channel = stackless.channel() event.event(self.handle_connect, handle=self.sock, evtype=event.EV_WRITE).add(timeout) self.connect_channel.receive() else: raise stdsocket.error(err, errno.errorcode[err]) def send(self, data, *args): event.write(self.sock, self.handle_send, data) return self.write_channel.receive() @tasklet def handle_send(self, data): if self.write_channel.balance < 0: self.write_channel.send(self.sock.send(data)) def sendall(self, data, *args): while data: sent = self.send(data) data = data[sent:] def recv(self, bytes, *args): event.read(self.sock, self.handle_recv, bytes) return self.read_channel.receive() @tasklet def handle_recv(self, bytes): print 'recv check' if self.read_channel.balance < 0: print 'recv start' print self.read_channel.balance self.read_channel.send(self.sock.recv(bytes)) print 'recv end' # TODO def recvfrom(self, bytes, *args): event.read(self.sock, self.handle_recv, bytes) return self.read_channel.receive() # TODO @tasklet def handle_recvfrom(self, bytes): self.read_channel.send(self.sock.recvfrom(bytes)) def makefile(self, mode='r', bufsize=-1): if self.fileobject is None: self.close = lambda : None self.fileobject = _fileobject(self, mode, bufsize) return self.fileobject def close(self): self.sock.close() # SSL Proxy Class class evsocketssl(evsocket): def __init__(self, sock, keyfile=None, certfile=None): if certfile: server_side = True else: server_side = False # XXX This currently performs a BLOCKING handshake operation # TODO Implement a non-blocking handshake self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side) @tasklet def handle_accept(self, ev, sock, event_type, *arg): s, a = self.sock.accept() s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1) s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1) s = evsocketssl(s) self.accept_channel.send((s,a)) if __name__ == "__main__": sys.modules["socket"] = __import__(__name__) import urllib2 import gc @tasklet def test(url): print "url read", url print urllib2.urlopen(url).read(32) test('http://localhost') test('http://yahoo.com') stackless.run() #gc.get_count()