[Stackless] How to "transfer" a tasklet to the current thread

Anselm Kruis a.kruis at science-computing.de
Mon Nov 11 20:22:51 CET 2013


Hi Kristján,

many thanks for the code. I have a very tight schedule, but at least I 
found some time to test your changes.


Am 11.11.2013 16:34, schrieb Kristján Valur Jónsson:
> Ok, I have made some commits to the 2.7 branch.
> I found some bugs which are probably the reason you were seeing a crash when not manually cloning the frames.
> I wasn't able to reproduce your crash, but at least I found something very broken which I fixed.

My test program was fairly simple. Sorry, I missed to post it.

---8<-------8<-------8<-------8<-------
import stackless

def a_function():
     print "Hello, World!"

task=stackless.tasklet(a_function)()
task.remove()
reducedTask = task.__reduce__()
task.bind(None)
task = reducedTask[0](*reducedTask[1])
task.__setstate__(reducedTask[2])
task.insert()
stackless.run()  # crash
---8<-------8<-------8<-------8<-------

Your commit

changeset:   82977:ee2e4345ecf5
branch:      2.7-slp
user:        Kristjan Valur Jonsson <sweskman at gmail.com>
date:        Mon Nov 11 14:50:14 2013 +0000
files:       Stackless/pickling/prickelpit.c
description:
fix slp_clone_frame().  It was completely broken.

indeed fixes this crash.

> I have also added the experimental bind_thread() method, and some unittests in test_thread.
> Can you see If it works as expected?

I did some quick and dirty tests. One test fails:

---8<-------8<-------8<-------8<-------
class BindThreadTest(TestCase):
     def setUp(self):
         super(BindThreadTest, self).setUp()
         self.taskletExecuted = False

     def create_tasklet(self, action, *args, **kw):
         self.tasklet = stackless.tasklet(action)(*args, **kw)

     def tasklet_action(self):
         self.taskletExecuted = True

     def testForeignThread_scheduled(self):
         theThread = threading.Thread(target=self.create_tasklet, 
args=(self.tasklet_action,))
         theThread.start()
         theThread.join()
         t = self.tasklet
         self.assertEqual(t.thread_id, theThread.ident)
         self.assertTrue(t.alive)
         self.assertFalse(t.paused)

         t.bind_thread()

         self.assertTrue(t.alive)
         self.assertFalse(t.paused)

         self.assertNotEqual(t.thread_id, theThread.ident)
         self.assertEqual(t.thread_id, thread.get_ident())
         stackless.run()
         self.assertTrue(self.taskletExecuted)  # fails
         self.assertFalse(t.alive)	       # fails too	
---8<-------8<-------8<-------8<-------

Perhaps we should allow bind_thread() only if a tasklet is paused. How 
can we prevent race conditions, if the tasklet to be bound to another 
thread gets scheduled on the originating thread?

And another point. Perhaps we could add an optional argument to 
bind_tread() to specify the target thread. If the argument is not given, 
bind to the current thread.

Regards
   Anselm


p.s. did you notice http://www.stackless.com/ticket/26?





>
> K
>
>> -----Original Message-----
>> From: stackless-bounces at stackless.com [mailto:stackless-
>> bounces at stackless.com] On Behalf Of Anselm Kruis
>> Sent: 11. nóvember 2013 09:07
>> To: stackless at stackless.com
>> Subject: Re: [Stackless] How to "transfer" a tasklet to the current thread
>>
>> Hi Kristján,
>>
>>
>> Am 08.11.2013 10:31, schrieb Kristján Valur Jónsson:
>>>
>>>
>>>> -----Original Message-----
>>> data and decides, that the tasklet shall be run on a worker thread.
>>>>
>>>> Problem: as far as I know, a tasklet bound to a thread (via cstate)
>>>> and this association can't be changed. The best we can do is to
>>>> create a new tasklet on the worker thread, that belongs to the worker
>> thread.
>>>
>>> This is right.  Although actually, for picklable tasklets (tasklets
>>> without a c state) this restriction Is artificial.
>>
>> Adding a method to switch the thread could be a useful extension.
>>
>>
>>>
>>>>        reducedTask = task.__reduce__()
>>>>        # raise RuntimeError, if task is alive but not paused
>>>>        task.bind(None)
>>>>
>>>>        if True:  # python will crash if set to False
>>>>            frameList = reducedTask[2][3]
>>>>            for i in range(len(frameList)):
>>>>                frame = frameList[i]
>>>>                if isinstance(frame, stackless.cframe):
>>>>                    reducedFrame = frame.__reduce__()
>>>>                    newFrame = reducedFrame[0](*reducedFrame[1])
>>>>                    newFrame.__setstate__(reducedFrame[2])
>>>>                    frameList[i] = newFrame
>>>>        # rebind the task
>>>>        task = reducedTask[0](*reducedTask[1])
>>>>        task.__setstate__(reducedTask[2])
>>>>        return task
>>>>
>>>
>>> Looks like you are doing recursive pickling of the tasklet by hand, but only
>> for the cframes, not regular frames.
>>> Why can't you just use
>>> return pickle.loads(pickle.dumps(task))?
>>
>> A simple pickle.loads(pickle.dumps(task)) creates a deep-copy of the tasklet.
>> There are two reasons to avoid the deep-copy:
>> 1. it is fairly expensive
>> 2. In this particular case the set of objects to be pickled could contain objects
>> which register themselves upon unpickling with other components of the
>> system and I don't want that to happen a second time.
>>
>> But there is really a good point in your question. In a very quick first test, I
>> observed only cframes in the frameList. If I understand you correctly,
>> frameList could contain regular frames too. I'll adapt my method. But not
>> today, a  customer is waiting.
>>
>>>
>>> We can discuss the possibility of transferring picklable tasklets
>>> between threads.  As long as a tasklet is soft-switchable it really doesn't
>> care which thread it runs on.  A "bind_to_thread() method could be
>> provided...
>>
>> Well, a method tasklet.bind_to_thread(thread) would be a sensible
>> extension.
>>
>>> But we can also think more of thread agnostic tasklets.
>>> We could even have a global run-queue of thread-agnostic tasklets that
>> threads could access....
>>>
>>> I'd like to understand your use case, however.  It looks as though you
>>> are unpickling tasklets on one thread, then a pool of worker threads are
>> accessing these tasklets and attempting to run them, is that right?  For that
>> purpose, a worker thread needs to "claim"
>>> the tasklet as its own.--
>>
>> It is really simple. A tasklet is part of the data, that defines an instance of a
>> particular workflow. A user continues a serialised
>> (pickled) flow by starting an executable with appropriate parameters in a
>> shell (Unix-shell Windows cmd). The executable unpickles the flow data
>> during command line procession (main-thread) and decides to display a GTK-
>> GUI for further interaction with the user. In this case the executable uses the
>> main thread for the GUI and one secondary thread for each flow.
>>
>> Cheers
>>     Anselm
>>
>> --
>>    Dipl. Phys. Anselm Kruis                       science + computing ag
>>    Senior Solution Architect                      Ingolstädter Str. 22
>>    email A.Kruis at science-computing.de             80807 München, Germany
>>    phone +49 89 356386 874  fax 737               www.science-computing.de
>> --
>> Vorstandsvorsitzender/Chairman of the board of management:
>> Gerd-Lothar Leonhart
>> Vorstand/Board of Management:
>> Dr. Bernd Finkbeiner, Michael Heinrichs,
>> Dr. Arno Steitz, Dr. Ingrid Zech
>> Vorsitzender des Aufsichtsrats/
>> Chairman of the Supervisory Board:
>> Philippe Miltin
>> Sitz/Registered Office: Tuebingen
>> Registergericht/Registration Court: Stuttgart
>> Registernummer/Commercial Register No.: HRB 382196
>>
>>
>> _______________________________________________
>> Stackless mailing list
>> Stackless at stackless.com
>> http://www.stackless.com/mailman/listinfo/stackless
>
>
>
> _______________________________________________
> Stackless mailing list
> Stackless at stackless.com
> http://www.stackless.com/mailman/listinfo/stackless
>

-- 
  Dipl. Phys. Anselm Kruis                       science + computing ag
  Senior Solution Architect                      Ingolstädter Str. 22
  email A.Kruis at science-computing.de             80807 München, Germany
  phone +49 89 356386 874  fax 737               www.science-computing.de
-- 
Vorstandsvorsitzender/Chairman of the board of management:
Gerd-Lothar Leonhart
Vorstand/Board of Management:
Dr. Bernd Finkbeiner, Michael Heinrichs, 
Dr. Arno Steitz, Dr. Ingrid Zech
Vorsitzender des Aufsichtsrats/
Chairman of the Supervisory Board:
Philippe Miltin
Sitz/Registered Office: Tuebingen
Registergericht/Registration Court: Stuttgart
Registernummer/Commercial Register No.: HRB 382196




More information about the Stackless mailing list