[Stackless] a pyevent-based socket proxy module

Phoenix Sol phoenix at burninglabs.com
Mon Mar 17 06:22:09 CET 2008


Greetings all,

I started on a pyEvent-based socket proxy module.

It's still rough and could use a lot of love, but so far I have it working
with urllib, and urllib2 clients (no client benchmarks yet...),
and using a variant of Richard's basicWebserver.py ( with the delays
removed, lol ) it handles ~505 requests per second on my old
sempron 2800 with "ab -n 10000 -c 500 http://localhost:8080/". This is, of
course, with apachebench running on the same box and
all kinds of other things running. The same code and benchmark reveals ~81
requests per second using stacklesssocket. Hopefully
this makes it interesting enough for some of you fine hackers to step up and
contribute.

I will be testing it on Amazon EC2 soon, with more serious applications
(custom code built using the cherrypy WSGI server,
very similar to Arnar's stacklesswsgi example). Let me know if you are
interested in the results.

If anyone has the time and the inclination, please look this over and
contribute if possible. It's much appreciated =)
The source follows below, and is also included as a file attachment.

Take care,

-- 
Phoenix Sol
541-646-8612
130 'A' Street
Ashland, Oregon
97520


____evsocket.py___________________________________________________________________________________________________

################################################################################
#
#   A libevent/ pyevent based Stackless-compatible socket module
#
#   (With much (inspired by | taken from) Richard Tew's stacklesssocket
#     and Sam Rushing's Asyncore)
#
#   No License; No Warranty; Nothing...
#
#   Do with it what you will, but please share any improvements.
#
#   phoenix at burninglabs.com
#
################################################################################


MODULE_NAME = 'evsocket'
VERSION = '0.1'
AUTHOR = 'Phoenix Sol'
LICENSE = 'Nihilist'


import traceback
import stackless
import event
import socket as stdsocket

from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
     ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, errorcode


sockets = []
managerRunning = False


# Arnar Birgisson's neat little sleep function ;-)
def sleep(seconds):
    def wakeup(ch):
        ch.send(None)
    ch = stackless.channel()
    event.timeout(seconds, wakeup, ch)
    ch.receive()


# If we are to masquerade as the socket module, we need to provide the
constants.
if "__all__" in stdsocket.__dict__:
    __all__ = stdsocket.__dict__
    for k, v in stdsocket.__dict__.iteritems():
        if k in __all__:
            globals()[k] = v
else:
    for k, v in stdsocket.__dict__.iteritems():
        if k.upper() == k:
            globals()[k] = v
    error = stdsocket.error
    timeout = stdsocket.timeout
    # WARNING: this function blocks and is not thread safe.
    # The only solution is to spawn a thread to handle all
    # getaddrinfo requests.  Implementing a stackless DNS
    # lookup service is only second best as getaddrinfo may
    # use other methods.
    getaddrinfo = stdsocket.getaddrinfo

# urllib2 apparently uses this directly.  We need to cater for that.
_fileobject = stdsocket._fileobject


# Event Loop:
def ManageSockets():
    while len(sockets):
        #print "event loop"
        event.loop()
        stackless.schedule()

    managerRunning = False

def StartManager():
    global managerRunning
    if not managerRunning:
        managerRunning = True
        stackless.tasklet(ManageSockets)()

#
# Replacement for standard socket() constructor.
#
def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
    realSocket = stdsocket.socket(family, type, proto)
    realSocket.setblocking(0)
    evsock = evSocket(realSocket)
    global sockets
    sockets.append(evsock)
    StartManager()
    return evsock


class evSocket(object):
    """a pyEvent based socket proxy object"""

    address = None
    connected = False
    sending = False
    receiving = True
    acceptChannel = None
    recvChannel = None

    def __init__(self, sock):
        # Assert that we have a real socket, not a proxy object
        if not isinstance(sock, stdsocket.socket):
            raise StandardError("Invalid socket passed to dispatcher")

        self.sock = sock
        self._fileno = sock.fileno()
        self.readBufferString = ''
        self.recvChannel = stackless.channel()
        self.sendChannel = stackless.channel()

    def __getattr__(self, attr):
        if not attr.startswith('__'):
            return getattr(self.sock, attr)

    def makefile(self, mode='r', bufsize=-1):
        return stdsocket._fileobject(self, mode, bufsize)

    def accept(self):
        if not self.acceptChannel:
            self.acceptChannel = stackless.channel()

        def cb(ev, sock, event_type, *arg):
            s, a = self.sock.accept()
            self.acceptChannel.send((s, a))

        ev = event.event(cb, handle=self.sock, evtype=event.EV_READ |
                                               event.EV_PERSIST)
        ev.add()

        return (self.acceptChannel.receive())

    def bind(self, address):
        self.address = address
        return self.sock.bind(address)

    def close(self):
        stackless.tasklet(self.handle_close)()

    def handle_close(self):

        # XXX There just might be a better way to do this:
        while self.receiving:
            sleep(.2)

        self.connected = False
        self.sending = False  # breaks the loop in sendall

        global sockets
        sockets.remove(self)
        self._fileno = None
        self.sock.close()

        # XXX Am I forgetting anything here?

        #Clear out all the channels with relevant errors.
        while self.acceptChannel and self.acceptChannel.balance < 0:
            self.acceptChannel.send_exception(error, 9, 'Bad file
descriptor')

        while self.recvChannel and self.recvChannel.balance < 0:
            self.recvChannel.send("")

    def connect(self, address):
        while not self.connected:
            err = self.sock.connect_ex(address)

            if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
                stackless.schedule()
                continue

            if err in (0, EISCONN):
                self.connected = True
                self.address = address
            else:
                raise socket.error, (err, errorcode[err])

    def connect_ex(self, address):
        err = self.sock.connect_ex(address)

        if err in (0, EISCONN):
            self.connected = True
            self.address = address

        return err

    def recv(self, byteCount):
        self.receiving = True

        def cb():
            self.recvChannel.send(self.sock.recv(byteCount))

        data = ""
        if len(data) < byteCount:
            try:
                event.read(self.sock, cb)
                data += self.recvChannel.receive()
            except:
                # XXX Add some handling here
                traceback.print_exc()

        self.receiving = False
        return data


    def recvfrom(self, byteCount):
        if self.socket.type == SOCK_STREAM:
            return (self.recv(byteCount), None)
        else:
            return (self.recv(byteCount), address)

    def send(self, data):
        def cb():
            try:
                self.sendChannel.send(self.sock.send(data))
            except stdsocket.error, err:
                if err[0] == EWOULDBLOCK:
                    return 0
                else:
                    raise
                return 0

        ev = event.write(self.sock, cb)
        return self.sendChannel.receive()

    def sendall(self, data):
        self.sending = True
        while data and self.sending:
            sent = self.send(data)
            data = data[sent + 1:]
            stackless.schedule()

    def sendto(self, data, address):
        def cb():
            try:
                self.sendChannel(self.sock.sendto(data, address))
            except stdsocket.error, err:
                if err[0] == EWOULDBLOCK:
                    return 0
                else:
                    raise
                return 0

        ev = event.write(self.sock, cb)
        return self.sendChannel.receive()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.stackless.com/pipermail/stackless/attachments/20080316/3247e645/attachment-0001.htm 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: evsocket.py
Type: text/x-python
Size: 7129 bytes
Desc: not available
Url : http://www.stackless.com/pipermail/stackless/attachments/20080316/3247e645/attachment-0001.py 


More information about the Stackless mailing list