[Stackless-checkins] r53839 - stackless/sandbox/examples/channelWithReceiveTimeout.py
richard.tew
python-checkins at python.org
Wed Feb 21 18:42:56 CET 2007
Author: richard.tew
Date: Wed Feb 21 18:42:51 2007
New Revision: 53839
Added:
stackless/sandbox/examples/channelWithReceiveTimeout.py
Log:
Added an example which creates a subclass of channel to timeout tasklets waiting to receive something on it.
Added: stackless/sandbox/examples/channelWithReceiveTimeout.py
==============================================================================
--- (empty file)
+++ stackless/sandbox/examples/channelWithReceiveTimeout.py Wed Feb 21 18:42:51 2007
@@ -0,0 +1,139 @@
+#
+# One way of timing out tasklets waiting on a channel.
+#
+# Author: Richard Tew <richard.m.tew at gmail.com>
+#
+# This code was written to serve as an example of Stackless Python usage.
+# Feel free to email me with any questions, comments, or suggestions for
+# improvement.
+#
+# Limitations of this approach:
+#
+# - There is no need to know the order the tasklets are waiting on the
+# channel because of the uniform expiration period so we can assume
+# that the first to expire will be the first in line on the channel
+# and we can remove it by sending an exception.
+#
+
+import time, weakref
+import stackless
+
+exitScheduler = False
+
+sleepingTasklets = []
+
+# Utility functions for tasklets to call.
+
+def Sleep(secondsToWait):
+ """ Put the current tasklet to sleep for a number of seconds. """
+ channel = stackless.channel()
+ endTime = time.time() + secondsToWait
+ sleepingTasklets.append((endTime, channel))
+ sleepingTasklets.sort()
+ # Block until we get sent an awakening notification.
+ channel.receive()
+
+# Scheduler running related functions.
+
+def ManageSleepingTasklets():
+ """ Awaken all tasklets which are due to be awakened. """
+ while not exitScheduler:
+ while len(sleepingTasklets):
+ endTime = sleepingTasklets[0][0]
+ if endTime > time.time():
+ break
+ channel = sleepingTasklets[0][1]
+ del sleepingTasklets[0]
+ # It does not matter what we send as it is not used.
+ channel.send(None)
+ stackless.schedule()
+
+
+class ChannelWaitExpiration(Exception):
+ pass
+
+class TimeLimitedReceiveChannel(stackless.channel):
+ def __init__(self, *args, **kwargs):
+ self.expirySeconds = 5.0
+ self.receiveAddTimes = []
+
+ stackless.channel.__init__(self, *args, **kwargs)
+
+ def receive(self):
+ if self.balance > 0:
+ # We can return from this immediately.
+ return stackless.channel.receive(self)
+
+ if self.balance == 0:
+ def ManageWaitingTasklets(c):
+ while c.balance < 0:
+ earliestExpirationTime = None
+ for timeAdded, tasklet in self.receiveAddTimes:
+ if not tasklet.blocked:
+ # This tasklet has already been removed but since
+ # it hasn't been scheduled yet it hasn't exited its
+ # call to our receive function and cleared out its
+ # entry. This can happen if the channel is configured
+ # to schedule waiting tasklets when they receive
+ # rather than running them immediately (i.e. via the
+ # 'preference' attribute).
+ continue
+ if timeAdded + self.expirySeconds < time.time():
+ # This tasklet is guaranteed to be the first tasklet
+ # on the channel.
+ self.send_exception(ChannelWaitExpiration)
+ else:
+ # Otherwise note the time at which the next expiring
+ # tasklet expires so we can sleep until then. Since
+ # all expiration delays are the same amount of time
+ # from the period of addition, we can guess the
+ # earliest we will need to check next.
+ earliestExpirationTime = timeAdded + self.expirySeconds
+ break
+ # Wait until the earliest we will need to next check.
+ if earliestExpirationTime is None:
+ Sleep(self.expirySeconds)
+ else:
+ Sleep(time.time() - earliestExpirationTime)
+ stackless.tasklet(ManageWaitingTasklets)(self)
+
+ t = time.time(), weakref.proxy(stackless.getcurrent())
+ self.receiveAddTimes.append(t)
+ self.receiveAddTimes.sort()
+
+ try:
+ ret = stackless.channel.receive(self)
+ self.receiveAddTimes.remove(t)
+ return ret
+ except:
+ # Any exception should be met with the same handling.
+ self.receiveAddTimes.remove(t)
+ raise
+
+def Run():
+ c = TimeLimitedReceiveChannel()
+
+ def Test(c):
+ global exitScheduler
+ t = time.time()
+ print "Waiting for %0.1f seconds" % c.expirySeconds
+ try:
+ c.receive()
+ except Exception, e:
+ if isinstance(e, ChannelWaitExpiration):
+ print "Exited receive, took %0.1f seconds" % (time.time() - t)
+ else:
+ print "Unexpected exit '%s', took %0.1f seconds" % (str(e), time.time() - t)
+ finally:
+ exitScheduler = True
+
+ stackless.tasklet(Test)(c)
+ stackless.tasklet(ManageSleepingTasklets)()
+
+ # This will run until there are no scheduled tasklets. But because we
+ # create one to manage the sleeping tasklets this will mean it will run
+ # indefinitely.
+ stackless.run()
+
+if __name__ == '__main__':
+ Run()
_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins
More information about the Stackless-checkins
mailing list