[Stackless-checkins] CVS: slpdev/src/2.3/dev/Stackless/module channelobject.c, 1.50, 1.51 scheduling.c, 1.107, 1.108
Christian Tismer
tismer at centera.de
Thu Jul 8 20:58:32 CEST 2004
Update of /home/cvs/slpdev/src/2.3/dev/Stackless/module
In directory centera.de:/tmp/cvs-serv9390/Stackless/module
Modified Files:
channelobject.c scheduling.c
Log Message:
added iterator support to channels,
and sending of sequences.
It works, there is just a case missing to
make it completely stackless.
Changed exceptions to stopiteration in certain places.
Very convenient for main, when consuming all of
a channel's data with list(ch)
Index: channelobject.c
===================================================================
RCS file: /home/cvs/slpdev/src/2.3/dev/Stackless/module/channelobject.c,v
retrieving revision 1.50
retrieving revision 1.51
diff -C2 -d -r1.50 -r1.51
*** a/channelobject.c 2 Jun 2004 23:03:42 -0000 1.50
--- b/channelobject.c 8 Jul 2004 18:58:29 -0000 1.51
***************
*** 454,460 ****
RUNTIME_ERROR("this tasklet does not like to be"
" blocked.", NULL);
! if (self->flags.closing)
! RUNTIME_ERROR("a closed/closing channel cannot"
! " block.", NULL);
slp_current_remove();
slp_channel_insert(self, source, dir);
--- 454,461 ----
RUNTIME_ERROR("this tasklet does not like to be"
" blocked.", NULL);
! if (self->flags.closing) {
! PyErr_SetNone(PyExc_StopIteration);
! return NULL;
! }
slp_current_remove();
slp_channel_insert(self, source, dir);
***************
*** 615,618 ****
--- 616,850 ----
+ /*********************************************************
+
+ Sequences in channels.
+
+ Channels can work as an iterator. This removes all
+ call overhead on the receiver side and is optimum.
+
+ There is still the sender who has to call a method to
+ transfer data.
+ But to my surprize, supporting a sequence protocol
+ for sending does exactly the desired job.
+
+ Example 1:
+
+ def sender(ch):
+ while condition:
+ # compute a bulk of results
+ ch.send:sequence(results)
+
+ def receiver(ch):
+ for each in ch:
+ # process each
+
+ Now, with a little layer, we can even avoid the list
+ creation, and use a generator to yield the results
+ into the channel. This is also helpful to overcome
+ the generator restriction of only one yield level.
+
+ Example 2:
+
+ def sender(ch, src):
+
+ def parser():
+ while more_to_parse:
+ # find next token
+ if is_simple_token:
+ yield token
+ else:
+ # got nested structure
+ ch.send_sequence(parser())
+
+ parser()
+
+ *********************************************************/
+
+
+ /*
+ * iterator extension.
+ * This is probably the fastest way to run through a channel
+ * from Python.
+ */
+
+ static PyObject *
+ channel_iternext(PyChannelObject *self)
+ {
+ if (self->flags.closing && self->balance == 0) {
+ /* signal the end of the iteration */
+ return NULL;
+ }
+ return impl_channel_receive(self);
+ }
+
+ static PyObject *
+ channel_getiter(PyObject *self)
+ {
+ Py_INCREF(self);
+ return self;
+ }
+
+ /*
+ * sequence support for sending.
+ * This is now absolutely unbeatable, because you can
+ * let a generator yield into the channel, without
+ * any extra method call. :-)
+ */
+
+ static char channel_send_sequence__doc__[] =
+ "channel.send_sequence(seq) -- sed a stream of values\n\
+ over the channel. Combined with a generator, this is\n\
+ a very efficient way to build fast pipes.";
+
+ /*
+ * this is the traight-forward and simple implementation,
+ * but here we have almost no speedup, since all switches
+ * are hard.
+ */
+
+ static PyObject *
+ _channel_send_sequence(PyChannelObject *self, PyObject *v)
+ {
+ PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type;
+ PyObject *it;
+ int i;
+ PyObject *ret;
+
+ it = PyObject_GetIter(v);
+ if (it == NULL)
+ return NULL;
+
+ /* Run iterator to exhaustion. */
+ for (i = 0; ; i++) {
+ PyObject *item = PyIter_Next(it);
+ if (item == NULL) {
+ if (PyErr_Occurred())
+ goto error;
+ break;
+ }
+ ret = t->send(self, item);
+ Py_DECREF(item);
+ if (ret == NULL)
+ goto error;
+ Py_DECREF(ret);
+ }
+
+ Py_DECREF(it);
+ return PyInt_FromLong(i);
+
+ error:
+ Py_DECREF(it);
+ return NULL;
+
+ }
+
+ /*
+ * I tried to make this worker loop look like the simple
+ * implementation. The problem is to leave and enter
+ * the loop all the time. Hopefully the idea is still visible.
+ */
+
+ static PyObject *
+ channel_seq_callback(PyFrameObject *_f, PyObject *retval)
+ {
+ PyThreadState *ts;
+ PyCFrameObject *f = (PyCFrameObject *) _f;
+ PyChannelObject *ch;
+ PyChannel_HeapType *t;
+ PyObject *item;
+ int stage = f->n;
+
+ /* prolog to re-enter the loop */
+ if (stage == 1) {
+ item = retval;
+ goto back_with_data;
+ }
+ if (retval == NULL)
+ goto exit_frame;
+
+ Py_DECREF(retval);
+ retval = NULL;
+
+ if (stage == 2) {
+ goto back_from_send;
+ }
+
+ /* Run iterator to exhaustion. */
+ for (; ; f->i++) {
+ /* get the data */
+ STACKLESS_PROPOSE_ALL();
+ item = PyIter_Next(f->ob1);
+ if (item == NULL) {
+ if (PyErr_Occurred())
+ goto exit_frame;
+ break;
+ }
+ if (STACKLESS_UNWINDING(item)) {
+ stage = f->n = 1;
+ return item;
+ }
+
+ back_with_data:
+ /* send the data */
+ ch = (PyChannelObject *) f->ob2;
+ t = (PyChannel_HeapType *) ch->ob_type;
+ STACKLESS_PROPOSE_ALL();
+ retval = t->send(ch, item);
+ Py_DECREF(item);
+ if (retval == NULL)
+ goto exit_frame;
+ if (STACKLESS_UNWINDING(retval)) {
+ stage = f->n = 2;
+ return retval;
+ }
+ Py_DECREF(retval);
+ back_from_send:
+ ;
+ }
+
+ retval = PyInt_FromLong(f->i);
+ exit_frame:
+
+ /* epilog to return from the frame */
+ ts = PyThreadState_GET();
+ ts->frame = f->f_back;
+ Py_DECREF(f);
+ return retval;
+ }
+
+ static PyObject *
+ channel_send_sequence(PyChannelObject *self, PyObject *v)
+ {
+ STACKLESS_GETARG();
+ PyThreadState *ts = PyThreadState_GET();
+ PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type;
+ PyObject *it;
+ PyCFrameObject *f;
+
+ if (!stackless)
+ return _channel_send_sequence(self, v);
+
+ it = PyObject_GetIter(v);
+ if (it == NULL)
+ return NULL;
+
+ f = slp_cframe_new(channel_seq_callback, 1);
+ if (f == NULL)
+ goto error;
+
+ f->ob1 = it;
+ Py_INCREF(self);
+ f->ob2 = (PyObject *) self;
+ f->i = 0;
+ f->n = 0;
+ ts->frame = (PyFrameObject *) f;
+ Py_INCREF(Py_None);
+ return STACKLESS_PACK(Py_None);
+ error:
+ Py_DECREF(it);
+ return NULL;
+ }
+
+
static char channel_close__doc__[] =
"channel.close() -- stops the channel from enlarging its queue.\n\
***************
*** 669,672 ****
--- 901,906 ----
{"open", (PCF)channel_open, METH_NOARGS,
channel_open__doc__},
+ {"send_sequence", (PCF)channel_send_sequence, METH_OS,
+ channel_send__doc__},
{NULL, NULL} /* sentinel */
};
***************
*** 709,714 ****
offsetof(PyChannelObject, chan_weakreflist),
/* tp_weaklistoffset */
! 0, /* tp_iter */
! 0, /* tp_iternext */
channel_methods, /* tp_methods */
channel_members, /* tp_members */
--- 943,948 ----
offsetof(PyChannelObject, chan_weakreflist),
/* tp_weaklistoffset */
! (getiterfunc)channel_getiter, /* tp_iter */
! (iternextfunc)channel_iternext, /* tp_iternext */
channel_methods, /* tp_methods */
channel_members, /* tp_members */
Index: scheduling.c
===================================================================
RCS file: /home/cvs/slpdev/src/2.3/dev/Stackless/module/scheduling.c,v
retrieving revision 1.107
retrieving revision 1.108
diff -C2 -d -r1.107 -r1.108
*** a/scheduling.c 6 Jun 2004 15:04:41 -0000 1.107
--- b/scheduling.c 8 Jul 2004 18:58:29 -0000 1.108
***************
*** 1015,1027 ****
if (blocked) {
/* main was blocked and nobody can send */
if (blocked < 0)
! RUNTIME_ERROR("the main tasklet is receiving"
! " without a sender available.", NULL);
else
! RUNTIME_ERROR("the main tasklet is sending"
! " without a receiver available.", NULL);
/* fall through to error handling */
! retval = NULL;
}
next = ts->st.main;
--- 1015,1031 ----
if (blocked) {
+ char *txt;
/* main was blocked and nobody can send */
if (blocked < 0)
! txt = "the main tasklet is receiving"
! " without a sender available.";
else
! txt = "the main tasklet is sending"
! " without a receiver available.";
! PyErr_SetString(PyExc_StopIteration, txt);
/* fall through to error handling */
! retval = slp_curexc_to_bomb();
! if (retval == NULL)
! return NULL;
}
next = ts->st.main;
_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins
More information about the Stackless-checkins
mailing list