[Stackless] rough PyCon'07 draft; feedback wanted

Andrew Dalke dalke at dalkescientific.com
Tue Feb 13 02:38:48 CET 2007


Hi all,

   Here's my rough draft for what I can/hope to talk about
during PyCon.  It's rather long at 2441 lines.  It contains
a lot of code.

  According to http://us.pycon.org/apps07/talks/ (see #76),
I have 45 minutes for my Stackless slot, so <=40 minutes to talk.

   This has turned out to be the most difficult presentation
I've had to do because of the length of the talk, because
of my relative lack of experience with Stackless, and because
I generated a lot of timing numbers and alternative approaches.

   I'm sending it now for review and feedback.  Some
specific questions are:
   - what should I stress / deemphasize?  Are the examples
interesting to people (eg, XML processing, format identification,
handling blocking function calls and I/O)?

   - am I missing something obvious?

   - if people build a stackless binary do they usually call
      that binary "python", "stackless", "spython" or something else?

   - ???


   I hope to have a version as a text article but as this draft took
a few weeks of on-and-off work and thought, I can't make guarantees.
There's still a lot of work for the slides for the conference in
10 days.

  ========


Stackless is kind of an oddball in the Python community.  It's been
around for years but few people know about.  When Christian first
started developing it he didn't know how to explain it well, or even
quite what he was looking for.  As a result I think most people put it
aside and ignored it since.  It's sometimes said that the culture
around a project reflects its founder and that's probably another
reason people haven't heard much about Stackless.  Christian is a
soft-spoken person, and few people using Stackless are outspoken
enough to let others know just how cool it it.

I hope I can change some of this.


=== Extends CPython ===


Normal code works

def main(who):
     print who, "reporting for duty!"

if __name__ == "__main__":
     main("Python")


It can replace your normal Python but I built mine to use "spython"
XXX others use "stackless".  Preference?
XXX NOTE: known compilation problem on Intel Mac

./Stackless/platf/switch_x86_unix.h:37: error: PIC register 'ebx'  
clobbered in 'asm'
built my version with

./configure --enable-universalsdk --enable-stacklessfewerregisters

% spython simple.py
Python reporting for duty!
%

Stackless: cooperative multitasking

Like old Mac programming, or using an event-based system.  Prime the
system then enter the event loop.  Process events until done.

import stackless

def main(who):
     print who, "reporting for duty!"

if __name__ == "__main__":
     stackless.tasklet(main)("Stackless Python")
     stackless.run()


% spython simple.py
Stackless Python reporting for duty!
%

I'll start three tasklets

import stackless

def main(who):
     print who, "reporting for duty!"

if __name__ == "__main__":
     stackless.tasklet(main)("Threadlet 1")
     stackless.tasklet(main)("Threadlet 2")
     stackless.tasklet(main)("Threadlet 3")
     stackless.run()


% spython simple.py
Threadlet 1 reporting for duty!
Threadlet 2 reporting for duty!
Threadlet 3 reporting for duty!
%

Tasklets are processed in first-come-first-served (FIFO) order.  A
tasklet has control until it explicitly yields control.  One way to
yield control is with "stackless.schedule()"

import stackless

def main(who):
     if who.endswith("1"):
         stackless.schedule()
     print who, "reporting for duty!"

if __name__ == "__main__":
     stackless.tasklet(main)("Threadlet 1")
     stackless.tasklet(main)("Threadlet 2")
     stackless.tasklet(main)("Threadlet 3")
     stackless.run()


% spython simple.py
Threadlet 2 reporting for duty!
Threadlet 3 reporting for duty!
Threadlet 1 reporting for duty!
%


I'll use a new example to print text into three different columns.
The tasklets do not yield control just yet so the output is all of the
"numbers" in the first column then all of the "shapes" in the second
ending with all of the "colors" in the third.

import stackless

def print_column(column, lines):
     prefix = (" "*10) * column
     for line in lines:
         print prefix+line

def main():
     numbers = ["one", "two", "three", "four", "five", "six", "seven"]
     shapes = ["circle", "triangle", "square"]
     colors = ["red", "orange", "yellow", "green"]

     stackless.tasklet(print_text)(0, numbers)
     stackless.tasklet(print_text)(1, shapes)
     stackless.tasklet(print_text)(2, colors)
     stackless.run()

if __name__ == "__main__":
     main()


one
two
three
four
five
six
seven
           circle
           triangle
           square
                     red
                     orange
                     yellow
                     green


I'll change the code slightly to call the Stackless scheduler.  This
yields control from the current tasklet to the next one.  It will
print one line of the numbers, then one of the shapes, then one of the
colors, then start over again.  Once a tasklet has finished it is
removed and will not be scheduled again.


import stackless

def print_column(column, lines):
     prefix = (" "*10) * column
     for line in lines:
         print prefix+line
         stackless.schedule()

def main():
     numbers = ["one", "two", "three", "four", "five", "six", "seven"]
     shapes = ["circle", "triangle", "square"]
     colors = ["red", "orange", "yellow", "green"]

     stackless.tasklet(print_column)(0, numbers)
     stackless.tasklet(print_column)(1, shapes)
     stackless.tasklet(print_column)(2, colors)
     stackless.run()

if __name__ == "__main__":
     main()


% spython print_text.py
one
           circle
                     red
two
           triangle
                     orange
three
           square
                     yellow
four
                     green
five
six
seven
%

At this point you might be thinking Stackless is little more than a
set of functions built around Python's "yield" functionality.  At this
point you're right as I'm only showing the basics of Stackless.


Communication

Stackless is cooperative so there's little need for locks and other
synchronization between tasklets.  You could pass data across
tasklets with standard Python data structures but that gives you bad
habits (eg, things which look wrong in other thread libraries).

Instead, use a "channel", which is the rough equivalent of a
Queue.Queue.


A tasklet can "send" and "receive" across a channel.  The tasklet
blocks in the send until after another tasklet receives it.  A
receiving tasklet blocks until it gets something.


import stackless

chan = stackless.channel()

def send_something():
     print "before sending"
     chan.send("Sending out an SOS")
     print "after sending"

def receive_something():
     print "before receiving"
     msg = chan.receive()
     print "received", repr(msg)

stackless.tasklet(send_something)()
stackless.tasklet(receive_something)()
stackless.run()


The output when I run this is


% spython send.py
before sending
before receiving
received 'Sending out an SOS'
after sending
%

If I reverse the tasklet order so the last few lines becomes


stackless.tasklet(receive_something)()
stackless.tasklet(send_something)()
stackless.run()


then the output is

% spython send.py
before receiving
before sending
received 'Sending out an SOS'
after sending
%

You can see the first two lines of output are swapped because the
tasklets start order was swapped.

So far you haven't seen anything unusual.  You don't even need threads
to reproduce these examples, only a simple task loop and some
mechanical translation from stackless.schedule() to the yield
statement.  The following should also not be surprising.

I'll start a counter in another tasklet and pass each value through a
channel.


import stackless

def counter(chan, start, end):
     print "Starting counter"
     while start < end:
         print "Sending value", start
         chan.send(start)
         start = start + 1
     print "Counter finished"

def main():
     chan = stackless.channel()
     # Start a new tasket and put it on the end of the task queue
     stackless.tasklet(counter)(chan, 0, 3)
     print "Created new tasklet"

     while 1:
         # Read from the channel.  If the channel is empty this
         # yields control to another tasklet
         val = chan.receive()
         print "Received", val

if __name__ == "__main__":
     stackless.tasklet(main)()
     stackless.run()


The output looks correct


% spython counter.py
Created new tasklet
Starting counter
Sending value 0
Received 0
Sending value 1
Received 1
Sending value 2
Received 2
Counter finished
%

until you realize that the main loop

     while 1:
         # Read from the channel.  If the channel is empty this
         # yields control to another tasklet
         val = chan.receive()
         print "Received", val

should never end.  What happened?

Two things occured.  First is the "stackless.run()" exits when it
detects deadlocks.  In this case the main tasklet was blocked on
receive and there were no running tasklet.

After "run" returns, the program is done so Python does its normal
garbage collection.  Stackless cleans up tasklets by raising a
TaskletExit on every tasklet blocked on a receive.

Stackless adds TaskletExit to the standard library as
"exceptions.TaskletExit".  It's a subclass of SystemExit.  I've
modified the example to clarify the details.


import stackless
import exceptions

def counter(chan, start, end):
     print "Starting counter"
     while start < end:
         print "Sending value", start
         chan.send(start)
         start = start + 1
     print "Counter finished"

def main():
     chan = stackless.channel()
     stackless.tasklet(counter)(chan, 0, 3)
     print "Created new tasklet"

     while 1:
         try:
             val = chan.receive()
         except exceptions.TaskletExit:
             print "Caught TaskletExit"
             raise
         print "Received", val

if __name__ == "__main__":
     stackless.tasklet(main)()
     stackless.run()
     print "Program finished"


% spython counter.py
Created new tasklet
Starting counter
Sending value 0
Received 0
Sending value 1
Received 1
Sending value 2
Received 2
Counter finished
Program finished
Caught TaskletExit
%

Everything I've shown so far can be done with the standard CPython
using a yield statement^H^H^H^H^H^H^H^H^Hexpression and a bit of
mechanical overhead.  What makes Stackless interesting?

Most of the Stackless users at this point would say "game programming"
or "simulation work" or "actor-based programming".  For example, each
game element -- player, spaceship, bullet, door, bunny rabbit -- might
be it's own threadlet.  Stackless easily supports having hundreds of
thousands of threadlets running, as shown by the MMO game "EVE
Online."  Programming these sorts of systems with Stackless ends up
being more natural for most people.

However, I'm not going to talk about that.  I'm not a game programmer.
I tried learning enough about that to give a few examples but gave up
when I figured out I also needed to learn some graphics APIs to
demonstrate the results.  If you want to learn more about it, try
Grant Olson's "Introduction to Concurrent Programming with Stackless
Python" excellent tutorial at
   http://members.verizon.net/olsongt/stackless/why_stackless.html

Instead, I'll talk about why I find Stackless to be interesting.

Stackless lets you get around the C stack.  During a task switch it
replaces the execution stack of one task for another.  The stack depth
can be many frames deep.  By comparison, the "yield" stack depth is
one frame.


Here's an example of processing XML using SAX events.

import xml.sax

xml_text = "<cities><city>Dallas</city><city>Santa Fe</city></cities>"

class DebugContent(xml.sax.ContentHandler):
     def startElement(self, tag, attrib):
         print "start", repr(tag), dict(attrib.items())
     def characters(self, text):
         print "text", repr(text)
     def endElement(self, tag):
         print "end", repr(tag)

xml.sax.parseString(xml_text, DebugContent())


% spython itersax.py
start u'cities' {}
start u'city' {}
text u'Dallas'
end u'city'
start u'city' {}
text u'Santa Fe'
end u'city'
end u'cities'
%

The parser uses a few layers.  parseString() uses make_parser() to get
an XML reader.  On my system this is an instance of
xml.sax.expatreader.ExpatParser

 >>> xml.sax.make_parser()
<xml.sax.expatreader.ExpatParser instance at 0x206c88>
 >>>

which is a wrapper around the expat C library.  As expat processes the
XML it calls back to Python methods.

import xml.sax
import traceback

xml_text = "<cities><city>Dallas</city><city>Santa Fe</city></cities>"

class DebugContent(xml.sax.ContentHandler):
     def startElement(self, tag, attrib):
         traceback.print_stack()
         raise SystemExit()

xml.sax.parseString(xml_text, DebugContent())


% spython itersax.py
   File "itersax.py", line 11, in <module>
     xml.sax.parseString(xml_text, DebugContent())
   File "/Users/dalke/stackless/lib/python2.5/xml/sax/__init__.py",  
line 49, in parseString
     parser.parse(inpsrc)
   File "/Users/dalke/stackless/lib/python2.5/xml/sax/ 
expatreader.py", line 107, in parse
     xmlreader.IncrementalParser.parse(self, source)
   File "/Users/dalke/stackless/lib/python2.5/xml/sax/xmlreader.py",  
line 123, in parse
     self.feed(buffer)
   File "/Users/dalke/stackless/lib/python2.5/xml/sax/ 
expatreader.py", line 207, in feed
     self._parser.Parse(data, isFinal)
   File "/Users/dalke/cvses/stackless/Modules/pyexpat.c", line 604,  
in StartElement
     rv = call_with_frame(getcode(StartElement, "StartElement",  
__LINE__),
   File "/Users/dalke/stackless/lib/python2.5/xml/sax/ 
expatreader.py", line 301, in start_element
     self._cont_handler.startElement(name, AttributesImpl(attrs))
   File "itersax.py", line 8, in startElement
     traceback.print_stack()
%


Most people find it hard to process XML through callbacks.  It feels
backwards.  Another style is to transform this into a "pull-parser"
and iterate through events.  For example,

for (event, args) in iterParseString(xml_text):
     if event == "startElement":
         tag, attrib = args
         print "start", repr(tag), dict(attrib.items())
     elif event == "characters":
         print "text", repr(args)
     elif event == "endElement":
         print "end", repr(args)


I'll use Stackless to convert the callback-based expat parser into an
iterative pull-parser.

import xml.sax
import stackless

xml_text = "<cities><city>Dallas</city><city>Santa Fe</city></cities>"

class StacklessContentHandler(xml.sax.ContentHandler):
     def __init__(self, chan):
         self.chan = chan
     def startElement(self, tag, attrib):
         self.chan.send( ("startElement", (tag, attrib)) )
     def characters(self, text):
         self.chan.send( ("characters", text) )
     def endElement(self, tag):
         self.chan.send( ("endElement", tag) )

def iterParseString(text):
     chan = stackless.channel()
     handler = StacklessContentHandler(chan)
     stackless.tasklet(xml.sax.parseString)(text, handler)
     while 1:
         yield chan.receive()
     return chan

def main():
     for (event, args) in iterParseString(xml_text):
         if event == "startElement":
             tag, attrib = args
             print "start", repr(tag), dict(attrib.items())
         elif event == "characters":
             print "text", repr(args)
         elif event == "endElement":
             print "end", repr(args)

if __name__ == "__main__":
     stackless.tasklet(main)()
     stackless.run()



% spython itersax.py
start u'cities' {}
start u'city' {}
text u'Dallas'
end u'city'
start u'city' {}
text u'Santa Fe'
end u'city'
end u'cities'
%

(BTW, see ElementTree for a pull parser)

I wrote


def iterParseString(text):
     chan = stackless.channel()
     handler = StacklessContentHandler(chan)
     stackless.tasklet(xml.sax.parseString)(text, handler)
     while 1:
         yield chan.receive()

so it would be easier to understand coming from a standard Python
background.  In practice you can simplify it a bit.  A channel
implements the iterator protocol so the following also works


def iterParseString(text):
     chan = stackless.channel()
     handler = StacklessContentHandler(chan)
     stackless.tasklet(xml.sax.parseString)(text, handler)
     return chan


XXX It looks like gc is happier with this approach.  Explore and
explain.

   So what?

What I've shown so far is pretty standard for any sort of multiple
threading system.  You can implement the same code, with a few
renamings, using the normal Python threading library.

    Stackless      Threading
    =========      =========
     tasklet        thread
     channel        queue
       .send          .put
       .receive       .get

You need to be a bit more careful because system threads aren't
garbage collected and there is no deadlock detection.  To compare
these two, here's the itersax code rewritten to use a file instead of
text.  The first version uses Stackless and the other the threading
library.


=======================================================
import xml.sax
import stackless

xml_filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml"

class StacklessContentHandler(xml.sax.ContentHandler):
     def __init__(self, chan):
         self.chan = chan
     def startDocument(self):
         self.chan.send( ("startDocument", None) )
     def startElement(self, tag, attrib):
         self.chan.send( ("startElement", (tag, attrib)) )
     def characters(self, text):
         self.chan.send( ("characters", text) )
     def endElement(self, tag):
         self.chan.send( ("endElement", tag) )
     def endDocument(self):
         self.chan.send( ("endDocument", None) )

def iterParse(filename):
     chan = stackless.channel()
     handler = StacklessContentHandler(chan)
     stackless.tasklet(xml.sax.parse)(filename, handler)
     while 1:
         data = chan.receive()
         yield data
         if data[0] == "endDocument":
             break

def main():
     element_count = 0
     for (event, args) in iterParse(xml_filename):
         if event == "startElement":
             element_count += 1
     print "There were", element_count, "elements"

if __name__ == "__main__":
     stackless.tasklet(main)()
     stackless.run()

=======================================================

import xml.sax
import threading
import Queue
import time

xml_filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml"

class ThreadedContentHandler(xml.sax.ContentHandler):
     def __init__(self, queue):
         self.queue = queue
     def startDocument(self):
         self.queue.put( ("startDocument", None) )
     def startElement(self, tag, attrib):
         self.queue.put( ("startElement", (tag, attrib)) )
     def characters(self, text):
         self.queue.put( ("characters", text) )
     def endElement(self, tag):
         self.queue.put( ("endElement", tag) )
     def endDocument(self):
         self.queue.put( ("endDocument", None) )

def iterParse(filename):
     queue = Queue.Queue()
     handler = ThreadedContentHandler(queue)
     th = threading.Thread(target=xml.sax.parse, args=(filename,  
handler))
     th.setDaemon(True)
     th.start()

     while 1:
         data = queue.get()
         yield data
         if data[0] == "endDocument":
             break

def main():
     element_count = 0
     for (event, args) in iterParse(xml_filename):
         if event == "startElement":
             element_count += 1
     print "There were", element_count, "elements"

if __name__ == "__main__":
     main()

=======================================================

The first point is that the stackless code is faster -- much faster --
than the threaded code.  Stackless is all in user space and doesn't do
as much as threads do.

How much faster?  With my iTunes library file (6MB and 670K
events)

threaded version: 93 seconds (94.9s, 94.5s, 93.1s)
stackless version: 20 seconds (20.8s, 19.9s, 20.4s)
low-level SAX interface: 8.4 seconds  (8.36s, 8.57s, 8.49s)

The last uses a do-nothing ContentHandler and is the fastest you can
get using the SAX API from Python.  For faster XML parsing using
cElementTree.  That takes about 1 3/4 seconds on my machine because
building the data structure is done at the C level and does not have
the Python callback overhead.

XXX insert results from other OSes here

XXX mention using collections.deque vs. Queue.Queue
   (deque+lock is faster?  See Santiago Gala's emails.)
   NOTE: get rid of DTD lookup at front!)
with battery, 131s.  batter and Queue.Queue(1), 254 but ended with
    Unhandled exception in thread started by
    Error in sys.excepthook:

    Original exception was:
???

Here are simplified versions of a couple of file formats I use.  This
is a PDB file used for 3D macromolecular structures. Here I'm only
showing the ATOM records.  The actual format is much more complicated,
and in practice most people use modified version of the PDB structure.

===
ATOM      1  C           1      -0.426  -0.115  -0.147  1.00  0.00
ATOM      2  O           1      -0.599   1.244  -0.481  1.00  0.00
ATOM      3  H           1      -0.750  -0.738  -0.981  1.00  0.00
ATOM      4  H           1      -1.022  -0.351   0.735  1.00  0.00
ATOM      5  H           1      -1.642   1.434  -0.689  1.00  0.00
ATOM      6  C           1       1.047  -0.383   0.147  1.00  0.00
ATOM      7  H           1       1.370   0.240   0.981  1.00  0.00
ATOM      8  H           1       1.642  -0.147  -0.735  1.00  0.00
ATOM      9  H           1       1.180  -1.434   0.405  1.00  0.00
===

Here's a pretty standard way of parsing the file format.  In this case
I'll find the min and max values for the x coordinate.  That's not
useful for real purposes but it's enough for now.

def pdb_x_bounds(infile):
     minx = maxx = None
     for line in infile:
         if line[:6] == "ATOM  ":
             x = float(line[30:38])
             if minx is None:
                 minx = maxx = x
             else:
                 if x < minx: minx = x
                 if x > maxx: maxx = x
         else:
             raise TypeError("Not an ATOM record: %r" % (line,))
     return (minx, maxx)


Here is the same structure in the SD file format.

===
ethanol.pdb
OpenBabel0123D

   9  8  0  0  0  0  0  0  0  0999 V2000
    -0.4260   -0.1150   -0.1470 C   0  0  0  0  0
    -0.5990    1.2440   -0.4810 O   0  0  0  0  0
    -0.7500   -0.7380   -0.9810 H   0  0  0  0  0
    -1.0220   -0.3510    0.7350 H   0  0  0  0  0
    -1.6420    1.4340   -0.6890 H   0  0  0  0  0
     1.0470   -0.3830    0.1470 C   0  0  0  0  0
     1.3700    0.2400    0.9810 H   0  0  0  0  0
     1.6420   -0.1470   -0.7350 H   0  0  0  0  0
     1.1800   -1.4340    0.4050 H   0  0  0  0  0
   3  1  1  0  0  0
   2  1  1  0  0  0
   1  6  1  0  0  0
   1  4  1  0  0  0
   5  2  1  0  0  0
   8  6  1  0  0  0
   6  9  1  0  0  0
   6  7  1  0  0  0
M  END
===

A bounding range function for this is

import re
_verify_counts_line = re.compile(r"(  \d| \d\d|\d\d\d) 
{10}....V2000").match

def sdf_x_bounds(infile):
     readline = iter(infile).next
     readline(); readline(); readline() # Skip the first 3 lines

     counts_line = readline()
     if not _verify_counts_line(counts_line):
         raise TypeError("Invalid counts line in SD file: %r" %  
(counts_line,))
     num_atoms = int(counts_line[:3])
     minx = maxx = None
     while num_atoms:
         line = readline()
         num_atoms -= 1
         x = float(line[:10])
         if minx is None:
             minx = maxx = x
         else:
             if x < minx: minx = x
         if x > maxx: maxx = x
     return (minx, maxx)


XXX Note on the use of iter(file).next() vs. file.readline(); I prefer
the former even though it's less obvious.

If I have a PDB file I can pass it to the PDB function, and if I have
an SD file I can pass it to the SD reader.  What if I have an unknown
file format?  The usual solutions are to ask the user "what format is
this?", guess from the filename extension, or write a special
"sniffer" routine.

The above functions, like most of the parsers I wrote, do some format
validation.  Pass in an obviously wrong format and they raise an
exception: the explicit raises I do, or the implicit ones from the
float() conversion or StopIteration.

You could use the parsers as validators.  For example

formats = ( ("pdb", pdb_x_bounds),
             ("sdf", sdf_x_bounds) )

def sniff_format(infile):
     possible_formats = []
     for (typename, parser) in formats:
         infile.seek(0)
         try:
             parser(infile)
         except Exception:
             continue
         possible_formats.append(typename)
     return possible_formats

for filename in ("ethanol.pdb", "ethanol.sdf", "/etc/passwd"):
     print "Sniffing", repr(filename), "==", sniff_format(open 
(filename))


with output

Sniffing 'ethanol.pdb' == ['pdb']
Sniffing 'ethanol.sdf' == ['sdf']
Sniffing '/etc/passwd' == []

This is pretty useful but there are some concerns.  I seek(0) (really
should tell() then seek to the tell()), which only works for some
input streams.  If the input is a network socket or other streaming
connection I could copy the input to a local file or StringIO
instance, converting the stream to a seekable file.  Or I can use
something like my ReseekFile.

A PDB file may contain several hundred thousand atoms.  The
pdb_x_bounds function reads all of the ATOM records before returning.
If I repurpose the function as a sniffer I don't need it to do all
that work.  I would rather just have it read enough so there's a high
likelihood of being in the PDB format and not some other format.

In this case I know that reading the first 5 lines is enough so I can
make a new iterable wrapper which raises a new exception after reading
5 lines.  (3 lines is not enough because the 1st 3 lines of the SD
parser are ignored.)


class LimitReached(Exception):
     pass

class LimitedReader(object):
     def __init__(self, infile, limit=5):
         self._next = iter(infile).next
         self.limit = limit
         self.count = 0
     def __iter__(self):
         return self
     def next(self):
         if self.count >= self.limit:
             raise LimitReached
         # count will be wrong after a StopIterationError
         self.count += 1
         return self._next()


Remember, this is used in a context like

     while num_atoms:
         line = readline()
         num_atoms -= 1

When the line count limit is reached the LimitedReader raises a new
exception class which is not handled by any of the parsers, stopping
the parse at that point.

I change the sniffer driver to allo that special exception to mean
"good enough"


def sniff_format(infile):
     possible_formats = []
     for (typename, parser) in formats:
         infile.seek(0)
         try:
             parser(LimitedReader(infile))
         except LimitReached:
             pass
         except Exception:
             continue
         possible_formats.append(typename)
     return possible_formats

for filename in ("ethanol.pdb", "ethanol.sdf", "/etc/passwd"):
     print "Sniffing", repr(filename), "==", sniff_format(open 
(filename))


This approach is about the best you can do with normal Python.  It's
decent but still a bit ugly because of reseek().  The algorithm I
would like is to start each parser in parallel.  For each line of
input, pass the line to each parser.  If a parser fails because the
input is in the wrong format then remove it from the list of active
parsers.  When one parser remains, and if enough lines have been read,
then that's the likely format.

If the parsers were written as a state machine taking characters or
lines as input then that's trivial to implement.  But most people have
a hard time dealing with state machines.  A frame/block oriented
programming language, like Python with its for loops, try blocks, and
so on seems to fit people's mental models better.

(This is a reason I think people have a harder time using the SAX
event model then the iterative model I showed earlier, even though the
resulting code looks almost identical.  XXX is this true? elaborate?
remove comment?)

In the standard Python implementation a function can't do much to the
stack.  It can create a new frame by using a new block or calling a
function, it can pop a frame using a 'return', it can use a 'yield' to
defer the current function frame, or it can throw an exception, to be
caught several frames up.

Stackless gives one more way to manipulate the stack.  It can act like
a more powerful 'yield' statement to pause the execution of multiple
frames in the stack, put them aside, go to another stack, and continue
working from some other point.

That's exactly what I want here.

I need to be a bit careful because I only control the top and the
bottom of the stack.  The middle part is the parser.  Here's the
wrapper I use at the bottom of the stack, which can catch exceptions
and report them back to the "results" channel.

import stackless
import exceptions

def parser_wrapper(input, results, typename, parser):
     try:
         parser(input)
     except LimitReached:
         results.send( ("unfinished", typename, None) )
     except Exception, err:
         results.send( ("error", typename, err) )
     else:
         results.send( ("success", typename, None) )


I'll start a new tasklet for each parser, like this

     input = stackless.channel()
     results = stackless.channel()
     for (typename, parser) in formats:
         stackless.tasklet(parser_wrapper)(input, results, typename,  
parser)

The input channel gets each line of the input.  Remember that a
channel is also an iterator.  The parsers only use the iterator
interface so there's no need to write an adapter.

Even though the tasklets have been created they have not yet executed.
They are queued up, waiting.  I'll let them run now, which will take
each of the parsers up to the first readline.

     stackless.schedule()

Each tasklet is now blocked waiting for input.next(), which is the
next line of input.

The main driver code looks like this

     for line in infile:
         balance = -input.balance
         if balance == 0:
             break
         if balance == 1:
             input.send_exception(LimitReached)
             break
         input.send_sequence([line]*balance)
         process_results()

"balance" gives the number of tasklets waiting to write to a channel -
the number of tasklets waiting to read from a channel.  In this case
the first time threw the balance is -2 because there are two parsers
waiting for the next line.

If there are no tasklets left then there's no need to continue.

If there's one tasklet remaining then assume it's the correct format
and stop.  To get nice cleanup I'll raise a LimitReached exception in
the tasklet.  This will be caught in the parser_wrapper and converted
to an "unfinished" message on the results channel.

Otherwise I'll put 'N' copies of the input line in the 'input'
channel, once for each waiting tasklet.  The "send_sequence" is faster
than doing the loop manually (XXX according to the documentation).

Each tasklet gets a line of input and processes it.

How does data get back from the parser code to the driver code?  If
the parser throws an exception or finishes then the parser_wrapper
catches it and sends data to the "results" channel.  The driver code
needs to check that channel to see if any results are pending.  Here I
encapulated it within the "process_results" function, which I've
written as a local function.


     possible_formats = []

     def process_results():
         while results.balance:
             status, typename, err = results.receive()
             if status == "error":
                 pass
             else:
                 possible_formats.append(typename)

There are three ways for the main driver loop to finish:
   - 0 tasklets remain (unknown format)
   - 1 tasklet remains (assume that that defines the format)
   - the input file has no more lines

I need to handle the last case correctly.  The parsers are waiting for
the next line; accessed via iterator protocol.  Therefore I need to
raise a StopIteration exception to tell each one that there are no
more lines.  This is an excellent place to use the "else:" clause of a
for loop.

     for line in infile:
         balance = -input.balance
         if balance == 0:
             break
         if balance == 1:
             input.send_exception(LimitReached)
             break
         input.send_sequence([line]*balance)
         process_results()
     else:
         for i in range(-input.balance):
             input.send_exception(StopIteration)


Channels do have a "close()" method which looks like the right
solution.  However, "close" raises a TaskletExit exception in each
tasklet trying to read from the closed channel, which breaks the
iterator protocol.

I think that when a channel is closed then iter(channel).next() should
raise a StopIteration exception instead of TaskletExit.

(NOTE: the above code is wrong -- it should also send StopIteration on
any future attempt to use the channel, and not just the first time.)


It may be that some processes send an "error" message while others
send "success", depending on if they need more input lines or not.
The parser_wrapper catches those cases and forwards the results
through the "results" channel.  The driver code needs to process those
messages before returning the list of likely format types.

     process_results()
     return possible_formats


Putting the pieces together the main driver code is

def sniff_format(infile):
     input = stackless.channel()
     results = stackless.channel()
     for (typename, parser) in formats:
         stackless.tasklet(parser_wrapper)(input, results, typename,  
parser)

     # Let all of the parsers get to a readline()
     # This depends on Stackless' round-robin behavior
     stackless.schedule()

     possible_formats = []

     def process_results():
         while results.balance:
             status, typename, err = results.receive()
             if status == "error":
                 pass
             else:
                 possible_formats.append(typename)

     for line in infile:
         balance = -input.balance
         if balance == 0:
             break
         if balance == 1:
             input.send_exception(LimitReached)
             break
         input.send_sequence([line]*balance)
         process_results()
     else:
         for i in range(-input.balance):
             input.send_exception(StopIteration)

     process_results()
     return possible_formats


I'll try it out with my test code

def test_sniff():
     for filename in ("ethanol.pdb", "ethanol.sdf", "/etc/passwd"):
         possible_formats = sniff_format(open(filename))
         print "Sniffing", repr(filename), "==", possible_formats

if __name__ == "__main__":
     stackless.tasklet(test_sniff)()
     stackless.run()


Sniffing 'ethanol.pdb' == ['pdb']
Sniffing 'ethanol.sdf' == ['sdf']
Sniffing '/etc/passwd' == ['sdf']


That's strange -- /etc/passwd is sniffed as 'sdf' format?  What's
wrong?

With a bit of debugging I found the answer.  The first line is not in
PDB format so it's removed from the list of possibilities.  Only one
option is left -- sdf -- so the drive code exits the main loop.  The
SD format parser after all reads the first three lines before doing
any checking.  Those lines could contain anything.

I need to parse 4 and hopefully 5 lines to have a better likelihood of
my sniffer being correct.  That's an easy change.

     for lineno, line in enumerate(infile):
         balance = -input.balance
         if balance == 0:
             break
         if balance == 1 and lineno >= 5:
             input.send_exception(LimitReached)
             break
         input.send_sequence([line]*balance)
         process_results()


Sniffing 'ethanol.pdb' == ['pdb']
Sniffing 'ethanol.sdf' == ['sdf']
Sniffing '/etc/passwd' == []


This algorithm may not suffice for you.  Perhaps a more sensitive test
is to read 5 lines after there's only one parser left.  I can think of
other variations which may be more appropriate, depending on the data.
The point is to show how you can use stackless for this.


XXX insert visual depiction of what happened? XXX

== Cooperative multitasking ==

Here's a rather silly function

def count_to_a_big_number():
     x = 10000000
     result = 0
     while x:
         result += x
         x -= 1
     return result

It takes about 17 seconds for my laptop to compute 50000005000000

Suppose you want to run other code while the compute task is
processing.  In a preemptive system there's no problem: the thread
library swaps things out correctly.

But Stackless is a cooperative threading system.  It can't force a
context switch on running code (well, technically it can through hooks
in Python's run-time, but it's recommended that you not to that, in
part because of the higher need to handle critical sections explicitly).

If you have access to the code you may be able to modify it and add
"stackless.schedule()" calls.  This tells Stackless to suspend
execution on the current stack and go to the next tasklet in the
queue.  Here's the simplest way to change the above code to give up
its timeslice about every 0.2 seconds.

def count_to_a_big_number():
     x = 10000000
     result = 0
     while x:
         if x % 100000 == 0:
             stackless.schedule()
         result += x
         x -= 1
     return result

When I tried it my runtime when from 17 to 22 seconds because of the
extra calls to do the modulo ("%") calculations.  In nearly all cases
you can rework the code to minimze the overhead, though it may require
more code and more thought.  For example, the following takes about
14.3 seconds.  It's faster probably because xrange is faster than
doing the "-=1" in Python.


def count_to_a_big_number():
     x = 10000000
     result = 0
     N = 100000
     for a in xrange(x//N):
         for b in xrange(N):
             result += x
             x -= 1
         stackless.schedule()
     while x:
         result += x
         x -= 1
     return result

and for this case a very precocious schoolkid can figure out the
analytic and trivial exact solution.

It does takes some thought and testing to get the balance between
compute and scheduling correct.  If you are doing a lot of
compute-based calculations you'll probably want to stay with regular
threads or switch to multiple processes, which can be parallelized
across multiple processors and machines.

== Fun with recursion ==

Python has a default recursion limit of 1000.  Using everyone's
favorite recursive algorithm:

def factorial(n):
     if n <= 1:
	return 1
     return n * factorial(n-1)

print factorial(1000)

gives a long traceback and the message

RuntimeError: maximum recursion depth exceeded

Stackless lets you allocate new stacks as needed so here's a couple of
functions which call a given function in a new tasklet and returns the
results like a normal function call.

import stackless

def call_wrapper(f, args, kwargs, result_channel):
     try:
	result_channel.send(f(*args, **kwargs))
     except Exception, e:
	result_channel.send_exception(e)

def call(f, *args, **kwargs):
     result_channel = stackless.channel()
     stackless.tasklet(call_wrapper)(f, args, kwargs, result_channel)
     return result_channel.receive()

The "call_wrapper" is the base of the new stack.  It calls the
function and puts the result or the exception back into the results
channel.  Here's the quick replacement for the factorial function

def factorial(n):
     if n <= 1:
	return 1
     return n * call(factorial, n-1)

print factorial(1000)

In case you are curious, that's

402387260077093773543702433923003985719374864210714632543799
910429938512398629020592044208486969404800479988610197196058
631666872994808558901323829669944590997424504087073759918823
627727188732519779505950995276120874975462497043601418278094
646496291056393887437886487337119181045825783647849977012476
632889835955735432513185323958463075557409114262417474349347
553428646576611667797396668820291207379143853719588249808126
867838374559731746136085379534524221586593201928090878297308
431392844403281231558611036976801357304216168747609675871348
312025478589320767169132448426236131412508780208000261683151
027341827977704784635868170164365024153691398281264810213092
761244896359928705114964975419909342221566832572080821333186
116811553615836546984046708975602900950537616475847728421889
679646244945160765353408198901385442487984959953319101723355
556602139450399736280750137837615307127761926849034352625200
015888535147331611702103968175921510907788019393178114194545
257223865541461062892187960223838971476088506276862967146674
697562911234082439208160153780889893964518263243671616762179
168909779911903754031274622289988005195444414282012187361745
992642956581746628302955570299024324153181617210465832036786
906117260158783520751516284225540265170483304226143974286933
061690897968482590125458327168226458066526769958652682272807
075781391858178889652208164348344825993266043367660176999612
831860788386150279465955131156552036093988180612138558600301
435694527224206344631797460594682573103790084024432438465657
245014402821885252470935190620929023136493273497565513958720
559654228749774011413346962715422845862377387538230483865688
976461927383814900140767310446640259899490222221765904339901
886018566526485061799702356193897017860040811889729918311021
171229845901641921068884387121855646124960798722908519296819
372388642614839657382291123125024186649353143970137428531926
649875337218940694281434118520158014123344828015051399694290
153483077644569099073152433278288269864602789864321139083506
217095002597389863554277196742822248757586765752344220207573
630569498825087968928162753848863396909959826280956121450994
871701244516461260379029309120889086942028510640182154399457
156805941872748998094254742173582401063677404595741785160829
230135358081840096996372524230560855903700624271243416909004
153690105933983835777939410970027753472000000000000000000000
000000000000000000000000000000000000000000000000000000000000
000000000000000000000000000000000000000000000000000000000000
000000000000000000000000000000000000000000000000000000000000
000000000000000000000000000000000000000000000000

There is some overhead for doing all of this stack manipulation.  I
wrote two functions, "factorial" and "factorial_stackless" and ran
both of them under Stackless python to compute 998! one thousand
times.  I also ran "factorial" under the normal Python.  The best of 3
runs times are:

factorial           on CPython          = 1.4  s
factorial           on Stackless Python = 2.6  s
factorial_stackless on Stackless Python = 6.75 s

Using Stackless gives you additional function call overhead compared
to normal Python, even in programs which don't use stackless.
Manipulating stacks, probably because of the additional send and
receive function calls and the channel and tasklet creation, adds a
lot more overhead.


== Blocking ==

Because Stackless is cooperative (non-preemptive), it does not handle
blocking.  If a function call computes without ever yielding control
or is blocked by a system call then all of stackless blocks and no
other threadlets work.  Here's an example showing that the two sleep
tasklets are executed in series and not in parallel.


import time
import stackless

start_time = time.time()

def sleep_report(delay):
     time.sleep(delay)
     print "Elapsed time after", delay, "second delay:",
     print time.time() - start_time

stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
stackless.run()


Elapsed time after 5 second delay: 5.00160193443
Elapsed time after 10 second delay: 15.0034680367


Stackless does work in combination with normal threads so one solution
is to defer all blocking calls to another thread.  Here I'll show an
example creating a new thread for each blocking request.  You might
want to use a thread pool to defer thread creation overhead.

import time
import stackless
import threading

start_time = time.time()

_thread_count = 0

class CallInThread(threading.Thread):
     def __init__(self, f, args, kwargs, results):
         threading.Thread.__init__(self, target=f, args=args,  
kwargs=kwargs)
         self.results = results
     def run(self):
         try:
             result = super(CallInThread, self).run()
         except Exception, err:
             self.results.send_exception(err)
         else:
             self.results.send(result)
         print "Thread", self._Thread__args, "ends at", time.time() -  
start_time

def call_in_thread(f, *args, **kwargs):
     global _thread_count
     results = stackless.channel()
     _thread_count += 1
     try:
         CallInThread(f, args, kwargs, results).start()
         return results.receive()
     finally:
         _thread_count -= 1

def sleep_report(delay):
     call_in_thread(time.sleep, delay)
     print "Elapsed time after", delay, "second delay:",
     print time.time() - start_time

stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
while 1:
     stackless.run()
     if _thread_count == 0:
         break
     time.sleep(0.02)

print "Finished at", time.time()-start_time


Here is the output to show that it works.

Elapsed time after 5 second delay: 5.00126981735
Thread (5,) ends at 5.00324678421
Elapsed time after 10 second delay: 10.0125999451
Finished at 10.0150208473
Thread (10,) ends at 10.0165588856


Threads and stackless don't mix perfectly.  The Stackless "run"
function exits when it detects a deadlock.  The computations are
pushed to another thread so all of the Stackless tasklets are blocked.
Stackless interprets blocked tasklets as deadlock.  To handle that I
wrote a main loop around "run" checking that neither tasks nor threads
are pending.  This is a busy-wait, which is rather ugly.  Your process
may sip CPU even if doing nothing.

A more elegant solution is to switch to asynchonous calls where
possible, and only use threads when there's no other solution.  This
is the approach that asyncore, Twisted, and XXX use.

A downside of that approach is you feel like you're programming
inside-out.  Every time something can block, or wait for data, you
have to schedule the next step via a callback.  While it's learnable
it's a big hassle for most people to learn.

One way to overcome that inside-out feeling is to hide most of it
using continuations.  That's the computer science term for what
Stackless does.  Make the API look like a normal blocking call but
have the implementation use Stackless so that other tasklets can run.

I'll show this using an implementation of the "sleep" function.
Suppose I'm making fried spam and hard-boiled eggs.  To make fried
spam (don't try this at home -- I'm make it all up as I go along):

   * put cut spam on griddle
   * wait 2 minutes
   * flip spam and fry other side
   * wait 1 more minute
   * put spam on plate
   * done

To boil an egg:

   * place egg in boiling water
   * wait 3 minutes
   * put egg on plate
   * done

So to make spam and eggs:

   * set out a plate
   * fry the spam
   * boil the egg
   * serve
   * done

Here's how to implement it sequentially, although I'm using a few
cosmetic changes to make the result look more like how the Stackless
version will be.

import time

start_time = time.time()
def when():
     delta = "%.1f" % (time.time() - start_time,)
     return "(at %s 'minutes')" % (delta,)

sleep = time.sleep

class Plate(list):
     send = list.append
     def receive(self):
         return self.pop(0)

def fry_spam(plate):
     print "frying spam for 2 minutes", when()
     sleep(2)
     print "frying other side for 1 minute", when()
     sleep(1)
     print "putting spam on plate", when()
     plate.send("fried spam")

def boil_egg(plate):
     print "boiling egg for 2 minutes", when()
     sleep(2)
     print "putting egg on plate", when()
     plate.send("boiled egg")

def spam_and_eggs():
     plate = Plate()
     fry_spam(plate)
     boil_egg(plate)
     print "Plate now has", plate.receive(), when()
     print "Plate now has", plate.receive(), when()

if __name__ == "__main__":
     spam_and_eggs()


When I run it I get

frying spam for 2 minutes (at 0.0 'minutes')
frying other side for 1 minute (at 2.0 'minutes')
putting spam on plate (at 3.0 'minutes')
boiling egg for 2 minutes (at 3.0 'minutes')
putting egg on plate (at 5.0 'minutes')
Plate now has fried spam (at 5.0 'minutes')
Plate now has boiled egg (at 5.0 'minutes')

This algorithm takes 5 minutes even though doing the tasks in parallel
only takes 3 minutes.

The thread solution is also simple.  The only differences are a couple
of extra imports, a change to the Plate class, and the code to start
each thread.

import time
import threading
import Queue

start_time = time.time()
def when():
     delta = "%.1f" % (time.time() - start_time,)
     return "(at %s 'minutes')" % (delta,)

sleep = time.sleep

class Plate(Queue.Queue):
     send = Queue.Queue.put
     receive = Queue.Queue.get

def fry_spam(plate):
     print "frying spam for 2 minutes", when()
     sleep(2)
     print "frying other side for 1 minute", when()
     sleep(1)
     print "putting spam on plate", when()
     plate.send("fried spam")

def boil_egg(plate):
     print "boiling egg for 2 minutes", when()
     sleep(2)
     print "putting egg on plate", when()
     plate.send("boiled egg")

def spam_and_eggs():
     plate = Plate()
     threading.Thread(target=fry_spam, args=(plate,)).start()
     threading.Thread(target=boil_egg, args=(plate,)).start()
     print "Plate now has", plate.receive(), when()
     print "Plate now has", plate.receive(), when()

if __name__ == "__main__":
     spam_and_eggs()

The output is a bit confusing because the receive shifts execution
partway through the print statement.  XXX fix for slide

frying spam for 2 minutes (at 0.0 'minutes')
boiling egg for 2 minutes (at 0.0 'minutes')
Plate now has frying other side for 1 minute (at 2.0 'minutes')
putting egg on plate (at 2.0 'minutes')
boiled egg (at 2.0 'minutes')
Plate now has putting spam on plate (at 3.0 'minutes')
fried spam (at 3.0 'minutes')


I won't show how to implement this in Twisted as I don't have enough
practice with that system.  The fry_spam part might look something
like the following.

class FrySpam(object):
     def __init__(self, plate):
         self.plate = plate
     def step_1(self):
         print "frying spam for 2 minutes", when()
         callLater(2, self.step_2)
     def step_2(self):
         print "frying other side for 1 minute", when()
         callLater(1, self.step_3)
     def step_3(self):
         print "putting spam on plate", when()
         self.plate.send("fried spam")


Okay, now how do I do this with Stackless?  The simplest way is to
busy-wait, like this:

import time
import stackless
import heapq

start_time = time.time()
def when():
     delta = "%.1f" % (time.time() - start_time,)
     return "(at %s 'minutes')" % (delta,)

def sleep(t):
     wakeup_time = time.time() + t
     while time.time() < wakeup_time:
         stackless.schedule()

def fry_spam(plate):
     print "frying spam for 2 minutes", when()
     sleep(2)
     print "frying other side for 1 minute", when()
     sleep(1)
     print "putting spam on plate", when()
     plate.send("fried spam")

def boil_egg(plate):
     print "boiling egg for 2 minutes", when()
     sleep(2)
     print "putting egg on plate", when()
     plate.send("boiled egg")

def spam_and_eggs():
     plate = stackless.channel()
     stackless.tasklet(fry_spam)(plate)
     stackless.tasklet(boil_egg)(plate)
     for _ in range(2):
         food = plate.receive()
         print "Plate now has", food, when()

if __name__ == "__main__":
     stackless.tasklet(spam_and_eggs)()
     stackless.run()


The key part is

def sleep(t):
     wakeup_time = time.time() + t
     while time.time() < wakeup_time:
         stackless.schedule()

(you might want to use

def sleep(t):
     wakeup_time = time.time() + t
     while 1:
         stackless.schedule()
         if time.time() >= wakeup_time:
             break

in case you want sleep() to always yield)

The code works but it still has that busy-wait, which I don't like.
What can I do about it?

I'll start by making it someone else's problem.  I'll have a
centralized sleep manager which does the busy waiting for me.
Here's the SEP implementation of sleep.

import heapq

sleep_queue = []

def sleep(t):
     wakeup_time = time.time() + t
     wakeup_channel = stackless.channel()
     heapq.heappush(sleep_queue, (wakeup_time, wakeup_channel) )
     wakeup_channel.receive()


It's still someone's problem.  Here's the someone, which is run as a
tasklet.

def sleep_scheduler():
     while 1:
         while sleep_queue:
             now = time.time()
             wakeup_time, wakeup_channel = sleep_queue[0]
             if now >= wakeup_time:
                 heapq.heappop(sleep_queue)
                 wakeup_channel.send(0)
             else:
                 break

         stackless.schedule()


and scheduled like this

if __name__ == "__main__":
     stackless.tasklet(spam_and_eggs)()
     stackless.tasklet(sleep_scheduler)()
     stackless.run()

The code still busy waits, and I've added a new problem.  The
sleep_scheduler never finishes so "run" never finishes.  My code no
longer exits.

In talking things over with people on the Stackless mailing list, I
think the problem is that "run" isn't quite smart enough.  As I showed
earlier it doesn't know about threads, and here it doesn't know that
some tasklets shouldn't be counted as running.  In looking through the
archives the "run" function was once known as "run_watchdog".  It
isn't really meant to be the main loop of your program.

The recommendation is that you should implement your own main loop for
non-trivial tasks in Stackless.  I'll develop variations of that
through the rest of this talk.

I'll call is "run_all" to make it distinct from the Stackless version.
The simplest implementation of "run_all" is something like

def run_all():
   while 1:
     stackless.schedule()
     if <<not stackless jobs are running>>:
         break

I used << >> here because there are many ways to do the test.  One is
to look at the number of running tasklets.  The main tasklet is always
runnable and the sleep_scheduler is also always running.  If the
number of runnable tasklets is 2 or less then run_all can exit.

     if stackless.getruncount() <= 2:
         break

(BTW, I test after the schedule() to make sure all tasklets have a
chance to run.)

If you're writing an application you probably want a user command to
shut things down.  That might look like

_finished = False
def exit():
     global _finished
     _finished = True

def run_all():
     global _finished
     _finished = True
     while 1:
         stackless.schedule()
         if _finished:
             break

and have a tasklet call "exit" when it's time to finish.  What I
decided to do for this talk was define a new tasklet creation function
called "main_tasklet".  It works like "stackless.tasklet" expect that
my run_all function will not exit until all of the "main" tasklets are
finished.

main_tasklet_count = 0

def main_tasklet(f):
     def main_wrapper(*args, **kwargs):
         global main_tasklet_count
         main_tasklet_count += 1
         try:
             f(*args, **kwargs)
         finally:
             main_tasklet_count -= 1
     return stackless.tasklet(main_wrapper)

The code is a bit tricky because it defines a function in one tasklet
which will be run in another tasklet.  The main reason for this was to
keep the same API as "stackless.tasklet".

The rest of the code is

def run_all():
     while 1:
         stackless.schedule()
         if main_tasklet_count == 0:
             break

if __name__ == "__main__":
     main_tasklet(spam_and_eggs)()
     stackless.tasklet(sleep_scheduler)()
     run_all()


I run this and

frying spam for 2 minutes (at 0.0 'minutes')
boiling egg for 2 minutes (at 0.0 'minutes')
frying other side for 1 minute (at 2.0 'minutes')
putting egg on plate (at 2.0 'minutes')
Plate now has boiled egg (at 2.0 'minutes')
putting spam on plate (at 3.0 'minutes')
Plate now has fried spam (at 3.0 'minutes')


it exits.  That's nice, but it still busy-waits because the
sleep_scheduler busy-waits.

What I want to do is use time.sleep() to block the system if there are
no running tasklets (other than the sleep_scheduler) until the next
tasklet is ready to wake up.  There are two likely places to put the
sleep: "run_all" and "sleep_scheduler".

Either works fine for this case and I originally put the sleep in
run_all.  Aftwards I found a conflict when I added support for I/O.  I
wanted to use asyncore's timeout option to schedule the wakeup and
because of the implementation that meant the asyncore loop cannot run
in the main tasklet.  Why?  Stackless does not allow the main tasklet
to block on a channel, which I used to let asyncore communicate with
the tasklets waiting for data.


I need a way to know if any tasklets are running.  I can use
"stackless.getruncount()" function to get that number, but it will
always be at least 2: one for the main tasklet and one for the
sleep_scheduler.

def sleep_scheduler():
     while 1:
         while sleep_channel.balance > 0:
             heapq.heappush(sleep_queue, sleep_channel.receive())

         # Process wakeup requests
         while sleep_queue:
             now = time.time()
             wakeup_time, wakeup_channel = sleep_queue[0]
             if now >= wakeup_time:
                 heapq.heappop(sleep_queue)
                 wakeup_channel.send(0)
             else:
                 break

         # Figure out if I need to sleep.
         timeout = None

         # If there are any jobs running (besides the main tasklet and
         # this one) then do not sleep
         if stackless.getruncount() > 2:
             timeout = 0
         else:
             # any tasklets to wake up?
             if sleep_queue:
                 timeout = time.time() - sleep_queue[0][0]

         # Process any async requests
         if timeout is None:
             # Nothing running and nothing sleeping.
             raise AssertionError("deadlock detected")
         else:
             time.sleep(timeout)

         # Let everyone else do something
         stackless.schedule()



If you use a library with many blocking calls then I suggest you put
calls to the library its own thread and pass messages between the
stackless thread (containing all the tasklets) and the library thread.
There's a complication between Stackless doesn't know if any of the
threads have sent data back to the tasklets.  I couldn't find a better
solution that using a small timeout delay for that case, like 0.05
seconds.  The time will depend in part on your requirements.

Some code is available in both blocking and non-blocking form.  The
most notable of these is networking code.  The Python standard library
includes "asyncore" for doing asynchonous I/O.

Here is the example code from the documentation for asyncore


import asyncore, socket

class http_client(asyncore.dispatcher):

     def __init__(self, host, path):
         asyncore.dispatcher.__init__(self)
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.connect( (host, 80) )
         self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

     def handle_connect(self):
         pass

     def handle_close(self):
         self.close()

     def handle_read(self):
         print self.recv(8192)

     def writable(self):
         return (len(self.buffer) > 0)

     def handle_write(self):
         sent = self.send(self.buffer)
         self.buffer = self.buffer[sent:]

c = http_client('www.python.org', '/')

asyncore.loop()


Compare that to the "normal" solution which is

import urllib2
f = urllib2.urlopen("http://www.python.org/")
print f.read()

The advantage of course with the asyncore solution is that it can
handle multiple connections.  (I'll ignore other complications like
the asyncore having less memory overhead and urllib2 having better
support for HTTP headers and proxies.  For better protocol support in
an asynchonous library see Twisted or XXX.)

By the way, there is a slight problem with the above example.  The  
initial request says

         self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

when it should say

         self.buffer = 'GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path,  
host)

Without the "Host:" in the request header a multihomed web server
won't know which server to send the request to.


Suppose I want to get the content from 3 top-level web pages and
compute the md5 hash of the reversed text.  It's silly, but I chose it
as something where the code has to fetch and store all the data before
doing anything.

Here's one solution

import urllib2
import time
import hashlib

def open_toplevel(host):
     url = "http://" + host + "/"
     return urllib2.urlopen(url)

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel(host).read()[::-1]
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = %s in %.2f s" % (host, digest, dt)

start_time = time.time()
fetch_and_reverse("www.python.org")
fetch_and_reverse("docs.python.org")
fetch_and_reverse("planet.python.org")
print "total time: %.2f s" % (time.time()-start_time,)

hash of 'www.python.org'/ = 471c97ee4266932e80c36a0103d33863 in 0.90 s
hash of 'docs.python.org'/ = 24bce4dc223cc5c9047d0c978d41e701 in 0.60 s
hash of 'planet.python.org'/ = 8408cea2f6bf53b0827473c567c29b67 in  
1.10 s
total time: 2.60 s


I wrote the "open_toplevel" function so it fits in with the code I'm
about to develop using stackless.  I want the result to look like

import urllib2
import time
import slib

... stackless / asyncore interface code here ...

def open_toplevel(hostname):
     ... new code here, returns a file-like object implementing "read 
()" ...

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel.read()[::-1]
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = % in %.2f s" % (host, digest, dt)

start_time = time.time()
slib.main_tasklet(fetch_and_reverse)("www.python.org")
slib.main_tasklet(fetch_and_reverse)("docs.python.org")
slib.main_tasklet(fetch_and_reverse)("planet.python.org")
slib.run_all()
print "total time: %.2f s" % (time.time()-start_time,)

where "slib" is short for "Stackless library" and contains the code
I've been developing.  Sadly, I couldn't think of a better name.

The "main_tasklet" code is the same as earlier.

import stackless
import asyncore

main_tasklet_count = 0

def main_tasklet(f):
     def main_wrapper(*args, **kwargs):
         global main_tasklet_count
         main_tasklet_count += 1
         try:
             f(*args, **kwargs)
         finally:
             main_tasklet_count -= 1
     return stackless.tasklet(main_wrapper)

I've changed the scheduler to use asyncore.  For now I'm not going to
worry about sleeping tasks but in a bit I'll show how that works with
the timeout.

def scheduler():
     while 1:
         # If there are any jobs running (besides the main tasklet and
         # this one) then do not sleep.  Else sleep for up to 30 seconds
	timeout = 30
         if stackless.getruncount() > 2:
             timeout = 0

	asyncore.loop(timeout=timeout, count=1)

         # Let everyone else do something
         stackless.schedule()

I'm using a default timeout of "30" because that's what asyncore.loop
uses.


I've moved the scheduler initialization into the run_all function so
user code doesn't need to start it.  More robust code should make sure
that multiple calls to run_all don't add multiple schedulers.


def run_all():
     stackless.tasklet(scheduler)()
     while 1:
         stackless.schedule()
         if main_tasklet_count == 0:
             break

That's its for "slib".  There's nothing new here other than a call
to "asyncore.loop" every time through the tasklet queue.

The API I'm trying to replace is

def open_toplevel(hostname):
     ... new code here ...

This creates an HTTP connection to the given host and returns an
object which implements the "read()" functionality of a file.

If you look at the asyncore example http_client you'll see that once
the connection is set up the only asynchronous part is in the
"handle_read" method and "handle_close".  (The class implements a
"handle_write" method but that's only to send a data buffer created in
the constructor.)

Instead of printing that data my new "handle_read" will send it to a
new "read_channel", passed in via the constructor.  When asyncore says
the connection is closed, my handle_close turns around and closes the
read_channel.

class stackless_http_client(asyncore.dispatcher):
     def __init__(self, host, path, read_channel):
         asyncore.dispatcher.__init__(self)
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.connect( (host, 80) )
         self.buffer = 'GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path,  
host)
         self.read_channel = read_channel

     def handle_connect(self):
         pass

     def handle_close(self):
         self.read_channel.close()
         self.close()

     def handle_read(self):
         self.read_channel.send(self.recv(8192))

     def writable(self):
         return (len(self.buffer) > 0)

     def handle_write(self):
         sent = self.send(self.buffer)
         self.buffer = self.buffer[sent:]

I need a weakly file-like object that implements "read()" by reading
all the data from the read_channel.  That's simple:

class StacklessFileAdapter(object):
     def __init__(self, read_channel):
         self.read_channel = read_channel
     def read(self):
         # XXX not quite right!
         return "".join(self.read_channel)

(Remember that channels implement the iterator protocol so "".join()  
works.)

It's actually too simple.  The stackless_http_client code is a bit
lower-level than urllib2.urlopen and it does not process the HTTP
response headers.  I have to do a bit more processing to skip those
headers:

class StacklessFileAdapter(object):
     def __init__(self, read_channel):
         self.read_channel = read_channel
     def read(self):
	s = "".join(self.read_channel)
	# Skip the http response headers
	i = s.index("\r\n\r\n")
	return s[i+4:]

And the last bit to put it all together is:

def open_toplevel(hostname):
     read_channel = stackless.channel()
     stackless.tasklet(stackless_http_client)(hostname, "/",  
read_channel)
     return StacklessFileAdapter(read_channel)

Note that this is not a "main_tasklet".  I reserve that for tasklets
which must finish before the program exits.

Let's give it a whirl:

hash of 'docs.python.org'/ = 24bce4dc223cc5c9047d0c978d41e701 in 0.54 s
hash of 'www.python.org'/ = 471c97ee4266932e80c36a0103d33863 in 0.66 s
hash of 'planet.python.org'/ = 8408cea2f6bf53b0827473c567c29b67 in  
1.17 s
total time: 1.17 s

You can see that the fastest fetch finished first and that all three
fetches were done in parallel.

I can add sleep support pretty easily.  It's the same "sleep" function
from earlier and most of the code from "sleep_scheduler".  If a task
is sleeping then use that as an upper limit for the asyncore.loop
timeout.  Of course if there are other running tasks then don't sleep
for that long.

def scheduler():
     while 1:
         while sleep_channel.balance > 0:
             heapq.heappush(sleep_queue, sleep_channel.receive())

         # Process wakeup requests
         while sleep_queue:
             now = time.time()
             wakeup_time, wakeup_channel = sleep_queue[0]
             if now >= wakeup_time:
                 heapq.heappop(sleep_queue)
                 wakeup_channel.send(0)
             else:
                 break

         # Figure out if I need to sleep.
         timeout = None

         # If there are any jobs running (besides the main tasklet and
         # this one) then do not sleep
         if stackless.getruncount() > 2:
             timeout = 0
         else:
             # any tasklets to wake up?  Make sure timeout is non- 
negative
             if sleep_queue:
                 timeout = max(0, sleep_queue[0][0] - time.time())
		

         # Process any async requests
         if timeout is None:
             asyncore.loop(count=1)
         else:
             if not asyncore.socket_map:
                 time.sleep(timeout)
             else:
                 asyncore.loop(timeout=timeout, count=1)

         # Let everyone else do something
         stackless.schedule()


To test it out I hacked my fetch_and_reverse function

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel(host).read()[::-1]
     if host.startswith("docs"):
	slib.sleep(5)
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = %s in %.2f s" % (host, digest, dt)


hash of 'www.python.org'/ = 471c97ee4266932e80c36a0103d33863 in 0.66 s
hash of 'planet.python.org'/ = 8e517e6ebbba8dc4f05328f3b9cc990c in  
1.07 s
hash of 'docs.python.org'/ = 24bce4dc223cc5c9047d0c978d41e701 in 5.50 s
total time: 5.50 s

To double check that the code was sleeping I stuck some print
statements in the scheduler's main loop.  Yep, it was. :)

If you're really observant you'll see that the hash for
planet.python.org changed.  It was updated while I wrote this document.


The stackless code implements very little of the functionality in
urllib2.  It's really more a implementation of the socket class than
an HTTP interface, with StacklessFileAdapter more like the result of
socket.makefile()

Richard Tew wrote a "stacklesssocket.py" module which implements the
same API as the standard library socket module using stackless and
asyncore.  Getting the details right is a bit tricky and I won't cover
them here.  It uses the same principles I've already covered so you
should be able to figure out most of it from the source code.

That module implements its own scheduler using a busy-wait loop which
polls for 0.05 seconds then yields control.  I don't like the
busy-wait so I'll replace the function which starts that scheduler and
have it use my own code.

import stacklesssocket
from stacklesssocket import stdsocket as socket

def slib_socket(family=socket.AF_INET, type=socket.SOCK_STREAM,  
proto=0):
     new_socket = socket.socket(family, type, proto)
     return stacklesssocket.stacklesssocket(new_socket)

stacklesssocket.socket = slib_socket

(The stacklesssocket module imports the "socket" module as "stdsocket"
because it also implements the function named "socket".  I access the
socket module through stacklesssocket because of a neat trick I'll do
in a bit.)

In the Python world, replacing code in another module during run-time
like this is sometimes called "monkey patching".  It can lead to
subtle errors.  In this case you'll need to make sure you import
"slib" before getting "stacklessocket.socket" as otherwise you'll get
the original form of that function.

Here's how you might write an HTTP request using the socket module

import sys
import socket

def main():
     soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     soc.connect( ("www.python.org", 80) )
     soc.send("GET / HTTP/1.0\r\nHost: www.python.org\r\n\r\n")
     while 1:
	buf = soc.recv(1024)
	if not buf:
	    break
	sys.stdout.write(buf)

main()

and here's the same using stacklessocket

import sys
# must import slib before stacklesssocket
import slib
import stacklesssocket as socket

def main():
     soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     soc.connect( ("www.python.org", 80) )
     soc.send("GET / HTTP/1.0\r\nHost: www.python.org\r\n\r\n")
     while 1:
	buf = soc.recv(1024)
	if not buf:
	    break
	sys.stdout.write(buf)

slib.main_tasklet(main)()
slib.run_all()

The stacklesssocket module implements the same API as the standard
socket module which means I can take the monkeypatch one step further
and *replace* the system socket with the stackless one, like this:

import sys
# must import slib before stacklesssocket
import slib
import stacklesssocket
sys.modules["socket"] = stacklesssocket

import socket  # this gets stacklesssocket from sys.module

def main():
     soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     soc.connect( ("www.python.org", 80) )
     soc.send("GET / HTTP/1.0\r\nHost: www.python.org\r\n\r\n")
     while 1:
	buf = soc.recv(1024)
	if not buf:
	    break
	sys.stdout.write(buf)

slib.main_tasklet(main)()
slib.run_all()


It's a neat trick but why is it useful?  Because every other Python
module which uses socket will now use stacklessocket.  Fetching URLs
through urllib2.urlopen is automatically compatible with Stackless, as
is pop3, nntp, and other protocols.

To make the code a bit simpler I'll add the following to my "slib"
module somewhere after the slib_socket code I added earlier:

import sys
def use_monkeypatch():
     sys.modules["socket"] = stacklesssocket

Remember the "normal" way to fetch web documents I gave some time ago?

import urllib2
import time
import hashlib

def open_toplevel(host):
     url = "http://" + host + "/"
     return urllib2.urlopen(url)

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel(host).read()[::-1]
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = %s in %.2f s" % (host, digest, dt)

start_time = time.time()
fetch_and_reverse("www.python.org")
fetch_and_reverse("docs.python.org")
fetch_and_reverse("planet.python.org")
print "total time: %.2f s" % (time.time()-start_time,)

I can switch to stackless trivially like this

import slib
import urllib2
import time
import hashlib

def open_toplevel(host):
     url = "http://" + host + "/"
     return urllib2.urlopen(url)

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel(host).read()[::-1]
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = %s in %.2f s" % (host, digest, dt)

start_time = time.time()
slib.main_tasklet(fetch_and_reverse)("www.python.org")
slib.main_tasklet(fetch_and_reverse)("docs.python.org")
slib.main_tasklet(fetch_and_reverse)("planet.python.org")
slib.run_all()
print "total time: %.2f s" % (time.time()-start_time,)

but as you can see from the output the code is still linear because
the read() doesn't know how to cooperate with Stackless.

hash of 'www.python.org'/ = 471c97ee4266932e80c36a0103d33863 in 1.10 s
hash of 'docs.python.org'/ = 24bce4dc223cc5c9047d0c978d41e701 in 0.76 s
hash of 'planet.python.org'/ = 8e517e6ebbba8dc4f05328f3b9cc990c in  
1.11 s
total time: 2.97 s

Now I'll add the one call to monkeypatch the socket module with
stacklesssocket

import slib
# must monkeypatch before any other module imports "socket"
slib.use_monkeypatch()

import urllib2
import time
import hashlib

def open_toplevel(host):
     url = "http://" + host + "/"
     return urllib2.urlopen(url)

def fetch_and_reverse(host):
     t1 = time.time()
     s = open_toplevel(host).read()[::-1]
     dt = time.time() - t1
     digest = hashlib.md5(s).hexdigest()
     print "hash of %r/ = %s in %.2f s" % (host, digest, dt)

start_time = time.time()
slib.main_tasklet(fetch_and_reverse)("www.python.org")
slib.main_tasklet(fetch_and_reverse)("docs.python.org")
slib.main_tasklet(fetch_and_reverse)("planet.python.org")
slib.run_all()
print "total time: %.2f s" % (time.time()-start_time,)

and *poof*

hash of 'docs.python.org'/ = 24bce4dc223cc5c9047d0c978d41e701 in 0.71 s
hash of 'www.python.org'/ = 471c97ee4266932e80c36a0103d33863 in 1.03 s
hash of 'planet.python.org'/ = 8e517e6ebbba8dc4f05328f3b9cc990c in  
1.27 s
total time: 1.69 s

XXX had to patch stacklesssocket.py to do
_fileobject = stdsocket._fileobject

One caution: stacklesssocket.getaddrinfo uses the original getaddrinfo
function, which blocks.  There's no asyncronous version of the
function so if you want a fully non-blocking app you'll need to push
those calls into system thread using something like the call_in_thread
function and modify the scheduler so if there are functions running in
another thread then make sure the max timeout is, say, 0.05 seconds or
so.  I can't figure out any simple way for a thread to notify the
blocking asyncore loop that it has finished and that a tasklet should
run.  The only complex way is through some sort of loop back socket
connection that the thread thumps when it's finished.

=== Conclusion ===

Most people who currently use Stackless got into it for game
programming, or more generically for programming "actors" in a
simulation.  Stackless is more general purpose than that.  It's a type
of threading package and can replace a lot of what you might otherwise
use system threads for.  Because it's more light-weight than threads
it easier to use for different sorts of programming constructs you
wouldn't otherwise do using normal Python with threads.

Getting started in Stackless is an effort.  It's a different way of
thinking for most people and it doesn't have an easy learning curve.
Unlike system threads, doing "normal" things like network I/O requires
a decent understanding of how the pieces go together.

I think I've explained enough of it now to help, but it could be made
easier.  With a bit more work, perhaps a few weeks, the "slib" module
could be finished up and made into a package so people can more easily
start writing Stackless code with very few changes to existing code.

XXX Hmmm, I don't like this conculsion.  Need to think about what my
point is here.


					Andrew
					dalke at dalkescientific.com



_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless



More information about the Stackless mailing list