[Stackless] One-way channel pairs

Oren Tirosh oren-slp at hishome.net
Sat Feb 7 14:25:53 CET 2004


Attached is a little module that defines the channel2() function. It is
similar to the os.pipe() function, returning a 2-tuple of a receive-only 
channel and a send-only channel. This can help in debugging because the
receive-only side will raise an exception instead of blocking when there 
are no more references to the sending side and vice versa.

With the standard channel it is impossible because you can't tell whether
a tasklet is holding a reference to a channel for sending or receiving 
purposes.

Oren
-------------- next part --------------
""" channel2.py - wrap a stackless channel into a pair of send-only and
receive-only channels, somewhat similar to the os.pipe() call.  

This should aid debugging by raising an exception when writing to a
channel with no potential receivers or reading from a channel with no
potential senders. Note that it is ok to send data into a channel which
has no waiting receivers at the moment, as long as there are still live
references to the receive side of the channel. 

>>> from stackless import *
>>> from channel2 import *
>>> rch, sch = channel2()
>>> def rcv5(ch):
...   for i in range(5):
...     x = ch.receive()
...     print "received:", x
...
>>> t = tasklet(rcv5)
>>> ignore = t(rch)
>>> del rch
>>> sch.send('a')
received: a
>>> sch.send('b')
received: b
>>> sch.send('c')
received: c
>>> sch.send('d')
received: d
>>> sch.send('e')
received: e
>>> sch.send('f')
Traceback (most recent call last):
  File "<stdin>", line 1, in ?
  File "channel2.py", line 96, in send
    raise RuntimeError, "Receiving side of channel no longer exists."
RuntimeError: Receiving side of channel no longer exists.

"""

import stackless, weakref

__all__ = ['channel2']


class channel(stackless.channel):
    """ This channel subclass implements the Python Iterator protocol and
    a close method. These extensions have been discussed on the stackless
    list and, if I understand correctly, are going to be incorporated into
    stackless. """
    def __iter__(self):
        return self

    next = stackless.channel.receive

    def close(self):
        """ The real implementation should also prevent further writing
        to the channel and ensure that StopIteration will be raised for
        all receivers, not just one! """
        if self.queue:
            self.send_exception(StopIteration)


def channel2():
    """ channel() -> receive_only_channel, send_only_channel """
    ch = channel()
    rch = receive_only_channel(ch)
    sch = send_only_channel(ch)
    rch._otherside = weakref.ref(sch)
    sch._otherside = weakref.ref(rch)
    return rch, sch


class receive_only_channel:
    def __init__(self, ch):
        self._channel = ch

    def receive(self):
        if self._otherside() is not None:
            return self._channel.receive()
        else:
            raise RuntimeError, "Sending side of channel no longer exists"

    def __iter__(self):
        return self

    next = receive

        
class send_only_channel:
    def __init__(self, ch):
        self._channel = ch

    def send(self, value):
        if self._otherside() is not None:
            return self._channel.send(value)
        else:
            raise RuntimeError, "Receiving side of channel no longer exists."
        
    def send_exception(self, value):
        if self._otherside() is not None:
            return self._channel.send_exception(value)
        else:
            raise RuntimeError, "Receiving side of channel no longer exists."

    def close(self):
        """ Close the channel explicitly """
        self._channel.close()


if __name__ == "__main__":
    import doctest 
    import channel2
    if doctest.testmod(channel2):
        print "Self test passed."
-------------- next part --------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless


More information about the Stackless mailing list