[Stackless-checkins] r51687 - in stackless/sandbox: examples examples/mud.py examples/rpc.py examples/stacklesssocket.py

richard.tew python-checkins at python.org
Sat Sep 2 16:10:55 CEST 2006


Author: richard.tew
Date: Sat Sep  2 16:10:54 2006
New Revision: 51687

Added:
   stackless/sandbox/
   stackless/sandbox/examples/
   stackless/sandbox/examples/mud.py
   stackless/sandbox/examples/rpc.py
   stackless/sandbox/examples/stacklesssocket.py
Log:
Copied my Stackless networking examples from the wiki, so they can be fixed as need be with version control.

Added: stackless/sandbox/examples/mud.py
==============================================================================
--- (empty file)
+++ stackless/sandbox/examples/mud.py	Sat Sep  2 16:10:54 2006
@@ -0,0 +1,163 @@
+#
+# A very simple MUD server based on the Stackless compatible sockets.
+#
+# Author: Richard Tew <richard.m.tew at gmail.com>
+#
+# This code was written to serve as an example of Stackless Python usage.
+# Feel free to email me with any questions, comments, or suggestions for
+# improvement.
+#
+# - Could possibly be even simpler if the TelnetConnection class were removed
+#   and read/readline were done locally to the MUD code.  Then disconnection
+#   detection could be built into the stacklesssocket.dispatcher class.
+#
+
+import stackless
+import stacklesssocket as socket
+import random, time
+import traceback
+
+class TelnetConnection(socket.dispatcher):
+    echo = False
+    connectionID = None
+    disconnectChannel = None
+
+    def read(self): # TELNET
+        ret = self.readChannel.receive()
+        if self.echo:
+            if ret == '\x08':
+                self.send(ret+" ")
+            self.send(ret)
+        return ret
+
+    def readline(self): # TELNET
+        buf = self.readBuffer
+
+        while True:
+            if buf.find('\r\n') > -1:
+                i = buf.index('\r\n')
+                ret = buf[:i+2]
+                self.readBuffer = buf[i+2:]
+                while '\x08' in ret:
+                    i = ret.index('\x08')
+                    if i == 0:
+                        ret = ret[1:]
+                    else:
+                        ret = ret[:i-1]+ret[i+1:]
+                return ret
+
+            buf += self.read()
+
+    def close(self):
+        # Notify the server.
+        if self.disconnectChannel is not None:
+            stackless.tasklet(self.disconnectChannel.send)(self)
+            self.disconnectChannel = None
+        # Do the standard socket closure handling.
+        socket.dispatcher.close(self)
+
+def RunServer(host, port):
+    listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    listenSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    listenSocket.bind((host, port))
+    listenSocket.listen(5)
+
+    # Connecting sockets should be wrapped in TelnetConnection, rather than
+    # stacklesssocket.dispatcher (the default wrapper).
+    def wrap_accept_socket(currentSocket):
+        return TelnetConnection(currentSocket)
+    listenSocket.wrap_accept_socket = wrap_accept_socket
+
+    nextConnectionID = 1
+    infoByConnectionID = {}
+    disconnectChannel = stackless.channel()
+    stackless.tasklet(MonitorDisconnections)(disconnectChannel, infoByConnectionID)
+
+    print "Accepting connections on", host, port
+    try:
+        while listenSocket.accepting:
+            clientSocket, clientAddress = listenSocket.accept()
+            clientSocket = TelnetConnection(clientSocket.socket)
+            clientSocket.disconnectChannel = disconnectChannel
+            clientSocket.connectionID = nextConnectionID
+            print "Received connection #", clientSocket.connectionID, "from", clientAddress
+            infoByConnectionID[clientSocket.connectionID] = clientAddress, clientSocket
+            nextConnectionID += 1
+            stackless.tasklet(MonitorIncomingConnection)(clientSocket, clientAddress, infoByConnectionID)
+            stackless.schedule()
+    except socket.error:
+        print "RunServer.error"
+        traceback.print_exc()
+    print "RunServer.exit"
+
+def MonitorDisconnections(disconnectChannel, infoByConnectionID):
+    print "MonitorDisconnections"
+    try:
+        while True:
+            clientSocket = disconnectChannel.receive()
+            print "Received disconnection of #", clientSocket.connectionID, "from", infoByConnectionID[clientSocket.connectionID][0]
+            del infoByConnectionID[clientSocket.connectionID]
+    except socket.error:
+        print "MonitorDisconnections.error"
+        traceback.print_exc()
+    print "MonitorDisconnections.exit"
+
+def MonitorIncomingConnection(clientSocket, clientAddress, infoByConnectionID):
+    clientSocket.send("Welcome to a basic Stackless Python MUD server.\r\n")
+    try:
+        while clientSocket.connected:
+            clientSocket.send("> ")
+            line = clientSocket.readline()[:-2].strip()
+            words = [ word.strip() for word in line.split(" ") ]
+            verb = words[0]
+
+            if verb == "look":
+                clientSocket.send("There are %d users connected:\r\n" % len(infoByConnectionID))
+                clientSocket.send("Name\tHost\t\tPort\r\n")
+                clientSocket.send("-" * 40 +"\r\n")
+                for clientAddress2, clientSocket2 in infoByConnectionID.itervalues():
+                    clientSocket.send("Unknown\t"+ str(clientAddress2[0]) +"\t"+ str(clientAddress2[1]) +"\r\n")
+            elif verb == "say":
+                line = line[4:]
+                secondPartyPrefix = "Someone says: "
+                for clientAddress2, clientSocket2 in infoByConnectionID.itervalues():
+                    if clientSocket2 is clientSocket:
+                        prefix = "You say: "
+                    else:
+                        prefix = secondPartyPrefix
+                    clientSocket2.send(prefix + "\"%s\"\r\n" % line)
+            elif verb == "quit":
+                clientSocket.close()
+            elif verb == "help":
+                clientSocket.send("Commands:\r\n")
+                for verb in [ "look", "say", "quit", "help" ]:
+                    clientSocket.send("  "+ verb +"\r\n")
+            else:
+                clientSocket.send("Unknown command.  Type 'help' to see a list of available commands.\r\n")
+
+            stackless.schedule()
+    except socket.error:
+        print "MonitorIncomingConnection.socket.error"
+        traceback.print_exc()
+
+if __name__ == "__main__":
+    host = "127.0.0.1"
+    port = 3000
+
+    # We do not want this to start as we will run it ourselves.
+    socket.managerRunning = True
+
+    t = stackless.tasklet(RunServer)(host, port)
+    # ManageSockets will exit if there are no sockets existing, so
+    # by running this tasklet once, we will get the listen socket in
+    # place before we invoke ManageSockets to run.
+    t.run()
+
+    try:
+        socket.ManageSockets()
+    except KeyboardInterrupt:
+        print "** Detected ctrl-c in the console"
+    except:
+        print "main error"
+        traceback.print_exc()
+    print "EXIT"

Added: stackless/sandbox/examples/rpc.py
==============================================================================
--- (empty file)
+++ stackless/sandbox/examples/rpc.py	Sat Sep  2 16:10:54 2006
@@ -0,0 +1,158 @@
+#
+# Remote procedure calls over sockets with Stackless Python.
+#
+# Author: Richard Tew <richard.m.tew at gmail.com>
+#
+# This code was written to serve as an example of Stackless Python usage.
+# Feel free to email me with any questions, comments, or suggestions for
+# improvement.
+#
+# With just a page of code and the replacement socket module that is
+# currently known as "stacklesssocket", it is possible to easily write
+# a straightforward remote procedure call layer over a socket.
+#
+
+import stackless
+import stacklesssocket as socket
+import types, struct, cPickle
+
+class EndPoint:
+    def __init__(self, epSocket):
+        self.socket = epSocket
+        self.callID = 0
+        self.channelsByCallID = {}
+        self.otherEnd = RemoteEndPoint(self)
+
+        stackless.tasklet(self.ManageSocket)()
+
+    def ManageSocket(self):
+        try:
+            self.ReceiveIncomingData()
+        except socket.error:
+            # Disconnection while blocking on a recv call.
+            return
+
+    def ReceiveIncomingData(self):
+        while self.socket.connected:
+            data = self.socket.recv(struct.calcsize("I"))
+            dataLength = struct.unpack("I", data)[0]
+            data = self.socket.recv(dataLength)
+            packet = cPickle.loads(data)
+            callID = packet[1]
+            if packet[0]:
+                channel = self.channelsByCallID[callID]
+                del self.channelsByCallID[callID]
+                channel.send(packet[2])
+            else:
+                ret = self.HandleIncomingCall(packet[2], packet[3], packet[4])
+                self.SendPacket(True, callID, ret)
+            stackless.schedule()
+
+    def HandleIncomingCall(self, name, args, kwargs):
+        if name.startswith("__") or hasattr(EndPoint, name):
+            return # Raise error?
+        method = getattr(self, name)
+        if type(method) is not types.MethodType:
+            return # Raise error?
+        return method(*args, **kwargs)
+
+    def RemoteCall(self, methodInfo):
+        self.callID += 1
+        callID = self.callID
+
+        channel = self.channelsByCallID[callID] = stackless.channel()
+        self.SendPacket(False, callID, methodInfo.name, methodInfo.args, methodInfo.kwargs)
+        return channel.receive()
+
+    def SendPacket(self, *bits):
+        data = cPickle.dumps(bits)
+        data = struct.pack("I", len(data)) + data
+        self.socket.send(data)
+
+class RemoteEndPoint:
+    def __init__(self, endpoint):
+        self.endpoint = endpoint
+
+    def __getattr__(self, name):
+        return RemoteFunction(self.endpoint, name)
+
+class RemoteFunction:
+    def __init__(self, endpoint, name):
+        self.endpoint = endpoint
+        self.name = name
+
+    def __call__(self, *args, **kwargs):
+        self.args = args
+        self.kwargs = kwargs
+
+        return self.endpoint.RemoteCall(self)
+
+if __name__ == "__main__":
+    # This test/example code is a little artificial, but it should both
+    # adequately test the RPC code above and demonstrate how easy it is
+    # to write this sort of code with Stackless Python.
+
+    class Server:
+        def __init__(self, address):
+            self.socket = listenSocket = socket.socket()
+            listenSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            listenSocket.bind(address)
+            listenSocket.listen(5)
+
+            self.endPoints = []
+
+            stackless.tasklet(self.ManageSocket)(listenSocket)
+
+        def ManageSocket(self, listenSocket):
+            try:
+                while listenSocket.accepting:
+                    epSocket, clientAddress = listenSocket.accept()
+                    endPoint = ServerEndPoint(epSocket)
+                    self.endPoints.append(endPoint)
+            except socket.error:
+                pass # Listen socket disconnected.  Our job is done.
+
+            if listenSocket.accepting:
+                listenSocket.close()
+            self.endPoints = []
+
+    class ClientEndPoint(EndPoint):
+        def Hello(self):
+            return "Client Hello!"
+
+    class ServerEndPoint(EndPoint):
+        def Hello(self):
+            return "Server Hello!"
+
+    address = "127.0.0.1", 3000
+
+    # Start the server.
+    server = Server(address)
+
+    clientSocket = socket.socket()
+    clientSocket.connect(address)
+
+    # Then connect the client.
+    client = ClientEndPoint(clientSocket)
+
+    def ClientTasklet(client, server, clientSocket):
+        # Tell the server hello.
+        ret = client.otherEnd.Hello()
+        print "  CLIENT GOT", ret
+
+        stackless.tasklet(ServerTasklet)(server)
+        while server.socket.connected:
+            stackless.schedule()
+        clientSocket.close()
+
+    def ServerTasklet(server):
+        # Tell all the clients hello.
+        for endpoint in server.endPoints:
+            ret = endpoint.otherEnd.Hello()
+            print "  SERVER GOT", ret, "FROM CLIENT"
+        server.socket.close()
+
+    stackless.tasklet(ClientTasklet)(client, server, clientSocket)
+    stackless.run()
+
+    print "Done"

Added: stackless/sandbox/examples/stacklesssocket.py
==============================================================================
--- (empty file)
+++ stackless/sandbox/examples/stacklesssocket.py	Sat Sep  2 16:10:54 2006
@@ -0,0 +1,235 @@
+#
+# Stackless compatible socket module:
+#
+# Author: Richard Tew <richard.m.tew at gmail.com>
+#
+# This code was written to serve as an example of Stackless Python usage.
+# Feel free to email me with any questions, comments, or suggestions for
+# improvement.
+#
+# This wraps the asyncore module and the dispatcher class it provides in order
+# write a socket module replacement that uses channels to allow calls to it to
+# block until a delayed event occurs.
+#
+# Not all aspects of the socket module are provided by this file.  Examples of
+# it in use can be seen at the bottom of this file.
+#
+# NOTE: Versions of the asyncore module from Python 2.4 or later include bug
+#       fixes and earlier versions will not guarantee correct behaviour.
+#       Specifically, it monitors for errors on sockets where the version in
+#       Python 2.3.3 does not.
+#
+
+# Possible improvements:
+# - More correct error handling.  When there is an error on a socket found by
+#   poll, there is no idea what it actually is.
+# - Launching each bit of incoming data in its own tasklet on the readChannel
+#   send is a little over the top.  It should be possible to add it to the
+#   rest of the queued data
+
+import stackless
+import asyncore
+import socket as stdsocket # We need the "socket" name for the function we export.
+
+# If we are to masquerade as the socket module, we need to provide the constants.
+for k, v in stdsocket.__dict__.iteritems():
+    if k.upper() == k:
+        locals()[k] = v
+error = stdsocket.error
+
+managerRunning = False
+
+def ManageSockets():
+    global managerRunning
+
+    while len(asyncore.socket_map):
+        # Check the sockets for activity.
+        asyncore.poll(0.0)
+        # Yield to give other tasklets a chance to be scheduled.
+        stackless.schedule()
+
+    managerRunning = False
+
+def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
+    global managerRunning
+
+    currentSocket = stdsocket.socket(family, type, proto)
+    ret = dispatcher(currentSocket)
+    # Ensure that the sockets actually work.
+    if not managerRunning:
+        managerRunning = True
+        stackless.tasklet(ManageSockets)()
+    return ret
+
+class dispatcher(asyncore.dispatcher):
+    def __init__(self, sock):
+        # This is worth doing.  I was passing in an invalid socket which was
+        # an instance of dispatcher and it was causing tasklet death.
+        if not isinstance(sock, stdsocket.socket):
+            raise StandardError("Invalid socket passed to dispatcher")
+
+        asyncore.dispatcher.__init__(self, sock)
+
+        self.acceptChannel = stackless.channel()
+        self.connectChannel = stackless.channel()
+        self.readChannel = stackless.channel()
+
+        self.readBuffer = ''
+        self.outBuffer = ''
+
+    def writable(self):
+        return (not self.connected) or len(self.outBuffer)
+
+    def accept(self):
+        return self.acceptChannel.receive()
+
+    def connect(self, address):
+        asyncore.dispatcher.connect(self, address)
+        if not self.connected:
+            self.connectChannel.receive()
+
+    def send(self, data):
+        self.outBuffer += data
+        return len(data)
+
+    # Read at most byteCount bytes.
+    def recv(self, byteCount):
+        if len(self.readBuffer) < byteCount:
+            self.readBuffer += self.readChannel.receive()
+        ret = self.readBuffer[:byteCount]
+        self.readBuffer = self.readBuffer[byteCount:]
+        return ret
+
+    def close(self):
+        asyncore.dispatcher.close(self)
+        self.connected = False
+        self.accepting = False
+
+        # Clear out all the channels with relevant errors.
+        while self.acceptChannel.balance < 0:
+            self.acceptChannel.send_exception(error, 9, 'Bad file descriptor')
+        while self.connectChannel.balance < 0:
+            self.connectChannel.send_exception(error, 10061, 'Connection refused')
+        while self.readChannel.balance < 0:
+            self.readChannel.send_exception(error, 10054, 'Connection reset by peer')
+
+    def handle_accept(self):
+        if self.acceptChannel.balance < 0:
+            currentSocket, clientAddress = asyncore.dispatcher.accept(self)
+            currentSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+            # Give them the asyncore based socket, not the standard one.
+            currentSocket = self.wrap_accept_socket(currentSocket)
+            stackless.tasklet(self.acceptChannel.send)((currentSocket, clientAddress))
+
+    # Inform the blocked connect call that the connection has been made.
+    def handle_connect(self):
+        self.connectChannel.send(None)
+
+    # Just close the socket.  That should send kind of relevant errors to waiting calls.
+    def handle_close(self):
+        self.close()
+
+    # Some error, just close the channel and let that raise errors to blocked calls.
+    def handle_expt(self):
+        self.close()
+
+    def handle_read(self):
+        buf = asyncore.dispatcher.recv(self, 20000)
+        stackless.tasklet(self.readChannel.send)(buf)
+
+    def handle_write(self):
+        if len(self.outBuffer):
+            sentBytes = asyncore.dispatcher.send(self, self.outBuffer[:512])
+            self.outBuffer = self.outBuffer[sentBytes:]
+
+    # In order for incoming connections to be stackless compatible, they need to be
+    # wrapped by an asyncore based dispatcher subclass.
+    def wrap_accept_socket(self, currentSocket):
+        return dispatcher(currentSocket)
+
+
+if __name__ == '__main__':
+    import struct
+    # Test code goes here.
+    testAddress = "127.0.0.1", 3000
+    info = -12345678
+    data = struct.pack("i", info)
+    dataLength = len(data)
+
+    print "creating listen socket"
+    def ManageListener(address):
+        global info, data, dataLength
+
+        listenSocket = socket(AF_INET, SOCK_STREAM)
+        listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+        listenSocket.bind(address)
+        listenSocket.listen(5)
+        # We will have to manually close the listening socket, at which point it
+        # should be the last one left and we want the socket manager tasklet to
+        # exit then.
+
+        NUM_TESTS = 2
+
+        i = 1
+        while i < NUM_TESTS + 1:
+            # No need to schedule this tasklet as the accept should yield most
+            # of the time on the underlying channel.
+            print "waiting for connection test", i
+            currentSocket, clientAddress = listenSocket.accept()
+            print "received connection", i, "from", clientAddress
+
+            if i == 1:
+                currentSocket.close()
+            elif i == 2:
+                print "server test", i, "send"
+                currentSocket.send(data)
+                try:
+                    print "server test", i, "recv"
+                    currentSocket.recv(4)
+                    break
+                except error:
+                    pass
+            else:
+                currentSocket.close()
+
+            print "server test", i, "OK"
+            i += 1
+
+        if i != NUM_TESTS+1:
+            print "server: FAIL", i
+        else:
+            print "server: OK", i
+
+        listenSocket.close()
+
+    def TestClientConnections(address):
+        global info, data, dataLength
+
+        # Attempt 1:
+        clientSocket = socket()
+        clientSocket.connect(address)
+        print "client connection", 1, "waiting to recv"
+        try:
+            clientSocket.recv(5)
+            print "client test", 1, "FAIL"
+            return
+        except error:
+            pass
+        print "client test", 1, "OK"
+
+        # Attempt 2:
+        clientSocket = socket()
+        clientSocket.connect(address)
+        print "client connection", 2, "waiting to recv"
+        s = clientSocket.recv(dataLength)
+        t = struct.unpack("i", s)
+        if t[0] == info:
+            print "client test", 2, "OK"
+        else:
+            print "client test", 2, "FAIL"
+        clientSocket.close()
+
+    stackless.tasklet(ManageListener)(testAddress)
+    stackless.tasklet(TestClientConnections)(testAddress)
+    stackless.run()
+    print "result: SUCCESS"

_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins



More information about the Stackless-checkins mailing list