[Stackless-checkins] CVS: slpdev/src/2.3/src/Stackless/Langley/stackless_fault __init__.py, NONE, 1.1 log.py, NONE, 1.1 pipe.py, NONE, 1.1 twistless.py, NONE, 1.1
Christian Tismer
tismer at centera.de
Mon Oct 11 01:30:27 CEST 2004
Update of /home/cvs/slpdev/src/2.3/src/Stackless/Langley/stackless_fault
In directory centera.de:/tmp/cvs-serv23004/src/2.3/src/Stackless/Langley/stackless_fault
Added Files:
__init__.py log.py pipe.py twistless.py
Log Message:
A problem submitted by Adam Langley.
Will see how to cure this :-)
--- NEW FILE: __init__.py ---
--- NEW FILE: log.py ---
import logging
logger = None
def init (name):
global logger
logger = logging.getLogger ()
handle = logging.StreamHandler ()
format = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handle.setFormatter (format)
logger.addHandler (handle)
logger.setLevel (logging.NOTSET)
logger.info ('Started')
def debug (msg, *args, **kwargs):
logger.debug (msg, *args, **kwargs)
def info (msg, *args, **kwargs):
logger.info (msg, *args, **kwargs)
def warning (msg, *args, **kwargs):
logger.warning (msg, *args, **kwargs)
def error (msg, *args, **kwargs):
logger.error (msg, *args, **kwargs)
--- NEW FILE: pipe.py ---
# vim: shiftwidth=8
# vim: softtabstop=8
import log
import struct
import stackless
import traceback
import twistless
import StringIO
[LIST_START, DICT_START, SYMBOL, STREAM, NUM, CAP, LIST_END, DICT_END] = range(8)
class Cap(object):
def __init__(self, fdnum):
self.fdnum = fdnum
def serialise(x):
s = StringIO.StringIO()
def do_list(l):
s.write(struct.pack('>B', LIST_START))
for x in l:
do_obj(x)
s.write(struct.pack('>B', LIST_END))
def do_dict(d):
s.write(struct.pack('>B', DICT_START))
for (x,y) in d.items():
assert type(x) == str
do_sym(x)
do_obj(y)
s.write(struct.pack('>B', DICT_END))
def do_sym(s):
assert type(s) == str
s.write(struct.pack('>H', SYMBOL))
s.write(s.encode('utf8'))
def do_num(n):
assert type(n) == int or type(n) == long
s.write(struct.pack('>B', NUM))
s.write(struct.pack('>Q', n))
def do_obj(x):
{str : do_sym, int : do_num, long : do_num,
dict : do_dict, list : do_list}[type(x)]
do_obj(x)
result = s.getvalue()
s.close()
return result
class Pipe(object):
def __init__(self, fd):
self.fd = fd
self.__read_funcs = { LIST_START : self.__list_process,
DICT_START : self.__dict_process,
SYMBOL : self.__sym_process,
NUM : self.__num_process,
CAP : self.__cap_process }
self.chan = twistless.Channel()
self.tasklet = stackless.tasklet(self.__read_process)()
def read(self):
return self.chan.recv()
def __read_type():
return struct.unpack('>B', self.fd.read(1).wait())[0]
def __read_process():
try:
type = self.__read_type()
fragment = self.__read_funcs[type]()
self.chan.send(fragment)
except:
traceback.print_exc()
fd.disconnect()
def __list_process(self):
l = []
while 1:
type = self.__read_type()
if type == LIST_END:
return l
l.append(self.__read_funcs[type]())
def __dict_process(self):
d = {}
while 1:
type = self.__read_type()
if type == DICT_END:
return d
if type != SYMBOL:
raise ValueError("Non symbol as key in dict")
key = self.__read_funcs[type]()
type = self.__read_type()
value = self.__read_funcs[type]()
d[key] = value
def __sym_process(self):
length_data = self.fd.read(2).wait()
length = struct.unpack('>H', length_data)[0]
return self.fd.read(length).wait()
def __num_process(self):
return struct.unpack('>Q', self.fd.read(8).wait())[0]
def __cap_process(self):
return Cap(self.fd.recv_cap())
--- NEW FILE: twistless.py ---
# vim: shiftwidth=8
# vim: softtabstop=8
import select
import socket
import os
import heapq
import time
import traceback
import errno
import stackless
import struct
import log
class Error (Exception):
pass
class Selectable (object):
'''A Selectable gets call from the Reactor when interesting events happen.'''
def __init__ (self, sock):
self.__sock = sock
self.__waiting_on_read = False
self.__waiting_on_write = False
reactor.register (sock.fileno(), self)
def want_to_read (self, tf):
'Set the notification for reads to one (tf == True) or off (tf == False)'
self.__waiting_on_read = tf
reactor.wait (self.__sock.fileno (), self.__waiting_on_read, self.__waiting_on_write)
def want_to_write (self, tf):
'Set the notification for writes to one (tf == True) or off (tf == False)'
self.__waiting_on_write = tf
reactor.wait (self.__sock.fileno(), self.__waiting_on_read, self.__waiting_on_write)
def disconnected (self):
'Unregister this file descriptor'
reactor.unregister (self.__sock.fileno ())
class Reactor (object):
'A poll(2) loop which calls Selectable objects'
def __init__ (self):
self.poll = select.poll ()
self.fds = {}
self.eventq = []
self.timeout_id = 0
self.timeouts = {}
def timeout_add (self, timeout, func, *args):
'Call @func after at least @timeout seconds with arguments @args. Returns a timeout handle'
id = self.timeout_id
while id in self.timeouts:
id += 1
self.timeout_id = id + 1
if self.timeout_id > 2000000000: # 2 billion
self.timeout_id = 0
t = (func, args)
self.timeouts[id] = t
heapq.heappush (self.eventq, (time.time () + timeout, id))
return id
def timeout_rm (self, id):
'Cancel the timeout handle @id (as returned by callLater)'
if id in self.timeouts:
self.timeouts[id] = None
def iterate (self):
'Perform one iteration of the event loop'
timeout = -1
curtime = time.time ()
while 1:
if not len (self.eventq):
break
(t, id) = self.eventq[0]
if self.timeouts[id]:
if t <= curtime:
# this timeout should happen now
(func, args) = self.timeouts[id]
try:
func (*args)
except:
traceback.print_exc ()
del self.timeouts[id]
heapq.heappop (self.eventq)
else:
# this timeout is the next one to happen
timeout = t - curtime
break
else:
# this timeout has been canceled
del self.timeouts[id]
heapq.heappop (self.eventq)
try:
events = self.poll.poll (timeout*1000)
except select.error, (err, errstr):
if err == errno.EINTR:
return
raise
for (fd, event) in events:
if event & (select.POLLIN | select.POLLHUP | select.POLLERR | select.POLLNVAL):
try:
obj = self.fds[fd]
except KeyError:
continue
try:
obj.do_read ()
except:
traceback.print_exc ()
if event & select.POLLOUT:
try:
obj = self.fds[fd]
except KeyError:
continue
try:
obj.do_write ()
except:
traceback.print_exc ()
def run (self):
'Call iterater() forever'
while 1:
stackless.getcurrent().insert ()
stackless.schedule ()
self.iterate ()
def register (self, fd, obj):
'Register interrest in a file descriptor. Don\'t call - use Selectable'
assert isinstance (fd, int)
assert not fd in self.fds
self.fds[fd] = obj
def unregister (self, fd):
'Unregister interest in an fd. Don\'t call - use Selectable'
assert fd in self.fds
del self.fds[fd]
self.poll.unregister (fd)
def wait (self, fd, readp, writep):
'Set the interresting events for an fd. Don\'t call - use Selectable'
assert fd in self.fds
self.poll.register (fd, (readp and select.POLLIN) | (writep and select.POLLOUT))
reactor = Reactor ()
class BlockPoint (object):
'Syncronisation point. Many threads can wait(). One can wake them all with wake(), or a single one with wake_one()'
def __init__ (self):
self.q = []
def wait (self):
chan = stackless.channel ()
self.q.insert (0, chan)
chan.receive ()
def wake_one (self):
if len (self.q) == 0:
return
chan = self.q.pop ()
chan.send (1)
def wake (self):
q = self.q
self.q = []
for c in q:
c.send (1)
class TCPConnect (Selectable):
'Forms a TCP connection. connect() returns a Deferred which is a socket object or an integer errno'
def __init__ (self):
pass
def connect (self, host, port):
self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking (False)
try:
self.sock.connect ((host, port))
except socket.error, (err, strerr):
if not err in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINPROGRESS]:
raise
Selectable.__init__ (self, self.sock)
self.want_to_write (True)
self.defer = Deferred ()
return self.defer
def do_write (self):
'Called by the reactor'
err = self.sock.getsockopt (socket.SOL_SOCKET, socket.SO_ERROR)
self.disconnected ()
if err:
self.sock.close ()
self.defer.callback (err)
else:
self.defer.callback (self.sock)
do_read = do_write
class TCPAcceptor (Selectable):
'''Accepts TCP connections. __init__ will raise if there's a problem binding. accept() returns a deferred
which callbacks with a socket object'''
def __init__ (self, port):
self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking (False)
self.sock.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind ( ('0.0.0.0', port) )
self.sock.listen (1)
Selectable.__init__ (self, self.sock)
self.want_to_read (True)
self.q = []
def accept (self):
defer = Deferred ()
self.want_to_read (1)
self.q.insert (0, defer)
return defer
def do_read (self):
if len (self.q) == 0:
self.want_to_read (False)
return
try:
con = self.sock.accept ()
except socket.error, (err, errstr):
return
self.q.pop ().callback (con)
class TCPConnection (Selectable):
def __init__ (self, sock):
self.sock = sock
Selectable.__init__ (self, sock)
self.readq = [] # : [(length, Deferred)]
self.writeq = [] # : [(str, Deferred)]
self.writelen = 0
self.readpoint = BlockPoint ()
self.writepoint = BlockPoint ()
self.readbuf = ''
self.dead = False
def disconnect (self):
if self.dead:
return
self.disconnected ()
self.sock.close ()
self.dead = True
for (_, defer) in self.readq:
defer.callback ('')
for (_, defer) in self.writeq:
defer.callback (1)
def do_read (self):
'''Called by reactor'''
if len (self.readq) == 0:
self.want_to_read (False)
return
try:
buf = self.sock.recv (65535)
except socket.error, (err, errstr):
if not err in [errno.EAGAIN, errno.EWOULDBLOCK]:
self.disconnect ()
raise
return
if len (buf) == 0:
self.disconnect ()
self.readbuf += buf
while len (self.readq) > 0 and len (self.readbuf):
a = self.readq[-1]
log.debug ('Want to read: %s, %d', str (a), len (self.readbuf))
# a : (length {-1 == any length}, defer)
if a[0] == -1 or a[0] <= len (self.readbuf):
if a[0] == -1:
buf = self.readbuf
self.readbuf = ''
else:
buf = self.readbuf[:a[0]]
self.readbuf = self.readbuf[a[0]:]
self.readq.pop ()
a[1].callback (buf)
else:
break
def read (self, length = -1):
'''Called by a thread'''
defer = Deferred ()
self.want_to_read (True)
self.readq.insert (0, (length, defer))
return defer
def do_write (self):
'''Called by reactor'''
if len (self.writeq) == 0:
self.want_to_write (False)
return
while len (self.writeq):
(defer, buf) = self.writeq.pop ()
try:
n = self.sock.send (buf)
except socket.error, (err, errstr):
if not err in [errno.EAGAIN, errno.EWOULDBLOCK]:
self.disconnect ()
raise
self.writeq.append ((buf, defer))
return
self.writelen -= n
if n < len (buf):
self.writeq.append ((buf[n:], defer))
return
if defer:
defer.callback (1)
def write (self, buf):
'''Called by a thread'''
assert type(buf) == str
if self.dead:
raise Error ('Connection closed')
if self.writelen > 256000:
defer = Deferred ()
retdefer = defer
else:
defer = None
retdefer = DefSucceed (1)
self.writeq.insert (0, (defer, buf))
self.writelen += len (buf)
self.want_to_write (True)
return retdefer
class Deferred (object):
def __init__ (self):
self.bp = BlockPoint ()
self.valset = False # True iff self.val is valid
self.val = None # value
self.cb = None # callback function
self.ext_bp = None # external BlockPoint, used by select
def callback (self, arg):
self.val = arg
self.valset = True
if self.ext_bp:
self.ext_bp.wake ()
else:
self.bp.wake ()
if self.cb:
self.cb (arg)
def wait (self):
while not self.valset:
self.bp.wait ()
if isinstance (self.val, Exception):
log.debug ('Deferred raising exception %s', repr (self.val))
#aise self.val
return
return self.val
def callback_add (self, func):
self.cb = func
if self.valset:
self.cb (self.val)
def readyp (self):
return self.valset
def external_bp_set (self, val):
self.ext_bp = val
def wait (args):
bp = BlockPoint ()
l = []
for (i, a) in enumerate (args):
if isinstance (a, Deferred):
if not a.readyp ():
l.append ( (i, a) )
a.external_bp_set (bp)
elif a == None:
continue
else:
o = a.defer_get ()
if not o.readyp ():
l.append ( (i, o) )
o.external_bp_set (bp)
if len (l) == 0:
return -1
bp.wait ()
idx = -1
for (i, d) in l:
if d.readyp ():
idx = i
d.external_bp_set (None)
assert idx != -1
return idx
class DefSucceed (Deferred):
def __init__ (self, val):
self.valset = True
self.val = val
def callback (self, arg):
raise Error ('Not implimented')
def wait (self):
return self.val
class LineReader (object):
def __init__ (self):
self.__linebuf = ''
def line_read (self):
while True:
line = self.__line_get ()
if line != None:
return line
data = self.read().wait ()
if len (data) == 0:
raise Error ('connection failed')
self.__linebuf += data
return None
def __line_get (self):
index = self.__linebuf.find ('\n')
if index == -1:
return None
a = self.__linebuf[:index]
self.__linebuf = self.__linebuf[index+1:]
if len (a) > 0 and a[-1] == '\r':
a = a[:-1]
return a
class Factory (object):
def __init__ (self, port, method):
self.port = port
self.method = method
self.acceptor = TCPAcceptor (port)
stackless.tasklet (self.__run) ()
def __run (self):
while True:
(sock, addr) = self.acceptor.accept().wait ()
self.method (sock)
class Timeout (object):
def __init__ (self, time):
self.to = reactor.timeout_add (time, self.timeout)
self.defer = Deferred ()
self.fired = False
def timeout (self):
self.fired = True
self.defer.callback (1)
def defer_get (self):
return self.defer
def cancel (self):
if self.fired:
return
reactor.timeout_rm (self.to)
class _TimedOut (Exception):
pass
class TimedOut (Exception):
pass
def timed_execute (time, func, *args):
task = stackless.getcurrent ()
def timeout_cb (val):
task.raise_exception (_TimedOut, time, id (func)) # this may be wrong
timeout = Timeout (time)
timeout.defer_get ().callback_add (timeout_cb)
try:
func (*args)
timeout.cancel ()
except _TimedOut, (otime, oid):
timeout.cancel ()
if otime == time and oid == id (func):
raise TimedOut ()
raise
class Channel (object):
def __init__ (self):
self.q = []
self.waiters = []
def send (self, val):
if len (self.waiters):
self.waiters.pop().callback (val)
else:
self.q.insert (0, val)
def recv (self):
if len (self.q):
return DefSucceed (self.q.pop ())
defer = Deferred ()
self.waiters.insert (0, defer)
return defer
if __name__ == '__main__':
def t():
sock = TCPConnect().connect ('127.0.0.1', 5678).wait ()
if type (sock) == int:
print 'error', sock
else:
print sock
stackless.tasklet (t) ()
stackless.schedule ()
reactor.run ()
_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins
More information about the Stackless-checkins
mailing list