[Stackless] Re: mutex class containing tool(s) for preemptive in SLP3.0

Christian Tismer tismer at tismer.com
Sun May 4 21:04:46 CEST 2003

Eric van Riet Paap wrote:
> Hi All,
> I've created this small piece of code (that I can't test yet) to provide the 
> preemptive multitasking with stackless python that Chris is currently working 
> on.
> Comments/hints are very welcome: 
> (file: preemptive.py)
> #!/usr/bin/python
> '''Provide tools for preemptive multitasking with stackless python.
> Actual scheduling/locking is performed by communicating over (stackless') 
> channels.
> The goal here is to set the atomic flag for a short a period as possible.

I think you mixed matters a bit with the way Stackless 1.0 worked.
There was a global /per thread) schedule_lock, which is gone now.
The new atomic flag is much better, since it is defined
per tasklet, which makes things very very easy.
Setting the atomic flag simply disallows auto-scheduling for *this*
tasklet. But an explicit switch by schedule() or channel.send is
just fine. When switched back, the tasklet will again be locked,
and it can remove the atomic flag without any danger.
Also not, that for ease of use, there is an atomic() object,
which does this for your running function, automagically.

> Author: Eric van Riet Paap <eric at vanrietpaap.com>'''
> import stackless
> class mutex:
>         '''mutual exclusion object'''
>         data = {}

### Why a class variable and not per-mutex?

>         def __init__(self, name):
>                 '''mutexes are initialy unlocked'''
>                 self.name = name
>                 if not self.channel.has_key(name):

### Did you maybe want to write self.ddata, above?

>                         self.data[name] = [0, stackless.channel()]
>         def isLocked(self):
>                 '''return non-zero if locked'''
>                 return self.data[self.name][0]
>         def lock(self):
>                 '''acquire the lock'''
>                 a = stackless.setatomic(1)
>                 n, ch = self.data[self.name]
>                 self.data[self.name][0] = n+1
>                 if n: #other tasklet acquired the mutex
>                         #note: we actually might need to setatomic(0) here, 
> however I'm a bit afraid of switches that
>                         #      might happen between setatomic(0) and the next 
> line! (Chris, what do you think?)
>                         ch.receive()    #wait for tasklet with lock to send 
> data
>                 stackless.setatomic(a)

### here is my version, see the comment on top:

         def lock(self):
                 '''acquire the lock'''
                 atom = stackless.atomic()
                 n, ch = self.data[self.name]
                 self.data[self.name][0] = n+1
                 if n: #other tasklet acquired the mutex
                         #wait for tasklet with lock to send data
                 # no action needed, since atom will restore the
                 # tasklet's atomic flag when it is collected

### Alternative proposal: You don't need any extra counter,
### since the channel has the channel.balance property,
### which gives the number of waiting tasklets.
### balance > 0: There are senders, < 0: there are receivers

>         def unlock(self):
>                 '''release the lock'''
>                 a = stackless.setatomic(1)
>                 n, ch = self.data[self.name]
>                 self.data[self.name][0] = n-1
>                 if n: #other tasklets would like to aqcuire the mutex
>                         #note: same note is with lock method.
>                         ch.send(n)      #wake up a tasklets that is trying to 
> receive data from this channel
>                 stackless.setatomic(a)

### Same as above.

Here a slightly different implementation, which I just checked
into the new demo subfolder in the cvs tree.
It implements a simple mutex class and also shows how to use
callbacks on schedule() and channel actions for debugging.

from stackless import *

debug = 1

class mutex:
     def __init__(self, capacity=1):
         self.queue = channel()
         self.capacity = capacity

     def isLocked(self):
         '''return non-zero if locked'''
         return self.capacity == 0

     def lock(self):
         '''acquire the lock'''
         atom = atomic()
         if self.capacity:
             self.capacity -= 1

     def unlock(self):
         '''release the lock'''
         atom = atomic()
         if self.queue.balance < 0:
             self.capacity += 1

class MyTasklet(tasklet):
#    __slots__ = ["name"]
# refcount bug in 2.2, enable in 2.3
     def __init__(self, func, name=None):
         tasklet.__init__(self, func)
         if not name:
             name = "at %08x" % (id(self))
         self.name = name
     def __new__(self, func, name=None):
         return tasklet.__new__(self, func)
     def __repr__(self):
         return "Tasklet %s" % self.name

m = mutex()

def f():
     name = getcurrent().name
     print name, "acquiring"
     print name, "switching"
     print name, "releasing"

MyTasklet(f, "tick")()
MyTasklet(f, "trick")()
MyTasklet(f, "track")()

def channel_cb(channel, tasklet, sending, willblock):
     print tasklet, ("recv", "send")[sending], ("nonblocking", 

def schedule_cb(prev, next):
     if not prev:
         print "starting", next
     elif not next:
         print "ending", prev
         print "jumping from %s to %s" % (prev, next)

if debug:



Christian Tismer             :^)   <mailto:tismer at tismer.com>
Mission Impossible 5oftware  :     Have a break! Take a ride on Python's
Johannes-Niemeyer-Weg 9a     :    *Starship* http://starship.python.net/
14109 Berlin                 :     PGP key -> http://wwwkeys.pgp.net/
work +49 30 89 09 53 34  home +49 30 802 86 56  pager +49 173 24 18 776
PGP 0x57F3BF04       9064 F4E1 D754 C2FF 1619  305B C09C 5A3B 57F3 BF04
      whom do you want to sponsor today?   http://www.stackless.com/

Stackless mailing list
Stackless at www.tismer.com

More information about the Stackless mailing list