[Stackless] Stackless for 2.1: Progress Report

Jeff Senn senn at maya.com
Tue May 29 22:41:58 CEST 2001


Jeff Rush <jrush at taupro.com> writes:

> Does anyone have/want to create some regression tests
> for Stackless?

Here's a start that includes examples from the uthread page...  (not
at all outfitted to check the output results!!! -- which is hard in this
case) -- but I do notice that there is an exception and also the
output for the examples is suspiciously sequential (wasn't there a 2.0
problem that caused sequential execution?).

--
-Jas  


--snip------ test_thread.py-----------------

#! /usr/bin/env python
"""Test uthread (requires uthread.py)
"""
import uthread,sys
from test_support import verbose, TESTFN, unlink, TestFailed

def main():
    test_print1()
    test_triangle()
    test_queue()
    test_snailq()
    test_synch()
    
sem = uthread.Semaphore() 
pwidth = 0 

def printw(str): 
    global pwidth 
    sem.acquire() 
    if pwidth + len(str) + 1 > 70: 
        print 
        pwidth = 0 
    print str, 
    sys.stdout.flush() 
    pwidth = pwidth + len(str) + 1 
    sem.release() 

def pwclean(): 
    global pwidth 
    if pwidth != 0: 
        print 
        pwidth = 0 

def talker(q): 
    for i in range(100): 
        printw(`i`) 
        wasteTime = 0 
        for j in range(20): 
            wasteTime = wasteTime + 1 
        q.put(i) 
    q.put(None) 

def listener(q): 
    x = 2 
    while x != None: 
        x = q.get() 
        printw('[' + `x` + ']') 

def test_print1():
    preambleString = 'ABCDEFGH' 
    def count(preamble): 
        for i in range(20): 
            printw(preamble + `i`) 
    for ch in preambleString: 
        uthread.new(count, ch) 
    uthread.run() 
    pwclean() 

def test_queue():
    q = uthread.Queue() 
    uthread.new(talker, q) 
    uthread.new(listener, q) 
    uthread.run() 
    pwclean() 

def test_snailq():
    q = uthread.SnailQueue() 
    uthread.new(talker, q) 
    uthread.new(listener, q) 
    uthread.run() 
    pwclean()

def test_triangle():
    def triangle(n):
        sum = 0 
        for i in range(n+1): 
            sum = sum + i 
        printw('%d -> %d, ' % (n, sum))
    for i in range(20): 
        n = int(uthread.random() * 2000) 
        uthread.new(triangle, n) 
    uthread.run() 
    pwclean() 

def test_synch():
    def trySynchronizer(s, n): 
        t = Timer(x) 
        t.start() 
        t.wait() 
        printw('waiting') 
        s.sync() 
        printw('done') 
    N = 20 
    s = uthread.Synchronizer(N) 
    for i in range(N): 
        uthread.new(trySynchronizer, s, random(5)) 
    uthread.run() 
    pwclean()

def test_timer():
    global x
    def tryTimer(n):
        global x        
        t = uthread.Timer(n) 
        t.start() 
        t.wait() 
        printw('%.3g' % x) 
    for i in range(20): 
        x = random(5) 
        uthread.new(tryTimer, x) 
    uthread.run() 
    pwclean()

main()



More information about the Stackless mailing list