[Stackless] stackless and system threads
Andrew Dalke
dalke at dalkescientific.com
Fri Feb 2 21:27:59 CET 2007
I'm working on my presentation for PyCon. I've gotten to the
section where I'm talking about blocking, and ways to handle
blocking calls.
I want to show how to have stackless work with system threads,
as when dealing with functions which have no non-blocking solution.
My first attempt (plus diagnostic instrumentation) is this.
Note that it does not work correctly.
import time
import stackless
import threading
start_time = time.time()
class CallInThread(threading.Thread):
def __init__(self, f, args, kwargs, results):
threading.Thread.__init__(self, target=f, args=args,
kwargs=kwargs)
self.results = results
def run(self):
try:
result = super(CallInThread, self).run()
except Exception, err:
self.results.send_exception(err)
else:
self.results.send(result)
print "Thread", args, "ends at", time.time() - start_time
def call_in_thread(f, *args, **kwargs):
results = stackless.channel()
CallInThread(f, args, kwargs, results).start()
return results.receive()
def sleep_report(delay):
call_in_thread(time.sleep, delay)
print "Elapsed time after", delay, "second delay:",
print time.time() - start_time
stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
stackless.run()
print "Finished at", time.time()-start_time
When I run it I get the following output, which is not
what I had hoped for.
Elapsed time after 5 second delay: 5.00184106827
Finished at 5.00350213051
Thread (5,) ends at 5.00540995598
Thread (10,) ends at 10.0165581703
The short version is that the scheduler doesn't know about
the threads so stops when it sees a deadlock that doesn't
actually exist. In detail this output tells me that:
stackless starts two tasklets
each start a thread calling time.sleep
both tasklets are blocking on their respective 'results' queue
the 5 second thread finishes with the sleep and sends the results
back to its tasklet
there is a context shift and the main thread now has control
the 5 second tasklet gets the data and reports its elapsed time
"Elapsed time after 5 second delay: 5.00184106827"
the 5 second tasklet finishes
At this point the 5 second thread has not yet exited
At this point the 10 second thread is still sleeping
At this point the 10 second tasklet is still waiting for a result
from the thread
Stackless notices that all tasklets are blocked (there is only 1
tasklet)
This causes stackless.run() to go to completion and print
The main task finishes after printing
"Finished at 5.00350213051"
Control goes back to the 5 second thread, which finishes after
printing
"Thread (5,) ends at 5.00540995598"
At this point the 10 second thread is still sleeping
At this point the 10 second tasklet suspended. If revived it would
be waiting for a result from the thread
The main thread exits.
The 10 second thread is not demonic and is still waiting.
The 10 second thread finally wakes up and sends the result to the
results queue.
The 10 second thread exits, printing
"Thread (10,) ends at 10.0165581703"
Stackless does gc of any reamining tasklets, and the 10 second
tasklet disappears.
What I'm looking for is a way to tell Stackless that some channels
are held by system threads so even though all tasklets are blocked
there's still the possibility of data coming in.
Is there a way to do that?
I tried starting a tasklet in the other thread but that gave
me a bus error. Here's what I tried.
import time
import stackless
import threading
start_time = time.time()
class CallInThread(threading.Thread):
def __init__(self, f, args, kwargs, results):
threading.Thread.__init__(self, target=f, args=args,
kwargs=kwargs)
self.results = results
def _run(self):
try:
result = super(CallInThread, self).run()
except Exception, err:
self.results.send_exception(err)
else:
self.results.send(result)
print "Thread", self._Thread__args, "ends at", time.time() -
start_time
def run(self):
stackless.tasklet(self._run)()
def call_in_thread(f, *args, **kwargs):
results = stackless.channel()
CallInThread(f, args, kwargs, results).start()
return results.receive()
def sleep_report(delay):
call_in_thread(time.sleep, delay)
print "Elapsed time after", delay, "second delay:",
print time.time() - start_time
stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
stackless.run()
print "Finished at", time.time()-start_time
The only solution I could figure out was to have a
counter of the number of running threads
_thread_count = 0
with the number tracked like this
def call_in_thread(f, *args, **kwargs):
global _thread_count
results = stackless.channel()
_thread_count += 1
try:
CallInThread(f, args, kwargs, results).start()
return results.receive()
finally:
_thread_count -= 1
and a busy loop around stackless.run() like this
stackless.tasklet(sleep_report)(5)
stackless.tasklet(sleep_report)(10)
while 1:
stackless.run()
print "thread count", _thread_count
if _thread_count == 0:
break
time.sleep(0.02)
Here's the output I get, which is as I expect.
Elapsed time after 5 second delay: 5.00111889839
thread count 1
Thread (5,) ends at 5.00368189812
Elapsed time after 10 second delay: 10.0116138458
thread count 0
Finished at 10.0135469437
Thread (10,) ends at 10.0145258904
While it works, I don't like the busy loop.
Any ideas?
Andrew
dalke at dalkescientific.com
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless
More information about the Stackless
mailing list