[Stackless] Stackless asynchronous file module - IOCP

Carlos Eduardo de Paula carlosedp at gmail.com
Sat Aug 18 00:15:15 CEST 2007


Fiddling around I found that on windows select() doesn't support file
descriptors so the other option was OverlappedIO also known as IOCP.

This made me learn a thousand things I thought would never know...
Ctypes, Windows internal APIs (kernel32 stuff), and even python
internals to see how its fileobject works.

After this all here it is, the module that mimics the internal python
file() object. Please test it and report any successes or failures. I
also made a script to test its usage.

Hope it's useful.
(full announcement in my blog: http://themindcaster.blogspot.com/)

The code is here too: http://dpaste.com/hold/17151/
and will be posted on Stackless Examples Project SVN

-------------------------------------------------------------------------------------------
#
# Stackless Asynchronous file module:
#
# Author: Carlos E. de Paula <carlosedp at gmail.com>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# This is an asynchronous file class in order to have a file module replacement
# that uses channels and a windows async API to allow its methods to
# block just the calling tasklet not the entire interpreter.
#
#
import os
import time
import stackless
from ctypes import *
from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR,
LPCWSTR, WinError

# Verify module compatibility
if os.name != 'nt':
    raise ImportError('This module has been implemented for windows systems.')

# Windows structures

class _US(Structure):
    _fields_ = [
        ("Offset",          DWORD),
        ("OffsetHigh",      DWORD),
    ]

class _U(Union):
    _fields_ = [
        ("s",               _US),
        ("Pointer",         c_void_p),
    ]

    _anonymous_ = ("s",)

class OVERLAPPED(Structure):
    _fields_ = [
        ("Internal",        POINTER(ULONG)),
        ("InternalHigh",    POINTER(ULONG)),
        ("u",               _U),
        ("hEvent",          HANDLE),

        # Custom fields.
        ("taskletID",       ULONG),
    ]

    _anonymous_ = ("u",)

# Windows kernel32 API

CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
CreateIoCompletionPort.restype = HANDLE

GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
                                      POINTER(POINTER(OVERLAPPED)), DWORD)
GetQueuedCompletionStatus.restype = BOOL

ReadFile = windll.kernel32.ReadFile
ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
POINTER(OVERLAPPED))
ReadFile.restype = BOOL

WriteFile = windll.kernel32.WriteFile
WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
POINTER(OVERLAPPED))
WriteFile.restype = BOOL

CreateFileA = windll.kernel32.CreateFileA
CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileA.restype = HANDLE

CreateFileW = windll.kernel32.CreateFileW
CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileW.restype = HANDLE

CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL

GetLastError = windll.kernel32.GetLastError

# Python API

pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
pythonapi.PyErr_SetFromErrno.restype = py_object

# Windows definitions

INVALID_HANDLE_VALUE    = 0xFFFFFFFF
NULL                    = c_ulong()

WAIT_TIMEOUT            = 0x102
ERROR_IO_PENDING        = 997
FILE_FLAG_RANDOM_ACCESS = 0x10000000
FILE_FLAG_OVERLAPPED    = 0x40000000

GENERIC_READ            = 0x80000000
GENERIC_WRITE           = 0x40000000
FILE_APPEND_DATA                = 0x00000004

FILE_SHARE_READ         = 0x00000001
FILE_SHARE_WRITE        = 0x00000002

OPEN_EXISTING           = 3
OPEN_ALWAYS             = 4
CREATE_ALWAYS           = 2

# ----------------------------------------------------------------------------

class resultsManager(object):
    """
    Manages the results sent by the CreateIoCompletionPort call.
    The resultsManager dequeues the IO requests and keeps a list of
pending handles.
    The taskletID is stored in OVERLAPPED structure so it can be
recalled and the
    signalling data sent via its channel unblocking the original tasklet.
    The handle is then removed from the dict.
    """
    def __init__(self, numThreads=NULL):
        self.running = True
        self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
                                             NULL, numThreads)
        if self.handle == 0:
            raise WinError()

        self.numThreads = numThreads
        stackless.tasklet(self.poll)()
        self.overlappedByID = {}

    def __del__(self):
        if self.handle is None:
            return
        self.overlappedByID.clear()
        CloseHandle(self.handle)

    def poll(self, timeout=1):
        while self.running and self.overlappedByID:
            numBytes = DWORD()
            completionKey = c_ulong()
            ovp = POINTER(OVERLAPPED)()

            ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
                                            byref(completionKey), byref(ovp),
                                            timeout)

            if not ovp and ret == 0:
                if GetLastError() == WAIT_TIMEOUT:
                    stackless.schedule()
                    continue

            if ovp.contents.taskletID in self.overlappedByID:
                #print ovp.contents.taskletID, " tasklet ID IN pool"
                c = self.overlappedByID[ovp.contents.taskletID]
            else:
                #print ovp.contents.taskletID, " tasklet ID NOT in pool"
                continue

            #print "sending data back to channel in ID", ovp.contents.taskletID
            c.send(numBytes)
            #print "sent data to channel in ID",
ovp.contents.taskletID, numBytes
            self.UnregisterChannelObject(ovp.contents.taskletID)

        self.running = False

    def RegisterChannelObject(self, ob, c):
        self.overlappedByID[ob] = c

    def UnregisterChannelObject(self, ob):
        if self.overlappedByID.has_key(ob):
            del self.overlappedByID[ob]


mng = resultsManager()

class stacklessfile(object):
    """
    stacklessfile(name[, mode[, buffering]]) -> stackless file object

    This class creates a new file module permitting nonblocking IO calls
    for tasklets using windows IOCP functionality.
    When a read or write operation is called, only the calling tasklet is
    blocked. In standard file module, the whole interpreter gets blocked
    until the operation is concluded.

    Open a file.  The mode can be 'r', 'w' or 'a' for reading (default),
    writing or appending.  The file will be created if it doesn't exist
    when opened for writing or appending; it will be truncated when
    opened for writing.  Add a 'b' to the mode for binary files.
    Add a '+' to the mode to allow simultaneous reading and writing.
    If the buffering argument is given, 0 means unbuffered, 1 means line
    buffered, and larger numbers specify the buffer size.
    Add a 'U' to mode to open the file for input with universal newline
    support.  Any line ending in the input file will be seen as a \'\\n\'
    in Python.  Also, a file so opened gains the attribute 'newlines';
    the value for this attribute is one of None (no newline read yet),
    \'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline
types seen.
    """
    closed = True
    def __init__(self, name, mode='r', buffering=-1):
        """
        Initializes the file object and creates an internal file object.
        """
        if not self.closed:
            self.close()
        self.name = name
        self.mode = mode
        self.offset = 0
        self.iocpLinked = False
        self._check_manager()
        self.open_handle()
        self.closed = False

    def __repr__(self):
        return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open",
"closed" ][self.closed], self.name, self.mode, id(self))

    def __del__(self):
        self.close()

    def _check_manager(self):
        if not mng.running:
            stackless.tasklet(mng.poll)()
            mng.running = True
            #print "ERROR - Manager not running"

    def _check_still_open(self):
        if self.closed:
            raise ValueError("I/O operation on closed file")

    def _ensure_iocp_association(self):
        if not self.iocpLinked:
            CreateIoCompletionPort(self.handle, mng.handle, NULL,
mng.numThreads)
            self.iocpLinked = True

    def close(self):
        """
        close() -> None or (perhaps) an integer.  Close the file.

        Sets data attribute .closed to True.  A closed file canno
        further I/O operations.  close() may be called more than
        error.  Some kinds of file objects (for example, opened b
        may return an exit status upon closing.
        """
        if not self.closed:
            CloseHandle(self.handle)
            del self.handle
            self.closed = True

    def open_handle(self):
        self.binary = 'b' in self.mode
        access = GENERIC_READ
        if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
            access |= GENERIC_WRITE
        if 'a' in self.mode:
            access |= FILE_APPEND_DATA

        share = FILE_SHARE_READ | FILE_SHARE_WRITE

        if 'w' in self.mode:
            disposition = CREATE_ALWAYS
        elif 'r' in self.mode and '+' in self.mode:
            disposition = OPEN_ALWAYS
        else:
            disposition = OPEN_EXISTING

        flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED

        if isinstance(self.name, unicode):
            func = CreateFileW
        else:
            func = CreateFileA

        self.handle = func(self.name, access,
                              share, c_void_p(), disposition,
                              flags, NULL )

        if self.handle == INVALID_HANDLE_VALUE:
            raise WinError()

        self.iocpLinked = False

    def read(self, size=None):
        """
        read([size]) -> read at most size bytes, returned as a string.
        """
        self._check_still_open()
        maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
        if (size is None) or (maxBytesToRead < size):
            size = maxBytesToRead

        bytesRead = DWORD()
        self.o = OVERLAPPED()
        self.o.Offset = self.offset
        self.o.taskletID = id(self)
        self.buffer = create_string_buffer(size)
        self.channel = stackless.channel()
        self._ensure_iocp_association()
        self._check_manager()

        #print self.o.taskletID, "ID on read", self.name
        #print "firing ReadFile", self.name

        r = ReadFile(self.handle, self.buffer,
                     size, byref(bytesRead), byref(self.o));
        #print "fired ReadFile", self.name

        if r == 0:
            if GetLastError() != ERROR_IO_PENDING:

pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
                                                                 0,
c_char_p(self.name))
            mng.RegisterChannelObject(self.o.taskletID, self.channel)
            #print "blocked on channel",self.channel, self.name,
self.o.taskletID
            self.channel.receive()
            #print "returned from channel",self.channel, self.name,
self.o.taskletID

        self.offset += size
        return self.buffer[:size]

    def write(self, data):
        """
        write(str) -> None.  Write string str to file.
        """
        self._check_still_open()
        bytesToWrite = c_int()
        writeBufferPtr = c_char_p()
        bytesWritten = DWORD()
        self.o = OVERLAPPED()
        self.o.Offset = self.offset
        self.o.taskletID = id(self)
        self.channel = stackless.channel()
        self._ensure_iocp_association()
        self._check_manager()
        #print self.o.taskletID, "ID on write", self.name

        fmt = self.binary and "s#" or "t#"
        ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
                                         byref(writeBufferPtr),
byref(bytesToWrite))
        if ret == 0:
            raise WinError()

        #print "firing WriteFile", self.name
        r = WriteFile(self.handle, writeBufferPtr,
                      bytesToWrite.value, byref(bytesWritten), byref(self.o))
        #print "fired WriteFile", self.name

        if r == 0:
            if GetLastError() != ERROR_IO_PENDING:

pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
                                                                 0,
c_char_p(self.name))

            mng.RegisterChannelObject(self.o.taskletID, self.channel)
            #print "blocked on channel",self.channel, self.name,
self.o.taskletID
            written = self.channel.receive()
            #print "returned from channel",self.channel, self.name,
self.o.taskletID
        else:
            written = bytesWritten
        #print "Checking contents...", bytesToWrite.value,
written.value, self.name

        if bytesToWrite.value != written.value:
            # Check if the quantity of bytes sent has been written to the file
            #print self.o.taskletID, "size mismatch"
            raise WinError()

    def tell(self):
        """
        tell() -> current file position, an integer (may be a long integer).
        """
        return self.offset

    def seek(self, offset, whence=os.SEEK_SET):
        """
        seek(offset[, whence]) -> None.  Move to new file position.

        Argument offset is a byte count.  Optional argument whence defaults to
        0 (offset from start of file, offset should be >= 0); other values are 1
        (move relative to current position, positive or negative), and 2 (move
        relative to end of file, usually negative, although many platforms allow
        seeking beyond the end of a file).  If the file is opened in text mode,
        only offsets returned by tell() are legal.  Use of other offsets causes
        undefined behavior.
        Note that not all file objects are seekable.
        """
        self._check_still_open()
        if whence == os.SEEK_SET:
            self.offset = offset
        elif whence == os.SEEK_CUR:
            self.offset += offset
        elif whence == os.SEEK_END:
            raise RuntimeError("SEEK_END unimplemented")

    def flush(self):
        pass

    def isatty(self):
        """
        isatty() -> true or false.  True if the file is connected to a
tty device.
        """
        self._check_still_open()
        return False

    def isatty(self):
        self._check_still_open()
        return False


if __name__ == '__main__':
    import time
    import glob
    import os
    stdfile = file

    # On your stackless apps, use these 2 lines below
    from stacklessfile import stacklessfile as file
    open = file

    # Function to copy a file
    def copyfile(who, infile, out):
        st = time.time()
        f1 = file(infile, 'rb')
        f2 = file(out, 'wb')
        print "%s started reading %s ..." % (who, infile)
        a = f1.read()
        print "%s started writing %s -> %s ..." % (who, infile, out)
        f2.write(a)
        f1.close()
        f2.close()
        print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)

    # Creating two dummy files
    newfile = stdfile('test-small.txt','w')
    for x in xrange(10000):
        newfile.write(str(x))
    newfile.close()

    newfile2 = stdfile('test-big.txt','w')
    for x in xrange(500000):
        newfile2.write(str(x))
    newfile2.close()

    # Launching tasklets to perform the file copy
    for i in xrange(1,11):
        stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)

    for i in xrange(1,21):
        stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)

    st = time.time()
    stackless.run()
    print "Total time is %s seconds." % (time.time() - st)

    # Cleanup all test files used
    for f in glob.glob('test*.txt'):
        os.unlink(f)
    for f in glob.glob('sm*.txt'):
        os.unlink(f)
    for f in glob.glob('big*.txt'):
        os.unlink(f)

-------------------------------------------------------------------------------------------


-- 
-------------------------------------------------------------------
Visit Stackless Examples Project
http://code.google.com/p/stacklessexamples/
Stackless Python - www.stackless.com
-------------------------------------------------------------------

_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://stackless.com/cgi-bin/mailman/listinfo/stackless



More information about the Stackless mailing list