[Stackless-checkins] r51749 - in stackless/sandbox/libraries: slpmonkeypatch uthread-asgeir uthread-asgeir/uthread.py uthread-ccp
richard.tew
python-checkins at python.org
Tue Sep 5 19:47:47 CEST 2006
Author: richard.tew
Date: Tue Sep 5 19:47:46 2006
New Revision: 51749
Added:
stackless/sandbox/libraries/
stackless/sandbox/libraries/slpmonkeypatch/
stackless/sandbox/libraries/uthread-asgeir/
stackless/sandbox/libraries/uthread-asgeir/uthread.py (contents, props changed)
stackless/sandbox/libraries/uthread-ccp/
Log:
Working towards adding the CCP uthread.py and then starting on a slpmonkeypatch module based on Andrew Dalke's suggestions.
Added: stackless/sandbox/libraries/uthread-asgeir/uthread.py
==============================================================================
--- (empty file)
+++ stackless/sandbox/libraries/uthread-asgeir/uthread.py Tue Sep 5 19:47:46 2006
@@ -0,0 +1,535 @@
+#
+# Copyright (c) 2005, Asgeir Bjarni Ingvarsson
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright notice, this
+# list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+# * Neither the name of the original author nor the names of other contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+#
+# Obtain the latest version from:
+#
+# http://ares.hlekkir.com/repo/shady-utils/trunk/uthread.py
+#
+
+import stackless
+import weakref
+import cPickle
+import atexit
+import sys
+import time
+
+#----------------------------------------------------------------------------
+# A class to support named microthreads
+#----------------------------------------------------------------------------
+class NamedTasklet(stackless.tasklet):
+ __slots__ = ['name']
+
+ def __init__(self, func, name=None):
+ stackless.tasklet.__init__(self, func)
+
+ if not name:
+ name = '%08x' % (id(self))
+ self.name = name
+
+ def __new__(self, func, name=None):
+ return stackless.tasklet.__new__(self, func)
+
+ def __repr__(self):
+ try:
+ return '%s' % self.name
+ except AttributeError:
+ # I think that this will only happen with the main tasklet
+ return '%08x' % (id(self))
+
+#----------------------------------------------------------------------------
+# A class to manage sleeping tasklets
+#----------------------------------------------------------------------------
+_timeKeeperTaskletName = '__internal__TimeKeeper__mainLoop__'
+
+class TimeKeeper(object):
+ """Manages sleeping tasklets
+
+ Uses channels to block tasklets that want to sleep.
+ """
+ __slots__ = ['chnPool', 'sleepers']
+
+ def __init__(self):
+ self.chnPool = [stackless.channel() for i in range(100)]
+ self.sleepers = []
+
+ def getSleeperCount(self):
+ """Returns the number of sleeping tasklets
+ """
+ return len(self.sleepers)
+
+ def sleep(self, delay=0):
+ """Suspend the active tasklet for a specified amount of seconds
+
+ If delay is zero (default) then the tasklet just blocks.
+ Returns seconds passed since sleep was called.
+ """
+ startTime = time.clock()
+ when = startTime + delay
+
+ if delay:
+ try:
+ try:
+ chn = self.chnPool.pop()
+ except IndexError:
+ chn = stackless.channel()
+ # could also allocate more channels for chnPool
+
+ self.sleepers.append((when, chn))
+ chn.receive()
+ finally:
+ self.chnPool.append(chn)
+ else:
+ stackless.schedule()
+
+ return time.clock() - startTime
+
+ def mainLoop(self):
+ """Internal function
+
+ Created in a new tasklet by the scheduler
+ """
+ try:
+ while True:
+ try:
+ # assume that the sleeper list is ordered
+ (when, chn) = self.sleepers.pop(0)
+ # an empty list will raise IndexError
+ while when <= time.clock():
+ chn.send(None)
+ (when, chn) = self.sleepers.pop(0)
+ # an empty list will raise IndexError
+
+ # The only way to reach this point is if the while evaluates as False.
+ # That means that the current sleeper wants to sleep longer.
+ self.sleepers.insert(0, (when, chn))
+ except IndexError:
+ # The sleeper list is empty, nothing to do
+ pass
+
+ stackless.schedule()
+ except Exception, e:
+ # Restart TimeKeeper unless it's being killed
+ if isinstance(e, TaskletExit):
+ # TimeKeeper is being killed.
+ # Send a TaskletExit to all sleepers
+ for (when, chn) in self.sleepers:
+ chn.send_exception(e)
+
+ self.sleepers = []
+ else:
+ # recreate the timeKeeper tasklet from another tasklet so
+ # the timeKeeper name may be reused
+ new(newNamed, self.mainLoop, _timeKeeperTaskletName)
+
+#----------------------------------------------------------------------------
+# Schedulers to manage tasklets
+#----------------------------------------------------------------------------
+class BaseScheduler(object):
+ """This is the base thread scheduler
+
+ _activeScheduler should not be set to an instance of this class.
+ Subclasses must override the run and runSingle methods
+ """
+ __slots__ = ['running', 'paused', 'timeKeeper']
+
+ def __init__(self):
+ # References to tasklets will be dropped as soon as they
+ # stop executing
+ self.running = weakref.WeakValueDictionary()
+ self.paused = weakref.WeakValueDictionary()
+
+ self.timeKeeper = TimeKeeper()
+ self.new(self.timeKeeper.mainLoop, _timeKeeperTaskletName)
+
+ def new(self, func, name=None, *args, **kw):
+ """Create a new tasklet
+
+ If name is already in use, a *random* name will be chosen
+ """
+ if name not in self.listTasks():
+ task = NamedTasklet(func, name)
+ else:
+ task = NamedTasklet(func)
+ task(*args, **kw) # tasklet will be lost in dict if this is not done
+ task.insert()
+
+ currID = repr(task)
+ self.running[currID] = task
+ return currID
+
+ def killAll(self):
+ """Kills all tasklets
+ """
+ for taskID in (self.running.keys() + self.paused.keys()):
+ try:
+ self.kill(taskID)
+ except:
+ pass
+
+ def kill(self, taskID):
+ """Stops a running tasklet
+ """
+ task = self.running.pop(taskID, None)
+ workingDict = self.running
+ if task is None:
+ task = self.paused.pop(taskID, None)
+ workingDict = self.paused
+
+ if task:
+ try:
+ task.kill()
+ return
+ except:
+ workingDict[taskID] = task
+ raise
+
+ def pause(self, taskID):
+ """Pauses a running tasklet
+ """
+ task = self.running.pop(taskID, None)
+ if task:
+ task.remove()
+ self.paused[taskID] = task
+
+ def resume(self, taskID):
+ """Resumes a paused tasklet
+ """
+ task = self.paused.pop(taskID, None)
+ if task:
+ task.insert()
+ self.running[taskID] = task
+
+ def run(self):
+ """Execute all tasklets
+ """
+ pass
+
+ def runSingle(self):
+ """Execute the schedule list once, then return
+ """
+ pass
+
+ def dumpTask(self, taskID, kill=0):
+ """Returns a pickled tasklet
+ """
+ task = self.running.get(taskID, None) or self.paused.get(taskID, None)
+ if task:
+ data = cPickle.dumps(task, 2)
+ if kill:
+ self.kill(repr(task))
+ return data
+
+ def loadTask(self, data, paused=1):
+ """Loads a pickled tasklet
+ """
+ task = cPickle.loads(data)
+ currID = repr(task)
+ if paused:
+ self.paused[currID] = task
+ else:
+ self.running[currID] = task
+ task.insert()
+ return currID
+
+ def listTasks(self):
+ """Returns a list of all tasklets
+ """
+ return self.running.keys() + self.paused.keys()
+
+ def getRunCount(self):
+ """Same as stackless.getruncount but takes sleeping tasklets into account
+
+ Will return 1 when only the main tasklet is left, the timeKeeper tasklet
+ will not be counted since it is not a client tasklet.
+
+ For consideration: If stackless.getruncount() returns 2
+ only the main tasklet and the TimeKeeper tasklets are currently
+ running. This might then be a good place to sleep for one millisecond
+ to reduce CPU load
+ """
+ runCount = stackless.getruncount() + self.timeKeeper.getSleeperCount()
+ return (runCount - 1) # subtract the timeKeeper tasklet
+
+ def sleepTask(self, delay=0):
+ """Suspend the active tasklet for a specified amount of seconds
+
+ If delay is zero (default) then the tasklet just blocks.
+ Returns seconds passed since sleep was called.
+ """
+ return self.timeKeeper.sleep(delay)
+
+ def printException(self):
+ """Uses sys.excepthook to report exceptions
+ """
+ exc_info = sys.exc_info()
+ sys.excepthook(exc_info[0], exc_info[1], exc_info[2])
+
+class PreemptiveScheduler(BaseScheduler):
+ """This is a preemptive tasklet scheduler
+
+ This scheduler will execute each thread for as little time as possible.
+ When it interrupts tasklets, it will keep them in the scheduler list.
+
+ THIS IS EXPERIMENTAL AND HAS NOT BEEN TESTED. UTHREAD HAS NOT BEEN
+ DESIGNED WITH THIS KIND OF USE IN MIND. You may need to modify code
+ in this module if you plan on using this scheduler.
+ """
+ def __init__(self):
+ BaseScheduler.__init__(self)
+ self.maxSlice = 50
+
+ def run(self):
+ """Execute all tasklets
+ """
+ while self.getRunCount() > 1:
+ try:
+ # Run a single slice, if the currently executing thread is
+ # still running append it to the scheduler list
+ victim = stackless.run(self.maxSlice)
+ if victim:
+ victim.insert()
+ except:
+ self.printException()
+
+ def runSingle(self):
+ """Execute the schedule list once, then return
+ """
+ try:
+ # Run a single slice, if the currently executing thread is
+ # still running append it to the scheduler list
+ victim = stackless.run(self.maxSlice)
+ if victim:
+ victim.insert()
+ except:
+ self.printException()
+
+class DefaultScheduler(BaseScheduler):
+ """This is the default tasklet scheduler
+
+ This scheduler will execute threads sequentially.
+ If it needs to interrupt the execution of a tasklet it will remove
+ that tasklet from the scheduler list
+ """
+ def __init__(self):
+ BaseScheduler.__init__(self)
+
+ def run(self):
+ """Execute all tasklets
+ """
+ while self.getRunCount() > 1:
+ try:
+ stackless.schedule()
+ except:
+ self.printException()
+
+ def runSingle(self):
+ """Execute the schedule list once, then return
+ """
+ try:
+ stackless.schedule()
+ except:
+ self.printException()
+
+
+#----------------------------------------------------------------------------
+# Functions to interact with the active scheduler
+#----------------------------------------------------------------------------
+_activeScheduler = DefaultScheduler()
+
+def setScheduler(newMgr, killOld=1, copy=0):
+ """Replace the active tasklet scheduler
+
+ It is not recommended to call this after threads have been started.
+ If killOld is set to 1 then all threads in the old manager are killed.
+ If copy is set to 1 then the running and paused dicts are copied into
+ the new scheduler
+ """
+ assert isinstance(newMgr, BaseScheduler), 'newMgr must inherit from BaseScheduler'
+
+ global _activeScheduler
+ if killOld:
+ _activeScheduler.KillAll()
+ if copy:
+ newMgr.running = _activeScheduler.running
+ newMgr.paused = _activeScheduler.paused
+ _activeScheduler = newMgr
+
+def getScheduler():
+ """Returns the active tasklet scheduler
+ """
+ global _activeScheduler
+ return _activeScheduler
+
+def _exitHandler():
+ """This function calls killAll on the active scheduler
+
+ This function is called by the atexit module before interpreter shutdown.
+ This will give all tasklets a chance to shutdown cleanly if they are not
+ blocked.
+ """
+ _activeScheduler.killAll()
+
+atexit.register(_exitHandler)
+
+#----------------------------------------------------------------------------
+# Functions to expose the scheduler to users
+#----------------------------------------------------------------------------
+def new(func, *args, **kw):
+ """Create a new tasklet
+ """
+ return _activeScheduler.new(func, None, *args, **kw)
+
+def newNamed(func, name, *args, **kw):
+ """Create a new named tasklet
+
+ If name is already in use, a *random* name will be chosen
+ """
+ return _activeScheduler.new(func, name, *args, **kw)
+
+def run():
+ """Execute all tasklets
+ """
+ _activeScheduler.run()
+
+def runSingle():
+ """Execute the schedule list once, then return
+ """
+ _activeScheduler.runSingle()
+
+def pause(taskID):
+ """Pauses a running tasklet
+ """
+ _activeScheduler.pause(taskID)
+
+def resume(taskID):
+ """Resumes a paused tasklet
+ """
+ _activeScheduler.resume(taskID)
+
+def kill(taskID):
+ """Stops a running tasklet
+ """
+ _activeScheduler.kill(taskID)
+
+def dumpTask(taskID, kill=0):
+ """Returns a pickled tasklet
+ """
+ return _activeScheduler.dumpTask(taskID, kill)
+
+def loadTask(data, paused=1):
+ """Loads a pickled tasklet
+ """
+ return _activeScheduler.loadTask(data, paused)
+
+def sleep(delay=0):
+ """Suspend the active thread for a specified amount of seconds
+
+ If delay is zero (default) then the tasklet just blocks.
+ Returns seconds passed since sleep was called.
+ """
+ return _activeScheduler.sleepTask(delay)
+
+def getCurrent():
+ """Returns the currently running tasklet
+ """
+ return stackless.getcurrent()
+
+#----------------------------------------------------------------------------
+# Utility classes
+#----------------------------------------------------------------------------
+class Semaphore(object):
+ """Protects globally accessible resources from context switching
+ """
+ __slots__ = ['count', 'channel']
+
+ def __init__(self, maxcount=1):
+ self.count = maxcount
+ self.channel = stackless.channel()
+
+ def acquire(self):
+ if self.count == 0:
+ self.channel.receive()
+ else:
+ self.count = self.count - 1
+
+ def release(self):
+ if self.channel.queue:
+ self.channel.send(None)
+ else:
+ self.count = self.count + 1
+
+class Queue(object):
+ """A queue is a microthread-safe FIFO.
+ """
+ __slots__ = ['contents', 'channel']
+
+ def __init__(self):
+ self.contents = []
+ self.channel = stackless.channel()
+
+ def put(self, x):
+ self.contents.append(x)
+ self.pump()
+
+ def pump(self):
+ # Channel balance is <0 when there are tasklets waiting to recieve
+ while self.channel.queue and self.contents and self.channel.balance < 0:
+ self.channel.send(self.contents.pop(0))
+
+ def nonBlockingPut(self, x):
+ self.contents.append(x)
+
+ def get(self):
+ if self.contents:
+ return self.contents.pop(0)
+ return self.channel.receive()
+
+ def unget(self, x):
+ self.contents.insert(0, x)
+
+class Synchronizer(object):
+ """A traffic light for microthreads
+
+ No synchronized thread can continue execution until all the other
+ synchronized threads have called sync.
+ """
+ __slots__ = ['maxCount', 'queue', 'count']
+
+ def __init__(self, count):
+ self.maxCount = count
+ self.queue = Queue()
+ self.count = 0
+
+ def sync(self):
+ self.count += 1
+ if self.count == self.maxCount:
+ for i in range(self.maxCount):
+ self.queue.put(None)
+ self.count = 0
+ return self.queue.get()
_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins
More information about the Stackless-checkins
mailing list