[Stackless] One-way channel pairs
Oren Tirosh
oren-slp at hishome.net
Sat Feb 7 14:25:53 CET 2004
Attached is a little module that defines the channel2() function. It is
similar to the os.pipe() function, returning a 2-tuple of a receive-only
channel and a send-only channel. This can help in debugging because the
receive-only side will raise an exception instead of blocking when there
are no more references to the sending side and vice versa.
With the standard channel it is impossible because you can't tell whether
a tasklet is holding a reference to a channel for sending or receiving
purposes.
Oren
-------------- next part --------------
""" channel2.py - wrap a stackless channel into a pair of send-only and
receive-only channels, somewhat similar to the os.pipe() call.
This should aid debugging by raising an exception when writing to a
channel with no potential receivers or reading from a channel with no
potential senders. Note that it is ok to send data into a channel which
has no waiting receivers at the moment, as long as there are still live
references to the receive side of the channel.
>>> from stackless import *
>>> from channel2 import *
>>> rch, sch = channel2()
>>> def rcv5(ch):
... for i in range(5):
... x = ch.receive()
... print "received:", x
...
>>> t = tasklet(rcv5)
>>> ignore = t(rch)
>>> del rch
>>> sch.send('a')
received: a
>>> sch.send('b')
received: b
>>> sch.send('c')
received: c
>>> sch.send('d')
received: d
>>> sch.send('e')
received: e
>>> sch.send('f')
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "channel2.py", line 96, in send
raise RuntimeError, "Receiving side of channel no longer exists."
RuntimeError: Receiving side of channel no longer exists.
"""
import stackless, weakref
__all__ = ['channel2']
class channel(stackless.channel):
""" This channel subclass implements the Python Iterator protocol and
a close method. These extensions have been discussed on the stackless
list and, if I understand correctly, are going to be incorporated into
stackless. """
def __iter__(self):
return self
next = stackless.channel.receive
def close(self):
""" The real implementation should also prevent further writing
to the channel and ensure that StopIteration will be raised for
all receivers, not just one! """
if self.queue:
self.send_exception(StopIteration)
def channel2():
""" channel() -> receive_only_channel, send_only_channel """
ch = channel()
rch = receive_only_channel(ch)
sch = send_only_channel(ch)
rch._otherside = weakref.ref(sch)
sch._otherside = weakref.ref(rch)
return rch, sch
class receive_only_channel:
def __init__(self, ch):
self._channel = ch
def receive(self):
if self._otherside() is not None:
return self._channel.receive()
else:
raise RuntimeError, "Sending side of channel no longer exists"
def __iter__(self):
return self
next = receive
class send_only_channel:
def __init__(self, ch):
self._channel = ch
def send(self, value):
if self._otherside() is not None:
return self._channel.send(value)
else:
raise RuntimeError, "Receiving side of channel no longer exists."
def send_exception(self, value):
if self._otherside() is not None:
return self._channel.send_exception(value)
else:
raise RuntimeError, "Receiving side of channel no longer exists."
def close(self):
""" Close the channel explicitly """
self._channel.close()
if __name__ == "__main__":
import doctest
import channel2
if doctest.testmod(channel2):
print "Self test passed."
-------------- next part --------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless
More information about the Stackless
mailing list