[Stackless] Stackless + Twisted perspective broker example

Carlos Eduardo de Paula carlosedp at gmail.com
Wed Mar 7 15:16:06 CET 2007


I created a new example using Stackless with Twisted perspective broker.

>From Twisted docs, PB is an asynchronous, symmetric network protocol
for secure, remote method calls and transferring of objects.

Here follows a description of the example:

# This example uses Stackless together with Twisted Perspective Broker(PB)
# Perspective Broker (affectionately known as PB) is an asynchronous,
# symmetric network protocol for secure, remote method calls and
transferring of objects.
# PB has support for direct or authenticated sessions where the user
receives a "Perspective"
# containning the methods it could call.
#
# This example mimics the producer consumer example having the production
# queue (stack) in a server and the producers and consumers accessing it
# over the network using predefined exported methods.
#
# For more information on PB check
http://twistedmatrix.com/projects/core/documentation/howto/index.html
#
# The examples provided by Greg Hazel were used to allow the integration between
# tasklets and deferred calls. Also the sleep manager code from
Richard Tew to handle
# sleep requests.
#
# Author: Carlos Eduardo de Paula <carlosedp at gmail.com>

These examples will be checked in StacklessExamples project page under
examples/twisted.

Any commentaries are welcome!

Best regards,

Carlos
-------------- next part --------------
#
# This example uses Stackless together with Twisted Perspective Broker(PB)
# Perspective Broker (affectionately known as PB) is an asynchronous, 
# symmetric network protocol for secure, remote method calls and transferring of objects.
# PB has support for direct or authenticated sessions where the user receives a "Perspective"
# containning the methods it could call.
#
# This example mimics the producer consumer example having the production
# queue (stack) in a server and the producers and consumers accessing it
# over the network using predefined exported methods.
#
# For more information on PB check http://twistedmatrix.com/projects/core/documentation/howto/index.html
#
# The examples provided by Greg Hazel were used to allow the integration between
# tasklets and deferred calls. Also the sleep manager code from Richard Tew to handle
# sleep requests.
#
# Author: Carlos Eduardo de Paula <carlosedp at gmail.com>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# But a better place to discuss Stackless Python related matters is the
# mailing list:
#
#   http://www.tismer.com/mailman/listinfo/stackless
#

import stackless
import random
import time
from twisted.spread import pb
from twisted.internet import reactor, task
from twisted.cred import credentials

sleepingTasklets = []
def Sleep(secondsToWait):
    channel = stackless.channel()
    endTime = time.time() + secondsToWait
    sleepingTasklets.append((endTime, channel))
    sleepingTasklets.sort()
    # Block until we get sent an awakening notification.
    channel.receive()

def ManageSleepingTasklets():
    while 1:
        if len(sleepingTasklets):
            endTime = sleepingTasklets[0][0]
            if endTime <= time.time():
                channel = sleepingTasklets[0][1]
                del sleepingTasklets[0]
                # We have to send something, but it doesn't matter what as it is not used.
                channel.send(None)
            if len(sleepingTasklets) <= 0: 
                time.sleep(endTime-time.time())
        stackless.schedule()
        #if len(sleepingTasklets) <= 0: stackless.getcurrent().kill()
        if stackless.getruncount <= 1: stackless.getcurrent().kill()

class NWChannel(stackless.channel):
    '''
    Greg Hazel's twisted_yield.py example
    '''
    def send_nowait(self, v):
        if self.balance == 0:
            self.value = v
        else:
            self.send(v)

    def send_exception_nowait(self, type, value):
        if self.balance == 0:
            self.exc = (type, value)
        else:
            self.send_exception(type, value)        

    def receive(self):
        if hasattr(self, 'value'):
            v = self.value
            del self.value
            return v
        if hasattr(self, 'exc'):
            type, value = self.exc
            del self.exc
            raise type, value
        return stackless.channel.receive(self)

class Agent(object):
    '''
    This class is the base for the producer and consumer classes
    It contains all init stuff and execution skeleton.
    '''
    def __init__(self, name, login, password):
        self.me = stackless.tasklet(self.runAction)()
        self.ch = NWChannel()
        self.name = name
        self.items = 0
        self.time = random.random()
        self.login = login
        self.pwd = password

    def runAction(self):
        factory = pb.PBClientFactory()
        reactor.connectTCP("localhost", 8800, factory)
        def1 = factory.login(credentials.UsernamePassword(self.login, self.pwd))
        def1.addCallback(self.good, self.me, self.ch)
        def1.addErrback(self.bad, self.me, self.ch)
        self.perspective = self.ch.receive()
        #print "got perspective ref:", self.perspective
        self.connected = 1
        self.kill = False
        
        def1 = self.perspective.callRemote("getqueuesize")
        def1.addCallback(self.good, self.me, self.ch)
        def1.addErrback(self.bad, self.me, self.ch)
        self.qmaxsize = self.ch.receive()
        while not self.kill:
            self.action()
    
    def action(self):
        pass

    def good(self, r, me, return_channel):
        return_channel.send_nowait(r)
        # if the deferred is called back immediately, this function will be called
        # from the original tasklet. no need to reschedule.
        if stackless.getcurrent() != me:
            stackless.schedule()

    def bad(self, f, me, return_channel):
        return_channel.send_exception_nowait(f.type, f.value)
        # if the deferred fails immediately, this function will be called
        # from the original tasklet. no need to reschedule.
        if stackless.getcurrent() != me:
            stackless.schedule()

class Producer(Agent):
    def __init__(self, name, login, password):
        Agent.__init__(self, name, login, password)

    def action(self):
        # gets the queue size for the first time
        def1 = self.perspective.callRemote("getqueue")
        def1.addCallback(self.good, self.me, self.ch)
        def1.addErrback(self.bad, self.me, self.ch)
        self.qsize = self.ch.receive()

        while self.qsize+1 < self.qmaxsize:
            if self.qsize < 5:
                print self.name, "sleeping time"
                Sleep(self.time)
            else:
                print self.name, "sleeping 2*time"
                Sleep(self.time*2)
            # asks the queue for its size
            def1 = self.perspective.callRemote("getqueue")
            def1.addCallback(self.good, self.me, self.ch)
            def1.addErrback(self.bad, self.me, self.ch)
            self.qsize = self.ch.receive()
            
            # puts 1 production unit into queue
            def1 = self.perspective.callRemote("put", 1)
            def1.addCallback(self.good, self.me, self.ch)
            def1.addErrback(self.bad, self.me, self.ch)
            resp = self.ch.receive()
            print self.name, " put ", resp
            self.items += resp
        else:
            # queue is full, no work to be done, just sleep
            print self.name, " sleeping"
            Sleep(self.time)


class Consumer(Agent):
    def __init__(self, name, login, password):
        Agent.__init__(self, name, login, password)
    
    def action(self):
        # gets the queue size for the first time
        def1 = self.perspective.callRemote("getqueue")
        def1.addCallback(self.good, self.me, self.ch)
        def1.addErrback(self.bad, self.me, self.ch)
        self.qsize = self.ch.receive()

        while self.qsize > 0:
            if self.qsize > 5:
                print self.name, "sleeping time"
                Sleep(self.time)
            else:
                print self.name, "sleeping 2*time"
                Sleep(self.time*2)
            # asks the queue for its size
            def1 = self.perspective.callRemote("getqueue")
            def1.addCallback(self.good, self.me, self.ch)
            def1.addErrback(self.bad, self.me, self.ch)
            self.qsize = self.ch.receive()

            # gets 1 production unit from queue
            def1 = self.perspective.callRemote("get", 1)
            def1.addCallback(self.good, self.me, self.ch)
            def1.addErrback(self.bad, self.me, self.ch)
            resp = self.ch.receive()
            print self.name, " got ", resp
            self.items += resp
        else:
            # queue is empty, no work to be done, just sleep
            print self.name, " sleeping"
            Sleep(self.time)

def main():
    PID = 0
    CID = 0
    num_prod = 10                # Number of starting producers
    num_cons = 10                # Number of starting consumers

    for i in range(num_prod):
        ID = PID
        name = "P"+str(ID)
        a = Producer(name,"producer", "prod")
        PID += 1

    for i in range(num_cons):
        ID = CID
        name = "C"+str(ID)
        a = Consumer(name, "consumer", "cons")
        CID += 1

    t = task.LoopingCall(stackless.schedule).start(0.00033)
    sleepman = stackless.tasklet(ManageSleepingTasklets)()
    re = stackless.tasklet(reactor.run)()
    stackless.run()

if __name__ == "__main__":
    main()
-------------- next part --------------
#
# This example uses Stackless together with Twisted Perspective Broker(PB)
# Perspective Broker (affectionately known as PB) is an asynchronous, 
# symmetric network protocol for secure, remote method calls and transferring of objects.
# PB has support for direct or authenticated sessions where the user receives a "Perspective"
# containning the methods it could call.
#
# This example mimics the producer consumer example having the production
# queue (stack) in a server and the producers and consumers accessing it
# over the network using predefined exported methods.
#
# For more information on PB check http://twistedmatrix.com/projects/core/documentation/howto/index.html
#
# The examples provided by Greg Hazel were used to allow the integration between
# tasklets and deferred calls. Also the sleep manager code from Richard Tew to handle
# sleep requests.
#
# Author: Carlos Eduardo de Paula <carlosedp at gmail.com>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# But a better place to discuss Stackless Python related matters is the
# mailing list:
#
#   http://www.tismer.com/mailman/listinfo/stackless
#

import stackless
import time
from zope.interface import implements
from twisted.spread import pb
from twisted.cred import checkers, portal, credentials
from twisted.internet import reactor, task

sleepingTasklets = []

def Sleep(secondsToWait):
    channel = stackless.channel()
    endTime = time.time() + secondsToWait
    sleepingTasklets.append((endTime, channel))
    sleepingTasklets.sort()
    # Block until we get sent an awakening notification.
    channel.receive()

def ManageSleepingTasklets():
    while 1:
        if len(sleepingTasklets):
            endTime = sleepingTasklets[0][0]
            if endTime <= time.time():
                channel = sleepingTasklets[0][1]
                del sleepingTasklets[0]
                # We have to send something, but it doesn't matter what as it is not used.
                channel.send(None)
#            if len(sleepingTasklets) <= 0: time.sleep(endTime-time.time())
        stackless.schedule()
        if stackless.getruncount() <= 1: stackless.getcurrent().kill()
        #if len(sleepingTasklets) <= 0: stackless.getcurrent().kill()

class Queue(object):
    def __init__(self, name):
        self.name = name
        self.queue_start=0
        self.qt = self.queue_start
        self.max_size = 60
        self.running = False
        self.kill = False
        self.produced = 0
        self.consumed = 0
        self.q_empty = 0
        self.q_full = 0
        self.producers = []
        self.consumers = []

    def put(self, qty=1, who=None):
        if self.qt+qty < self.max_size:
            self.qt += qty
            self.produced += 1
        else:
            self.q_full += 1

    def get(self, qty=1, who=None):
        if self.qt >= qty:
            self.qt -= qty
            self.consumed += 1
        else:
            self.q_empty += 1


class Monitor(object):
    '''
    This class monitors the attached queue and prints a representation in terminal
    When its instance is created, a tasklet is created and the monitor starts
    to scan the queue status at 30fps (configurable in the Sleep call)
    This could also be part of the Queue object itself.
    '''
    def __init__(self, queue):
        self.running = 1
        self.queue = queue
        stackless.tasklet(self.run)()

    def run(self):
        print "Started monitor for queue " + self.queue.name
        while self.running:
            print "Size: [" + "#" * self.queue.qt + " " * (self.queue.max_size-self.queue.qt) + "] Qty:" , self.queue.qt , "\r",
            Sleep(0.033)
            #stackless.schedule()


class MyPerspective(pb.Avatar):
    '''
    This perspective is passed to the client when the login phase completes
    It contains all methods accessible by the clients prefixed by perspective_
    A non authentication version could be created using pb.Root, check documentation
    in http://twistedmatrix.com/projects/core/documentation/howto/index.html
    Perspective Broker session.
    '''
    def __init__(self, name):
        global q
        self.queue = q
        self.name = name
    def perspective_getqueue(self):
        return self.queue.qt

    def perspective_getqueuesize(self):
        return self.queue.max_size

    def perspective_put(self, arg):
        self.queue.put(arg)
        return arg
        
    def perspective_get(self, arg):
        self.queue.get(arg)
        return arg
    
class MyRealm:
    '''
    The realm handles the user authentications, we could have different 
    perspectives for the producer and the consumer where each would have
    access to its methods.
    '''
    implements(portal.IRealm)
    def requestAvatar(self, avatarId, mind, *interfaces):
        assert pb.IPerspective in interfaces
        return pb.IPerspective, MyPerspective(avatarId), lambda:None


p = portal.Portal(MyRealm())
c1 = checkers.InMemoryUsernamePasswordDatabaseDontUse(producer="prod", consumer="cons")
p.registerChecker(c1)
q = Queue('q1')
m = Monitor(q)

reactor.listenTCP(8800, pb.PBServerFactory(p))
t = task.LoopingCall(stackless.schedule).start(0.0033)
sleepman = stackless.tasklet(ManageSleepingTasklets)()
re = stackless.tasklet(reactor.run)()
stackless.run()

-------------- next part --------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless


More information about the Stackless mailing list