[Stackless-checkins] r51501 - stackless/trunk/PCbuild/uthread_ccp.py
richard.tew
python-checkins at python.org
Wed Aug 23 11:10:25 CEST 2006
Author: richard.tew
Date: Wed Aug 23 11:10:23 2006
New Revision: 51501
Modified:
stackless/trunk/PCbuild/uthread_ccp.py
Log:
An updated version of the uthread module which CCP uses internally. It still requires some changes, like the addition of Sleep and a BeNice methods, and the removal or replacement of some internal CCP functions like those related to local storage.
Modified: stackless/trunk/PCbuild/uthread_ccp.py
==============================================================================
--- stackless/trunk/PCbuild/uthread_ccp.py (original)
+++ stackless/trunk/PCbuild/uthread_ccp.py Wed Aug 23 11:10:23 2006
@@ -29,225 +29,791 @@
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE."""
-#import continuation
import stackless
+import sys
+import time
+import types
+import weakref
+import traceback
+import copy
+import logging
+
+# This is a simple replacement for what CCP uses which is linked into our
+# framework.
+def WriteTraceback(text, tb):
+ logging.error(text)
+ for s in traceback.format_tb(tb):
+ logging.error(s.strip())
+ logging.error(str(excInstance))
+
+def LogTraceback(text):
+ if text is None:
+ text = "Traceback:"
+ tb = traceback.extract_stack()
+ WriteTraceback(text, tb)
+
+def StackTrace(text=None):
+ excClass, excInstance, tb = sys.exc_info()
+ if excClass:
+ if text is None:
+ text = "Stacktrace:"
+ WriteTraceback(text, tb)
+ else:
+ LogTraceback(text)
-MAX_TIMESLICE = 20000000
+tasks = [] # bogus breyta
-'''
-tasks = [ ]
-ghandler = None
-'''
-
-'''
-def endtask():
- """force this thread to die"""
- continuation.uthread_lock(1)
- sched = continuation.uthread_reset()
- if sched != None:
- sched()
-'''
-# not needed yet
-
-'''
-def schedule():
- """force a context switch"""
- continuation.uthread_lock(1)
- sched = continuation.uthread_reset()
- sched(continuation.caller())
-'''
# handled internally
schedule = stackless.schedule
-'''
+# We need to subclass it so that we can store attributes on it.
+class Tasklet(stackless.tasklet):
+ pass
+
def new(func, *args, **kw):
- """Start a new task"""
- if func == None:
- raise "Don't you make me try to call None"
- global tasks
- assert not kw
+ return Tasklet(func)(*args, **kw)
- def wrapper(func, args):
- continuation.return_current()
- try:
- apply(func, args)
- except:
- continuation.uthread_reset()
- raise
- endtask()
-
- tmp = continuation.uthread_lock(1)
- task = wrapper(func, args)
- task.caller = None
- tasks.append(task)
- continuation.uthread_lock(tmp)
-'''
-def new(func, *args, **kw):
- """Start a new task"""
- gen = stackless.taskoutlet(func)
- t = gen(*args, **kw)
- t.insert()
-
-'''
-def run():
- """Run a bunch of tasks, until they all complete"""
- global tasks
- pop = tasks.pop
- append = tasks.append
- timeslice = continuation.timeslice
- if not tasks:
- # just enter the loop
- append(None)
- while tasks:
- task = pop(0)
- if task:
- try:
- task = timeslice(task, MAX_TIMESLICE)
- except UserError, e:
- global ghandler
- if ghandler:
- new(ghandler, e)
- else:
- print e
-
- if task:
- import runner
- runner.PostException(
- task,
- "THIS THREAD WAS PROBABLY STUCK IN AN INFINITE LOOP AND HAS BEEN NUKED!!!",
- None)
-
- #append(task)
-'''
-def run():
- """Run a bunch of tasks, until they all complete"""
- # have to add error handling like above
- while stackless.getruncount() > 1:
- try:
- # running until the end or until something happens
- victim = stackless.run_watchdog(MAX_TIMESLICE)
- if victim:
- print "THIS THREAD WAS PROBABLY STUCK IN AN INFINITE LOOP AND HAS BEEN NUKED!!!",
- # todo: kill the task cleanly
- except Exception, e:
- print "Exception occoured:", e
+def newWithoutTheStars(func, args, kw):
+ return Tasklet(func)(*args, **kw)
idIndex = 0
-def sethandler(handler):
- global ghandler
- ghandler = handler
-
-
def uniqueId():
"""Microthread-safe way to get unique numbers, handy for
giving things unique ID numbers"""
global idIndex
- tmp = stackless.scheduler_lock(1)
+ ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
+ #tmp = stackless.atomic()
z = idIndex
- idIndex = z + 1
- stackless.scheduler_lock(tmp)
+ idIndex += 1
return z
def irandom(n):
"""Microthread-safe version of random.randrange(0,n)"""
import random
- tmp = stackless.scheduler_lock(1)
+ ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
+ #tmp = stackless.atomic()
n = random.randrange(0, n)
- stackless.scheduler_lock(tmp)
return n
-'''
-class Semaphore:
- """Semaphores protect globally accessible resources from
- the effects of context switching."""
- __guid__ = 'uthread.Semaphore'
- def __init__(self, maxcount=1):
- self.count = maxcount
- self.waiting = []
- def claim(self):
- tmp = continuation.uthread_lock(1)
- if self.count == 0:
- self.waiting.append(continuation.caller())
- endtask()
- else:
- self.count = self.count - 1
- continuation.uthread_lock(tmp)
- def release(self):
- tmp = continuation.uthread_lock(1)
- if self.waiting:
- tasks.append(self.waiting.pop(0))
- else:
- self.count = self.count + 1
- continuation.uthread_lock(tmp)
-'''
+synonyms = {}
+
+def MakeSynonymOf(threadid, synonym_threadid):
+ global synonyms
+ key = (threadid, synonym_threadid)
+ if key not in synonyms:
+ synonyms[key] = 1
+ else:
+ synonyms[key] += 1
+
+def MakeCurrentSynonymOf(synonym_threadid):
+ return MakeSynonymOf(id(stackless.getcurrent()), synonym_threadid)
+
+def RemoveSynonymOf(threadid, synonym_threadid):
+ global synonyms
+ key = (threadid, synonym_threadid)
+ if key not in synonyms:
+ StackTrace("RemoveSynonymOf unexpected call threadid:%s synonym_threadid:%s" % key)
+ return
+ synonyms[key] -= 1
+ if 0 == synonyms[key]:
+ del synonyms[key]
+
+def RemoveCurrentSynonymOf(synonym_threadid):
+ return RemoveSynonymOf(id(stackless.getcurrent()), synonym_threadid)
+
+def IsSynonymOf(threadid, synonym_threadid):
+ global synonyms
+ key = (threadid, synonym_threadid)
+ return key in synonyms
+
+def IsCurrentSynonymOf(synonym_threadid):
+ return IsSynonymOf(id(stackless.getcurrent()), synonym_threadid)
+
+
+semaphores = weakref.WeakKeyDictionary({})
+
+def GetSemaphores():
+ return semaphores
class Semaphore:
"""Semaphores protect globally accessible resources from
the effects of context switching."""
- __guid__ = 'uthread_ccp.Semaphore'
-
- def __init__(self, maxcount=1):
- self.count = maxcount
- self.waiting = stackless.channel()
-
+
+ def __init__(self, semaphoreName=None, maxcount=1, strict=True):
+ global semaphores
+
+ semaphores[self] = 1
+
+ self.semaphoreName = semaphoreName
+ self.maxcount = maxcount
+ self.count = maxcount
+ self.waiting = stackless.channel()
+ self.thread = None
+ self.lockedWhen = None
+ self.strict = strict
+
+ def IsCool(self):
+ '''
+ returns true if and only if nobody has, or is waiting for, this lock
+ '''
+ return self.count==self.maxcount
+
+ def __str__(self):
+ return "Semaphore("+ str(self.semaphoreName) +")"
+
+ def __del__(self):
+ if not self.IsCool():
+ logger.error("Semaphore "+ str(self) +" is being destroyed in a locked or waiting state")
+
def acquire(self):
- tmp = stackless.atomic()
- if self.count == 0:
+ if self.strict:
+ assert self.thread is not stackless.getcurrent()
+ if self.thread is stackless.getcurrent():
+ raise RuntimeError, "tasklet deadlock, acquiring tasklet holds strict semaphore"
+ self.count -= 1
+ if self.count < 0:
self.waiting.receive()
- else:
- self.count = self.count - 1
-
+
+ self.lockedWhen = time.time()
+ self.thread = stackless.getcurrent()
+
claim = acquire
-
+
def release(self):
- tmp = stackless.atomic()
- if self.waiting.queue:
- self.waiting.send(None)
+ if self.strict:
+ assert self.thread is stackless.getcurrent()
+ if self.thread is not stackless.getcurrent():
+ raise RuntimeError, "wrong tasklet releasing strict semaphore"
+
+ self.count += 1
+ self.thread = None
+ self.lockedWhen = None
+ if self.count <= 0:
+ PoolWorker("uthread::Semaphore::delayed_release",self.__delayed_release)
+
+ #This allows the release thread to continue without being interrupted
+ def __delayed_release(self):
+ self.waiting.send(None)
+
+class CriticalSection(Semaphore):
+ def __init__(self, semaphoreName = None, strict=True):
+ Semaphore.__init__(self, semaphoreName)
+ self.__reentrantRefs = 0
+
+ def acquire(self):
+ # MEB: if (self.count<=0) and (self.thread is stackless.getcurrent() or stackless.getcurrent() is synonymof self.thread):
+ if (self.count<=0) and (self.thread is stackless.getcurrent() or IsCurrentSynonymOf(self.thread)):
+ self.__reentrantRefs += 1
+ else:
+ Semaphore.acquire(self)
+
+ def release(self):
+ if self.__reentrantRefs:
+ # MEB: assert self.thread is stackless.getcurrent()
+ assert self.thread is stackless.getcurrent() or IsCurrentSynonymOf(self.thread)
+ # MEB: if self.thread is not stackless.getcurrent():
+ if not (self.thread is stackless.getcurrent() or IsCurrentSynonymOf(self.thread)):
+ raise RuntimeError, "wrong tasklet releasing reentrant CriticalSection"
+ self.__reentrantRefs -= 1
else:
- self.count = self.count + 1
+ Semaphore.release(self)
+
+def FNext(f):
+ first = stackless.getcurrent()
+ try:
+ cursor = first.next
+ while cursor != first:
+ if cursor.frame.f_back == f:
+ return FNext(cursor.frame)
+ cursor = cursor.next
+ return f
+ finally:
+ first = None
+ cursor = None
+
+class SquidgySemaphore:
+ '''
+ This is a semaphore which allows exclusive locking
+ '''
+
+ def __init__(self, lockName):
+ self.__outer__ = Semaphore(lockName)
+ self.lockers = {}
+ self.__wierdo__ = 0
+
+ def IsCool(self):
+ '''
+ returns true if and only if nobody has, or is waiting for, this lock
+ '''
+ while 1:
+ lockers = []
+ try:
+ for each in self.lockers:
+ return 0
+ break
+ except:
+ StackTrace()
+ sys.exc_clear()
+ return self.__outer__.IsCool() and not self.__wierdo__
+
+ def acquire_pre_friendly(self):
+ '''
+ Same as acquire, but with respect for pre_acquire_exclusive
+ '''
+ while 1:
+ if self.__wierdo__:
+ Sleep(0.5)
+ else:
+ self.acquire()
+ if self.__wierdo__:
+ self.release()
+ else:
+ break
+
+ def pre_acquire_exclusive(self):
+ '''
+ Prepares the lock for an acquire_exclusive call, so that
+ acquire_pre_friendly will block on the dude.
+ '''
+ self.__wierdo__ += 1
+
+ def acquire_exclusive(self):
+ i = 0
+ while 1:
+ self.__outer__.acquire()
+ theLocker = None
+ try:
+ # self.lockers is a dict, and we just want one entry from it.
+ # for each in/break is a convenient way to get one entry.
+ for each in self.lockers:
+ theLocker = each
+ break
+ except:
+ StackTrace()
+ sys.exc_clear()
+
+ if theLocker is not None:
+ self.__outer__.release() # yielding to the sucker is fine, since we're waiting for somebody anyhow.
+ if i and ((i%(3*4*60))==0):
+ logger.error("Acquire-exclusive is waiting for the inner lock (%d seconds total, lockcount=%d)" % (i/4, len(self.lockers)))
+ LogTraceback("This acquire_exclusive is taking a considerable amount of time")
+ logger.error("This dude has my lock:")
+ logger.error("tasklet: "+str(theLocker))
+ for s in traceback.format_list(traceback.extract_stack(FNext(theLocker.frame),40)):
+ for n in range(0,10120,253): # forty lines max.
+ if n==0:
+ if len(s)<=255:
+ x = s
+ else:
+ x = s[:(n+253)]
+ else:
+ x = " - " + s[n:(n+253)]
+ logger.error(x, 4)
+ if (n+253)>=len(s):
+ break
+ Sleep(0.500)
+ else:
+ break
+ i += 1
+
+ def release_exclusive(self):
+ self.__outer__.release()
+ self.__wierdo__ -= 1
+
+ def acquire(self):
+ # you don't need the outer lock to re-acquire
+ self.__outer__.acquire()
+ self.__acquire_inner()
+ self.__outer__.release()
+
+ def release(self, t=None):
+ if t is None:
+ t = stackless.getcurrent()
+ self.__release_inner(t)
+
+ def __acquire_inner(self):
+ while 1:
+ try:
+ if self.lockers.has_key(stackless.getcurrent()):
+ self.lockers[stackless.getcurrent()] += 1
+ else:
+ self.lockers[stackless.getcurrent()] = 1
+ break
+ except:
+ StackTrace()
+ sys.exc_clear()
+
+ def __release_inner(self, t):
+ while 1:
+ try:
+ if self.lockers.has_key(t):
+ self.lockers[t] -= 1
+ if self.lockers[t]==0:
+ del self.lockers[t]
+ else:
+ StackTrace("You can't release a lock you didn't acquire")
+ break
+ except:
+ StackTrace()
+ sys.exc_clear()
+
+channels = weakref.WeakKeyDictionary()
+
+def GetChannels():
+ return channels
+
+class Channel:
+ """
+ A Channel is a stackless.channel() with administrative spunk
+ """
+
+ def __init__(self,channelName=None):
+ global channels
+ self.channelName = channelName
+ self.channel = stackless.channel()
+ self.send = self.channel.send
+ self.send_exception = self.channel.send_exception
+ channels[self] = 1
+
+ def receive(self):
+ return self.channel.receive()
+
+ def __getattr__(self,k):
+ return getattr(self.channel,k)
+
+
+
+
+# -----------------------------------------------------------------------------------
+# FIFO Class
+# -----------------------------------------------------------------------------------
+class FIFO(object):
+
+ __slots__ = ('data',)
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - Constructor
+ # -----------------------------------------------------------------------------------
+ def __init__(self):
+ self.data = [[], []]
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - push
+ # -----------------------------------------------------------------------------------
+ def push(self, v):
+ self.data[1].append(v)
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - pop
+ # -----------------------------------------------------------------------------------
+ def pop(self):
+ d = self.data
+ if not d[0]:
+ d.reverse()
+ d[0].reverse()
+ return d[0].pop()
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - __nonzero__
+ # -----------------------------------------------------------------------------------
+ # NB: Please don't define this function, as it will break some legacy code in client
+ # Use the len() function instead
+ #def __nonzero__(self):
+ # d = self.data
+ # return not (not (d[0] or d[1]))
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - __contains__
+ # -----------------------------------------------------------------------------------
+ def __contains__(self, o):
+ d = self.data
+ return (o in d[0]) or (o in d[1])
+
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - Length
+ # -----------------------------------------------------------------------------------
+ def Length(self):
+ d = self.data
+ return len(d[0]) + len(d[1])
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - clear
+ # -----------------------------------------------------------------------------------
+ def clear(self):
+ self.data = [[], []]
+
+ # -----------------------------------------------------------------------------------
+ # FIFO - clear
+ # -----------------------------------------------------------------------------------
+ def remove(self, o):
+ d = self.data
+ try:
+ d[0].remove(o)
+ except ValueError:
+ sys.exc_clear()
-class Queue:
+ try:
+ d[1].remove(o)
+ except ValueError:
+ sys.exc_clear()
+
+
+# -----------------------------------------------------------------------------------
+# Queue - QueueCheck
+# -----------------------------------------------------------------------------------
+def QueueCheck(o):
+
+ while True:
+ try:
+ o.pump()
+ except ReferenceError:
+ sys.exc_clear()
+ break
+ except StandardError:
+ StackTrace()
+ sys.exc_clear()
+
+ Sleep(0.1)
+
+
+# -----------------------------------------------------------------------------------
+# Queue Class
+# -----------------------------------------------------------------------------------
+class Queue(FIFO):
"""A queue is a microthread-safe FIFO."""
- __guid__ = 'uthread_ccp.Queue'
-
+
+ # -----------------------------------------------------------------------------------
+ # Queue - Constructor
+ # -----------------------------------------------------------------------------------
def __init__(self):
- self.contents = [ ]
- self.channel = stackless.channel()
-
+ FIFO.__init__(self)
+ self.channel = stackless.channel()
+ self.blockingThreadRunning = False
+
+ # -----------------------------------------------------------------------------------
+ # Queue - put
+ # -----------------------------------------------------------------------------------
def put(self, x):
- tmp = stackless.atomic()
- self.contents.append(x)
- while self.channel.queue and self.contents:
- self.channel.send(self.contents.pop(0))
-
+ self.push(x)
+ self.pump()
+
+ # -----------------------------------------------------------------------------------
+ # Queue - pump
+ # -----------------------------------------------------------------------------------
+ def pump(self):
+
+ while self.channel.queue and self.Length() and self.channel.balance < 0:
+ o = self.pop()
+ self.channel.send(o)
+
+ # -----------------------------------------------------------------------------------
+ # Queue - non_blocking_put
+ # -----------------------------------------------------------------------------------
+ def non_blocking_put(self, x):
+
+ # Create a non blocking worker thread if this is the first time this gets called
+ if not self.blockingThreadRunning:
+ self.blockingThreadRunning = True
+ new(QueueCheck, weakref.proxy(self)).context = "uthread::QueueCheck"
+
+ self.push(x)
+
+ # -----------------------------------------------------------------------------------
+ # Queue - get
+ # -----------------------------------------------------------------------------------
def get(self):
- tmp = stackless.atomic()
- if self.contents:
- return self.contents.pop(0)
+ if self.Length():
+ return self.pop()
+
return self.channel.receive()
-
- def unget(self, x):
- tmp = stackless.atomic()
- self.contents.insert(0, x)
-
- def cget(self):
- return self.contents.pop(0)
-exports = {
- "uthread_ccp.new": new,
- "uthread_ccp.irandom": irandom,
- "uthread_ccp.uniqueId": uniqueId,
- "uthread_ccp.run": run,
- "uthread_ccp.schedule": schedule,
- "uthread_ccp.sethandler": sethandler,
- }
-
-def test(n):
- global cnt
- for cnt in xrange(n):
- if cnt == 12345:
- 1/0
-
-
\ No newline at end of file
+# --------------------------------------------------------------------
+class Event:
+
+ # --------------------------------------------------------------------
+ def __init__(self, manual=1, signaled=0):
+ self.channel = stackless.channel()
+ self.manual = manual
+ self.signaled = signaled
+
+ # --------------------------------------------------------------------
+ def Wait(self, timeout=-1):
+ if timeout != -1:
+ raise RuntimeError("No timeouts supported in Event")
+
+ if not self.signaled:
+ self.channel.receive()
+
+ # --------------------------------------------------------------------
+ def SetEvent(self):
+ if self.manual:
+ self.signaled = 1
+
+ while self.channel.queue:
+ self.channel.send(None)
+
+ # --------------------------------------------------------------------
+ def ResetEvent(self):
+ self.signaled = 0
+
+
+
+def LockCheck():
+ global semaphores
+ while 1:
+ each = None
+ Sleep(5 * 60)
+ now = time.time()
+ try:
+ for each in semaphores.keys():
+ BeNice()
+ if (each.count<=0) and (each.waiting.balance < 0) and (each.lockedWhen and (now - each.lockedWhen)>=(5*MIN)):
+ logger.error("Semaphore %s appears to have threads in a locking conflict."%id(each))
+ logger.error("holding thread:")
+ try:
+ for s in traceback.format_list(traceback.extract_stack(each.thread.frame,40)):
+ logger.error(s)
+ except:
+ sys.exc_clear()
+ first = each.waiting.queue
+ t = first
+ while t:
+ logger.error("waiting thread %s:"%id(t),4)
+ try:
+ for s in traceback.format_list(traceback.extract_stack(t.frame,40)):
+ logger.error(s,4)
+ except:
+ sys.exc_clear()
+ t = t.next
+ if t is first:
+ break
+ logger.error("End of locking conflict log")
+ except StandardError:
+ StackTrace()
+ sys.exc_clear()
+
+new(LockCheck).context = "uthread::LockCheck"
+
+__uthread__queue__ = None
+def PoolHelper(queue):
+ t = stackless.getcurrent()
+ t.localStorage = {}
+ respawn = True
+ try:
+ try:
+ while 1:
+ BeNice()
+ ctx, callingContext, func, loc, args, keywords = queue.get()
+ if (queue.channel.balance >= 0):
+ new(PoolHelper, queue).context = "uthread::PoolHelper"
+ SetLocalStorage(loc)
+ # _tmpctx = t.PushTimer(ctx)
+ try:
+ apply( func, args, keywords )
+ finally:
+ ctx = None
+ callingContext = None
+ func = None
+ t.localStorage = {}
+ loc = None
+ args = None
+ keywords = None
+ # t.PopTimer(_tmpctx)
+ except SystemExit:
+ respawn = False
+ raise
+ except:
+ if callingContext is not None:
+ extra = "spawned at %s %s(%s)"%callingContext
+ else:
+ extra = ""
+ StackTrace("Unhandled exception in %s%s" % (ctx, extra))
+ sys.exc_clear()
+ finally:
+ if respawn:
+ del t
+ new(PoolHelper, queue).context = "uthread::PoolHelper"
+
+def PoolWorker(ctx,func,*args,**keywords):
+ '''
+ Same as uthread.pool, but without copying local storage, thus resetting session, etc.
+
+ Should be used for spawning worker threads.
+ '''
+ return PoolWithoutTheStars(ctx,func,args,keywords,0,1)
+
+def PoolWorkerWithoutTheStars(ctx,func,args,keywords):
+ '''
+ Same as uthread.worker, but without copying local storage, thus resetting session, etc.
+
+ Should be used for spawning worker threads.
+ '''
+ return PoolWithoutTheStars(ctx,func,args,keywords,0,1)
+
+def PoolWithoutTheStars(ctx,func,args,keywords,unsafe=0,worker=0):
+ if type(ctx) not in types.StringTypes:
+ StackTrace("uthread.pool must be called with a context string as the first parameter")
+ global __uthread__queue__
+ callingContext = None
+ if ctx is None:
+ if unsafe:
+ ctx = "uthread::PoolHelper::UnsafeCrap"
+ else:
+ tb = traceback.extract_stack(limit=2)[0]
+ ctx = getattr(stackless.getcurrent(), "context", "")
+ callingContext = tb[2], tb[0], tb[1] #function , file, lineno
+ del tb
+
+ if __uthread__queue__ is None:
+ __uthread__queue__ = Queue()
+ for i in range(60):
+ new(PoolHelper, __uthread__queue__).context = "uthread::PoolHelper"
+ if unsafe or worker:
+ st = None
+ else:
+ st = copy.copy(GetLocalStorage())
+ __uthread__queue__.non_blocking_put( (str(ctx), callingContext, func, st, args, keywords,) )
+ return None
+
+def Pool(ctx,func,*args,**keywords):
+ '''
+ executes apply(args,keywords) on a new uthread. The uthread in question is taken
+ from a thread pool, rather than created one-per-shot call. ctx is used as the
+ thread context. This should generally be used for short-lived threads to reduce
+ overhead.
+ '''
+ return PoolWithoutTheStars(ctx,func,args,keywords)
+
+def UnSafePool(ctx,func,*args,**keywords):
+ '''
+ uthread.pool, but without any dangerous calls to stackless.getcurrent(), which could
+ have dramatic and drastic effects in the wrong context.
+ '''
+ return PoolWithoutTheStars(ctx,func,args,keywords,1)
+
+def ParallelHelper(ch,idx,what):
+ ch, threadid = ch
+ MakeCurrentSynonymOf(threadid)
+ try:
+ ei = None
+ try:
+ if len(what)==3:
+ ret = (idx, apply(what[0], what[1], what[2] ))
+ if ch.balance < 0 :
+ ch.send( (1, ret) )
+ else:
+ ret = (idx, apply(what[0], what[1] ))
+ if ch.balance < 0:
+ ch.send( (1, ret) )
+ except StandardError:
+ ei = sys.exc_info()
+ sys.exc_clear()
+
+ if ei:
+ if ch.balance < 0:
+ ch.send((0,ei))
+ del ei
+ finally:
+ RemoveCurrentSynonymOf(threadid)
+
+def Parallel(funcs,exceptionHandler=None,maxcount=30):
+ '''
+ Executes in parallel all the function calls specified in the list/tuple 'funcs', but returns the
+ return values in the order of the funcs list/tuple. If an exception occurs, only the first exception
+ will reach you. The rest will dissapear in a puff of logic.
+
+ Each 'func' entry should be a tuple/list of:
+ 1. a function to call
+ 2. a tuple of arguments to call it with
+ 3. optionally, a dict of keyword args to call it with.
+ '''
+ if not funcs:
+ return
+
+ context = "ParallelHelper::"+getattr(stackless.getcurrent(),"context","???")
+ ch = stackless.channel(), id(stackless.getcurrent())
+ ret = [ None ] * len(funcs)
+ n = len(funcs)
+ if n > maxcount:
+ n = maxcount
+ for i in range(n):
+ if type(funcs[i]) != types.TupleType:
+ raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
+ Pool(context, ParallelHelper, ch, i, funcs[i])
+ for i in range(len(funcs)):
+ ok, bunch = ch[0].receive()
+ if ok:
+ idx,val = bunch
+ if len(funcs[i])==4:
+ ret[idx] = (funcs[i][3], val,)
+ else:
+ ret[idx] = val
+ else:
+ try:
+ raise bunch[0],bunch[1],bunch[2]
+ except StandardError:
+ if exceptionHandler:
+ exctype, exc, tb = sys.exc_info()
+ try:
+ try:
+ apply( exceptionHandler, (exc,) )
+ except StandardError:
+ raise exc, None, tb
+ finally:
+ exctype, exc, tb = None, None, None
+ else:
+ StackTrace()
+ raise
+
+ if n<len(funcs):
+ if type(funcs[n]) != types.TupleType:
+ raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
+ Pool(context, ParallelHelper, ch, n, funcs[n])
+ n+=1
+ return ret
+
+locks = {}
+def Lock(object, *args):
+ global locks
+ t = (id(object), args)
+ if t not in locks:
+ locks[t] = Semaphore(t, strict=False)
+ locks[t].acquire()
+
+def TryLock(object, *args):
+ global locks
+ t = (id(object), args)
+
+ if t not in locks:
+ locks[t] = Semaphore(t, strict=False)
+ if not locks[t].IsCool():
+ return False
+ locks[t].acquire()
+ return True
+
+def ReentrantLock(object, *args):
+ global locks
+ t = (id(object), args)
+ if t not in locks:
+ locks[t] = CriticalSection(t)
+ locks[t].acquire()
+
+def UnLock(object, *args):
+ global locks
+ t = (id(object), args)
+ locks[t].release()
+ if (t in locks) and (locks[t].IsCool()): # may be gone or changed by now
+ del locks[t]
+
+# Exported names.
+parallel = Parallel
+worker = PoolWorker
+workerWithoutTheStars = PoolWorkerWithoutTheStars
+unsafepool = UnSafePool
+pool = Pool
+poolWithoutTheStars = PoolWithoutTheStars
+
+
_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins
More information about the Stackless-checkins
mailing list