[Stackless-checkins] CVS: slpdev/src/2.3/src/Stackless/Langley/stackless_fault __init__.py, NONE, 1.1 log.py, NONE, 1.1 pipe.py, NONE, 1.1 twistless.py, NONE, 1.1

Christian Tismer tismer at centera.de
Mon Oct 11 01:30:27 CEST 2004


Update of /home/cvs/slpdev/src/2.3/src/Stackless/Langley/stackless_fault
In directory centera.de:/tmp/cvs-serv23004/src/2.3/src/Stackless/Langley/stackless_fault

Added Files:
	__init__.py log.py pipe.py twistless.py 
Log Message:
A problem submitted by Adam Langley.
Will see how to cure this :-)

--- NEW FILE: __init__.py ---

--- NEW FILE: log.py ---
import logging

logger = None

def init (name):
	global logger

	logger = logging.getLogger ()
	handle = logging.StreamHandler ()
	format = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
	handle.setFormatter (format)
	logger.addHandler (handle)
	logger.setLevel (logging.NOTSET)
	logger.info ('Started')

def debug (msg, *args, **kwargs):
	logger.debug (msg, *args, **kwargs)
def info (msg, *args, **kwargs):
	logger.info (msg, *args, **kwargs)
def warning (msg, *args, **kwargs):
	logger.warning (msg, *args, **kwargs)
def error (msg, *args, **kwargs):
	logger.error (msg, *args, **kwargs)


--- NEW FILE: pipe.py ---
# vim: shiftwidth=8
# vim: softtabstop=8

import log
import struct
import stackless
import traceback
import twistless
import StringIO

[LIST_START, DICT_START, SYMBOL, STREAM, NUM, CAP, LIST_END, DICT_END] = range(8)

class Cap(object):
	def __init__(self, fdnum):
		self.fdnum = fdnum

def serialise(x):
	s = StringIO.StringIO()

	def do_list(l):
		s.write(struct.pack('>B', LIST_START))
		for x in l:
			do_obj(x)
		s.write(struct.pack('>B', LIST_END))
	
	def do_dict(d):
		s.write(struct.pack('>B', DICT_START))
		for (x,y) in d.items():
			assert type(x) == str
			do_sym(x)
			do_obj(y)
		s.write(struct.pack('>B', DICT_END))

	def do_sym(s):
		assert type(s) == str
		s.write(struct.pack('>H', SYMBOL))
		s.write(s.encode('utf8'))
	
	def do_num(n):
		assert type(n) == int or type(n) == long
		s.write(struct.pack('>B', NUM))
		s.write(struct.pack('>Q', n))

	def do_obj(x):
		{str : do_sym, int : do_num, long : do_num,
		 dict : do_dict, list : do_list}[type(x)]

	do_obj(x)
	result = s.getvalue()
	s.close()

	return result

class Pipe(object):
	def __init__(self, fd):
		self.fd = fd
		self.__read_funcs = { LIST_START : self.__list_process,
		                      DICT_START : self.__dict_process,
				      SYMBOL     : self.__sym_process,
				      NUM        : self.__num_process,
				      CAP        : self.__cap_process }
		self.chan = twistless.Channel()
		self.tasklet = stackless.tasklet(self.__read_process)()

	def read(self):
		return self.chan.recv()

	def __read_type():
		return struct.unpack('>B', self.fd.read(1).wait())[0]

	def __read_process():
		try:
			type = self.__read_type()
			fragment = self.__read_funcs[type]()
			self.chan.send(fragment)
		except:
			traceback.print_exc()
		fd.disconnect()

	def __list_process(self):
		l = []
		
		while 1:
			type = self.__read_type()
			if type == LIST_END:
				return l
			l.append(self.__read_funcs[type]())
	
	def __dict_process(self):
		d = {}

		while 1:
			type = self.__read_type()
			if type == DICT_END:
				return d
			if type != SYMBOL:
				raise ValueError("Non symbol as key in dict")
			key = self.__read_funcs[type]()
			type = self.__read_type()
			value = self.__read_funcs[type]()

			d[key] = value
	
	def __sym_process(self):
		length_data = self.fd.read(2).wait()
		length = struct.unpack('>H', length_data)[0]
		return self.fd.read(length).wait()
	
	def __num_process(self):
		return struct.unpack('>Q', self.fd.read(8).wait())[0]
		
	def __cap_process(self):
		return Cap(self.fd.recv_cap())
	
	

--- NEW FILE: twistless.py ---
# vim: shiftwidth=8
# vim: softtabstop=8

import select
import socket
import os
import heapq
import time
import traceback
import errno
import stackless
import struct
import log

class Error (Exception):
	pass

class Selectable (object):
	'''A Selectable gets call from the Reactor when interesting events happen.'''
	def __init__ (self, sock):
		self.__sock = sock
		self.__waiting_on_read  = False
		self.__waiting_on_write = False
		reactor.register (sock.fileno(), self)

	def want_to_read (self, tf):
		'Set the notification for reads to one (tf == True) or off (tf == False)'
		self.__waiting_on_read = tf
		reactor.wait (self.__sock.fileno (), self.__waiting_on_read, self.__waiting_on_write)

	def want_to_write (self, tf):
		'Set the notification for writes to one (tf == True) or off (tf == False)'
		self.__waiting_on_write = tf
		reactor.wait (self.__sock.fileno(), self.__waiting_on_read, self.__waiting_on_write)

	def disconnected (self):
		'Unregister this file descriptor'
		reactor.unregister (self.__sock.fileno ())


class Reactor (object):
	'A poll(2) loop which calls Selectable objects'
	def __init__ (self):
		self.poll = select.poll ()
		self.fds = {}
		self.eventq = []
		self.timeout_id = 0
		self.timeouts = {}

	def timeout_add (self, timeout, func, *args):
		'Call @func after at least @timeout seconds with arguments @args. Returns a timeout handle'
		id = self.timeout_id
		while id in self.timeouts:
			id += 1
		self.timeout_id = id + 1
		if self.timeout_id > 2000000000: # 2 billion
			self.timeout_id = 0
		t = (func, args)
		self.timeouts[id] = t
		heapq.heappush (self.eventq, (time.time () + timeout, id))

		return id
	def timeout_rm (self, id):
		'Cancel the timeout handle @id (as returned by callLater)'
		if id in self.timeouts:
			self.timeouts[id] = None
			
	def iterate (self):
		'Perform one iteration of the event loop'
		timeout = -1 
		curtime = time.time ()
		while 1:
			if not len (self.eventq):
				break
			(t, id) = self.eventq[0]
			if self.timeouts[id]:
				if t <= curtime:
					# this timeout should happen now
					(func, args) = self.timeouts[id]
					try:
						func (*args)
					except:
						traceback.print_exc ()
				  
					del self.timeouts[id]
					heapq.heappop (self.eventq)
				else:
					# this timeout is the next one to happen
					timeout = t - curtime
					break
			else:
				# this timeout has been canceled
				del self.timeouts[id]
				heapq.heappop (self.eventq)
		
		try:
			events = self.poll.poll (timeout*1000)
		except select.error, (err, errstr):
			if err == errno.EINTR:
				return
			raise
		for (fd, event) in events:
			if event & (select.POLLIN | select.POLLHUP | select.POLLERR | select.POLLNVAL):
				try: 
					obj = self.fds[fd]
				except KeyError: 
					continue
				try:
					obj.do_read ()
				except:
					traceback.print_exc ()
			if event & select.POLLOUT:
				try:
					obj = self.fds[fd]
				except KeyError:
					continue
				try:
					obj.do_write ()
				except:
					traceback.print_exc ()

	def run (self):
		'Call iterater() forever'
		while 1:
			stackless.getcurrent().insert ()
			stackless.schedule ()
			self.iterate ()

	def register (self, fd, obj):
		'Register interrest in a file descriptor. Don\'t call - use Selectable'
		assert isinstance (fd, int)
		assert not fd in self.fds
		self.fds[fd] = obj

	def unregister (self, fd):
		'Unregister interest in an fd. Don\'t call - use Selectable'
		assert fd in self.fds
		del self.fds[fd]
		self.poll.unregister (fd)

	def wait (self, fd, readp, writep):
		'Set the interresting events for an fd. Don\'t call - use Selectable'
		assert fd in self.fds
		self.poll.register (fd, (readp and select.POLLIN) | (writep and select.POLLOUT))

reactor = Reactor ()

class BlockPoint (object):
	'Syncronisation point. Many threads can wait(). One can wake them all with wake(), or a single one with wake_one()'
	def __init__ (self):
		self.q = []
	def wait (self):
		chan = stackless.channel ()
		self.q.insert (0, chan)
		chan.receive ()
	def wake_one (self):
		if len (self.q) == 0:
			return
		chan = self.q.pop ()
		chan.send (1)
	def wake (self):
		q = self.q
		self.q = []

		for c in q:
			c.send (1)

class TCPConnect (Selectable):
	'Forms a TCP connection. connect() returns a Deferred which is a socket object or an integer errno'
	def __init__ (self):
		pass
	def connect (self, host, port):
		self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
		self.sock.setblocking (False)
		try:
			self.sock.connect ((host, port))
		except socket.error, (err, strerr):
			if not err in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINPROGRESS]:
				raise
		Selectable.__init__ (self, self.sock)
		self.want_to_write (True)

		self.defer = Deferred ()
		return self.defer

	def do_write (self):
		'Called by the reactor'
		err = self.sock.getsockopt (socket.SOL_SOCKET, socket.SO_ERROR)
		self.disconnected ()
		if err:
			self.sock.close ()
			self.defer.callback (err)
		else:
			self.defer.callback (self.sock)

	do_read = do_write

class TCPAcceptor (Selectable):
	'''Accepts TCP connections. __init__ will raise if there's a problem binding. accept() returns a deferred
	which callbacks with a socket object'''
	def __init__ (self, port):
		self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
		self.sock.setblocking (False)
		self.sock.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		self.sock.bind ( ('0.0.0.0', port) )
		self.sock.listen (1)
		Selectable.__init__ (self, self.sock)
		self.want_to_read (True)
		self.q = []

	def accept (self):
		defer = Deferred ()
		
		self.want_to_read (1)
		self.q.insert (0, defer)

		return defer

	def do_read (self):
		if len (self.q) == 0:
			self.want_to_read (False)
			return

		try:
			con = self.sock.accept ()
		except socket.error, (err, errstr):
			return
		
		self.q.pop ().callback (con)

class TCPConnection (Selectable):
	def __init__ (self, sock):
		self.sock = sock

		Selectable.__init__ (self, sock)

		self.readq  = [] # : [(length, Deferred)]
		self.writeq = [] # : [(str, Deferred)]
		self.writelen = 0
		self.readpoint  = BlockPoint ()
		self.writepoint = BlockPoint ()

		self.readbuf = ''

		self.dead = False

	def disconnect (self):
		if self.dead:
			return
		self.disconnected ()
		self.sock.close ()
		self.dead = True
		for (_, defer) in self.readq:
			defer.callback ('')
		for (_, defer) in self.writeq:
			defer.callback (1)
		
	def do_read (self):
		'''Called by reactor'''

		if len (self.readq) == 0:
			self.want_to_read (False)
			return

		try:
			buf = self.sock.recv (65535)
		except socket.error, (err, errstr):
			if not err in [errno.EAGAIN, errno.EWOULDBLOCK]:
				self.disconnect ()
				raise
			return

		if len (buf) == 0:
			self.disconnect ()

		self.readbuf += buf

		while len (self.readq) > 0 and len (self.readbuf):
			a = self.readq[-1]
			log.debug ('Want to read: %s, %d', str (a), len (self.readbuf))
			# a : (length {-1 == any length}, defer)
			if a[0] == -1 or a[0] <= len (self.readbuf):
				if a[0] == -1:
					buf = self.readbuf
					self.readbuf = ''
				else:
					buf = self.readbuf[:a[0]]
					self.readbuf = self.readbuf[a[0]:]
				self.readq.pop ()
				a[1].callback (buf)
			else:
				break
	
	def read (self, length = -1):
		'''Called by a thread'''

		defer = Deferred ()
		self.want_to_read (True)
		self.readq.insert (0, (length, defer))
		return defer

	def do_write (self):
		'''Called by reactor'''

		if len (self.writeq) == 0:
			self.want_to_write (False)
			return
		
		while len (self.writeq):
			(defer, buf) = self.writeq.pop ()

			try:
				n = self.sock.send (buf)
			except socket.error, (err, errstr):
				if not err in [errno.EAGAIN, errno.EWOULDBLOCK]:
					self.disconnect ()
					raise
				self.writeq.append ((buf, defer))
				return

			self.writelen -= n
			if n < len (buf):
				self.writeq.append ((buf[n:], defer))
				return
			if defer:
				defer.callback (1)
	
	def write (self, buf):
		'''Called by a thread'''

		assert type(buf) == str

		if self.dead:
			raise Error ('Connection closed')
		if self.writelen > 256000:
			defer = Deferred ()
			retdefer = defer
		else:
			defer = None
			retdefer = DefSucceed (1)

		self.writeq.insert (0, (defer, buf))
		self.writelen += len (buf)
		self.want_to_write (True)

		return retdefer

class Deferred (object):
	def __init__ (self):
		self.bp = BlockPoint ()
		self.valset = False # True iff self.val is valid
		self.val = None # value
		self.cb = None # callback function
		self.ext_bp = None # external BlockPoint, used by select
	def callback (self, arg):
		self.val = arg
		self.valset = True
		if self.ext_bp:
			self.ext_bp.wake ()
		else:
			self.bp.wake ()
		if self.cb:
			self.cb (arg)
	def wait (self):
		while not self.valset:
			self.bp.wait ()
		if isinstance (self.val, Exception):
			log.debug ('Deferred raising exception %s', repr (self.val))
			#aise self.val
			return
		return self.val
	def callback_add (self, func):
		self.cb = func
		if self.valset:
			self.cb (self.val)
	def readyp (self):
		return self.valset
	def external_bp_set (self, val):
		self.ext_bp = val

def wait (args):
	bp = BlockPoint ()
	
	l = []
	for (i, a) in enumerate (args):
		if isinstance (a, Deferred):
			if not a.readyp ():
				l.append ( (i, a) )
				a.external_bp_set (bp)
		elif a == None:
			continue
		else:
			o = a.defer_get ()
			if not o.readyp ():
				l.append ( (i, o) )
				o.external_bp_set (bp)
	if len (l) == 0:
		return -1
	bp.wait ()
	idx = -1

	for (i, d) in l:
		if d.readyp ():
			idx = i
		d.external_bp_set (None)

	assert idx != -1
	
	return idx

class DefSucceed (Deferred):
	def __init__ (self, val):
		self.valset = True
		self.val = val
	def callback (self, arg):
		raise Error ('Not implimented')
	def wait (self):
		return self.val

class LineReader (object):
	def __init__ (self):
		self.__linebuf = ''
	def line_read (self):
		while True:
			line = self.__line_get ()
			if line != None:
				return line
			data = self.read().wait ()
			if len (data) == 0:
				raise Error ('connection failed')
			self.__linebuf += data
		return None

	def __line_get (self):
		index = self.__linebuf.find ('\n')

		if index == -1:
			return None
		a = self.__linebuf[:index]
		self.__linebuf = self.__linebuf[index+1:]

		if len (a) > 0 and a[-1] == '\r':
			a = a[:-1]

		return a

class Factory (object):
	def __init__ (self, port, method):
		self.port = port
		self.method = method
		self.acceptor = TCPAcceptor (port)

		stackless.tasklet (self.__run) ()
	
	def __run (self):
		while True:
			(sock, addr) = self.acceptor.accept().wait ()
			self.method (sock)

class Timeout (object):
	def __init__ (self, time):
		self.to = reactor.timeout_add (time, self.timeout)
		self.defer = Deferred ()
		self.fired = False
	def timeout (self):
		self.fired = True
		self.defer.callback (1)
	def defer_get (self):
		return self.defer
	def cancel (self):
		if self.fired:
			return
		reactor.timeout_rm (self.to)

class _TimedOut (Exception):
	pass

class TimedOut (Exception):
	pass

def timed_execute (time, func, *args):
	task = stackless.getcurrent ()
	def timeout_cb (val):
		task.raise_exception (_TimedOut, time, id (func))  # this may be wrong
	timeout = Timeout (time)
	timeout.defer_get ().callback_add (timeout_cb)

	try:
		func (*args)
		timeout.cancel ()
	except _TimedOut, (otime, oid):
		timeout.cancel ()
		if otime == time and oid == id (func):
			raise TimedOut ()
		raise

class Channel (object):
	def __init__ (self):
		self.q = []
		self.waiters = []
	def send (self, val):
		if len (self.waiters):
			self.waiters.pop().callback (val)
		else:
			self.q.insert (0, val)
	def recv (self):
		if len (self.q):
			return DefSucceed (self.q.pop ())

		defer = Deferred ()
		self.waiters.insert (0, defer)

		return defer
		
if __name__ == '__main__':
	def t():
		sock = TCPConnect().connect ('127.0.0.1', 5678).wait ()

		if type (sock) == int:
			print 'error', sock
		else:
			print sock
	
	stackless.tasklet (t) ()

	stackless.schedule ()
	reactor.run ()



_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins



More information about the Stackless-checkins mailing list