[From nobody Sat Mar 13 18:14:25 2010
Return-path: &lt;limsc@maya.com&gt;
Envelope-to: tismer@stackless.com
Received: from raven.maya.com ([192.70.254.20])
	by centera.de with esmtp (Exim 3.35 #1 (Debian)) id 1B1giX-0006nt-00
	for &lt;tismer@stackless.com&gt;; Fri, 12 Mar 2004 08:04:25 +0100
Received: from djslim (vpn7.prv.maya.com [10.20.10.7])
	by raven.maya.com (Postfix) with ESMTP id B1FD41481CB
	for &lt;tismer@stackless.com&gt;; Fri, 12 Mar 2004 01:51:33 -0500 (EST)
Message-ID: &lt;010801c407ff$abccb0f0$6401a8c0@djslim&gt;
From: &quot;Seung Chan Lim&quot; &lt;limsc@maya.com&gt;
To: &quot;Christian Tismer&quot; &lt;tismer@stackless.com&gt;
References: &lt;00c201c3fea9$e1e11aa0$6401a8c0@djslim&gt;
	&lt;40436208.8020700@stackless.com&gt;
	&lt;0f2d01c3ffd6$5f8f0ac0$95fe46c0@GIRLS&gt;
	&lt;4043F50F.4080604@stackless.com&gt;
	&lt;03dd01c4001b$2ccce900$6401a8c0@djslim&gt;
	&lt;4044F5E4.4020002@stackless.com&gt;
	&lt;10a601c4009a$60ff1570$95fe46c0@GIRLS&gt;
	&lt;40450A44.3050106@stackless.com&gt;
	&lt;10f701c4013f$f730bb70$95fe46c0@GIRLS&gt;
	&lt;40461080.40208@stackless.com&gt;
	&lt;12e801c402df$f615e890$95fe46c0@GIRLS&gt;
	&lt;404918E5.8010207@stackless.com&gt;
	&lt;13be01c40475$d8c9b900$95fe46c0@GIRLS&gt;
	&lt;404B74B5.6050209@stackless.com&gt;
	&lt;008e01c406ea$a86b3a50$95fe46c0@GIRLS&gt;
	&lt;404FBFC3.6090902@stackless.com&gt;
Subject: Re: Stackless crash
Date: Fri, 12 Mar 2004 02:00:14 -0500
MIME-Version: 1.0
Content-Type: multipart/mixed;
	boundary=&quot;----=_NextPart_000_0105_01C407D5.C294D9E0&quot;
X-Priority: 3
X-MSMail-Priority: Normal
X-Mailer: Microsoft Outlook Express 6.00.2800.1158
X-MimeOLE: Produced By Microsoft MimeOLE V6.00.2800.1165

This is a multi-part message in MIME format.

------=_NextPart_000_0105_01C407D5.C294D9E0
Content-Type: text/plain;
	charset=&quot;iso-8859-1&quot;
Content-Transfer-Encoding: 7bit

ok, I thik I have a better example ( hopefully due to the same bug )
that seems to crash consistently after creating 3, 4 files ( the publish
operation
goes up to about 2,3000)

see if you can get this one to crash with the debug build


slim

------=_NextPart_000_0105_01C407D5.C294D9E0
Content-Type: text/plain;
	name=&quot;stackless_crash2.py&quot;
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
	filename=&quot;stackless_crash2.py&quot;

&quot;&quot;&quot;


&quot;&quot;&quot;

#########################################################################=
######
#
#########################################################################=
######
import sys
import time
import stackless
import traceback
import asyncore
import thread
import socket
import bisect
import os
from heapq import heappop, heappush
from Queue import Queue





#########################################################################=
######
#
#########################################################################=
######




   =20
#########################################################################=
######
#
#########################################################################=
######
class Operation(object):
    LOWEST_PRIORITY =3D 5
    HIGHEST_PRIORITY =3D 1
   =20
    def __init__(self, params, priority):
        self.priority =3D priority
        self.params =3D params
       =20
    def __cmp__(self, other):
        assert(isinstance(other, Operation))
       =20
        return cmp(self.priority, other.priority)

    def __repr__(self):

        return &quot;&lt;Operation [%d]: %x&gt;&quot;%(self.priority,
                                       self.params[0])



class PriorityQueue(Queue):
   =20
    # Put a new item in the queue
    def _put(self, item):
        assert(isinstance(item, Operation))
        #heappush(self.queue, item)

        # print &gt;&gt; sys.stderr,  self.queue
        bisect.insort(self.queue, item)

        # self.queue.append(item)
       =20
    def _get(self):
       =20
        # return heappop(self.queue)
        return self.queue.pop(0)


   =20
class MicroThread(object):
    &quot;&quot;&quot;
    convention
    all methods unless prefixed by &quot;_&quot; will return immediately after =
queueing stuff
    all methods prefixed by &quot;_&quot; is to be used only be the object in its =
own thread
    event handler methods prefixed by &quot;on&quot; is an exception
    &quot;run&quot; is a special method that's also an exception
   =20
    &quot;&quot;&quot;
    __slots__ =3D [&quot;_queue&quot;, &quot;_id&quot;, &quot;_channel&quot;, &quot;_req_id&quot;, &quot;_executor&quot;,
                 &quot;_net&quot;, &quot;_ui&quot;, &quot;_fs&quot;]
   =20
    TERMINATE =3D 0x0000
   =20
    _req_id =3D 0
    def __init__(self, id):
        self._queue =3D PriorityQueue()
        self._id =3D id
        self._channel =3D stackless.channel()

       =20
    def run(self):
       =20
        if hasattr(self, &quot;onStart&quot;) and callable(self.onStart):
            self.onStart()
       =20
        while 1:
            try:
                try:
                    msg =3D self._queue.get(False)
                except:
                    # traceback.print_exc()
                    if hasattr(self, &quot;onIdle&quot;):
                        assert(callable(self.onIdle))
                        self.onIdle()
                else:
                    assert(isinstance(msg, Operation))
                    op =3D msg.params[0]
                   =20
                    if op =3D=3D MicroThread.TERMINATE:
                        if not self._handleTerminate():
                            break
                        # =
-----------------------------------------------------
                        else:
                            # print  &quot;(%s) Termination [ REJECTED =
]&quot;%(self._id)
                            # requeue so that it will try to terminat =
again=20
                            apply(self.queueOperation, msg.params, =
{&quot;priority&quot; : msg.priority})
                    else:
                        apply(self._handleOperation, msg.params)
            finally:
                stackless.schedule()
           =20
        if hasattr(self, &quot;onEnd&quot;) and callable(self.onEnd):
            self.onEnd()


    def _logTrace(self, *params):
       =20
        print &gt;&gt; sys.stderr,  &quot;=3D&quot; * 79
        print &gt;&gt; sys.stderr,  &quot;ERROR (%s): %s&quot;%(self._id, repr(params))
        print &gt;&gt; sys.stderr,  &quot;-&quot; * 79
        traceback.print_exc()

        print &gt;&gt; sys.stderr,  &quot;=3D&quot; * 79
                   =20
    def queueOperation(self, *args, **kw):
        priority =3D kw.get(&quot;priority&quot;, Operation.LOWEST_PRIORITY) # =
lowest priority
           =20
        assert(priority &gt; 0)
        req =3D Operation(args, priority)
       =20
        self._queue.put(req)


    def _handleOperation(self, op, args, req_id=3DNone):
        try:
            func =3D self.TASK_MAP[op]
        except (KeyError, AttributeError):
           =20
            print &gt;&gt; sys.stderr,  &quot;[ %s ] Unable to handle =
(%x)&quot;%(self._id, op)
        else:
            if req_id:
                args +=3D (req_id,)

            if hasattr(self, &quot;onHandle&quot;):
                assert(callable(self.onHandle))
                handle_not_allowed =3D self.onHandle(op, args, req_id)
            else:
                handle_not_allowed =3D False

            if not handle_not_allowed:
                try:
                    apply(func, (self, ) + args)
                except:
                    self._logTrace()
            else:
                print &gt;&gt; sys.stderr,  &quot;( %s ) Handling %x [ REJECTED =
]&quot;%(self._id, op)

    def _handleTerminate(self):
       =20
        return 0


    def linkNetwork(self, repository):
        self._net =3D repository


    def linkExecutor(self, executor):
        self._executor =3D executor


    def linkUserInterface(self, ui):
        self._ui =3D ui

    def linkFileSystem(self, fs):
        self._fs =3D fs
       =20
    def _generateReqId(self):

        MicroThread._req_id +=3D 1

        return MicroThread._req_id







class Ui(MicroThread):
    &quot;&quot;&quot;
    Responsible for all User Interaction
    &quot;&quot;&quot;
   =20
    __slots__ =3D [&quot;_shutting_down&quot;,
                 &quot;_windows&quot;,]
   =20
    def __init__(self):
        MicroThread.__init__(self, &quot;ui&quot;)
        self._windows =3D []
        self._shutting_down =3D False

    def createWindow(self, x, y, w, h):
        win =3D Window(self)
        win.show()
        self._windows.append(win)
       =20

    def destroyWindow(self, win):
        self._windows.remove(win)

       =20
    def shutDown(self):
        &quot;&quot;&quot;
        shut down sequence tells the executor to shut down
        and this is how the shutdown sequence initiates
        &quot;&quot;&quot;
       =20
        self._shutting_down =3D True
        self._executor.shutDown()

           =20
    def execute(self, op, *params):
        self._executor.queueOperation(op, params, self._generateReqId())


    def addStatusMessage(self, msg):
        print &gt;&gt; sys.stderr,  &quot;STATUS: %s&quot;%(msg)


    =
#########################################################################=
##
    def onStart(self):
        #self.createWindow(0,0,100,100)

        #thread.start_new_thread(self._startUiThread,())
        pass
   =20
    def onEnd(self):
        print &quot;&lt;&lt; UI Thread [ TERMINATED ]&quot;


    def onIdle(self):
   =20
        if 1:
             self.execute(Executor.PUBLISH, &quot;hello world&quot;)
           =20
           =20
        elif not self._shutting_down:
            # no windows? ok, shut down initiate!
            self.shutDown()
        else:
            time.sleep(0.1) # when we can't call Fl.wait, we need sleep
           =20
   =20
    =
#########################################################################=
##
    def _startUiThread(self):
        &quot;&quot;&quot;
        currently not being used
        &quot;&quot;&quot;
   =20
        print &quot;&gt;&gt; UI System Thread [ STARTED ]&quot;
       =20
        Fl.run()

        print &quot;&lt;&lt; UI System Thrad [ TERMINATED ]&quot;
       =20
        assert(not Fl.first_window())
       =20
        if not self._shutting_down:
            # no more windows, so shut down sould initiate
            self.shutDown()



    =
#########################################################################=
##
    ADD_STATUS_MESSAGE =3D 0x3000
   =20
    TASK_MAP =3D {ADD_STATUS_MESSAGE : addStatusMessage}






class Executor(MicroThread):
    &quot;&quot;&quot;
    Responsible for all Application Execution
    &quot;&quot;&quot;
   =20
    __slots__ =3D [&quot;_tasks&quot;,
                 &quot;_shutting_down&quot;]
   =20
    def __init__(self):
        MicroThread.__init__(self, &quot;executor&quot;)
        self._net =3D None
        self._ui =3D None
        self._tasks =3D {}
        self._shutting_down =3D False

    def shutDown(self):
        &quot;&quot;&quot;
        shutdown process involves queueing itself to death, but
        the real termination will only happen if all tasks it has
        been assigned to complete have been indeed completed

        once shut down process initiates, no more new tasks will
        be accepted

        new tasks mean operations with a request id
        &quot;&quot;&quot;
       =20
        if not self._shutting_down:
            self._shutting_down =3D True

            # terminating the executor has a priority equal to that
            # of the follow up messages, so that follow up messages
            # can lead to task completions which will allow
            # the executor to terminate eventually
            self.queueOperation(MicroThread.TERMINATE, priority=3D4)


    def sendStatusMessage(self, msg):
        self._ui.queueOperation(Ui.ADD_STATUS_MESSAGE, (msg, ))
       =20
    =
#########################################################################=
##
    def onEnd(self):
       =20
        print &gt;&gt; sys.stderr,  &quot;=3D&quot; * 79
        print &quot;&lt;&lt; Executor Thread [ TERMINATED]&quot;
       =20
        self._fs.shutDown()
        self._net.shutDown()

    def onHandle(self, op, args, req_id):
        if self._shutting_down:
            if req_id:
                assert(req_id not in self._tasks)
               =20
                return &quot;Rejecting op due to shut down&quot;



    =
#########################################################################=
##                   =20
    def _handleTerminate(self):

        if self._tasks:
           =20
            return &quot;Not yet&quot;
       =20
       =20
    def _publish(self, uu, req_id=3DNone):
        def _doPublish(ch, uu, req_id):
            while 1:
                self._net.queueOperation(Net.GETATTR, (uu, &quot;dataset&quot;), =
req_id)
               =20
                try:
                    dataset =3D ch.receive() # task switch
                except:
                    # if we get an exception that means that the get =
attr has
                    # failed... what's the best thing to do here?
                    # I guess that really depends on the task
                    # for now just retry the getattr
                    self.sendStatusMessage(&quot; ** Publish (%s) [ FAILED =
]&quot;%(req_id))
                    self.sendStatusMessage(&quot; ** Publish (%s) [ RETRYING =
]&quot;%(req_id))
                    continue
                # =
-------------------------------------------------------------
                else:
                    print &quot; &gt;&gt; got dataset of %d bytes&quot;%(len(dataset))
                    path =3D &quot;%d.txt&quot;%(req_id)
                    self._fs.queueOperation(FileSystem.OPEN, (path, =
&quot;w&quot;), req_id)

                    try:
                        fd =3D ch.receive()
                    except:
                        self.sendStatusMessage(&quot; ** Publish (%s) [ =
FAILED ]&quot;%(req_id))
                    else:
                        # print &gt;&gt; sys.stderr, &quot;Opened %s [ ID: %d =
]&quot;%(path, fd)
                        self._fs.queueOperation(FileSystem.WRITE, (fd, =
dataset), req_id)

                        try:
                            ch.receive()
                        except:
                            self.sendStatusMessage(&quot; ** Publish (%s) [ =
FAILED ]&quot;%(req_id))
                        else:
                            # print &gt;&gt; sys.stderr, &quot;Write complete [ ID: =
%d ]&quot;%(fd)

                            self._fs.queueOperation(FileSystem.CLOSE, =
(fd,), req_id)
                            ch.receive()
                           =20
                            del self._tasks[req_id]
                            self.sendStatusMessage(&quot; ** Publish (%s) [ =
COMPLETE (%s) ]&quot;%(req_id, path))
                    break
                # =
-------------------------------------------------------------
               =20
           =20
        # keep a map of request id to channel for waking it up later
        assert(not self._tasks.has_key(req_id))
        channel =3D stackless.channel()
        self._tasks[req_id] =3D channel
        self.sendStatusMessage(&quot; ** Publish (%s) [ BEGIN (%s) =
]&quot;%(req_id, uu))
        stackless.tasklet(_doPublish)(channel, uu, req_id)


       =20
    def _followThrough(self, reply, req_id):
   =20
        try:
            self._tasks[req_id].send(reply)
        except KeyError:
            print &quot;Error: unknown request id [ %d ]&quot;%(req_id)

    def _raiseException(self, exception, req_id):
        try:
            self._tasks[req_id].send_exception(exception)
        except KeyError:
            print &quot;Error: unknown request id [ %d ]&quot;%(req_id)
           =20
    =
#########################################################################=
##
    PUBLISH =3D 0x1000
    FOLLOW_THROUGH =3D 0x1001
    RAISE_EXCEPTION =3D 0x1002
   =20
    TASK_MAP =3D {PUBLISH : _publish,
                FOLLOW_THROUGH : _followThrough,
                RAISE_EXCEPTION : _raiseException}
   =20

class IoChannel(object):
    &quot;&quot;&quot;
    IO Channel is responsible for I/O interfacing at an abstract level.

    It provides locking/unlocking by a notion of occupation and also
    contains an activities table which keeps track of how much =
read/write
    is pending within an occupation period

    each time a channel is occupied the activities table for that=20
    &quot;&quot;&quot;
   =20
    __slots__ =3D [&quot;_req_id&quot;, &quot;_lock&quot;, &quot;onRead&quot;, &quot;onWrite&quot;, &quot;onError&quot;,
                 &quot;_thread_lock&quot;]

           =20
    def __init__(self):
        self._req_id =3D None
        self._lock =3D thread.allocate_lock()
        self._thread_lock =3D thread.allocate_lock()
       =20
    def occupy(self, req_id):
        &quot;&quot;&quot;
        cannot occupy a channel that is already occupied
        &quot;&quot;&quot;
        assert(self._req_id =3D=3D None)
        self._lock.acquire()
        self._req_id =3D req_id # current activity requester

       =20
    def release(self, req_id):
        &quot;&quot;&quot;
        only the original occupnt can release the channel
        &quot;&quot;&quot;
        if self._req_id =3D=3D req_id:
            # print &quot;Release lock %d&quot;%(req_id)
           =20
            self._req_id =3D None
            self._lock.release()
        else:

            raise &quot;Wrong request id&quot;

    def isIdle(self):
       =20
        return (not self._lock.locked())

               =20

class FsChannel(IoChannel):
    &quot;&quot;&quot;
    Asynchronous FileSystem Channel

    This is incomplete, obviously.=20

    The reason why this exists separate from Net is because select =
really
    isn't meant for asynchrnous file i/o

    All the _handle_X functions are called by the system I/O Thread, so =
they
    are not to be called manually
    &quot;&quot;&quot;
    __slots__ =3D [&quot;_owner&quot;, &quot;_write_buffer&quot;, &quot;_read_buffer&quot;, =
&quot;reached_eof&quot;,
                 &quot;_to_be_read&quot;, &quot;_mode&quot;, &quot;_dispatcher&quot;, &quot;_path&quot;, =
&quot;onEof&quot;,
                 &quot;_id&quot;]
    CHUNK_SIZE =3D 1024

   =20
    def __init__(self, owner, id, path=3DNone, mode=3DNone):
        IoChannel.__init__(self)
        self._owner =3D owner
        self._write_buffer =3D &quot;&quot;
        self._to_be_read =3D 0
        self._id =3D id
        self.reached_eof =3D False
       =20
        self._mode =3D mode or &quot;r&quot;
        self._path =3D &quot;&quot;
       =20
        if path:
            self.open(path, self._mode)

           =20
    def __repr__(self):

        return &quot;&quot;&quot;&lt;FsChannel #%d: %s&gt;&quot;&quot;&quot;%(self._id,
                                          self._path)

   =20
    def open(self, path, mode):
        dispatcher =3D asyncore.dispatcher()
       =20
        try:
            dispatcher.connected =3D 1
            if os.name =3D=3D &quot;posix&quot;:
                fd =3D os.open(path, os.O_NONBLOCK)
                fo =3D os.fdopen(fd, mode)
            else:
                fo =3D file(path, mode)
               =20
            dispatcher.set_socket(fo, map=3Dself._owner.files)
        except Exception, err:
            self._handle_error(err)
        else:
            dispatcher.handle_read =3D self._handle_read
            dispatcher.handle_write =3D self._handle_write
            dispatcher.writable =3D self.writable
            dispatcher.readable =3D self.readable

            self.close()

            self._dispatcher =3D dispatcher
            self._path =3D path


    def close(self):
        try:
            self._dispatcher.close()
        except AttributeError:
            pass
        else:
            del self._dispatcher


    def read(self, bytes=3D-1):
        =
#######################################################################
        self._thread_lock.acquire()
        self._to_be_read =3D bytes
        self._thread_lock.release()
        =
#######################################################################


    def write(self, bytes):
        =
#######################################################################
        self._thread_lock.acquire()
        self._write_buffer =3D&quot;%s%s&quot;%(self._write_buffer, bytes)
        self._thread_lock.release()
        =
#######################################################################

   =20
    def _handle_read(self):
        try:
            data =3D self._dispatcher.socket.read(self.CHUNK_SIZE)
        except Exception, err:
            self._handle_error(err)
        else:
            if not data:
                self._handle_eof()
            else:
                if hasattr(self, &quot;onRead&quot;):
                    self._to_be_read -=3D len(data)
                    assert(callable(self.onRead))
                    try:
                        if self.onRead(self, data, self._req_id):
                            self._to_be_read =3D 0
                    except:
                        self._owner._logTrace()
                =20
       =20
    def _handle_write(self):
        written =3D self.CHUNK_SIZE
       =20
        try:
            self._dispatcher.socket.write(self._write_buffer[:written])
        except Exception, err:
            self.handle_error(err)
        else:
            written_bytes =3D self._write_buffer[:written]
            self._write_buffer =3D self._write_buffer[written:]

            if hasattr(self, &quot;onWrite&quot;):
                assert(callable(self.onWrite))
                try:
                    if self.onWrite(self, written_bytes, self._req_id):
                        self._write_buffer =3D &quot;&quot;
                except:
                    self._owner._logTrace()
                   =20
       =20
    def _handle_eof(self):
        self.reached_eof =3D True
        self._to_be_read =3D 0
       =20
        if hasattr(self, &quot;onEof&quot;):
            assert(callable(self.onEof))
            try:
                self.onEof(self, self._req_id)
            except:
                self._owner._logTrace()
               =20


   =20
    def _handle_error(self, err):
        if hasattr(self, &quot;onError&quot;):
            assert(callable(self.onError))
            try:
                self.onErroR(self, err, self._req_id)
            except:
                self._owner._logTrace()
        else:
            self._owner._logTrace()
   =20
    def writable(self):
        &quot;&quot;&quot;
        writable will be called from the I/O system thread, but
        write will be called from the non system thread which means
        we need to use the lock for synchronization
        &quot;&quot;&quot;
       =20
        =
#######################################################################
        self._thread_lock.acquire()
        val =3D (len(self._write_buffer) &gt; 0)
        self._thread_lock.release()
        =
#######################################################################
   =20
        return val

    def readable(self):
        &quot;&quot;&quot;
        readable will be called from the I/O system thread, but
        read will be called from the non system thread which means
        we need to use the lock for synchronization
        &quot;&quot;&quot;
       =20
        =
#######################################################################
        self._thread_lock.acquire()
        val =3D not (self.reached_eof or (self._to_be_read =3D=3D 0))
        self._thread_lock.release()
        =
#######################################################################

        return val


class FileSystem(MicroThread):
    &quot;&quot;&quot;
    only one instance!
    &quot;&quot;&quot;
    __slots__ =3D [&quot;_file_pool&quot;,
                 &quot;_io_thread_id&quot;,
                 &quot;_shutting_down&quot;,
                 &quot;_activities&quot;]
    files =3D {}
   =20
    class Activity:
        __slots__ =3D [&quot;to_be_read&quot;,
                     &quot;to_be_written&quot;,
                     &quot;buffer&quot;]
        def __init__(self):
            self.to_be_read =3D 0
            self.to_be_written =3D 0
            self.buffer =3D &quot;&quot;
           =20
    def __init__(self):
        MicroThread.__init__(self, &quot;file system&quot;)
        self._ui =3D None
        self._executor =3D None
        self._shutting_down =3D False
        self._activities =3D {}
       =20
    def sendStatusMessage(self, msg):
        self._ui.queueOperation(Ui.ADD_STATUS_MESSAGE, (msg, ))

    def shutDown(self):
        self.queueOperation(MicroThread.TERMINATE)

       =20
    def followUp(self, *params):
        &quot;&quot;&quot;
        never executes anything it just follows up on requests
        &quot;&quot;&quot;

        # follow up messages should have a priority identical to the
        # termination priority of the executor
        self._executor.queueOperation(Executor.FOLLOW_THROUGH, params, =
priority=3D4)

    def raiseException(self, *params):
        &quot;&quot;&quot;
        &quot;&quot;&quot;

        self._executor.queueOperation(Executor.RAISE_EXCEPTION, params, =
priority=3D4)
       =20
    =
#########################################################################=
##
    def onStart(self):       =20
        self._file_pool =3D [FsChannel(self, 0)]
        self._io_thread_id =3D =
thread.start_new_thread(self._activateCommunication, ())
       =20

    def onEnd(self):
        # wait until all file channels have been closed
        while not self._shutting_down:
            pass
        print  &quot;&lt;&lt; FileSystem Thread [ TERMINATED ]&quot;


    =
#########################################################################=
##
    def _activateCommunication(self):
        print &quot;&gt;&gt; File I/O System Thread [ STARTED ]&quot;
       =20
        while self._file_pool:
            for f in FileSystem.files.values():
                if f.readable():
                    asyncore.read(f)
                if f.writable():
                    asyncore.write(f)
            else:
                time.sleep(0.1)

        self._shutting_down =3D True
        print &quot;&lt;&lt; File I/O System Thread [ TERMINATED ]&quot;

       =20
    def _handleTerminate(self):
        # close all open connections
        print &quot;Closing all %d open file =
channel(s)&quot;%(len(self._file_pool))
       =20
        while self._file_pool:
            con =3D self._file_pool.pop()
            con.close()

        assert(not self._file_pool)


    def _getIdleChannel(self):
        &quot;&quot;&quot;
        looks in the file channel pool for an idle channel, if none
        is found one is created and appended to the pool and then =
returned

        returns (channel_id, channel)
        &quot;&quot;&quot;
       =20
        file_pool_len =3D len(self._file_pool)
        file_pool_range =3D range(file_pool_len)
       =20
        for i in file_pool_range:
            chan =3D self._file_pool[i]
            # look for an unopen and idle channel

            if chan.isIdle() and not hasattr(chan, &quot;_dispatcher&quot;):
               =20
                return (i, chan)
            # =
-----------------------------------------------------------------
           =20
        # none is idle, so we create one
        chan =3D FsChannel(self, file_pool_len)
        self._file_pool.append(chan)
       =20
        return (file_pool_len, chan)

    def _open(self, path, mode, req_id):
        &quot;&quot;&quot;
        immediately returns after it finds/creates an idle file channel
        and opens it
        &quot;&quot;&quot;
       =20
        self.sendStatusMessage(&quot;OPEN: %s ( %s ) [ %s ]&quot;%(path,
                                                         mode, req_id))
        while 1:
            (chan_id, idle_chan) =3D self._getIdleChannel()
           =20
            if not idle_chan:
                stackless.schedule()
            else:
                print &quot;found idle file channel [ %s ]&quot;%(idle_chan)
                idle_chan.open(path, mode)
                self._activities[req_id] =3D self.Activity()
                self.followUp(chan_id, req_id)
                break
       =20
    def _close(self, chan_id, req_id):
        &quot;&quot;&quot;
        immediately returns after it finds and closes the channel with =
the
        given id
        &quot;&quot;&quot;
        try:
            chan =3D self._file_pool[chan_id]
        except IndexError, err:
            self.raiseException(err, req_id)
        else:
            self.sendStatusMessage(&quot;CLOSE: %s&quot;%(chan))
            chan.close()
            self.followUp(None, req_id)
           =20
    def _write(self, chan_id, bytes, req_id):
        &quot;&quot;&quot;
        immediately returns after it queues the bytes to be written
        through the file channel
        &quot;&quot;&quot;
       =20
        def bar(chan, data, req_id):
            # print &quot;wrote %d bytes&quot;%(len(data))
            self._activities[req_id].to_be_written -=3D len(data)
            #self._activities[req_id].buffer =3D =
&quot;%s%s&quot;%(self._activities[req_id].buffer,
            #                                          data)
            if self._activities[req_id].to_be_written =3D=3D 0:
                chan.release(req_id)
                #
                =
###############################################################
                buf =3D self._activities[req_id].buffer
                self._activities[req_id].buffer =3D &quot;&quot;
                self.followUp(buf, req_id)
           =20
        try:
            chan =3D self._file_pool[chan_id]
        except IndexError, err:
            self.raiseException(err, req_id)
        else:
            # print &quot;Writing to %s&quot;%(chan)
            assert(chan.isIdle())
            =
###################################################################
            #
            chan.occupy(req_id)
            chan.onWrite =3D bar
            self._activities[req_id].to_be_written +=3D len(bytes)
            chan.write(bytes)
               =20
    =
#########################################################################=
##

    OPEN =3D 0x3001
    #READ =3D 0x3002
    WRITE =3D 0x3003
    CLOSE =3D 0x3004
   =20
    TASK_MAP =3D {WRITE : _write,
                #READ : _read,
                OPEN : _open,
                CLOSE : _close,
                }

   =20
class TcpChannel(IoChannel):
    &quot;&quot;&quot;
    All the _handle_X functions are called by the system I/O Thread, so =
they
    are not to be called manually
    &quot;&quot;&quot;
    __slots__ =3D [&quot;_owner&quot;, &quot;_send_buffer&quot;, &quot;_recv_buffer&quot;, =
&quot;paused_receiving&quot;,
                 &quot;_dispatcher&quot;, &quot;_host&quot;, &quot;onClose&quot;, &quot;closed&quot;]
   =20
    CHUNK_SIZE =3D 1024
   =20
    def __init__(self, owner, host=3DNone):
        IoChannel.__init__(self)
        self._owner =3D owner
        self._send_buffer =3D &quot;&quot;
       =20
        self.closed =3D True
        self.paused_receiving =3D False
       =20
        if host:
            self.connect(host)
       =20
    def __repr__(self):

        return &quot;&lt;TcpChannel (%s:%d)&gt;&quot;%(self._host)
   =20
    def connect(self, host=3DNone):
        if host =3D=3D None:
            host =3D self._host
        else:
            self._host =3D host

        dispatcher =3D asyncore.dispatcher()
        dispatcher.create_socket(socket.AF_INET, socket.SOCK_STREAM)
       =20
        dispatcher.connect(host)
        dispatcher.handle_connect =3D self._handle_connect
        dispatcher.handle_read =3D self._handle_read
        dispatcher.handle_close =3D self._handle_close
        dispatcher.handle_write =3D self._handle_write
        dispatcher.writable =3D self.writable
        dispatcher.readable =3D self.readable
       =20
        self.disconnect()
       =20
        self._dispatcher =3D dispatcher
        self.closed =3D False

    def disconnect(self):

        try:
            self._dispatcher.close()
        except:
            pass
        else:
            del self._dispatcher
            self.closed =3D True
           =20

    def send(self, bytes):
        assert(self._lock.locked())
        self._send_buffer =3D bytes
           =20
           =20
    def writable(self):
       =20
        return (len(self._send_buffer) &gt; 0)
   =20
    def readable(self):
       =20
        return not (self.paused_receiving or self.closed)
   =20
    def _handle_connect(self):
        self._owner.sendStatusMessage(&quot; !! Connection [ CONNECTED (%s) =
]&quot;%(self))

       =20
    def _handle_read(self):
        try:
            data =3D self._dispatcher.recv(self.CHUNK_SIZE)
        except Exception, err:
            self._handle_error(err)
        else:

            if hasattr(self, &quot;onRead&quot;):
                assert(callable(self.onRead))
                try:
                    if self.onRead(self, data, self._req_id):
                        self.paused_receiving =3D True
                except:
                    self._owner._logTrace()
           =20
       =20
    def _handle_write(self):
        try:
            sent =3D =
self._dispatcher.send(self._send_buffer[:self.CHUNK_SIZE])
        except Exception, err:
            self._handle_error(err)
        else:
           =20
            if hasattr(self, &quot;onWrite&quot;):
                assert(callable(self.onWrite))
                try:
                    self.onWrite(self, self._send_buffer[:sent], =
self._req_id)
                except:
                    self._owner._logTrace()
           =20
            self._send_buffer =3D self._send_buffer[sent:]


    def _handle_close(self):
        &quot;&quot;&quot;
        The other side closes connection
        &quot;&quot;&quot;
        self.closed =3D True
       =20
        if hasattr(self, &quot;onClose&quot;):
            assert(callable(self.onClose))
            try:
                self.onClose(self, self._req_id)
            except:
                self._owner._logTrace()
       =20

    def _handle_error(self, err):
        if hasattr(self, &quot;onError&quot;):
            assert(callable(self.onError))
            try:
                self.onErroR(self, err, self._req_id)
            except:
                self._owner._logTrace()
               =20


class Net(MicroThread):
    &quot;&quot;&quot;
    Responsible for all Network I/O

    If you use this class, you must not instantiate more than one =
instance of it!
    &quot;&quot;&quot;
   =20
    __slots__ =3D [&quot;_connection_pool&quot;,
                 &quot;_io_thread_id&quot;,
                 &quot;_buffer&quot;]
   =20
    def __init__(self):
        MicroThread.__init__(self, &quot;network&quot;)
        self._ui =3D None
        self._executor =3D None
        self._buffer =3D {}
       =20
    def sendStatusMessage(self, msg):
        self._ui.queueOperation(Ui.ADD_STATUS_MESSAGE, (msg, ))

    def shutDown(self):
        self.queueOperation(MicroThread.TERMINATE)

       =20
    def raiseException(self, *params):
        &quot;&quot;&quot;
        &quot;&quot;&quot;

        self._executor.queueOperation(Executor.RAISE_EXCEPTION, params, =
priority=3D4)
       =20
    def followUp(self, *params):
        &quot;&quot;&quot;
        repository never executes anything it just follows up on =
requests
        &quot;&quot;&quot;

        # follow up messages should have a priority identical to the
        # termination priority of the executor
        self._executor.queueOperation(Executor.FOLLOW_THROUGH, params, =
priority=3D4)


    =
#########################################################################=
##
    def onStart(self):       =20
        self._connection_pool =3D [TcpChannel(self, (&quot;www.google.com&quot;, =
80))]
        self._io_thread_id =3D =
thread.start_new_thread(self._activateCommunication, ())
       =20

    def onEnd(self):
       =20
        print  &quot;&lt;&lt; Net Thread [ TERMINATED ]&quot;
        self._ui.queueOperation(MicroThread.TERMINATE)


    =
#########################################################################=
##
    def _activateCommunication(self):
        print &quot;&gt;&gt; Network I/O System Thread [ STARTED ]&quot;
       =20
        asyncore.loop(timeout=3D0.1)

        print &quot;&lt;&lt; Network I/O System Thread [ TERMINATED ]&quot;

       =20
    def _handleTerminate(self):
        # close all open connections
        print &quot;Closing all %d Net =
connection(s)&quot;%(len(self._connection_pool))
       =20
        while self._connection_pool:
            con =3D self._connection_pool.pop()
            con.disconnect()
           =20
        assert(not self._connection_pool)


    def _getIdleConnection(self):
        for conn in self._connection_pool:
            if conn.isIdle():
               =20
                return conn
           =20
        # none is idle
        return None
   =20
    def _getAttr(self, uu, attr, req_id):
        def foo(conn, data, req_id):
            print &quot;received %d bytes&quot;%(len(data))
            if data:
                self._buffer[req_id] +=3D data
           =20
        def bar(conn, data, req_id):
            print &quot;sent %d bytes&quot;%(len(data))
           =20
        def baz(conn, req_id):
           =20
            self.followUp(self._buffer[req_id], req_id)
            del self._buffer[req_id]
            conn.release(req_id)
            # print &quot;--&gt; trying to reconnect to %s&quot;%(repr(conn._host))
            # this gets called when the other party has closed the =
connection
            self.sendStatusMessage(&quot; !! Connection [ CLOSED (%s) =
]&quot;%(conn))
            self.sendStatusMessage(&quot; !! Connection [ RECONNECTING (%s) =
]&quot;%(conn))

            conn.connect()
       =20
        self.sendStatusMessage(&quot;GETATTR: %s ( %s ) [ %s ]&quot;%(uu,
                                                             attr, =
req_id))
        # look for idle connection
        while 1:
            idle_conn =3D self._getIdleConnection()

            if not idle_conn:
                # no connections available!
                stackless.schedule() # switch
            else:               =20
                # push get attr request with request id so that when the
                # request is fulfilled FOLLOW_THROUGH will happen
                print &quot;found idle connection [ %s ]&quot;%(idle_conn)
                idle_conn.occupy(req_id)
                idle_conn.onRead =3D foo
                idle_conn.onWrite =3D bar
                idle_conn.onClose =3D baz
                self._buffer[req_id] =3D &quot;&quot;
                idle_conn.send(&quot;GET / HTTP/1.0\r\n\r\n&quot;)
               =20
                break
            # =
-----------------------------------------------------------------
           =20
    =
#########################################################################=
##
    GETATTR =3D 0x2001
   =20
    TASK_MAP =3D {GETATTR : _getAttr,
                #CONNECT : _connect
                #SEND : _send
                #RECEIVE : _recv
                #CLOSE : _close
                #ACCEPT : _accept
                #BIND : _bind
                #LISTEN : _listen
                }






#########################################################################=
######
#
#########################################################################=
######
if __name__ =3D=3D &quot;__main__&quot;:

    e =3D Executor()
    r =3D Net()
    ui =3D Ui()
    fs =3D FileSystem()
   =20
    ui.linkExecutor(e)   =20
   =20
    e.linkNetwork(r)
    e.linkUserInterface(ui)
    e.linkFileSystem(fs)
   =20
    r.linkExecutor(e)
    r.linkUserInterface(ui)

    fs.linkExecutor(e)
    fs.linkUserInterface(ui)
   =20
    task_e =3D stackless.tasklet(e.run)()
    task_r =3D stackless.tasklet(r.run)()
    task_ui =3D stackless.tasklet(ui.run)()
    task_fs =3D stackless.tasklet(fs.run)()
   =20
    print &gt;&gt; sys.stderr,  &quot;=3D&quot; * 79
    print sys.version
   =20

    stackless.run()
   =20
   =20

------=_NextPart_000_0105_01C407D5.C294D9E0--



]