[Stackless] a pyevent-based socket proxy module
Phoenix Sol
phoenix at burninglabs.com
Mon Mar 17 06:22:09 CET 2008
Greetings all,
I started on a pyEvent-based socket proxy module.
It's still rough and could use a lot of love, but so far I have it working
with urllib, and urllib2 clients (no client benchmarks yet...),
and using a variant of Richard's basicWebserver.py ( with the delays
removed, lol ) it handles ~505 requests per second on my old
sempron 2800 with "ab -n 10000 -c 500 http://localhost:8080/". This is, of
course, with apachebench running on the same box and
all kinds of other things running. The same code and benchmark reveals ~81
requests per second using stacklesssocket. Hopefully
this makes it interesting enough for some of you fine hackers to step up and
contribute.
I will be testing it on Amazon EC2 soon, with more serious applications
(custom code built using the cherrypy WSGI server,
very similar to Arnar's stacklesswsgi example). Let me know if you are
interested in the results.
If anyone has the time and the inclination, please look this over and
contribute if possible. It's much appreciated =)
The source follows below, and is also included as a file attachment.
Take care,
--
Phoenix Sol
541-646-8612
130 'A' Street
Ashland, Oregon
97520
____evsocket.py___________________________________________________________________________________________________
################################################################################
#
# A libevent/ pyevent based Stackless-compatible socket module
#
# (With much (inspired by | taken from) Richard Tew's stacklesssocket
# and Sam Rushing's Asyncore)
#
# No License; No Warranty; Nothing...
#
# Do with it what you will, but please share any improvements.
#
# phoenix at burninglabs.com
#
################################################################################
MODULE_NAME = 'evsocket'
VERSION = '0.1'
AUTHOR = 'Phoenix Sol'
LICENSE = 'Nihilist'
import traceback
import stackless
import event
import socket as stdsocket
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, errorcode
sockets = []
managerRunning = False
# Arnar Birgisson's neat little sleep function ;-)
def sleep(seconds):
def wakeup(ch):
ch.send(None)
ch = stackless.channel()
event.timeout(seconds, wakeup, ch)
ch.receive()
# If we are to masquerade as the socket module, we need to provide the
constants.
if "__all__" in stdsocket.__dict__:
__all__ = stdsocket.__dict__
for k, v in stdsocket.__dict__.iteritems():
if k in __all__:
globals()[k] = v
else:
for k, v in stdsocket.__dict__.iteritems():
if k.upper() == k:
globals()[k] = v
error = stdsocket.error
timeout = stdsocket.timeout
# WARNING: this function blocks and is not thread safe.
# The only solution is to spawn a thread to handle all
# getaddrinfo requests. Implementing a stackless DNS
# lookup service is only second best as getaddrinfo may
# use other methods.
getaddrinfo = stdsocket.getaddrinfo
# urllib2 apparently uses this directly. We need to cater for that.
_fileobject = stdsocket._fileobject
# Event Loop:
def ManageSockets():
while len(sockets):
#print "event loop"
event.loop()
stackless.schedule()
managerRunning = False
def StartManager():
global managerRunning
if not managerRunning:
managerRunning = True
stackless.tasklet(ManageSockets)()
#
# Replacement for standard socket() constructor.
#
def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
realSocket = stdsocket.socket(family, type, proto)
realSocket.setblocking(0)
evsock = evSocket(realSocket)
global sockets
sockets.append(evsock)
StartManager()
return evsock
class evSocket(object):
"""a pyEvent based socket proxy object"""
address = None
connected = False
sending = False
receiving = True
acceptChannel = None
recvChannel = None
def __init__(self, sock):
# Assert that we have a real socket, not a proxy object
if not isinstance(sock, stdsocket.socket):
raise StandardError("Invalid socket passed to dispatcher")
self.sock = sock
self._fileno = sock.fileno()
self.readBufferString = ''
self.recvChannel = stackless.channel()
self.sendChannel = stackless.channel()
def __getattr__(self, attr):
if not attr.startswith('__'):
return getattr(self.sock, attr)
def makefile(self, mode='r', bufsize=-1):
return stdsocket._fileobject(self, mode, bufsize)
def accept(self):
if not self.acceptChannel:
self.acceptChannel = stackless.channel()
def cb(ev, sock, event_type, *arg):
s, a = self.sock.accept()
self.acceptChannel.send((s, a))
ev = event.event(cb, handle=self.sock, evtype=event.EV_READ |
event.EV_PERSIST)
ev.add()
return (self.acceptChannel.receive())
def bind(self, address):
self.address = address
return self.sock.bind(address)
def close(self):
stackless.tasklet(self.handle_close)()
def handle_close(self):
# XXX There just might be a better way to do this:
while self.receiving:
sleep(.2)
self.connected = False
self.sending = False # breaks the loop in sendall
global sockets
sockets.remove(self)
self._fileno = None
self.sock.close()
# XXX Am I forgetting anything here?
#Clear out all the channels with relevant errors.
while self.acceptChannel and self.acceptChannel.balance < 0:
self.acceptChannel.send_exception(error, 9, 'Bad file
descriptor')
while self.recvChannel and self.recvChannel.balance < 0:
self.recvChannel.send("")
def connect(self, address):
while not self.connected:
err = self.sock.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
stackless.schedule()
continue
if err in (0, EISCONN):
self.connected = True
self.address = address
else:
raise socket.error, (err, errorcode[err])
def connect_ex(self, address):
err = self.sock.connect_ex(address)
if err in (0, EISCONN):
self.connected = True
self.address = address
return err
def recv(self, byteCount):
self.receiving = True
def cb():
self.recvChannel.send(self.sock.recv(byteCount))
data = ""
if len(data) < byteCount:
try:
event.read(self.sock, cb)
data += self.recvChannel.receive()
except:
# XXX Add some handling here
traceback.print_exc()
self.receiving = False
return data
def recvfrom(self, byteCount):
if self.socket.type == SOCK_STREAM:
return (self.recv(byteCount), None)
else:
return (self.recv(byteCount), address)
def send(self, data):
def cb():
try:
self.sendChannel.send(self.sock.send(data))
except stdsocket.error, err:
if err[0] == EWOULDBLOCK:
return 0
else:
raise
return 0
ev = event.write(self.sock, cb)
return self.sendChannel.receive()
def sendall(self, data):
self.sending = True
while data and self.sending:
sent = self.send(data)
data = data[sent + 1:]
stackless.schedule()
def sendto(self, data, address):
def cb():
try:
self.sendChannel(self.sock.sendto(data, address))
except stdsocket.error, err:
if err[0] == EWOULDBLOCK:
return 0
else:
raise
return 0
ev = event.write(self.sock, cb)
return self.sendChannel.receive()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.stackless.com/pipermail/stackless/attachments/20080316/3247e645/attachment-0001.htm
-------------- next part --------------
A non-text attachment was scrubbed...
Name: evsocket.py
Type: text/x-python
Size: 7129 bytes
Desc: not available
Url : http://www.stackless.com/pipermail/stackless/attachments/20080316/3247e645/attachment-0001.py
More information about the Stackless
mailing list