[Stackless-checkins] CVS: slpdev/src/2.3/dev/Stackless/module channelobject.c, 1.50, 1.51 scheduling.c, 1.107, 1.108

Christian Tismer tismer at centera.de
Thu Jul 8 20:58:32 CEST 2004


Update of /home/cvs/slpdev/src/2.3/dev/Stackless/module
In directory centera.de:/tmp/cvs-serv9390/Stackless/module

Modified Files:
	channelobject.c scheduling.c 
Log Message:
added iterator support to channels,
and sending of sequences.
It works, there is just a case missing to
make it completely stackless.
Changed exceptions to stopiteration in certain places.
Very convenient for main, when consuming all of
a channel's data with list(ch)

Index: channelobject.c
===================================================================
RCS file: /home/cvs/slpdev/src/2.3/dev/Stackless/module/channelobject.c,v
retrieving revision 1.50
retrieving revision 1.51
diff -C2 -d -r1.50 -r1.51
*** a/channelobject.c	2 Jun 2004 23:03:42 -0000	1.50
--- b/channelobject.c	8 Jul 2004 18:58:29 -0000	1.51
***************
*** 454,460 ****
  			RUNTIME_ERROR("this tasklet does not like to be"
  				      " blocked.", NULL);
! 		if (self->flags.closing)
! 			RUNTIME_ERROR("a closed/closing channel cannot"
! 				      " block.", NULL);
  		slp_current_remove();
  		slp_channel_insert(self, source, dir);
--- 454,461 ----
  			RUNTIME_ERROR("this tasklet does not like to be"
  				      " blocked.", NULL);
! 		if (self->flags.closing) {
! 			PyErr_SetNone(PyExc_StopIteration);
! 			return NULL;
! 		}
  		slp_current_remove();
  		slp_channel_insert(self, source, dir);
***************
*** 615,618 ****
--- 616,850 ----
  
  
+ /*********************************************************
+ 
+   Sequences in channels.
+ 
+   Channels can work as an iterator. This removes all
+   call overhead on the receiver side and is optimum.
+ 
+   There is still the sender who has to call a method to
+   transfer data.
+   But to my surprize, supporting a sequence protocol
+   for sending does exactly the desired job.
+ 
+   Example 1:
+ 
+   def sender(ch):
+     while condition:
+       # compute a bulk of results
+       ch.send:sequence(results)
+ 
+   def receiver(ch):
+     for each in ch:
+       # process each
+ 
+   Now, with a little layer, we can even avoid the list
+   creation, and use a generator to yield the results
+   into the channel. This is also helpful to overcome
+   the generator restriction of only one yield level.
+ 
+   Example 2:
+ 
+   def sender(ch, src):
+ 
+     def parser():
+       while more_to_parse:
+         # find next token
+ 	if is_simple_token:
+ 	  yield token
+ 	else:
+ 	  # got nested structure
+ 	  ch.send_sequence(parser())
+ 
+     parser()    
+ 
+  *********************************************************/
+ 
+ 	
+ /*
+  * iterator extension.
+  * This is probably the fastest way to run through a channel
+  * from Python.
+  */
+ 
+ static PyObject *
+ channel_iternext(PyChannelObject *self)
+ {
+ 	if (self->flags.closing && self->balance == 0) {
+ 		/* signal the end of the iteration */
+ 		return NULL;
+ 	}
+ 	return impl_channel_receive(self);
+ }
+ 
+ static PyObject *
+ channel_getiter(PyObject *self)
+ {
+ 	Py_INCREF(self);
+ 	return self;
+ }
+ 
+ /*
+  * sequence support for sending.
+  * This is now absolutely unbeatable, because you can
+  * let a generator yield into the channel, without
+  * any extra method call. :-)
+  */
+ 
+ static char channel_send_sequence__doc__[] =
+ "channel.send_sequence(seq) -- sed a stream of values\n\
+ over the channel. Combined with a generator, this is\n\
+ a very efficient way to build fast pipes.";
+ 
+ /* 
+  * this is the traight-forward and simple implementation,
+  * but here we have almost no speedup, since all switches
+  * are hard.
+  */
+ 
+ static PyObject *
+ _channel_send_sequence(PyChannelObject *self, PyObject *v)
+ {
+ 	PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type;
+ 	PyObject *it;
+ 	int i;
+ 	PyObject *ret;
+ 
+ 	it = PyObject_GetIter(v);
+ 	if (it == NULL)
+ 		return NULL;
+ 
+ 	/* Run iterator to exhaustion. */
+ 	for (i = 0; ; i++) {
+ 		PyObject *item = PyIter_Next(it);
+ 		if (item == NULL) {
+ 			if (PyErr_Occurred())
+ 				goto error;
+ 			break;
+ 		}
+ 		ret = t->send(self, item);
+ 		Py_DECREF(item);
+ 		if (ret == NULL)
+ 			goto error;
+ 		Py_DECREF(ret);
+ 	}
+ 
+ 	Py_DECREF(it);
+ 	return PyInt_FromLong(i);
+ 
+   error:
+ 	Py_DECREF(it);
+ 	return NULL;
+ 
+ }
+ 
+ /*
+  * I tried to make this worker loop look like the simple
+  * implementation. The problem is to leave and enter
+  * the loop all the time. Hopefully the idea is still visible.
+  */
+ 
+ static PyObject *
+ channel_seq_callback(PyFrameObject *_f, PyObject *retval)
+ {
+ 	PyThreadState *ts;
+ 	PyCFrameObject *f = (PyCFrameObject *) _f;
+ 	PyChannelObject *ch;
+ 	PyChannel_HeapType *t;
+ 	PyObject *item;
+ 	int stage = f->n;
+ 
+ 	/* prolog to re-enter the loop */
+ 	if (stage == 1) {
+ 		item = retval;
+ 		goto back_with_data;
+ 	}
+ 	if (retval == NULL)
+ 		goto exit_frame;
+ 
+ 	Py_DECREF(retval);
+ 	retval = NULL;
+ 
+ 	if (stage == 2) {
+ 		goto back_from_send;
+ 	}
+ 
+ 	/* Run iterator to exhaustion. */
+ 	for (; ; f->i++) {
+ 		/* get the data */
+ 		STACKLESS_PROPOSE_ALL();
+ 		item = PyIter_Next(f->ob1);
+ 		if (item == NULL) {
+ 			if (PyErr_Occurred())
+ 				goto exit_frame;
+ 			break;
+ 		}
+ 		if (STACKLESS_UNWINDING(item)) {
+ 			stage = f->n = 1;
+ 			return item;
+ 		}
+ 
+ back_with_data:
+ 		/* send the data */
+ 		ch = (PyChannelObject *) f->ob2;
+ 		t = (PyChannel_HeapType *) ch->ob_type;
+ 		STACKLESS_PROPOSE_ALL();
+ 		retval = t->send(ch, item);
+ 		Py_DECREF(item);
+ 		if (retval == NULL)
+ 			goto exit_frame;
+ 		if (STACKLESS_UNWINDING(retval)) {
+ 			stage = f->n = 2;
+ 			return retval;
+ 		}
+ 		Py_DECREF(retval);
+ back_from_send:
+ 		;
+ 	}
+ 
+ 	retval = PyInt_FromLong(f->i);
+ exit_frame:
+ 
+ 	/* epilog to return from the frame */
+ 	ts = PyThreadState_GET();
+ 	ts->frame = f->f_back;
+ 	Py_DECREF(f);
+ 	return retval;
+ }	
+ 
+ static PyObject *
+ channel_send_sequence(PyChannelObject *self, PyObject *v)
+ {
+ 	STACKLESS_GETARG();
+ 	PyThreadState *ts = PyThreadState_GET();
+ 	PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type;
+ 	PyObject *it;
+ 	PyCFrameObject *f;
+ 
+ 	if (!stackless)
+ 		return _channel_send_sequence(self, v);
+ 
+ 	it = PyObject_GetIter(v);
+ 	if (it == NULL)
+ 		return NULL;
+ 
+ 	f = slp_cframe_new(channel_seq_callback, 1);
+ 	if (f == NULL)
+ 		goto error;
+ 
+ 	f->ob1 = it;
+ 	Py_INCREF(self);
+ 	f->ob2 = (PyObject *) self;
+ 	f->i = 0;
+ 	f->n = 0;
+ 	ts->frame = (PyFrameObject *) f;
+ 	Py_INCREF(Py_None);
+ 	return STACKLESS_PACK(Py_None);
+ error:
+ 	Py_DECREF(it);
+ 	return NULL;
+ }
+ 
+ 
  static char channel_close__doc__[] =
  "channel.close() -- stops the channel from enlarging its queue.\n\
***************
*** 669,672 ****
--- 901,906 ----
  	{"open",	    (PCF)channel_open,		  METH_NOARGS,
  	channel_open__doc__},
+ 	{"send_sequence",   (PCF)channel_send_sequence,	  METH_OS,
+ 	 channel_send__doc__},
  	{NULL,		    NULL}             /* sentinel */
  };
***************
*** 709,714 ****
  	offsetof(PyChannelObject, chan_weakreflist),
  						/* tp_weaklistoffset */
! 	0,					/* tp_iter */
! 	0,					/* tp_iternext */
  	channel_methods,			/* tp_methods */
  	channel_members,			/* tp_members */
--- 943,948 ----
  	offsetof(PyChannelObject, chan_weakreflist),
  						/* tp_weaklistoffset */
! 	(getiterfunc)channel_getiter,		/* tp_iter */
! 	(iternextfunc)channel_iternext,		/* tp_iternext */
  	channel_methods,			/* tp_methods */
  	channel_members,			/* tp_members */

Index: scheduling.c
===================================================================
RCS file: /home/cvs/slpdev/src/2.3/dev/Stackless/module/scheduling.c,v
retrieving revision 1.107
retrieving revision 1.108
diff -C2 -d -r1.107 -r1.108
*** a/scheduling.c	6 Jun 2004 15:04:41 -0000	1.107
--- b/scheduling.c	8 Jul 2004 18:58:29 -0000	1.108
***************
*** 1015,1027 ****
  
  		if (blocked) {
  			/* main was blocked and nobody can send */
  			if (blocked < 0)
! 				RUNTIME_ERROR("the main tasklet is receiving"
! 				    " without a sender available.", NULL);
  			else
! 				RUNTIME_ERROR("the main tasklet is sending"
! 				    " without a receiver available.", NULL);
  			/* fall through to error handling */
! 			retval = NULL;
  		}
  		next = ts->st.main;
--- 1015,1031 ----
  
  		if (blocked) {
+ 			char *txt;
  			/* main was blocked and nobody can send */
  			if (blocked < 0)
! 				txt = "the main tasklet is receiving"
! 				    " without a sender available.";
  			else
! 				txt = "the main tasklet is sending"
! 				    " without a receiver available.";
! 			PyErr_SetString(PyExc_StopIteration, txt);
  			/* fall through to error handling */
! 			retval = slp_curexc_to_bomb();
! 			if (retval == NULL)
! 				return NULL;
  		}
  		next = ts->st.main;


_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins



More information about the Stackless-checkins mailing list