import stackless # {{{ def sleep(duration): def sleep(duration): import time t = time.clock() until = t + duration while time.clock() < until: stackless.schedule() # }}} # {{{ def par(*funcs): def signalEnd(f, chan): f() chan.send(1) def par(*funcs): taskEnd = stackless.channel() for f in funcs: stackless.tasklet(signalEnd)(f, taskEnd) stackless.schedule() count = len(funcs) while count > 0: taskEnd.receive() count -= 1 # give processes a chance to terminate stackless.schedule() # }}} # {{{ class Alt: class Alt: inactive, enabling, waiting, ready = range(4) def __init__(self, channels): self.waitChan = stackless.channel() self.state = self.inactive self.channels = channels # {{{ def select(self, channels): def select(self): self.state = self.enabling l = len(self.channels) for i in range(l): if self.channels[i].enable(self): self.state = self.ready selected = i break atom = stackless.atomic() if self.state == self.enabling: self.state = self.waiting self.waitChan.receive() self.state = self.ready del atom # assert self.state == self.ready for i in range(l): if self.channels[i].disable(): selected = i self.state = self.inactive return selected # }}} # {{{ def schedule(self): def schedule(self): atom = stackless.atomic() if self.state == self.enabling: self.state = self.ready elif self.state == self.waiting: self.state = self.ready self.waitChan.send(0) # }}} # }}} # {{{ class AltChan(channel): class AltChan(stackless.channel): def __init__(self): stackless.channel.__init__(self) self.alt = None self.wantsToSend = False # {{{ def send(self, x): def send(self, x): atom = stackless.atomic() if self.alt != None: # a reader was alting on this Channel self.wantsToSend = True self.alt.schedule() stackless.channel.send(self, x) # }}} # {{{ def receive(self): def receive(self): atom = stackless.atomic() res = stackless.channel.receive(self) self.wantsToSend = False return res # }}} # {{{ def enable(self, alt): def enable(self, alt): atom = stackless.atomic() if self.balance <= 0: # no senders waiting self.alt = alt return False else: return True # }}} # {{{ def disable(self): def disable(self): atom = stackless.atomic() self.alt = None return self.wantsToSend or self.balance > 0 # senders are waiting # }}} # }}} # {{{ Demo # {{{ def tick(chan): def tick(chan): i = 1 while i < 5: print 'x',i i += 1 chan.receive() # }}} # {{{ def counter(c, rate, out, message=None): def counter(c, rate, out, message=None): for x in range(1, c+1): sleep(rate) out.send(x) #print '**** sending end ' + message out.send('end') #print '**** counter end ' + message # }}} # {{{ def accumulate(inp): def accumulate(inp): acc = 0 while 1: x = inp.receive() print x if x == 'end': break acc += x print acc # }}} # {{{ def testpar(): def testpar(): c = stackless.channel() par( lambda: counter(5, 0.1, c), lambda: accumulate(c) ) print 'End par test' print # }}} # {{{ def server(inA, inB, end): def server(inA, inB, end): running = 1 alt = Alt([inA, inB, end]) while running: i = alt.select() if i == 0: x = inA.receive() print 'Got an A: ', x if i == 1: x = inB.receive() print ' Got a B: ', x if i == 2: end.receive() end.receive() running = 0 print 'Server end' # }}} # {{{ def testalt(): def testalt(): a = AltChan() b = AltChan() c = AltChan() par( lambda: counter(30, .008, a, 'A'), lambda: counter(20, .012, b, 'B'), lambda: counter(1, 1, c, 'ender'), lambda: server(a, b, c) ) print 'End alt test' # }}} # }}} if __name__ == '__main__': testpar() testalt()