[Stackless] uthread latest version
Christian Tismer
tismer at tismer.com
Tue Feb 20 15:03:04 CET 2001
Hi all,
find attached the (to my knowledge) latest version of
uthreads. It is the one that is installed as an example together
with the Stackless for windows executable.
cheers - chris
--
Christian Tismer :^) <mailto:tismer at tismer.com>
Mission Impossible 5oftware : Have a break! Take a ride on Python's
Kaunstr. 26 : *Starship* http://starship.python.net
14163 Berlin : PGP key -> http://wwwkeys.pgp.net
PGP Fingerprint E182 71C7 1A9D 66E9 9D15 D3CC D4D7 93E2 1FAE F6DF
where do you want to jump today? http://www.stackless.com
-------------- next part --------------
"""Python Microthread Library, version 0.2
Microthreads are useful when you want to program many behaviors
happening simultaneously. Simulations and games often want to model
the simultaneous and independent behavior of many people, many
businesses, many monsters, many physical objects, many spaceships, and
so forth. With microthreads, you can code these behaviors as Python
functions. Microthreads use Stackless Python. For more details, see
http://world.std.com/~wware/uthread.html
"""
__version__ = "0.2"
__license__ = """\
Python Microthread Library version %s
Copyright (C)2000 Will Ware, Christian Tismer, Just van Rossum
Permission to use, copy, modify, and distribute this software and its
documentation for any purpose and without fee is hereby granted,
provided that the above copyright notice appear in all copies and that
both that copyright notice and this permission notice appear in
supporting documentation, and that the names of the authors not be
used in advertising or publicity pertaining to distribution of the
software without specific, written prior permission.
THE AUTHORS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
EVENT SHALL THE AUTHORS BE LIABLE FOR ANY SPECIAL, INDIRECT OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.""" % __version__
import continuation, sys, traceback, bisect, time, StringIO
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
### Are there platforms where time.time is not accurate enough?
##waitTimer = time.clock
waitTimer = time.time
try:
import thread
except ImportError:
# No system threading available,
# create a dummy get_ident() function.
def get_ident():
return 1
thread = None
else:
# We have system threads. See also getScheduler().
get_ident = thread.get_ident
class UThreadError(Exception): pass
ERR_UNKNOWN_THREAD = "Can't kill unknown thread."
ERR_THREAD_ACTIVE = "Thread is already active"
ERR_THREAD_NOT_ACTIVE = "Thread is not active"
ERR_THREAD_DEAD = "Can't restart a finished thread."
ERR_WRONG_SYSTEM_THREAD = "Can't call this method from " \
"a system thread other than the owner."
ERR_NO_PENDING_TASKS = "Can't enter the scheduler loop " \
"without any pending tasks."
#
# Functions related to the scheduler.
#
def new(func, *args, **kwargs):
"""Create a new thread object for func(*args, **kwargs) and
return it."""
return apply(getScheduler().new, (func,) + args, kwargs)
def newResistent( func, *args, **kwargs ):
'''Create a new resistent thread object for func(*args, **kwargs) and return it'''
def _a( func, args, kwargs ):
thread = apply(getScheduler().new, (func,) + args, kwargs)
thread.setResistant(1)
return thread
return atomic( _a, func, args, kwargs )
def run():
"""The main scheduler loop: run a bunch of tasks, until
they all complete."""
getScheduler().run()
def runAndContinue():
'''Scheduler loop that continues running from the current position
in a micro-thread (i.e. appears to return immediately)'''
thread = Thread( "<Thread: Main>", None )
thread.start( task = continuation.caller() )
getScheduler().run()
print 'scheduler exited normally'
raise SystemExit(0)
def switchContext():
"""Force a context switch, giving up the rest of our time slot
for other threads."""
getScheduler().switchContext()
def block():
"""Block the current thread."""
getCurrentThread().block()
def wait(duration):
"""Suspend a thread for duration seconds."""
return getCurrentThread().wait(duration)
def waitUntil(until):
"""Suspend a thread until time.time() >= until.
Note: this uses time.time's concept of time, even if
the module is currently using time.clock as it's
internal time-management mechanism.
"""
if waitTimer is not time.time:
difference = until - time.time()
until = waitTimer() + difference
return getCurrentThread().waitUntil(until)
def exit():
"""Exit the current thread, by raising SystemExit."""
raise SystemExit
def exitAll(reallyAll=0):
"""Exit all threads by raising SystemExit in each of them.
If the 'reallyAll' argument is true, even threads marked
'resistant' will be exited."""
getScheduler().exitAll(reallyAll)
def exitOthers(reallyAll=0):
"""Exit all threads except the current thread by raising
SystemExit in each of them. If the 'reallyAll' argument
is true, even threads marked 'resistant' will be exited."""
getScheduler().exitOthers(reallyAll)
def getCurrentThread():
"""Return the current thread."""
return getScheduler().getCurrentThread()
def getActiveThreads():
"""Return all active threads."""
return getScheduler().getActiveThreads()
def getBlockedThreads():
"""Return all blocked threads."""
return getScheduler().getBlockedThreads()
def getAllThreads():
"""Return all threads, whether active or blocked."""
return getScheduler().getAllThreads()
def microThreadsRunning ():
return getScheduler().getCurrentThread() is not None
#
# Some utilities
#
continuation_uthread_lock = continuation.uthread_lock
def atomic(func, *args, **kwargs):
"""Perform a function call as a microthread-safe operation."""
## print 'atomic:', func
tmp = continuation_uthread_lock(1)
try:
return apply(func, args, kwargs)
finally:
continuation_uthread_lock(tmp)
def startCritical( ):
'''DANGER: Critical sections can muck up your code!
consider using atomic whereever possible instead.'''
return continuation_uthread_lock(1)
def endCritical( token ):
'''DANGER: Critical sections can muck up your code!
consider using atomic whereever possible instead.'''
return continuation_uthread_lock(token)
def getExceptionString():
'''Attempt to get the "current" exception string.
Unfortunately, there's always a chance that some
other micro-thread has raised (and possibly caught
another exception before you call this function.'''
tmp = continuation_uthread_lock(1)
try:
io = StringIO()
traceback.print_exc( 20, io )
return io.getvalue()
finally:
continuation_uthread_lock(tmp)
def external (scheduler, function,*arguments,**namedarguments):
"""Perform an function within a micro-thread for an external thread"""
def _external( function, *arguments, **namedarguments ):
apply( function, arguments, namedarguments)
slave = apply (
Thread,
( "External Message Slave", _external, function)+arguments,
namedarguments,
)
slave.start( scheduler = scheduler )
def random(x=1):
"""Microthread-safe version of random.random()"""
import random
return atomic(lambda x=x,rnd=random.random: x * rnd())
_nextId = 1
def uniqueId():
"""Microthread-safe way to get unique numbers,
handy for giving things unique ID numbers."""
def _uniqueId():
global _nextId
z = _nextId
_nextId = z + 1
return z
return atomic(_uniqueId)
#
# Widgets
#
class Blockable:
"""Widgets that can block threads share some common
behaviors, so package them in a base class."""
def __init__(self):
self.waiters = []
def wait(self):
th = getCurrentThread()
# Following needs to be re-thought, as it is not logically correct
# for now, you can only wait a blockable object from a micro-thread.
## if th is None:
## # micro-threads are not currently running in this operating system thread
## # we suspend the operating system thread...
## if thread: # have operating system threading support
## externalLock = thread.allocate_lock()
## # add a thread to our waiters
## externalLock.acquire () # should immediately succeed
## newThread = Thread( "External Thread Lock", externalLock.release )
## self.waiters.append( newThread )
## externalLock.acquire() # should suspend until release
## else:
self.waiters.append(th)
th.block()
def notify(self):
atomic(self.__notify)
def __notify(self):
if self.waiters:
th = self.waiters.pop(0)
th.start()
def notifyAll(self):
atomic(self.__notifyAll)
def __notifyAll(self):
while self.waiters:
th = self.waiters.pop(0)
th.start()
class ExternalBlockable:
"""Mix-in provides mechanism for external process to release a blocked micro-thread"""
def notify(self):
self.__notify() # different method than the base class
def __notify(self):
if self.waiters:
thread = self.waiters.pop(0)
external (thread._scheduler, thread.start )
class Lock(Blockable):
"A primitive lock object"
def __init__(self):
Blockable.__init__(self)
self.locked = 0
def acquire(self, blocking=1):
return atomic(self.__acquire, blocking)
def __acquire(self, blocking):
while self.locked:
if not blocking:
return 0
self.wait()
self.locked = 1
return 1
def release(self):
assert self.locked, "trying to release unlocked Lock"
atomic(self.__release)
def __release(self):
self.notify()
self.locked = 0
def _is_owned(self):
return self.locked
class RLock(Blockable):
"A reentrant lock object"
def __init__(self):
Blockable.__init__(self)
self.locked = 0
self.owner = None
def acquire(self, blocking=1):
return atomic(self.__acquire, blocking)
def __acquire(self, blocking):
th = getCurrentThread()
if th == self.owner:
self.locked = self.locked + 1
return 1
while self.locked:
if not blocking:
return 0
self.wait()
self.locked = 1
self.owner = th
return 1
def release(self):
atomic(self.__release)
def __release(self):
th = getCurrentThread()
assert th is self.owner, "trying to release somebody else's RLock"
assert self.locked > 0, "trying to release unlocked RLock"
self.locked = self.locked - 1
if self.locked == 0 and self.waiters:
self.owner = None
self.notify()
def _is_owned(self):
return self.locked > 0
#return self.owner != None
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
self._is_owned = lock._is_owned
self.__waiters = []
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
def wait(self, timeout=None):
me = getCurrentThread()
assert timeout == None, "timeout not yet implemented"
assert self._is_owned(), "wait() of un-acquired lock"
self.release()
# note that the following allows for multiple copies of the same thread in the list
self.__waiters.append(me)
me.block()
self.acquire()
def notify(self, n=1):
atomic( self.__notify, n )
def __notify (self, n= 1):
assert self._is_owned(), "notify() of un-acquired lock"
__waiters = self.__waiters
waiters = __waiters[:n]
for waiter in waiters:
if waiter.isBlocked():
# previously would attempt to start already started threads if they appeared twice...
waiter.start()
__waiters.remove(waiter)
def notifyAll(self):
self.notify(len(self.__waiters))
class Semaphore(Blockable):
"""Semaphores protect globally accessible resources from
the effects of context switching."""
def __init__(self, maxcount=1):
Blockable.__init__(self)
self.count = maxcount
def acquire(self):
atomic(self.__acquire)
def __acquire(self):
if self.count == 0:
self.wait()
else:
self.count = self.count - 1
def release(self):
atomic(self.__release)
def __release(self):
if self.waiters:
self.notify()
else:
self.count = self.count + 1
class Queue(Blockable):
'''A Queue is a micro-thread-safe mechanism
for creating a First-in-First-out (FIFO)
sequence object.
Queue's allow for processing messages in order
when they are delivered (without the need for
constantly polling to see if new messages are
available).
See also:
ExternalQueue -- sub-class for use in
multiple operating-system thread situations
SnailQueue -- sub-class with built-in delay
suitable for modeling communications delays
Usage example:
def writer( queue ):
"""write 20 numbers to a queue"""
for number in range( 20):
queue.put (number) # add the number to the queue, awakening the reader
wait( .2 ) # wait 1/5 of a second before the next number
def reader(queue):
"""read numbers when available"""
number = 0
while number != 19:
# the queue.get call will return immediately
# if there is a value available
# if there isn't a value available,
# then the thread (reader) will be suspended (blocked)
# until a value is available (something calls queue.put,
# which in turn calls queue.notify). When that happens
# the thread is (re)started and continues processing
# with the returned value.
number = queue.get()
print 'read', number
queue = Queue()
new( reader, queue)
new( writer, queue)
run()
'''
def __init__(self):
Blockable.__init__(self)
self.contents = []
def put(self, x):
atomic(self.__put, x)
def __put(self, x):
self.contents.append(x)
self.notify()
def get(self):
return atomic(self.__get)
def __get(self):
if not self.contents:
self.wait()
return self.contents.pop(0)
def unget(self, x):
self.contents.insert(0, x)
def cget(self):
return atomic(self.__cget)
def __cget(self):
if self.contents:
return self.contents.pop(0)
else:
return None
class ExternalQueue (ExternalBlockable, Queue):
''' An ExternalQueue is a Queue which can be
written to by operating system threads other
than those that host the waiting micro-threads
scheduler's
'''
pass
NULL = ()
class SnailQueue(Queue):
''' A SnailQueue is an intentionally slow Queue
It allows for modeling communications latencies.
Each new message arriving in the queue is delayed
for either a set period (if randomize = 0), or a
randomized fraction of that period (if randomize
is true).
'''
def __init__(self, t=.1, randomize= 0):
Queue.__init__(self)
self.delay = t
self.pipeline = Queue()
self.randomize = randomize
new( self.mythread )
def mythread(self):
randomize = self.randomize
get = self.pipeline.get
while 1: # should really use an "active" or similar flag to allow orderly closing
value = get() # will block if no new messages...
if value is NULL:
break
# delay some seconds
if randomize: # if true (default), then used a randomized time to sleep
wait( random (self.delay))
else:
wait( self.delay )
# now do standard Queue stuff
Queue.put( self, value)
def put(self, value):
self.pipeline.put( value )
def close( self ):
self.pipeline.put( NULL)
self.pipeline = None
class Synchronizer(Blockable):
"""A synchronizer provides a method for synchronizing
threads. All the threads block on the sync() method, except
for the last one. When he reaches the sync() method, all
the others are unblocked. The synchronizer must be initialized
with the expected number of threads."""
def __init__(self, maxcount):
Blockable.__init__(self)
self.count = self.maxcount = maxcount
def sync(self):
atomic(self.__sync)
def __sync(self):
self.count = self.count - 1
if self.count > 0:
self.wait()
return
# We're the last guy, so awaken everybody else,
# and then reinitialize the counter so we can do
# it again some day.
self.notifyAll()
self.count = self.maxcount
class ActiveTimer(Blockable):
"""ActiveTimers block threads for a given amount of time,
and then release them.
Note: ActiveTimers consume a full time slice, they do not
reduce processor load. See Timer for processor load reduction
Note:
Waiting on a ActiveTimer whose finish time is in the past
will potentially hang your thread indefinitely (as the
ActiveTimer's timing thread will have exited, it will not
call notifyAll in this case. XXX This should be fixed.
Usage Example:
timer = ActiveTimer ( 3 )
timer.start () # fixes the "start" time for the timer
timer.wait() # blocks this micro-thread until the time has elapsed
"""
def __init__(self, interval=1):
Blockable.__init__(self)
self.interval = interval
def mythread(self):
while waitTimer() < self.finish:
switchContext()
self.notifyAll()
def start(self):
self.finish = waitTimer() + interval
new(self.mythread)
class Timer:
"""Timers are object-oriented interfaces to the wait/waitUntil
functions. Timers block the current thread for a given period
of time measured from the time when the timer's start method
is called.
The primary benefit of the Timer versus the ActiveTimer
is that the Timer allows for yielding the processor for
other micro-threads or operating system threads, preventing
"hogging" the processor.
See the global functions wait and waitUntil for procedural
mechanisms with the similar functionality.
Note:
A Timer whose finish time is in the past will not
block (or even force a task switch). Do not assume
that you will give up the rest of your time slice
by calling timer.wait() .
Usage Example:
timer = Timer ( 3 )
timer.start () # fixes the "start" time for the timer
timer.wait() # blocks this micro-thread until the time has elapsed
"""
def __init__(self, interval=1):
self.interval = interval
def start( self ):
"""Dummy call, now uses wait instead of active polling thread"""
self.finish = waitTimer() + self.interval
def wait( self ):
waitUntil( self.finish )
#
# Microthread kernel stuff: the Thread and Scheduler objects.
#
class Thread:
"""The thread object. See also the new() factory function."""
def __init__(self, name, func, *args, **kwargs):
self.name = name
self._func = func
self._args = args
self._kwargs = kwargs
self._started = 0
self._resistant = 0
self._scheduler = None
self._joins = []
def __repr__(self):
return "<Thread '%s' at 0x%x>" % (self.name, id(self))
def setResistant(self, onOff=1):
"""By default, when an exception occurs in a thread, all
other threads will be exited as well. To prevent this to
happen, you can call setResistant() with a non-zero value
to indicate that this thread should survive exceptions in
other threads. See also Thread.isResistant(), exitAll()
and exitOthers()."""
self._resistant = onOff
def isResistant(self):
"""Return a boolean indicating whether this thread will
survive exceptions occuring in other threads."""
return self._resistant
def isActive(self):
"Return true if this thread is active"
if self._scheduler:
return self._scheduler.isActive(self)
else:
return 0
def isBlocked(self):
"Return true if this thread is blocked"
if self._scheduler:
return self._scheduler.isBlocked(self)
else:
return 0
def start(self, task=None, scheduler=None):
"Start or restart this thread."
atomic(self.__start, task, scheduler)
def __start(self, task=None, scheduler=None):
if self.isActive():
raise UThreadError, ERR_THREAD_ACTIVE
if self.isBlocked():
assert task is None
self._scheduler.start(self, None)
else:
if self._started:
raise UThreadError, ERR_THREAD_DEAD
if scheduler is None:
scheduler = getScheduler()
self._scheduler = scheduler
if task is None:
task = scheduler.makeTask(self._func, self._args,
self._kwargs)
task.caller = None
scheduler.start(self, task)
self._started = 1
def block(self):
"Block this thread. Use thread.start() to restart it."
self._scheduler.block(self)
def wait(self, duration):
"Block thread for duration seconds"
self._scheduler.wait(self, duration)
def waitUntil(self, until):
"Block thread until waitTimer() >= until"
self._scheduler.waitUntil(self, until)
def exit(self):
"Exit this thread by raising a SystemExit exception."
self._scheduler.exit(self)
def postException(self, exc=SystemExit, val=None):
"Post an exception to this thread."
self._scheduler.postException(self, exc, val)
def handleException(self, exception):
"""This method gets called whenever an exception occurs
in this thread. Override this method if you need different
exception display behavior."""
atomic(self.__handleException, exception)
def __handleException(self, exception):
traceback.print_exception(exception[0], exception[1], exception[2])
## def join( self, others ):
## '''"Join" another thread, that is, wait for the other thread to
## complete execution before continuing processing in this thread.
## '''
## atomic( self.__join, others )
## def __join( self, others ):
## self._joins = others
## others = filter(lambda x: x.isActive() or x.isBlocked(), others)
## for other in others:
## # make the "other" a double-call, that is,
## # it's the current task + a wrapper that calls back
## # to us to tell us they're done.
class Scheduler:
"""A scheduler object is responsible for managing a collection of
microthreads. You usually don't need to use schedulers explicitly,
since the correct scheduler object is always used automatically.
In a non-threaded environment there will be just one scheduler
object, but in a multi-threaded environment there will be a scheduler
object for each system thread that uses microthreads. See also the
getScheduler() function."""
def __init__(self):
# This is a "pointer" to the current thread (so we can use
# a local reference in the scheduler loop for speed):
self.__currentThreadPtr = [None]
# Mapping of active threads to tasks:
self.active = {}
# Mapping of blocked threads to tasks (while this is not strictly
# neccesary, it's handy for debugging and monitoring):
self.blocked = {}
# A list of threads to be added to the internal task list:
self.pendingThreads = []
# A list of waiting threads, sorted by wake-up time
self.waitingThreads = []
self.__waitingThreadObject = None
# Store the ID of the system thread that created this scheduler,
# so we can later verify that we're being used in the correct
# context:
self._owner = get_ident()
def new(self, func, *args, **kwargs):
"""Create a new thread object for func(*args, **kwargs)
and return it."""
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
thread = apply(Thread, ("Thread-%s %s" % (uniqueId(), repr(func)), func) + args, kwargs)
thread.start(scheduler=self)
return thread
def run(self):
"""The main scheduler loop: run a bunch of tasks, until
they all complete."""
#
# Scheduler Loop Implementation Notes (JvR)
#
# It is important that this loop is as fast as possible, so we
# use all possible tricks. Using local variables exclusively is one.
#
# We're using a doubly linked list for the task list. There's no
# head or tail in this list: the last node has the first node as
# its 'next' value and so on. We don't even hold on to the first
# element, we only keep the current code in the 'currentNode'
# variable. A node is simply a Python list with length 3:
# [prevNode, thread, nextNode]
#
# This approach may look a little unpythonic, but we'd like to
# avoid list.pop() and list.insert() calls, since they may have to
# move quite a bit of memory if the list is long. With a linked
# list, inserting and deleting arbitrary items is relatively cheap.
# Still, the code below is quite verbose since we also like to
# avoid Python function calls...
#
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
# setup some local references
import __builtin__
None = __builtin__.None
KeyError = __builtin__.KeyError
timeslice = continuation.timeslice
currentThreadPtr = self.__currentThreadPtr
pendingThreads = self.pendingThreads
active = self.active
active_get = active.get
if currentThreadPtr[0] is not None:
return # we're already running, so we're all set.
if not pendingThreads:
raise UThreadError, ERR_NO_PENDING_TASKS
# constants to retrieve values from linked list nodes
PREV, THREAD, NEXT = range(3)
# initial dummy value, node will be created later
currentNode = None
while 1:
# The actual scheduler loop; will stop when there are
# no more tasks.
while pendingThreads:
# Add pending tasks to the task list. This must happen
# inside the loop since new threads may be started
# while we are running.
# New version should be os-thread safe, as you can append new
# pending threads while we are doing the initialisation
thread = pendingThreads.pop(0)
if currentNode:
# insert a new node *before* the current node
# (XXX we should be able to play with priorities here!)
newNode = [currentNode[PREV], thread, currentNode]
currentNode[PREV][NEXT] = newNode
currentNode[PREV] = newNode
else:
# list is empty, initialize
currentNode = []
currentNode[:] = [currentNode, thread, currentNode]
# make the current thread externally visible, assign to local variable
currentThreadPtr[0] = currentThread = currentNode[THREAD]
## print 'len(active)', len(active), currentThread
## print 'len(blocked)', len(self.blocked), currentThread
task = active_get( currentThread )
if task:
task = timeslice(task)
## print '.',
if task is None:
# This thread has been blocked or killed;
# remove the current node from the task list.
deadNode = currentNode
prevNode, currentNode = currentNode[PREV], currentNode[NEXT]
prevNode[NEXT] = currentNode
currentNode[PREV] = prevNode
del deadNode[:] # clear, to remove cycles
if not currentNode and not pendingThreads:
break # Exit the while loop: we're done!
else:
active[currentThread] = task
currentNode = currentNode[NEXT]
# Show the world we're no longer in business.
currentThreadPtr[0] = None
# We're not going to bother gracefully exiting orphans
# that were left blocked (poor sods!).
self.blocked.clear()
## Clean up memory/references for waiting threads...
self.__waitingThreadObject = None
del self.waitingThreads[:]
return
def switchContext(self):
"""Force a context switch."""
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
continuation.uthread_lock(1)
# jump back to the scheduler loop, passing our callers'
# continuation as the 'remainder'
scheduler = continuation.uthread_reset()
# jump back to the scheduler loop
scheduler(continuation.caller())
def getActiveThreads(self):
"""Return all active threads."""
return self.active.keys()
def getBlockedThreads(self):
"""Return all blocked threads."""
return self.blocked.keys()
def getAllThreads(self):
return atomic(self.__getAllThreads)
def __getAllThreads(self):
"""Return all threads, whether active or blocked."""
return self.active.keys() + self.blocked.keys()
def getCurrentThread(self):
"""Return the current thread."""
return self.__currentThreadPtr[0]
def isRunning(self):
return self.__currentThreadPtr[0] is not None
#
# The next methods work on specific threads. They are
# used by the Thread object, and should not be needed
# by client code.
#
def isActive(self, thread):
"""Return true if 'thread' is running."""
return self.active.has_key(thread)
def isBlocked(self, thread):
"""Return true if 'thread' is blocked."""
return self.blocked.has_key(thread)
def start(self, startingThread, startingTask):
"""Start a new thread or restart a blocked thread."""
if self._owner <> get_ident() and startingTask is None:
# only a completely new task is allowed, all other starts must be in same os thread
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
atomic(self.__start, startingThread, startingTask)
def __start(self, startingThread, startingTask):
if self.active.has_key(startingThread):
raise UThreadError, ERR_THREAD_ACTIVE
if self.blocked.has_key(startingThread):
# restart a blocked thread
assert startingTask is None
startingTask = self.blocked[startingThread]
del self.blocked[startingThread]
self.active[startingThread] = startingTask
self.pendingThreads.append(startingThread)
def block(self, thread):
"""Block a thread."""
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
tmp = continuation.uthread_lock(1)
if thread is self.__currentThreadPtr[0]:
del self.active[thread]
### IFF we are currently atomic, then when we wake up
### we want to be atomic (I think this is always true),
### so we need to restore the atomic state...
if tmp:
def restoreLock( lockValue, target ):
# when first run, give back the rest of this function...
continuation.return_current()
# this is what's run when you call the generated continuation
continuation.uthread_lock(lockValue)
# now call the original continuation (target)
target()
target = restoreLock( tmp, continuation.caller())
else:
target = continuation.caller()
self.blocked[thread] = target
# jump back to the scheduler loop, passing None
# to indicate the task should be removed from
# the task list.
scheduler = continuation.uthread_reset()
scheduler.jump(None)
else:
if not self.active.has_key(thread):
raise UThreadError, ERR_THREAD_NOT_ACTIVE
self.blocked[thread] = self.active[thread]
del self.active[thread]
continuation.uthread_lock(tmp)
### Waiting-related mechanisms
def wait(self, thread, duration):
''' Make this micro-thread wait duration seconds '''
self.waitUntil(thread, waitTimer() + duration)
def waitUntil(self, thread, until):
"""Make this micro-thread wait until waitTimer() >= until."""
if waitTimer() < until:
atomic( self.__waitUntil, thread, until)
def __waitUntil( self, thread, until ):
''' Atomic implementation for registering a thread for waiting '''
bisect.insort( self.waitingThreads, (until, thread) )
if not self.__waitingThreadObject:
## print 'creating new waiting thread'
# start the waiting thread...
self.__setWaitingThreadObject(
self.new ( self.__waitingThread )
)
elif self.isBlocked( self.__waitingThreadObject ):
## print 'waiter isBlocked, starting'
self.start( self.__waitingThreadObject, None )
## print 'blocking thread for wait'
thread.block()
def __setWaitingThreadObject (self, object= None):
'''
Assign a particular object as the official waiting thread
'''
if object is None and self.__waitingThreadObject is self.__currentThreadPtr[0]:
# attempt to clear, only allow if this is the current waiting thread
self.__waitingThreadObject = object
elif object is not None:
# attempt to set, allow in all cases
self.__waitingThreadObject = object
def __waitingThread( self ):
'''
Implementation of the waiting algorithm
The presence of this thread in the active list is what
prevents the scheduler from exiting. This thread
will exit if the list of waiting threads drops to zero
length, or if, on waking up, it discovers that it is
no longer the official waking thread.
This thread controls the entire scheduler,
potentially putting the whole OS thread
to sleep.
'''
# Localization
waitingThreads = self.waitingThreads
pendingThreads = self.pendingThreads
osThreadSleep = time.sleep
currentTime = waitTimer
schedule = self.switchContext
try:
while self.__currentThreadPtr[0] is not None: # is running
while waitingThreads:
# Waiting loop, triggered once per time through threads
while waitingThreads and currentTime() >= waitingThreads[0][0]:
# Restart waiting threads that are done waiting...
# we are the only thread allowed to remove waiting threads...
## print 'waking up sleeper'
wakeupTime, thread = waitingThreads.pop (0)
## try:
thread.start()
## except UThreadError:
## pass
if waitingThreads and len(self.active) == 1 and not pendingThreads:
# need to put the os thread to sleep until we're supposed to wake up
while waitingThreads and currentTime() < waitingThreads[0][0]:
# There is a potential for another waiting thread to delete the waitingThreads [0] here,
# should protect against that somehow, possibly we need to protect against that other
# waiting thread ever been created
delta = waitingThreads[0][0] - currentTime()
## print waitingThreads[0][0], currentTime()
## print 'os thread sleep', delta
# There's a bug somewhere that makes really
# short sleep periods cause hangs (osThreadSleep never returns)
# this is a hacky work-around, not a solution...
if delta > 0.0001:
osThreadSleep(delta)
## print 'os thread wakeup', currentTime()
# allow other micro-threads to run
schedule()
tmp = continuation.uthread_lock(1)
if not waitingThreads and self.__currentThreadPtr[0] is not None:
self.block( self.__currentThreadPtr[0] )
continuation.uthread_lock(tmp)
except ValueError:
pass
# eliminate the waiting thread if we are the current waiting thread...
atomic( self.__setWaitingThreadObject )
def exit(self, thread):
self.postException(thread, SystemExit)
def exitAll(self, reallyAll=0):
"""Exit all threads by raising SystemExit in each of them.
If the 'reallyAll' argument is true, even threads marked
'resistant' will be exited."""
self.exitOthers(reallyAll)
# exit the current thread last
raise SystemExit
def exitOthers(self, reallyAll=0):
"""Exit all threads except the current thread by raising
SystemExit in each of them. If the 'reallyAll' argument
is true, even threads marked 'resistant' will be exited."""
atomic(self.__exitOthers, reallyAll)
def __exitOthers(self, reallyAll):
currentThread = self.__currentThreadPtr[0]
for thread in self.getActiveThreads():
if thread is not currentThread and (reallyAll or
not thread.isResistant()):
thread.postException(SystemExit)
def postException(self, thread, exc, val=None):
"""Post an exception to a thread."""
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
atomic(self.__postException, thread, exc, val)
def __postException(self, thread, exc, val):
if thread is self.__currentThreadPtr[0]:
raise exc, val
else:
def exceptionRaiser(exc, val):
continuation.return_current()
raise exc, val
if not self.blocked.has_key(thread):
task = self.active[thread]
else:
task = self.blocked[thread]
implant = exceptionRaiser(exc, val) # caught just before the exception
implant.caller = task # stick it on top as if it was called
if not self.blocked.has_key(thread):
self.active[thread] = implant
else:
# Note that the exception will not be raised
# before the blocked task is restarted!
self.blocked[thread] = implant
def kill(self, thread):
"""Kill a thread. Note that it's usually better to post
a SystemExit exception or call exit(), since kill() is
very brutal: eg. pending 'finally' clauses will not be
executed!"""
if self._owner <> get_ident():
raise UThreadError, ERR_WRONG_SYSTEM_THREAD
tmp = continuation.uthread_lock(1)
if thread is self.__currentThreadPtr[0]:
del self.active[thread]
# jump back to the scheduler loop, passing None
# to indicate the task should be removed from
# the task list.
scheduler = continuation.uthread_reset()
scheduler.jump(None)
else:
if self.active.has_key(thread):
del self.active[thread]
elif self.blocked.has_key(thread):
del self.blocked[thread]
else:
raise UThreadError, ERR_UNKNOWN_THREAD
continuation.uthread_lock(tmp)
def makeTask(self, func, args, kw):
"""Turn a function plus arguments into a continuation,
suitable for consumption by the scheduler."""
try:
continuation.return_current()
apply(func, args, kw)
except (SystemExit, KeyboardInterrupt):
# Let SystemExit & KeyboardInterrupt go by silently,
# they mean the end of the thread.
pass
except:
currentThread = self.__currentThreadPtr[0]
currentThread.handleException(sys.exc_info())
self.exitOthers()
self.kill(self.__currentThreadPtr[0])
def wrapTask( self, core ):
'''Wrap a task (continuation) with a task that
does the continuation then does something else.'''
#
# Some support for using microthreads under system threads:
# each system thread should have its own scheduler instance.
#
if thread is None:
# No system threading available, make a stub.
def getScheduler(threadID=None,
scheduler=Scheduler()):
"""Return the default scheduler object."""
assert threadID is None
return scheduler
else:
# We have system threading, use one scheduler per system thread.
def getScheduler(threadID=None,
schedulerMap={}, # (Abusing a mutable default arg as a
# kind of "static" local variable.)
get_ident=thread.get_ident):
"""If no threadID is given, return the scheduler object
belonging to the current system thread, otherwise the
scheduler object for the specified threadID."""
if threadID is None:
threadID = get_ident()
try:
scheduler = schedulerMap[threadID]
except KeyError:
scheduler = schedulerMap[threadID] = Scheduler()
return scheduler
#
# Tests
#
if __name__ == "__main__":
sem = Semaphore()
def print_safe(*args):
sem.acquire()
for arg in args:
print arg,
print
sem.release()
pwidth = 0
def printw(str):
global pwidth
sem.acquire()
if pwidth + len(str) + 1 > 70:
print
pwidth = 0
print str,
sys.stdout.flush()
pwidth = pwidth + len(str) + 1
sem.release()
def pwclean():
global pwidth
if pwidth != 0:
print
pwidth = 0
def semaphore_test():
'''Semaphore test, tests??'''
preambleString = 'ABCDEFGH'
def count(preamble):
for i in range(20):
printw(preamble + `i`)
for ch in preambleString:
new(count, ch)
run()
pwclean()
print
def queue_test():
"""Test queues and snailqueues"""
def talker(q):
for i in range(100):
printw(`i`)
wasteTime = 0
for j in range(20):
wasteTime = wasteTime + 1
q.put(i)
q.put(None)
def listener(q):
x = 2
while x != None:
x = q.get()
printw('[' + `x` + ']')
q = Queue()
new(talker, q)
new(listener, q)
run()
pwclean()
print
q = SnailQueue()
new(talker, q)
new(listener, q)
run()
pwclean()
print
def nap(x):
t = Timer(x)
t.start()
t.wait()
def timer_test():
'''Test Timer object
'''
def tryTimer(x):
t = Timer(x)
t.start()
t.wait()
printw('%.3g' % x)
for i in range(20):
x = random(5)
new(tryTimer, x)
run()
pwclean()
print
def synchronizer_test():
'''Test Synchronizer object'''
def trySynchronizer(s, x):
t = Timer(x)
t.start()
t.wait()
printw('waiting')
s.sync()
printw('done')
N = 20
s = Synchronizer(N)
for i in range(N):
new(trySynchronizer, s, random(5))
run()
pwclean()
print
def condition_test():
'''Test Condition object'''
m, n = 3, 3
chowtime = [ ]
cv = Condition()
def consumer(cv, chowtime, m):
for i in range(m):
nap(.5)
cv.acquire()
while len(chowtime) == 0:
cv.wait()
chowtime.pop(0)
print getCurrentThread(), 'eating'
cv.release()
def producer(cv, chowtime, m, n):
for i in range(m * n):
nap(.5 / n)
cv.acquire()
chowtime.append('morsel')
print getCurrentThread(), 'feeding'
cv.notify()
cv.release()
new(producer, cv, chowtime, m, n).name = 'Producer'
for i in range(n):
new(consumer, cv, chowtime, m).name = 'Consumer'
run()
pwclean()
print
def justs_test():
'''Just's test
Expected results:
Three threads (ABC) will print out interspersed iteration counts
with each thread having slightly different timing for their
counting loops.'''
def loop(name, duration, interval=1):
count = 0
start = t = time.clock()
while 1:
now = time.clock()
if now - start > duration:
printw("(%s is done!)" % name)
break
elif now - t > interval:
t = now
printw("%s%s" % (name, count))
count = count + 1
switchContext()
t1 = new(loop, "A", 1.5, 0.1)
t2 = new(loop, "B", 2.5, 0.25)
t3 = new(loop, "C", 4, 0.5)
run()
pwclean()
print
def external_control_test():
'''Attempt to control threads from another thread
Second thread attempts to pause another (thread.block())
then restart it (thread.start()) after a given time.
Expected results:
The two threads are counting. Results should be interspersed.
After a period, thread A will be paused from thread B.
Thread B (and only thread B) should continue counting.
After a period, thread A will be restarted from thread B.
Thread A should continue counting until it finishes.'''
def loop1(name, duration):
print_safe("begin:", name)
last = t = time.clock()
counter = 0
while 1:
now = time.clock()
if now - t > duration:
print_safe("done!")
break
if now - last > 0.2:
print_safe(name, counter)
counter = counter + 1
last = now
def loop2(name, thread, when, whenstart):
print_safe("begin:", name)
last = t = time.clock()
counter = 0
ispaused = 0
while 1:
now = time.clock()
if not ispaused and now - t > when:
print_safe("pausing", thread, "from thread",
getCurrentThread())
thread.block()
ispaused = 1
if now - t > whenstart:
print_safe("restarting", thread, "from thread",
getCurrentThread())
thread.start()
break
if now - last > 0.25:
print_safe(name, counter)
counter = counter + 1
last = now
name1 = "A"
name2 = "B"
thA = Thread(name1, loop1, name1, 4)
thB = Thread(name2, loop2, name2, thA, 1.5, 3)
thA.start()
thB.start()
run()
print
def post_exception_test():
'''Attempt to post an exception to another thread
Expected results:
The two threads are counting upward.
After 2.5 seconds, the second thread will interrupt the first
by posting the exception "FakeError" to it. You should see a
traceback printed in the middle of the
"<Thread 'Thread-166' at X> number" lines.
The first thread should stop counting.
The second thread should continue counting.'''
def loop1(duration, when=0, thread=None):
last = t = time.clock()
counter = 0
while 1:
now = time.clock()
if now - t > duration:
break
if when and now - t > when:
print_safe("posting exception to %s from %s" %
(thread, getCurrentThread()))
thread.postException("FakeError", (-123, "just checking the walls"))
when = 0
if now - last > 0.5:
print_safe(getCurrentThread(), counter)
counter = counter + 1
last = now
th1 = new(loop1, 5)
th2 = new(loop1, 5, 2.5, th1)
th2.setResistant(1)
run()
print
def wait_test():
'''Test small number of micro-threads waiting for moderate duration:
Creates 5 micro-threads, each goes to sleep for random duration
of up to five seconds, after which the requested time and actual time
slept are printed.
Expected results:
A number of "Start wait, duration" lines
A pause equal to the shortest duration.
A number of "wait completed requested: duration actual: duration"
lines, with pauses between them equal to the difference between the
durations'''
def tryWait(x):
t = waitTimer()
printw('Start wait %.3g\n'%(x))
wait(x)
printw(' wait completed requested:%.3g actual:%.3g\n' % (x, waitTimer()-t))
for i in range(5):
x = random(5)
new(tryWait, x)
## print 'running 5 paused threads (random pause), should be low processor load'
run()
pwclean()
print
def lotsa_wait_test():
'''Test large numbers of micro-threads waiting for moderate duration:
Creates 3000 micro-threads, each of which waits for 3 seconds
Expected results:
Should be a period of time with very high processor load
(creating and waiting micro-threads)
Should be followed by a period of time with no processing load
(all threads sleeping)
Should be a period of time with very high processor load
(waking and destroying micro-threads)'''
def tryWait(x):
wait(x)
for i in range(500):
new(tryWait, 3)
## print 'running 3000 paused threads (3 second pause), should be low processor load'
run()
print 'finished'
pwclean()
print
def micro_wait_test():
'''Test large number of micro-thread waiting, short duration:
Creates 3000 threads, each of which waits for 0.05 seconds
Expected results:
On slow machines, may wind up with no sleep, threads will ignore
the wait, and continue to completion immediately.
On fast machines, there may be a (very) brief low-processor-load
moment, then all threads should continue.'''
def tryWait(x):
wait(x)
for i in range(500):
new(tryWait, .05)
## print 'running 3000 paused threads (.05 second pause)'
run()
print 'finished'
pwclean()
print
def os_thread_test():
'''Test ExternalQueue object:
Creates two operating system threads, one with a micro-thread
set consisting of "readers", the other consisting of "writers".
The two operating system threads (and their respective micro-
threads) communicate via an ExternalQueue object.
Expected results:
A series of read <number> lines, interspersed with a series of
write <number> lines note, the reads may be printed before the
writes are printed due to task switching'''
def holder( ): # hold the micro-threads library open, should have a better mechanism for this
while 1:
wait( 0.01 )
# why does this exit???
def reader( queue, number=5 ):
for x in range( number ):
value = queue.get()
print "read",value
print 'reader exit'
def writer( queue, number=5 ):
for x in range( number ):
queue.put( x )
print "write",x
wait( random(.1) )
print 'writer exit'
def microReader( queue, reader=reader, holder=holder ):
new( reader, queue )
new( holder )
run()
def microWriter( queue, writer=writer ):
new( writer, queue )
run()
queue = ExternalQueue()
thread.start_new_thread( microReader, (queue,) )
microWriter(queue,)
TESTS = [
semaphore_test,
queue_test,
timer_test,
synchronizer_test,
condition_test,
justs_test,
external_control_test,
post_exception_test,
wait_test,
lotsa_wait_test,
micro_wait_test,
]
if thread:
TESTS.append( os_thread_test)
for test in TESTS:
print '##',test.__doc__
test()
More information about the Stackless
mailing list