from collections import deque import stackless from time import clock import traceback class Scheduler(object): """Schedules jobs co-operatively until a timeout elapses. Manually manages a round-robin queue of tasklets to schedule. """ def __init__(self): self.queue = deque() self.running = False self.last_tasklet = None def submit_job(self, job, args=(), kwargs={}): #create a tasklet, remove it from the default run #queue and put it here at the end. t = stackless.tasklet(self._tasklet_function) t(job, args, kwargs) self.queue.append(t.remove) def run(self, duration): if self.running: return if not self.queue: return #compute the time slice for this run timeSlice = duration/len(self.queue) if timeSlice < 0.001: timeSlice = 0.001 #minimum timeslice self.runLen = duration self.sliceLen = timeSlice self.runStart = self.sliceStart = clock() self.running = True self.main = stackless.getcurrent() self._tasklet_switch(self.queue[0]) self.main = None self.running = False def active_job(self): """returns true if the current thread is the currently active job """ return (self.running and self.queue and stackless.getcurrent() is self.queue[0]) def benice(self): t = clock() if t - self.sliceStart >= self.sliceLen: self.schedule() def schedule(self): t = clock() if t - self.runStart >= self.runLen: self.end_run() #switch to the next job if not self.active_job(): return self.queue.append(self.queue.pop(0)) self.slice_start = clock() self._tasklet_switch(self.queue[0]) #the job ends def end_job(self): if not self.active_job(): return self.queue.pop(0) #now start the appropriate next guy, with us leaving the #scheduler if self.queue: #run the next guy in the queue self.slice_start = clock() self._tasklet_end(self.queue[0]) else: self._tasklet_end(self.main) def end_run(self): if not self.active_job(): return self.queue.append(self.queue.pop(0)) self._tasklet_switch(self.main) def _tasklet_function(self, job, args, kwargs): self._tasklet_start() try: job(*args, **kwargs) except Exception: traceback.print_exc() finally: #This job is finished self.end_job() #manual scheduling since there is not run_remove() function, we have to #simulate it. def _tasklet_start(self): #A tasklet starts running, must do this to complete the tasklet_run() assert self._scheduled_tasklet != None self._scheduled_tasklet.remove() self._scheduled_tasklet = None def _tasklet_switch(self, new): #switch to a new tasklet. assert self._scheduled_tasklet == None self._scheduled_tasklet = stackless.getcurrent() new.run() assert self._scheduled_tasklet != None self._scheduled_tasklet.remove() self._scheduled_tasklet = None def _tasklet_end(self, new): #run a given tasklet, but #leave us in the runnable queue to die gracefully assert self._scheduled_tasklet == None new.run()