[Stackless] stackless and system threads

Andrew Dalke dalke at dalkescientific.com
Fri Feb 2 21:27:59 CET 2007


I'm working on my presentation for PyCon.  I've gotten to the
section where I'm talking about blocking, and ways to handle
blocking calls.

I want to show how to have stackless work with system threads,
as when dealing with functions which have no non-blocking solution.

My first attempt (plus diagnostic instrumentation) is this.
Note that it does not work correctly.

import time
import stackless
import threading

start_time = time.time()

class CallInThread(threading.Thread):
     def __init__(self, f, args, kwargs, results):
         threading.Thread.__init__(self, target=f, args=args, 
kwargs=kwargs)
         self.results = results
     def run(self):
         try:
             result = super(CallInThread, self).run()
         except Exception, err:
             self.results.send_exception(err)
         else:
             self.results.send(result)
         print "Thread", args, "ends at", time.time() - start_time

def call_in_thread(f, *args, **kwargs):
     results = stackless.channel()
     CallInThread(f, args, kwargs, results).start()
     return results.receive()

def sleep_report(delay):
     call_in_thread(time.sleep, delay)
     print "Elapsed time after", delay, "second delay:",
     print time.time() - start_time

stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
stackless.run()
print "Finished at", time.time()-start_time

When I run it I get the following output, which is not
what I had hoped for.

Elapsed time after 5 second delay: 5.00184106827
Finished at 5.00350213051
Thread (5,) ends at 5.00540995598
Thread (10,) ends at 10.0165581703

The short version is that the scheduler doesn't know about
the threads so stops when it sees a deadlock that doesn't
actually exist.  In detail this output tells me that:

   stackless starts two tasklets
   each start a thread calling time.sleep
   both tasklets are blocking on their respective 'results' queue
   the 5 second thread finishes with the sleep and sends the results 
back to its tasklet
   there is a context shift and the main thread now has control
   the 5 second tasklet gets the data and reports its elapsed time
     "Elapsed time after 5 second delay: 5.00184106827"
   the 5 second tasklet finishes

   At this point the 5 second thread has not yet exited
   At this point the 10 second thread is still sleeping
   At this point the 10 second tasklet is still waiting for a result 
from the thread

   Stackless notices that all tasklets are blocked (there is only 1 
tasklet)
   This causes stackless.run() to go to completion and print
   The main task finishes after printing
     "Finished at 5.00350213051"

   Control goes back to the 5 second thread, which finishes after 
printing
     "Thread (5,) ends at 5.00540995598"

   At this point the 10 second thread is still sleeping
   At this point the 10 second tasklet suspended.  If revived it would
         be waiting for a result from the thread

   The main thread exits.
   The 10 second thread is not demonic and is still waiting.

   The 10 second thread finally wakes up and sends the result to the 
results queue.
   The 10 second thread exits, printing
     "Thread (10,) ends at 10.0165581703"

   Stackless does gc of any reamining tasklets, and the 10 second 
tasklet disappears.


What I'm looking for is a way to tell Stackless that some channels
are held by system threads so even though all tasklets are blocked
there's still the possibility of data coming in.

Is there a way to do that?

I tried starting a tasklet in the other thread but that gave
me a bus error.  Here's what I tried.

import time
import stackless
import threading

start_time = time.time()

class CallInThread(threading.Thread):
     def __init__(self, f, args, kwargs, results):
         threading.Thread.__init__(self, target=f, args=args, 
kwargs=kwargs)
         self.results = results
     def _run(self):
         try:
             result = super(CallInThread, self).run()
         except Exception, err:
             self.results.send_exception(err)
         else:
             self.results.send(result)
         print "Thread", self._Thread__args, "ends at", time.time() - 
start_time
     def run(self):
         stackless.tasklet(self._run)()

def call_in_thread(f, *args, **kwargs):
     results = stackless.channel()
     CallInThread(f, args, kwargs, results).start()
     return results.receive()

def sleep_report(delay):
     call_in_thread(time.sleep, delay)
     print "Elapsed time after", delay, "second delay:",
     print time.time() - start_time

stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
stackless.run()
print "Finished at", time.time()-start_time


The only solution I could figure out was to have a
counter of the number of running threads

_thread_count = 0

with the number tracked like this

def call_in_thread(f, *args, **kwargs):
     global _thread_count
     results = stackless.channel()
     _thread_count += 1
     try:
         CallInThread(f, args, kwargs, results).start()
         return results.receive()
     finally:
         _thread_count -= 1


and a busy loop around stackless.run() like this

stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
while 1:
     stackless.run()
     print "thread count", _thread_count
     if _thread_count == 0:
         break
     time.sleep(0.02)

Here's the output I get, which is as I expect.

Elapsed time after 5 second delay: 5.00111889839
thread count 1
Thread (5,) ends at 5.00368189812
Elapsed time after 10 second delay: 10.0116138458
thread count 0
Finished at 10.0135469437
Thread (10,) ends at 10.0145258904


While it works, I don't like the busy loop.

Any ideas?

					Andrew
					dalke at dalkescientific.com


_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless



More information about the Stackless mailing list