[Stackless] how I solved my problems

Paul Sijben sijben at eemvalley.com
Fri Mar 2 16:24:17 CET 2007


well as it turned out one worker thread was not enough. So whenever a
message arrives it is now put in its own tasklet after which the worker
can wait for new input again. The tasklet will run to its end and will
be cleaned up later.

That solved my problems that as it turned out was that the worker thread
received a new request before the old one has finished, yet it needed to
process the new request in order to finish the first...

Oh the joys of parallel programming....


Paul Sijben wrote:
> Andres, Richard,
>
> thanks for the input!
> Indeed I have a multi-threaded UDP server going. I was running out of
> regular threads so I am porting it to stackless.
>
> What I learned over the past days is that unless you have a loop
> constantly doing a blocking(!) recvfrom on a socket you will lose the
> UDP packets that arrive when you are not actively listening. I sorted
> all of that out now by creating a couple of threads, so I have a
> thread that may block all it likes on the socket and a worker thread
> that actually holds the tasklets to handle the incoming messages.
>
> My current problem is a logic problem. Halfway through my test
> scenario the server calls itself. At some point a return on one of
> those calls does not seem to find a tasklet waiting for it.
>
> It is one of those painful things when you have something massively
> parallel....
>
> In the process I made the attached drop-in modules to replace the
> regular queue and threading as part of a quick porting action. (i.e.
> import muQueue as queue). This might be of use to people who have a
> less complex problem on their hands ;-)
>
> Paul
>
> Andrew Francis wrote:
>> Message: 6
>> Date: Wed, 28 Feb 2007 18:26:53 +0000
>> From: "Richard Tew" <richard.m.tew at gmail.com>
>> Subject: Re: [Stackless] stackless socket, curiouser
>> and curiouser
>>   
>>> To: "Paul Sijben" <sijben at eemvalley.com>
>>>     
>> Cc: stackless at stackless.com
>> Message-ID:
>> 	
>>   
>>> If it is more important to you to just get things
>>> done, you might be better off to look at using a
>>> networking framework which is known to
>>> work with Stackless.  I do not have any personal
>>> knowledge of any which do so without some other form
>>> of shenanigans involved.  However there are some
>>> people on the list with experience mixing Stackless
>>> with Twisted who might wish to help out if you choose
>>> to go that direction.
>>>     
>>
>> Paul, I am not sure what you are trying to do but it
>> seems to involve setting up a UDP server. I feel
>> packages like asyncore simply require too much work to
>> do even for the simplest of programmes.
>>
>> Here is a quick example I wrote based on the Twisted
>> Echo server example at
>> http://twistedmatrix.com/projects/core/documentation/howto/udp.html
>>
>> The server listens at port 9999. When a connection
>> comes in, a tasklet prints out connection information.
>> I feel the solution is relatively easy to follow.
>>
>> ~~~~
>>
>> #!/usr/bin/env python
>> """
>> UDPServer.py
>> Andrew Francis
>> March 1st, 2007
>>
>> Simple example of a UDP server using Twisted and
>> Stackless
>>
>> Based on the Echo server example at
>> http://twistedmatrix.com/projects/core/documentation/howto/udp.html
>>
>> <song>Rebellion - Arcade Fire</song>
>> """
>>
>>
>> import stackless
>> from twisted.internet.protocol import DatagramProtocol
>> from twisted.internet import reactor
>>
>> class Echo(DatagramProtocol):
>>     
>>     def __init__(self, requestChannel):
>>         self.channel = requestChannel
>>         return
>>     
>>     
>>     def datagramReceived(self, data, (host, port)):
>>         self.channel.send((data, host, port))
>>         self.transport.write(data, (host, port))
>>
>>
>> class EchoServer(object):
>>     def execute(self, port, requestChannel):
>>         reactor.listenUDP(port, Echo(requestChannel))
>>         reactor.run()
>>
>>
>> class EchoTasklet(object):
>>     def __init__(self, name, channel):
>>         self.channel = channel
>>         return
>>
>>     def execute(self):
>>         while (1):
>>             data, host, port = self.channel.receive()
>>             print "received %r from %s:%d" % (data,
>> host, port)
>>             stackless.schedule()
>>             
>>
>> if __name__ == "__main__":
>>     from twisted.internet import reactor
>>     
>>     """
>>     for simplicity - share the channel
>>     """
>>     requestChannel = stackless.channel()
>>     
>>     listener = EchoTasklet(requestChannel)
>>     server = EchoServer()
>>     
>>     stackless.tasklet(listener.execute)()
>>     stackless.tasklet(server.execute)(9999,
>> requestChannel)
>>     
>>     while (stackless.getruncount() > 1):
>>        stackless.schedule()
>>
>> Cheers,
>> Andrew
>>
>>
>>  
>> ____________________________________________________________________________________
>> The fish are biting. 
>> Get more visitors on your site using Yahoo! Search Marketing.
>> http://searchmarketing.yahoo.com/arp/sponsoredsearch_v2.php
>>   
>
> -- 
> Paul Sijben                    	tel: +31334566488
> Eemvalley Technology       	fax: +31334557523
> the Netherlands                	http://eemvalley.com    
>   
> ------------------------------------------------------------------------
>
> import channelWithReceiveTimeout
> from MercuryFundamentals import debug
> import time
>
> class Empty(Exception):
> 	def __init__(self):
> 		Exception.__init__(self) 
>
> class Queue:
> 	def __init__(self,len=None):
> 		self.__queue=channelWithReceiveTimeout.TimeLimitedReceiveChannel()
> 		debug("__init__%s"%self.__queue)
> 	def put(self,val):
> 		debug("putting %s in %s"%(val,self.__queue))
> 		self.__queue.send((time.time(),val))
>
> 	def get(self,number=None,timeout=None):
> 		debug("timeout %s getting from %s"%(timeout,self.__queue))
> 		(t,val)= self.__queue.receive(timeout)
> 		debug("received from %s data with %s delay"%(self.__queue,(time.time()-t)))
> 		print val
> 		return val
> 	
> ------------------------------------------------------------------------
>
> import stackless
>
> class Thread:
> 	def __init__(self):
> 		self.__running=False
> 		print "tasklet initing.....",
> 		self.__task=stackless.tasklet(self.run)
> 		print self.__task
> 	def start(self):
> 		print "tasklet starting....", self
> 		print "...2...."
> 		self.__task.setup()
> 		print "...3...."	
> 		if not self.__running:
> 			print "...4...."
> 			self.__task.insert()
> 			print "...5...."
> 			self.__running=True
> 	def join(self):
> 		print "------------------- killing tasklet",self
> 		try:
> 			self.__task.kill()
> 		except:
> 			print "############## got exception killing tasklet,%s, ignoring"%self
> 			pass
> 		
> ------------------------------------------------------------------------
>
> _______________________________________________
> Stackless mailing list
> Stackless at stackless.com
> http://www.stackless.com/mailman/listinfo/stackless

-- 
Paul Sijben                    	tel: +31334566488
Eemvalley Technology       	fax: +31334557523
the Netherlands                	http://eemvalley.com    

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.stackless.com/pipermail/stackless/attachments/20070302/acfd820a/attachment.htm>
-------------- next part --------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://www.stackless.com/mailman/listinfo/stackless


More information about the Stackless mailing list