[Stackless] uthread latest version

Christian Tismer tismer at tismer.com
Tue Feb 20 15:03:04 CET 2001


Hi all,

find attached the (to my knowledge) latest version of
uthreads. It is the one that is installed as an example together
with the Stackless for windows executable.

cheers - chris

-- 
Christian Tismer             :^)   <mailto:tismer at tismer.com>
Mission Impossible 5oftware  :     Have a break! Take a ride on Python's
Kaunstr. 26                  :    *Starship* http://starship.python.net
14163 Berlin                 :     PGP key -> http://wwwkeys.pgp.net
PGP Fingerprint       E182 71C7 1A9D 66E9 9D15  D3CC D4D7 93E2 1FAE F6DF
     where do you want to jump today?   http://www.stackless.com
-------------- next part --------------
"""Python Microthread Library, version 0.2

Microthreads are useful when you want to program many behaviors
happening simultaneously. Simulations and games often want to model
the simultaneous and independent behavior of many people, many
businesses, many monsters, many physical objects, many spaceships, and
so forth. With microthreads, you can code these behaviors as Python
functions. Microthreads use Stackless Python. For more details, see
	http://world.std.com/~wware/uthread.html
"""

__version__ = "0.2"

__license__ = """\
Python Microthread Library version %s
Copyright (C)2000  Will Ware, Christian Tismer, Just van Rossum

Permission to use, copy, modify, and distribute this software and its
documentation for any purpose and without fee is hereby granted,
provided that the above copyright notice appear in all copies and that
both that copyright notice and this permission notice appear in
supporting documentation, and that the names of the authors not be
used in advertising or publicity pertaining to distribution of the
software without specific, written prior permission.

THE AUTHORS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
EVENT SHALL THE AUTHORS BE LIABLE FOR ANY SPECIAL, INDIRECT OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.""" % __version__


import continuation, sys, traceback, bisect, time, StringIO
try:
	from cStringIO import StringIO
except ImportError:
	from StringIO import StringIO

### Are there platforms where time.time is not accurate enough?
##waitTimer = time.clock
waitTimer = time.time


try:
	import thread
except ImportError:
	# No system threading available, 
	# create a dummy get_ident() function.
	def get_ident():
		return 1
	thread = None
else:
	# We have system threads. See also getScheduler().
	get_ident = thread.get_ident


class UThreadError(Exception): pass

ERR_UNKNOWN_THREAD = "Can't kill unknown thread."
ERR_THREAD_ACTIVE = "Thread is already active"
ERR_THREAD_NOT_ACTIVE = "Thread is not active"
ERR_THREAD_DEAD = "Can't restart a finished thread."
ERR_WRONG_SYSTEM_THREAD = "Can't call this method from " \
			"a system thread other than the owner."
ERR_NO_PENDING_TASKS = "Can't enter the scheduler loop " \
			"without any pending tasks."


#
# Functions related to the scheduler.
#

def new(func, *args, **kwargs):
	"""Create a new thread object for func(*args, **kwargs) and
	return it."""
	return apply(getScheduler().new, (func,) + args, kwargs)
def newResistent( func, *args, **kwargs ):
	'''Create a new resistent thread object for func(*args, **kwargs) and return it'''
	def _a( func, args, kwargs ):
		thread = apply(getScheduler().new, (func,) + args, kwargs)
		thread.setResistant(1)
		return thread
	return atomic( _a, func, args, kwargs )

def run():
	"""The main scheduler loop: run a bunch of tasks, until
	they all complete."""
	getScheduler().run()
def runAndContinue():
	'''Scheduler loop that continues running from the current position
	in a micro-thread (i.e. appears to return immediately)'''
	thread = Thread( "<Thread: Main>", None )
	thread.start( task = continuation.caller() )
	getScheduler().run()
	print 'scheduler exited normally'
	raise SystemExit(0)

def switchContext():
	"""Force a context switch, giving up the rest of our time slot
	for other threads."""
	getScheduler().switchContext()

def block():
	"""Block the current thread."""
	getCurrentThread().block()

def wait(duration):
	"""Suspend a thread for duration seconds."""
	return getCurrentThread().wait(duration)

def waitUntil(until):
	"""Suspend a thread until time.time() >= until.
	Note: this uses time.time's concept of time, even if
	the module is currently using time.clock as it's
	internal time-management mechanism.
	"""
	if waitTimer is not time.time:
		difference = until - time.time()
		until = waitTimer() + difference
	return getCurrentThread().waitUntil(until)

def exit():
	"""Exit the current thread, by raising SystemExit."""
	raise SystemExit

def exitAll(reallyAll=0):
	"""Exit all threads by raising SystemExit in each of them.
	If the 'reallyAll' argument is true, even threads marked
	'resistant' will be exited."""
	getScheduler().exitAll(reallyAll)

def exitOthers(reallyAll=0):
	"""Exit all threads except the current thread by raising
	SystemExit in each of them. If the 'reallyAll' argument
	is true, even threads marked 'resistant' will be exited."""
	getScheduler().exitOthers(reallyAll)

def getCurrentThread():
	"""Return the current thread."""
	return getScheduler().getCurrentThread()

def getActiveThreads():
	"""Return all active threads."""
	return getScheduler().getActiveThreads()

def getBlockedThreads():
	"""Return all blocked threads."""
	return getScheduler().getBlockedThreads()

def getAllThreads():
	"""Return all threads, whether active or blocked."""
	return getScheduler().getAllThreads()

def microThreadsRunning ():
	return getScheduler().getCurrentThread() is not None

#
# Some utilities
#

continuation_uthread_lock = continuation.uthread_lock
def atomic(func, *args, **kwargs):
	"""Perform a function call as a microthread-safe operation."""
##	print 'atomic:', func
	tmp = continuation_uthread_lock(1)
	try:
		return apply(func, args, kwargs)
	finally:
		continuation_uthread_lock(tmp)
def startCritical( ):
	'''DANGER: Critical sections can muck up your code!
	consider using atomic whereever possible instead.'''
	return continuation_uthread_lock(1)
def endCritical( token ):
	'''DANGER: Critical sections can muck up your code!
	consider using atomic whereever possible instead.'''
	return continuation_uthread_lock(token)


def getExceptionString():
	'''Attempt to get the "current" exception string.
	Unfortunately, there's always a chance that some
	other micro-thread has raised (and possibly caught
	another exception before you call this function.'''
	tmp = continuation_uthread_lock(1)
	try:
		io = StringIO()
		traceback.print_exc( 20, io )
		return io.getvalue()
	finally:
		continuation_uthread_lock(tmp)


def external (scheduler, function,*arguments,**namedarguments):
	"""Perform an function within a micro-thread for an external thread"""
	def _external( function, *arguments, **namedarguments ):
		apply( function, arguments, namedarguments)
	slave = apply (
		Thread,
		( "External Message Slave", _external, function)+arguments,
		namedarguments,
	)
	slave.start( scheduler = scheduler )

def random(x=1):
	"""Microthread-safe version of random.random()"""
	import random
	return atomic(lambda x=x,rnd=random.random: x * rnd())


_nextId = 1
def uniqueId():
	"""Microthread-safe way to get unique numbers,
	handy for giving things unique ID numbers."""
	def _uniqueId():
		global _nextId
		z = _nextId
		_nextId = z + 1
		return z
	return atomic(_uniqueId)



#
# Widgets
#

class Blockable:
	
	"""Widgets that can block threads share some common
	behaviors, so package them in a base class."""

	def __init__(self):
		self.waiters = []
	
	def wait(self):
		th = getCurrentThread()
		# Following needs to be re-thought, as it is not logically correct
		# for now, you can only wait a blockable object from a micro-thread.
##		if th is None:
##			# micro-threads are not currently running in this operating system thread
##			# we suspend the operating system thread...
##			if thread: # have operating system threading support
##				externalLock = thread.allocate_lock()
##				# add a thread to our waiters
##				externalLock.acquire () # should immediately succeed
##				newThread = Thread( "External Thread Lock", externalLock.release )
##				self.waiters.append( newThread )
##				externalLock.acquire() # should suspend until release
##		else:
		self.waiters.append(th)
		th.block()

	def notify(self):
		atomic(self.__notify)
	
	def __notify(self):
		if self.waiters:
			th = self.waiters.pop(0)
			th.start()
	
	def notifyAll(self):
		atomic(self.__notifyAll)
	
	def __notifyAll(self):
		while self.waiters:
			th = self.waiters.pop(0)
			th.start()

class ExternalBlockable:
	"""Mix-in provides mechanism for external process to release a blocked micro-thread"""
	def notify(self):
		self.__notify() # different method than the base class
	
	def __notify(self):
		if self.waiters:
			thread = self.waiters.pop(0)
			external (thread._scheduler, thread.start )


class Lock(Blockable):

	"A primitive lock object"

	def __init__(self):
		Blockable.__init__(self)
		self.locked = 0

	def acquire(self, blocking=1):
		return atomic(self.__acquire, blocking)

	def __acquire(self, blocking):
		while self.locked:
			if not blocking:
				return 0
			self.wait()
		self.locked = 1
		return 1
	
	def release(self):
		assert self.locked, "trying to release unlocked Lock"
		atomic(self.__release)

	def __release(self):
		self.notify()
		self.locked = 0

	def _is_owned(self):
		return self.locked

class RLock(Blockable):

	"A reentrant lock object"

	def __init__(self):
		Blockable.__init__(self)
		self.locked = 0
		self.owner = None

	def acquire(self, blocking=1):
		return atomic(self.__acquire, blocking)

	def __acquire(self, blocking):
		th = getCurrentThread()
		if th == self.owner:
			self.locked = self.locked + 1
			return 1
		while self.locked:
			if not blocking:
				return 0
			self.wait()
		self.locked = 1
		self.owner = th
		return 1
	
	def release(self):
		atomic(self.__release)

	def __release(self):
		th = getCurrentThread()
		assert th is self.owner, "trying to release somebody else's RLock"
		assert self.locked > 0, "trying to release unlocked RLock"
		self.locked = self.locked - 1
		if self.locked == 0 and self.waiters:
			self.owner = None
			self.notify()

	def _is_owned(self):
		return self.locked > 0
		#return self.owner != None


class Condition:

	def __init__(self, lock=None):
		if lock is None:
			lock = RLock()
		self.__lock = lock
		# Export the lock's acquire() and release() methods
		self.acquire = lock.acquire
		self.release = lock.release
		self._is_owned = lock._is_owned
		self.__waiters = []

	def __repr__(self):
		return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))

	def wait(self, timeout=None):
		me = getCurrentThread()
		assert timeout == None, "timeout not yet implemented"
		assert self._is_owned(), "wait() of un-acquired lock"
		self.release()
		# note that the following allows for multiple copies of the same thread in the list
		self.__waiters.append(me)
		me.block()
		self.acquire()

	def notify(self, n=1):
		atomic( self.__notify, n )
	def __notify (self, n= 1):
		assert self._is_owned(), "notify() of un-acquired lock"
		__waiters = self.__waiters
		waiters = __waiters[:n]
		for waiter in waiters:
			if waiter.isBlocked():
				# previously would attempt to start already started threads if they appeared twice...
				waiter.start()
			__waiters.remove(waiter)

	def notifyAll(self):
		self.notify(len(self.__waiters))


class Semaphore(Blockable):
	
	"""Semaphores protect globally accessible resources from
	the effects of context switching."""

	def __init__(self, maxcount=1):
		Blockable.__init__(self)
		self.count = maxcount

	def acquire(self):
		atomic(self.__acquire)

	def __acquire(self):
		if self.count == 0:
			self.wait()
		else:
			self.count = self.count - 1
	
	def release(self):
		atomic(self.__release)
	
	def __release(self):
		if self.waiters:
			self.notify()
		else:
			self.count = self.count + 1


class Queue(Blockable):
	'''A Queue is a micro-thread-safe mechanism
	for creating a First-in-First-out (FIFO)
	sequence object.
	
	Queue's allow for processing messages in order
	when they are delivered (without the need for
	constantly polling to see if new messages are
	available).

	See also:
		ExternalQueue -- sub-class for use in
		multiple operating-system thread situations
		
		SnailQueue -- sub-class with built-in delay
		suitable for modeling communications delays
	Usage example:
		def writer( queue ):
			"""write 20 numbers to a queue"""
			for number in range( 20):
				queue.put (number) # add the number to the queue, awakening the reader
				wait( .2 ) # wait 1/5 of a second before the next number
		def reader(queue):
			"""read numbers when available"""
			number = 0
			while number != 19:
				# the queue.get call will return immediately
				# if there is a value available
				# if there isn't a value available,
				# then the thread (reader) will be suspended (blocked)
				# until a value is available (something calls queue.put,
				# which in turn calls queue.notify).  When that happens
				# the thread is (re)started and continues processing
				# with the returned value.
				number = queue.get()
				print 'read', number
		queue = Queue()
		new( reader, queue)
		new( writer, queue)
		run()
	'''

	def __init__(self):
		Blockable.__init__(self)
		self.contents = []
	def put(self, x):
		atomic(self.__put, x)
	
	def __put(self, x):
		self.contents.append(x)
		self.notify()

	def get(self):
		return atomic(self.__get)
	
	def __get(self):
		if not self.contents:
			self.wait()
		return self.contents.pop(0)

	def unget(self, x):
		self.contents.insert(0, x)

	def cget(self):
		return atomic(self.__cget)

	def __cget(self):
		if self.contents:
			return self.contents.pop(0)
		else:
			return None

class ExternalQueue (ExternalBlockable, Queue):
	''' An ExternalQueue is a Queue which can be
	written to by operating system threads other
	than those that host the waiting micro-threads
	scheduler's
	'''
	pass
NULL = ()
class SnailQueue(Queue):
	''' A SnailQueue is an intentionally slow Queue
	It allows for modeling communications latencies.
	Each new message arriving in the queue is delayed
	for either a set period (if randomize = 0), or a
	randomized fraction of that period (if randomize
	is true).
	'''
	
	def __init__(self, t=.1, randomize= 0):
		Queue.__init__(self)
		self.delay = t
		self.pipeline = Queue()
		self.randomize = randomize
		new( self.mythread )

	def mythread(self):
		randomize = self.randomize
		get = self.pipeline.get
		while 1: # should really use an "active" or similar flag to allow orderly closing
			value = get() # will block if no new messages...
			if value is NULL:
				break
			# delay some seconds
			if randomize: # if true (default), then used a randomized time to sleep
				wait( random (self.delay))
			else:
				wait( self.delay )
			# now do standard Queue stuff
			Queue.put( self,  value)
	
	def put(self, value):
		self.pipeline.put( value )
	def close( self ):
		self.pipeline.put( NULL)
		self.pipeline = None
	


class Synchronizer(Blockable):
	
	"""A synchronizer provides a method for synchronizing
	threads. All the threads block on the sync() method, except
	for the last one. When he reaches the sync() method, all
	the others are unblocked. The synchronizer must be initialized
	with the expected number of threads."""
	
	def __init__(self, maxcount):
		Blockable.__init__(self)
		self.count = self.maxcount = maxcount

	def sync(self):
		atomic(self.__sync)
	
	def __sync(self):
		self.count = self.count - 1
		if self.count > 0:
			self.wait()
			return
		# We're the last guy, so awaken everybody else,
		# and then reinitialize the counter so we can do
		# it again some day.
		self.notifyAll()
		self.count = self.maxcount

class ActiveTimer(Blockable):
	"""ActiveTimers block threads for a given amount of time,
	and then release them.
	Note: ActiveTimers consume a full time slice, they do not
	reduce processor load.  See Timer for processor load reduction

	Note:
		Waiting on a ActiveTimer whose finish time is in the past
		will potentially hang your thread indefinitely (as the
		ActiveTimer's timing thread will have exited, it will not
		call notifyAll in this case.  XXX This should be fixed.
	
	Usage Example:
		timer = ActiveTimer ( 3 )
		timer.start () # fixes the "start" time for the timer
		timer.wait() # blocks this micro-thread until the time has elapsed
	"""

	def __init__(self, interval=1):
		Blockable.__init__(self)
		self.interval = interval

	def mythread(self):
		while waitTimer() < self.finish:
			switchContext()
		self.notifyAll()

	def start(self):
		self.finish = waitTimer() + interval
		new(self.mythread)


class Timer:
	"""Timers are object-oriented interfaces to the wait/waitUntil
	functions.  Timers block the current thread for a given period
	of time measured from the time when the timer's start method
	is called.

	The primary benefit of the Timer versus the ActiveTimer
	is that the Timer allows for yielding the processor for
	other micro-threads or operating system threads, preventing
	"hogging" the processor.

	See the global functions wait and waitUntil for procedural
	mechanisms with the similar functionality.

	Note:
		A Timer whose finish time is in the past will not
		block (or even force a task switch).  Do not assume
		that you will give up the rest of your time slice
		by calling timer.wait() .

	Usage Example:
		timer = Timer ( 3 )
		timer.start () # fixes the "start" time for the timer
		timer.wait() # blocks this micro-thread until the time has elapsed
	"""

	def __init__(self, interval=1):
		self.interval = interval
	def start( self ):
		"""Dummy call, now uses wait instead of active polling thread"""
		self.finish = waitTimer() + self.interval
	def wait( self ):
		waitUntil( self.finish )


#
# Microthread kernel stuff: the Thread and Scheduler objects.
#

class Thread:
	
	"""The thread object. See also the new() factory function."""
	
	def __init__(self, name, func, *args, **kwargs):
		self.name = name
		self._func = func
		self._args = args
		self._kwargs = kwargs
		self._started = 0
		self._resistant = 0
		self._scheduler = None
		self._joins = []
	
	def __repr__(self):
		return "<Thread '%s' at 0x%x>" % (self.name, id(self))
	
	def setResistant(self, onOff=1):
		"""By default, when an exception occurs in a thread, all
		other threads will be exited as well. To prevent this to
		happen, you can call setResistant() with a non-zero value
		to indicate that this thread should survive exceptions in
		other threads. See also Thread.isResistant(), exitAll()
		and exitOthers()."""
		self._resistant = onOff
	
	def isResistant(self):
		"""Return a boolean indicating whether this thread will
		survive exceptions occuring in other threads."""
		return self._resistant
	
	def isActive(self):
		"Return true if this thread is active"
		if self._scheduler:
			return self._scheduler.isActive(self)
		else:
			return 0
	
	def isBlocked(self):
		"Return true if this thread is blocked"
		if self._scheduler:
			return self._scheduler.isBlocked(self)
		else:
			return 0

	def start(self, task=None, scheduler=None):
		"Start or restart this thread."
		atomic(self.__start, task, scheduler)
		
	def __start(self, task=None, scheduler=None):
		if self.isActive():
			raise UThreadError, ERR_THREAD_ACTIVE
		if self.isBlocked():
			assert task is None
			self._scheduler.start(self, None)
		else:
			if self._started:
				raise UThreadError, ERR_THREAD_DEAD
			if scheduler is None:
				scheduler = getScheduler()
			self._scheduler = scheduler
			if task is None:
				task = scheduler.makeTask(self._func, self._args,
						self._kwargs)
				task.caller = None
			scheduler.start(self, task)
			self._started = 1
	
	def block(self):
		"Block this thread. Use thread.start() to restart it."
		self._scheduler.block(self)
		
	def wait(self, duration):
		"Block thread for duration seconds"
		self._scheduler.wait(self, duration)
	
	def waitUntil(self, until):
		"Block thread until waitTimer() >= until"
		self._scheduler.waitUntil(self, until)
	
	def exit(self):
		"Exit this thread by raising a SystemExit exception."
		self._scheduler.exit(self)
	
	def postException(self, exc=SystemExit, val=None):
		"Post an exception to this thread."
		self._scheduler.postException(self, exc, val)
	
	def handleException(self, exception):
		"""This method gets called whenever an exception occurs
		in this thread. Override this method if you need different
		exception display behavior."""
		atomic(self.__handleException, exception)
	
	def __handleException(self, exception):
		traceback.print_exception(exception[0], exception[1], exception[2])
##	def join( self, others ):
##		'''"Join" another thread, that is, wait for the other thread to
##		complete execution before continuing processing in this thread.
##		'''
##		atomic( self.__join, others )
##	def __join( self, others ):
##		self._joins = others
##		others = filter(lambda x: x.isActive() or x.isBlocked(),  others)
##		for other in others:
##			# make the "other" a double-call, that is,
##			# it's the current task + a wrapper that calls back
##			# to us to tell us they're done.
			


class Scheduler:
	
	"""A scheduler object is responsible for managing a collection of
	microthreads. You usually don't need to use schedulers explicitly,
	since the correct scheduler object is always used automatically.
	In a non-threaded environment there will be just one scheduler
	object, but in a multi-threaded environment there will be a scheduler
	object for each system thread that uses	microthreads. See also the
	getScheduler() function."""
	
	def __init__(self):
		# This is a "pointer" to the current thread (so we can use
		# a local reference in the scheduler loop for speed):
		self.__currentThreadPtr = [None]
		# Mapping of active threads to tasks:
		self.active = {}
		# Mapping of blocked threads to tasks (while this is not strictly
		# neccesary, it's handy for debugging and monitoring):
		self.blocked = {}
		# A list of threads to be added to the internal task list:
		self.pendingThreads = []
		# A list of waiting threads, sorted by wake-up time
		self.waitingThreads = []
		self.__waitingThreadObject = None
		# Store the ID of the system thread that created this scheduler,
		# so we can later verify that we're being used in the correct
		# context:
		self._owner = get_ident()
	
	def new(self, func, *args, **kwargs):
		"""Create a new thread object for func(*args, **kwargs)
		and return it."""
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		thread = apply(Thread, ("Thread-%s %s" % (uniqueId(), repr(func)), func) + args, kwargs)
		thread.start(scheduler=self)
		return thread
	
	def run(self):
		"""The main scheduler loop: run a bunch of tasks, until
		they all complete."""
		#
		# Scheduler Loop Implementation Notes (JvR)
		#
		# It is important that this loop is as fast as possible, so we
		# use all possible tricks. Using local variables exclusively is one.
		#
		# We're using a doubly linked list for the task list. There's no
		# head or tail in this list: the last node has the first node as
		# its 'next' value and so on. We don't even hold on to the first
		# element, we only keep the current code in the 'currentNode'
		# variable. A node is simply a Python list with length 3:
		#     [prevNode, thread, nextNode]
		#
		# This approach may look a little unpythonic, but we'd like to
		# avoid list.pop() and list.insert() calls, since they may have to
		# move quite a bit of memory if the list is long. With a linked
		# list, inserting and deleting arbitrary items is relatively cheap.
		# Still, the code below is quite verbose since we also like to
		# avoid Python function calls...
		#
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		
		# setup some local references
		import __builtin__
		None = __builtin__.None
		KeyError = __builtin__.KeyError
		timeslice = continuation.timeslice
		currentThreadPtr = self.__currentThreadPtr
		pendingThreads = self.pendingThreads
		active = self.active
		active_get = active.get
		
		if currentThreadPtr[0] is not None:
			return  # we're already running, so we're all set.
		if not pendingThreads:
			raise UThreadError, ERR_NO_PENDING_TASKS
		
		# constants to retrieve values from linked list nodes
		PREV, THREAD, NEXT = range(3)
		# initial dummy value, node will be created later
		currentNode = None
		
		while 1:
			# The actual scheduler loop; will stop when there are
			# no more tasks.
			while pendingThreads:
				# Add pending tasks to the task list. This must happen
				# inside the loop since new threads may be started
				# while we are running.
				# New version should be os-thread safe, as you can append new
				# pending threads while we are doing the initialisation
				thread = pendingThreads.pop(0)
				if currentNode:
					# insert a new node *before* the current node
					# (XXX we should be able to play with priorities here!)
					newNode = [currentNode[PREV], thread, currentNode]
					currentNode[PREV][NEXT] = newNode
					currentNode[PREV] = newNode
				else:
					# list is empty, initialize
					currentNode = []
					currentNode[:] = [currentNode, thread, currentNode]
			
			# make the current thread externally visible, assign to local variable
			currentThreadPtr[0] = currentThread = currentNode[THREAD]
##			print 'len(active)', len(active), currentThread
##			print 'len(blocked)', len(self.blocked), currentThread
			task = active_get( currentThread )
			if task:
				task = timeslice(task)
##				print '.',
				
			if task is None:
				# This thread has been blocked or killed;
				# remove the current node from the task list.
				deadNode = currentNode
				prevNode, currentNode = currentNode[PREV], currentNode[NEXT]
				prevNode[NEXT] = currentNode
				currentNode[PREV] = prevNode
				del deadNode[:] # clear, to remove cycles
				if not currentNode and not pendingThreads:
					break  # Exit the while loop: we're done!
			else:
				active[currentThread] = task
				currentNode = currentNode[NEXT]

		# Show the world we're no longer in business.
		currentThreadPtr[0] = None
		# We're not going to bother gracefully exiting orphans
		# that were left blocked (poor sods!).
		self.blocked.clear()
		## Clean up memory/references for waiting threads...
		self.__waitingThreadObject = None
		del self.waitingThreads[:]
		return
	
	def switchContext(self):
		"""Force a context switch."""
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		continuation.uthread_lock(1)
		# jump back to the scheduler loop, passing our callers'
		# continuation as the 'remainder'
		scheduler = continuation.uthread_reset()
		# jump back to the scheduler loop
		scheduler(continuation.caller())
	def getActiveThreads(self):
		"""Return all active threads."""
		return self.active.keys()
	
	def getBlockedThreads(self):
		"""Return all blocked threads."""
		return self.blocked.keys()
	
	def getAllThreads(self):
		return atomic(self.__getAllThreads)
	
	def __getAllThreads(self):
		"""Return all threads, whether active or blocked."""
		return self.active.keys() + self.blocked.keys()
	
	def getCurrentThread(self):
		"""Return the current thread."""
		return self.__currentThreadPtr[0]
	
	def isRunning(self):
		return self.__currentThreadPtr[0] is not None
	
	#
	# The next methods work on specific threads. They are
	# used by the Thread object, and should not be needed
	# by client code.
	#
	def isActive(self, thread):
		"""Return true if 'thread' is running."""
		return self.active.has_key(thread)
	
	def isBlocked(self, thread):
		"""Return true if 'thread' is blocked."""
		return self.blocked.has_key(thread)
	
	def start(self, startingThread, startingTask):
		"""Start a new thread or restart a blocked thread."""
		if self._owner <> get_ident() and startingTask is None:
			# only a completely new task is allowed, all other starts must be in same os thread
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		atomic(self.__start, startingThread, startingTask)
	def __start(self, startingThread, startingTask):
		if self.active.has_key(startingThread):
			raise UThreadError, ERR_THREAD_ACTIVE
		if self.blocked.has_key(startingThread):
			# restart a blocked thread
			assert startingTask is None
			startingTask = self.blocked[startingThread]
			del self.blocked[startingThread]
		self.active[startingThread] = startingTask
		self.pendingThreads.append(startingThread)
	
	def block(self, thread):
		"""Block a thread."""
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		tmp = continuation.uthread_lock(1)
		if thread is self.__currentThreadPtr[0]:
			del self.active[thread]
			### IFF we are currently atomic, then when we wake up
			### we want to be atomic (I think this is always true),
			### so we need to restore the atomic state...
			if tmp:
				def restoreLock( lockValue, target ):
					# when first run, give back the rest of this function...
					continuation.return_current()
					# this is what's run when you call the generated continuation
					continuation.uthread_lock(lockValue)
					# now call the original continuation (target)
					target()
				target = restoreLock( tmp, continuation.caller())
			else:
				target = continuation.caller()
			self.blocked[thread] = target
			# jump back to the scheduler loop, passing None
			# to indicate the task should be removed from
			# the task list.
			scheduler = continuation.uthread_reset()
			scheduler.jump(None)
		else:
			if not self.active.has_key(thread):
				raise UThreadError, ERR_THREAD_NOT_ACTIVE
			self.blocked[thread] = self.active[thread]
			del self.active[thread]
		continuation.uthread_lock(tmp)

	### Waiting-related mechanisms
	def wait(self, thread, duration):
		''' Make this micro-thread wait duration seconds '''
		self.waitUntil(thread, waitTimer() + duration)
	def waitUntil(self, thread, until):
		"""Make this micro-thread wait until waitTimer() >= until."""
		if waitTimer() < until:
			atomic( self.__waitUntil, thread, until)
	def __waitUntil( self, thread, until ):
		''' Atomic implementation for registering a thread for waiting '''
		bisect.insort( self.waitingThreads, (until, thread) )
		if not self.__waitingThreadObject:
##			print 'creating new waiting thread'
			# start the waiting thread...
			self.__setWaitingThreadObject(
				self.new ( self.__waitingThread )
			)
		elif self.isBlocked( self.__waitingThreadObject ):
##			print 'waiter isBlocked, starting'
			self.start( self.__waitingThreadObject, None )
##		print 'blocking thread for wait'
		thread.block()
	def __setWaitingThreadObject (self, object= None):
		'''
		Assign a particular object as the official waiting thread
		'''
		if object is None and self.__waitingThreadObject is self.__currentThreadPtr[0]:
			# attempt to clear, only allow if this is the current waiting thread
			self.__waitingThreadObject = object
		elif object is not None:
			# attempt to set, allow in all cases
			self.__waitingThreadObject = object
	def __waitingThread( self ):
		'''
		Implementation of the waiting algorithm
		
		The presence of this thread in the active list is what
		prevents the scheduler from exiting.  This thread
		will exit if the list of waiting threads drops to zero
		length, or if, on waking up, it discovers that it is
		no longer the official waking thread.

		This thread controls the entire scheduler,
		potentially putting the whole OS thread
		to sleep.
		'''
		# Localization
		waitingThreads = self.waitingThreads
		pendingThreads = self.pendingThreads
		osThreadSleep = time.sleep
		currentTime = waitTimer
		schedule = self.switchContext
		try:
			while self.__currentThreadPtr[0] is not None: # is running
				while waitingThreads:
					# Waiting loop, triggered once per time through threads
					while waitingThreads and currentTime() >= waitingThreads[0][0]:
						# Restart waiting threads that are done waiting...
						# we are the only thread allowed to remove waiting threads...
##						print 'waking up sleeper'
						wakeupTime, thread = waitingThreads.pop (0)
##						try:
						thread.start()
##						except UThreadError:
##							pass
					if waitingThreads and len(self.active) == 1 and not pendingThreads:
						# need to put the os thread to sleep until we're supposed to wake up
						while waitingThreads and currentTime() < waitingThreads[0][0]:
							# There is a potential for another waiting thread to delete the waitingThreads [0] here,
							# should protect against that somehow, possibly we need to protect against that other
							# waiting thread ever been created
							delta = waitingThreads[0][0] - currentTime()
##							print waitingThreads[0][0], currentTime()
##							print 'os thread sleep', delta
							# There's a bug somewhere that makes really
							# short sleep periods cause hangs (osThreadSleep never returns)
							# this is a hacky work-around, not a solution...
							if delta > 0.0001:
								osThreadSleep(delta)
##								print 'os thread wakeup', currentTime()
					# allow other micro-threads to run
					schedule()
				tmp = continuation.uthread_lock(1)
				if not waitingThreads and self.__currentThreadPtr[0] is not None:
					self.block( self.__currentThreadPtr[0] )
				continuation.uthread_lock(tmp)
		except ValueError:
			pass
		# eliminate the waiting thread if we are the current waiting thread...
		atomic( self.__setWaitingThreadObject )
	
	
	def exit(self, thread):
		self.postException(thread, SystemExit)
	
	def exitAll(self, reallyAll=0):
		"""Exit all threads by raising SystemExit in each of them.
		If the 'reallyAll' argument is true, even threads marked
		'resistant' will be exited."""
		self.exitOthers(reallyAll)
		# exit the current thread last
		raise SystemExit
	
	def exitOthers(self, reallyAll=0):
		"""Exit all threads except the current thread by raising
		SystemExit in each of them. If the 'reallyAll' argument
		is true, even threads marked 'resistant' will be exited."""
		atomic(self.__exitOthers, reallyAll)
	
	def __exitOthers(self, reallyAll):
		currentThread = self.__currentThreadPtr[0]
		for thread in self.getActiveThreads():
			if thread is not currentThread and (reallyAll or
					not thread.isResistant()):
				thread.postException(SystemExit)
	
	def postException(self, thread, exc, val=None):
		"""Post an exception to a thread."""
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		atomic(self.__postException, thread, exc, val)
	
	def __postException(self, thread, exc, val):
		if thread is self.__currentThreadPtr[0]:
			raise exc, val
		else:
			def exceptionRaiser(exc, val):
				continuation.return_current()
				raise exc, val
			if not self.blocked.has_key(thread):
				task = self.active[thread]
			else:
				task = self.blocked[thread]
			implant = exceptionRaiser(exc, val)  # caught just before the exception
			implant.caller = task  # stick it on top as if it was called
			if not self.blocked.has_key(thread):
				self.active[thread] = implant
			else:
				# Note that the exception will not be raised
				# before the blocked task is restarted!
				self.blocked[thread] = implant
	
	def kill(self, thread):
		"""Kill a thread. Note that it's usually better to post
		a SystemExit exception or call exit(), since kill() is 
		very brutal: eg. pending 'finally' clauses will not be
		executed!"""
		if self._owner <> get_ident():
			raise UThreadError, ERR_WRONG_SYSTEM_THREAD
		tmp = continuation.uthread_lock(1)
		if thread is self.__currentThreadPtr[0]:
			del self.active[thread]
			# jump back to the scheduler loop, passing None
			# to indicate the task should be removed from
			# the task list.
			scheduler = continuation.uthread_reset()
			scheduler.jump(None)
		else:
			if self.active.has_key(thread):
				del self.active[thread]
			elif self.blocked.has_key(thread):
				del self.blocked[thread]
			else:
				raise UThreadError, ERR_UNKNOWN_THREAD
		continuation.uthread_lock(tmp)
	
	def makeTask(self, func, args, kw):
		"""Turn a function plus arguments into a continuation,
		suitable for consumption by the scheduler."""
		try:
			continuation.return_current()
			apply(func, args, kw)
		except (SystemExit, KeyboardInterrupt):
			# Let SystemExit & KeyboardInterrupt go by silently,
			# they mean the end of the thread.
			pass
		except:
			currentThread = self.__currentThreadPtr[0]
			currentThread.handleException(sys.exc_info())
			self.exitOthers()
		self.kill(self.__currentThreadPtr[0])
	def wrapTask( self, core ):
		'''Wrap a task (continuation) with a task that
		does the continuation then does something else.'''
		


#
# Some support for using microthreads under system threads:
# each system thread should have its own scheduler instance.
#

if thread is None:
	# No system threading available, make a stub.
	def getScheduler(threadID=None,
			scheduler=Scheduler()):
		"""Return the default scheduler object."""
		assert threadID is None
		return scheduler
else:
	# We have system threading, use one scheduler per system thread.
	def getScheduler(threadID=None,
			schedulerMap={},  # (Abusing a mutable default arg as a 
							  #  kind of "static" local variable.)
			get_ident=thread.get_ident):
		"""If no threadID is given, return the scheduler object
		belonging to the current system thread, otherwise the
		scheduler object for the specified threadID."""
		if threadID is None:
			threadID = get_ident()
		try:
			scheduler = schedulerMap[threadID]
		except KeyError:
			scheduler = schedulerMap[threadID] = Scheduler()
		return scheduler




#
# Tests
#

if __name__ == "__main__":
	sem = Semaphore()
	
	def print_safe(*args):
		sem.acquire()
		for arg in args:
			print arg,
		print
		sem.release()
	
	pwidth = 0
	def printw(str):
		global pwidth
		sem.acquire()
		if pwidth + len(str) + 1 > 70:
			print
			pwidth = 0
		print str,
		sys.stdout.flush()
		pwidth = pwidth + len(str) + 1
		sem.release()

	def pwclean():
		global pwidth
		if pwidth != 0:
			print
			pwidth = 0

	def semaphore_test():
		'''Semaphore test, tests??'''
		preambleString = 'ABCDEFGH'
		def count(preamble):
			for i in range(20):
				printw(preamble + `i`)
		for ch in preambleString:
			new(count, ch)
		run()
		pwclean()
		print

	def queue_test():
		"""Test queues and snailqueues"""
		def talker(q):
			for i in range(100):
				printw(`i`)
				wasteTime = 0
				for j in range(20):
					wasteTime = wasteTime + 1
				q.put(i)
			q.put(None)
		def listener(q):
			x = 2
			while x != None:
				x = q.get()
				printw('[' + `x` + ']')
		q = Queue()
		new(talker, q)
		new(listener, q)
		run()
		pwclean()
		print
		q = SnailQueue()
		new(talker, q)
		new(listener, q)
		run()
		pwclean()
		print

	def nap(x):
		t = Timer(x)
		t.start()
		t.wait()

	def timer_test():
		'''Test Timer object
		'''
		def tryTimer(x):
			t = Timer(x)
			t.start()
			t.wait()
			printw('%.3g' % x)
		for i in range(20):
			x = random(5)
			new(tryTimer, x)
		run()
		pwclean()
		print

	def synchronizer_test():
		'''Test Synchronizer object'''
		def trySynchronizer(s, x):
			t = Timer(x)
			t.start()
			t.wait()
			printw('waiting')
			s.sync()
			printw('done')
		N = 20
		s = Synchronizer(N)
		for i in range(N):
			new(trySynchronizer, s, random(5))
		run()
		pwclean()
		print

	def condition_test():
		'''Test Condition object'''
		m, n = 3, 3
		chowtime = [ ]
		cv = Condition()
		def consumer(cv, chowtime, m):
			for i in range(m):
				nap(.5)
				cv.acquire()
				while len(chowtime) == 0:
					cv.wait()
				chowtime.pop(0)
				print getCurrentThread(), 'eating'
				cv.release()
		def producer(cv, chowtime, m, n):
			for i in range(m * n):
				nap(.5 / n)
				cv.acquire()
				chowtime.append('morsel')
				print getCurrentThread(), 'feeding'
				cv.notify()
				cv.release()
		new(producer, cv, chowtime, m, n).name = 'Producer'
		for i in range(n):
			new(consumer, cv, chowtime, m).name = 'Consumer'
		run()
		pwclean()
		print

	def justs_test():
		'''Just's test
Expected results:
	Three threads (ABC) will print out interspersed iteration counts
	with each thread having slightly different timing for their
	counting loops.'''
		def loop(name, duration, interval=1):
			count = 0
			start = t = time.clock()
			while 1:
				now = time.clock()
				if now - start > duration:
					printw("(%s is done!)" % name)
					break
				elif now - t > interval:
					t = now
					printw("%s%s" % (name, count))
					count = count + 1
				switchContext()
		t1 = new(loop, "A", 1.5, 0.1)
		t2 = new(loop, "B", 2.5, 0.25)
		t3 = new(loop, "C", 4, 0.5)
		run()
		pwclean()
		print

	def external_control_test():
		'''Attempt to control threads from another thread
	Second thread attempts to pause another (thread.block())
	then restart it (thread.start()) after a given time.
Expected results:
	The two threads are counting. Results should be interspersed.
	After a period, thread A will be paused from thread B.
	Thread B (and only thread B) should continue counting.
	After a period, thread A will be restarted from thread B.
	Thread A should continue counting until it finishes.'''
		def loop1(name, duration):
			print_safe("begin:", name)
			last = t = time.clock()
			counter = 0
			while 1:
				now = time.clock()
				if now - t > duration:
					print_safe("done!")
					break
				if now - last > 0.2:
					print_safe(name, counter)
					counter = counter + 1
					last = now
		
		def loop2(name, thread, when, whenstart):
			print_safe("begin:", name)
			last = t = time.clock()
			counter = 0
			ispaused = 0
			while 1:
				now = time.clock()
				if not ispaused and now - t > when:
					print_safe("pausing", thread, "from thread", 
							getCurrentThread())
					thread.block()
					ispaused = 1
				if now - t > whenstart:
					print_safe("restarting", thread, "from thread", 
							getCurrentThread())
					thread.start()
					break
				if now - last > 0.25:
					print_safe(name, counter)
					counter = counter + 1
					last = now
		
		name1 = "A"
		name2 = "B"
		thA = Thread(name1, loop1, name1, 4)
		thB = Thread(name2, loop2, name2, thA, 1.5, 3)
		thA.start()
		thB.start()
		run()
		print

	def post_exception_test():
		'''Attempt to post an exception to another thread
Expected results:
	The two threads are counting upward.
	After 2.5 seconds, the second thread will interrupt the first
	by posting the exception "FakeError" to it.  You should see a
	traceback printed in the middle of the
	"<Thread 'Thread-166' at X> number" lines.
	The first thread should stop counting.
	The second thread should continue counting.'''
		def loop1(duration, when=0, thread=None):
			last = t = time.clock()
			counter = 0
			while 1:
				now = time.clock()
				if now - t > duration:
					break
				if when and now - t > when:
					print_safe("posting exception to %s from %s" % 
							(thread, getCurrentThread()))
					thread.postException("FakeError", (-123, "just checking the walls"))
					when = 0
				if now - last > 0.5:
					print_safe(getCurrentThread(), counter)
					counter = counter + 1
					last = now
		th1 = new(loop1, 5)
		th2 = new(loop1, 5, 2.5, th1)
		th2.setResistant(1)
		run()
		print

	def wait_test():
		'''Test small number of micro-threads waiting for moderate duration:
	Creates 5 micro-threads, each goes to sleep for random duration
	of up to five seconds, after which the requested time and actual time
	slept are printed.
Expected results:
	A number of "Start wait, duration" lines
	A pause equal to the shortest duration.
	A number of "wait completed requested: duration actual: duration"
	lines, with pauses between them equal to the difference between the
	durations'''
		def tryWait(x):
			t = waitTimer()
			printw('Start wait %.3g\n'%(x))
			wait(x)
			printw('    wait completed requested:%.3g  actual:%.3g\n' % (x, waitTimer()-t))
		for i in range(5):
			x = random(5)
			new(tryWait, x)
##		print 'running 5 paused threads (random pause), should be low processor load'
		run()
		pwclean()
		print
		
	def lotsa_wait_test():
		'''Test large numbers of micro-threads waiting for moderate duration:
	Creates 3000 micro-threads, each of which waits for 3 seconds
Expected results:
	Should be a period of time with very high processor load
		(creating and waiting micro-threads)
	Should be followed by a period of time with no processing load
		(all threads sleeping)
	Should be a period of time with very high processor load
		(waking and destroying micro-threads)'''
		def tryWait(x):
			wait(x)
		for i in range(500):
			new(tryWait, 3)
##		print 'running 3000 paused threads (3 second pause), should be low processor load'
		run()
		print 'finished'
		pwclean()
		print
	def micro_wait_test():
		'''Test large number of micro-thread waiting, short duration:
	Creates 3000 threads, each of which waits for 0.05 seconds
Expected results:
	On slow machines, may wind up with no sleep, threads will ignore
	the wait, and continue to completion immediately.
	On fast machines, there may be a (very) brief low-processor-load
	moment, then all threads should continue.'''
		def tryWait(x):
			wait(x)
		for i in range(500):
			new(tryWait, .05)
##		print 'running 3000 paused threads (.05 second pause)'
		run()
		print 'finished'
		pwclean()
		print
	def os_thread_test():
		'''Test ExternalQueue object:
	Creates two operating system threads, one with a micro-thread
	set consisting of "readers", the other consisting of "writers".
	The two operating system threads (and their respective micro-
	threads) communicate via an ExternalQueue object.
Expected results:
	A series of read <number> lines, interspersed with a series of
	write <number> lines note, the reads may be printed before the
	writes are printed due to task switching'''
		def holder( ): # hold the micro-threads library open, should have a better mechanism for this
			while 1:
				wait( 0.01 )
			# why does this exit???
		def reader( queue, number=5 ):
			for x in range( number ):
				value = queue.get()
				print "read",value
			print 'reader exit'
		def writer( queue, number=5 ):
			for x in range( number ):
				queue.put( x )
				print "write",x
				wait( random(.1) )
			print 'writer exit'
		def microReader( queue, reader=reader, holder=holder ):
			new( reader, queue )
			new( holder )
			run()
		def microWriter( queue, writer=writer ):
			new( writer, queue )
			run()
		queue = ExternalQueue()
		thread.start_new_thread( microReader, (queue,) )
		microWriter(queue,)
	TESTS = [
		semaphore_test,
		queue_test,
		timer_test,
		synchronizer_test,
		condition_test,
		justs_test,
		external_control_test,
		post_exception_test,
		wait_test,
		lotsa_wait_test,
		micro_wait_test,
	]
	if thread:
		TESTS.append( os_thread_test)
	for test in TESTS:
		print '##',test.__doc__
		test()



More information about the Stackless mailing list