# An implementation of the Stackless Python "stackless" module using greenlets. import greenlet import time class DummyRetval: pass class Scheduler: def __init__(self): self.taskletsByID = {} self.tasklets = [] self.sleepingTasklets = [] # This could be a heapq instead of a list. self.nextIdx = 0 self.currentTasklet = None # Note what greenlet we are started in, this is the main thread of execution. self.greenlet = greenlet.getcurrent() def Run(self, timeout, remove=False): # Ensure that tasklets are only scheduled by the main thread of execution. if greenlet.getcurrent() is not self.greenlet: raise RuntimeError("Run must be run from the main tasklet.") # Schedule needs to respect timeout and if a tasklet times out, pass it to us to return. self.Schedule(remove, timeout) def Schedule(self, remove=False, timeout=0): if len(self.tasklets): if remove: self.RemoveTasklet(self.currentTasklet) # self.WakenSleepingTasklets() # The current tasklet index can change through whatever logic # happens as a result of the run() call. So we want to hold # a reference to it in order to ensure we can provide it if needs be. # Run the current tasklet until it yields or exits. self.currentTasklet = self.GetNextTasklet() self.IncrementIndex() return self.currentTasklet.run() def Sleep(self, seconds): raise RuntimeError("Not yet implemented") # self.SleepUntil(time.time() + seconds) def SleepUntil(self, endTime): raise RuntimeError("Not yet implemented") # t = self.currentTasklet # t.sleeping = True # t.remove() # self.sleepingTasklets.append((endTime, t)) # return self.Schedule() def WakenSleepingTasklets(self): raise RuntimeError("Not yet implemented") # Check the first element. If its time has come, append it to the tasklet list. # Repeat until the sleeper list is empty or the first element is still pending. # while len(self.sleepingTasklets): # if self.sleepingTasklets[0][0] > time.time(): # break # endTime, t = self.sleepingTasklets.pop(0) # t.sleeping = False # self.AddTasklet(t) def GetCurrentTasklet(self): return self.currentTasklet def GetNextTasklet(self): return self.tasklets[self.nextIdx] def IncrementIndex(self): self.nextIdx += 1 if self.nextIdx >= len(self.tasklets): self.nextIdx = 0 def DecrementIndex(self): self.nextIdx -= 1 if self.nextIdx < 0: self.nextIdx = len(self.tasklets) - 1 # Make the current tasklet last and add a new one first. def InsertTasklet(self, t): if not self.AddTasklet(t, last=False): # There are probably better ways.. self.RemoveTasklet(t) self.AddTasklet(t, last=False) # Add a new tasklet after all the others. def AddTasklet(self, t, last=True): if t.blocked: raise RuntimeError("You cannot run a blocked tasklet") if self.taskletsByID.has_key(t.id): return False self.taskletsByID[t.id] = t if last: if len(self.tasklets) != 0: self.tasklets.insert(self.nextIdx, t) self.nextIdx += 1 else: self.tasklets.append(t) self.nextIdx = 0 else: if len(self.tasklets) and self.nextIdx < len(self.tasklets) - 1: self.tasklets.insert(self.nextIdx, t) else: self.tasklets.append(t) self.nextIdx = len(self.tasklets) - 1 self.UpdateRunCount() return True def RemoveTasklet(self, t, idx=None): if not self.taskletsByID.has_key(t.id): return del self.taskletsByID[t.id] if idx is None: idx = self.tasklets.index(t) del self.tasklets[idx] if self.nextIdx > idx: self.nextIdx -= 1 if self.nextIdx >= len(self.tasklets): self.nextIdx = 0 self.UpdateRunCount() def UpdateRunCount(self): global runcount runcount = len(self.tasklets) + len(self.sleepingTasklets) scheduler = Scheduler() runcount = 0 def run(timeout): """Run the current tasklet until it yields or exits. If it takes longer than the timeout period, then it will be returned. Otherwise None will be returned. The timeout parameter is ignored at this time.""" return scheduler.Run(timeout, remove=False) def getcurrent(): """If a tasklet is currently being executed by the scheduler, return it.""" return scheduler.GetCurrentTasklet() def schedule(retval=DummyRetval): """schedule(retval=stackless.current) -- switch to the next runnable tasklet. The return value for this call is retval, with the current tasklet as default.""" if retval is DummyRetval: retval = getcurrent() scheduler.Schedule(remove=False) return retval def schedule_remove(retval=DummyRetval): """schedule_remove(retval=stackless.current) -- same as schedule but remove self.""" if retval is DummyRetval: retval = getcurrent() scheduler.Schedule(remove=True) return retval def sleep(seconds): scheduler.Sleep(seconds) def getruncount(self): return runcount class channel: def __init__(self): self.balance = 0 self.queue = [] def send(self, value): """channel.send(value) -- send a value over the channel. If no other tasklet is already receiving on the channel, the sender will be blocked. Otherwise, the receiver will be activated immediately, and the sender is put at the end of the runnables list. Note that an exception instance sent will be raised at the receiver (see also channel.send_exception)""" if self.balance < 0: self.balance += 1 t = self.queue.pop(0) t.args = (value,) t.blocked = 0 scheduler.InsertTasklet(t) else: t = getcurrent() # Remove the tasklet from the list of running tasklets. t.remove() # Let it wait for a receiver to come along. self.balance += 1 self.queue.append((t, value)) t.blocked = 1 return scheduler.Schedule() def receive(self): """channel.receive() -- receive a value over the channel. If no other tasklet is already sending on the channel, the receiver will be blocked. Otherwise, the receiver will continue immediately, and the sender is put at the end of the runnables list.""" if self.balance > 0: self.balance -= 1 t, value = self.queue.pop(0) t.blocked = 0 scheduler.AddTasklet(t) return value t = getcurrent() t.remove() self.balance -= 1 self.queue.append(t) t.blocked = -1 return scheduler.Schedule() class tasklet: nextID = 1 def __init__(self, func): self.func = func self.args = () self.kwargs = {} self.greenlet = None self.name = "" self.alive = False self.blocked = 0 self.paused = False self.sleeping = False self.id = tasklet.nextID tasklet.nextID += 1 def __repr__(self): return "" % (self.id, self.name) def __call__(self, *args, **kwargs): # Store the arguments for when we are scheduled. self.args = args self.kwargs = kwargs self.insert() return self def insert(self): """ Insert this tasklet at the end of the scheduler list, given that it isn't blocked. Blocked tasklets need to be reactivated by channels.""" scheduler.AddTasklet(self) def remove(self): """Removing a tasklet from the runnables queue. Note: If this tasklet has a non-trivial C stack attached, it will be destructed when the containing thread state is destroyed. Since this will happen in some unpredictable order, it may cause unwanted side-effects. Therefore it is recommended to either run tasklets to the end or to explicitly kill() them. """ scheduler.RemoveTasklet(self) def run(self): """ Run this tasklet, given that it isn't blocked. Blocked tasks need to be reactivated by channels.""" if self.blocked: raise RuntimeError("You cannot run a blocked tasklet") elif self.sleeping: raise RuntimeError("You cannot run a sleeping tasklet") if self.greenlet is None: self.alive = True func, args, kwargs = self.func, self.args, self.kwargs self.func, self.args, self.kwargs = None, (), {} self.greenlet = greenlet.greenlet(func, scheduler.greenlet) # switch is flawed in its lack of support for keyword arguments for # the first call. Without this, .. unexpected behaviour.. ret = self.greenlet.switch(*args) else: args = self.args self.args = () ret = self.greenlet.switch(*args) # If the greenlet is not non-zero, then it died on us. if not self.greenlet: self.alive = False scheduler.RemoveTasklet(self) return ret def become(self, value): """t.become(retval) -- catch the current running frame in a tasklet. It is also inserted at the end of the runnables chain. If it is a toplevel frame (and therefore has no caller), an exception is raised. The function result is the tasklet itself. retval is passed to the calling frame. If retval is not given, the tasklet is used as default.""" pass # Cannot see how to do this offhand. def capture(self, value): """t.capture(retval) -- capture the current running frame in a tasklet, like t.become(). In addition the tasklet is run immediately, and the parent tasklet is removed from the runnables and returned as the value.""" pass # Cannot see how to do this offhand.