[Stackless] Stackless asynchronous file module - IOCP

Arnar Birgisson arnarbi at gmail.com
Sat Aug 18 02:37:54 CEST 2007


Beautiful!

Should there be a common interface/naming that modules such as this
should adhere to, to make it easy for people to do something like

try:
    from iocp import stacklessfile
except ImportError:
    from epollstuff import stacklessfile

etc. ?

I'd love to see such an "async io" module with different
implementations included with stackless and one central module that
when imported would select the most appropriate implementation for the
platform. Preferable also with sockets and timers.

Arnar

On 8/17/07, Carlos Eduardo de Paula <carlosedp at gmail.com> wrote:
> Fiddling around I found that on windows select() doesn't support file
> descriptors so the other option was OverlappedIO also known as IOCP.
>
> This made me learn a thousand things I thought would never know...
> Ctypes, Windows internal APIs (kernel32 stuff), and even python
> internals to see how its fileobject works.
>
> After this all here it is, the module that mimics the internal python
> file() object. Please test it and report any successes or failures. I
> also made a script to test its usage.
>
> Hope it's useful.
> (full announcement in my blog: http://themindcaster.blogspot.com/)
>
> The code is here too: http://dpaste.com/hold/17151/
> and will be posted on Stackless Examples Project SVN
>
> -------------------------------------------------------------------------------------------
> #
> # Stackless Asynchronous file module:
> #
> # Author: Carlos E. de Paula <carlosedp at gmail.com>
> #
> # This code was written to serve as an example of Stackless Python usage.
> # Feel free to email me with any questions, comments, or suggestions for
> # improvement.
> #
> # This is an asynchronous file class in order to have a file module replacement
> # that uses channels and a windows async API to allow its methods to
> # block just the calling tasklet not the entire interpreter.
> #
> #
> import os
> import time
> import stackless
> from ctypes import *
> from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR,
> LPCWSTR, WinError
>
> # Verify module compatibility
> if os.name != 'nt':
>     raise ImportError('This module has been implemented for windows systems.')
>
> # Windows structures
>
> class _US(Structure):
>     _fields_ = [
>         ("Offset",          DWORD),
>         ("OffsetHigh",      DWORD),
>     ]
>
> class _U(Union):
>     _fields_ = [
>         ("s",               _US),
>         ("Pointer",         c_void_p),
>     ]
>
>     _anonymous_ = ("s",)
>
> class OVERLAPPED(Structure):
>     _fields_ = [
>         ("Internal",        POINTER(ULONG)),
>         ("InternalHigh",    POINTER(ULONG)),
>         ("u",               _U),
>         ("hEvent",          HANDLE),
>
>         # Custom fields.
>         ("taskletID",       ULONG),
>     ]
>
>     _anonymous_ = ("u",)
>
> # Windows kernel32 API
>
> CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
> CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
> CreateIoCompletionPort.restype = HANDLE
>
> GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
> GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
>                                       POINTER(POINTER(OVERLAPPED)), DWORD)
> GetQueuedCompletionStatus.restype = BOOL
>
> ReadFile = windll.kernel32.ReadFile
> ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> POINTER(OVERLAPPED))
> ReadFile.restype = BOOL
>
> WriteFile = windll.kernel32.WriteFile
> WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> POINTER(OVERLAPPED))
> WriteFile.restype = BOOL
>
> CreateFileA = windll.kernel32.CreateFileA
> CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> CreateFileA.restype = HANDLE
>
> CreateFileW = windll.kernel32.CreateFileW
> CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> CreateFileW.restype = HANDLE
>
> CloseHandle = windll.kernel32.CloseHandle
> CloseHandle.argtypes = (HANDLE,)
> CloseHandle.restype = BOOL
>
> GetLastError = windll.kernel32.GetLastError
>
> # Python API
>
> pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
> pythonapi.PyErr_SetFromErrno.restype = py_object
>
> # Windows definitions
>
> INVALID_HANDLE_VALUE    = 0xFFFFFFFF
> NULL                    = c_ulong()
>
> WAIT_TIMEOUT            = 0x102
> ERROR_IO_PENDING        = 997
> FILE_FLAG_RANDOM_ACCESS = 0x10000000
> FILE_FLAG_OVERLAPPED    = 0x40000000
>
> GENERIC_READ            = 0x80000000
> GENERIC_WRITE           = 0x40000000
> FILE_APPEND_DATA                = 0x00000004
>
> FILE_SHARE_READ         = 0x00000001
> FILE_SHARE_WRITE        = 0x00000002
>
> OPEN_EXISTING           = 3
> OPEN_ALWAYS             = 4
> CREATE_ALWAYS           = 2
>
> # ----------------------------------------------------------------------------
>
> class resultsManager(object):
>     """
>     Manages the results sent by the CreateIoCompletionPort call.
>     The resultsManager dequeues the IO requests and keeps a list of
> pending handles.
>     The taskletID is stored in OVERLAPPED structure so it can be
> recalled and the
>     signalling data sent via its channel unblocking the original tasklet.
>     The handle is then removed from the dict.
>     """
>     def __init__(self, numThreads=NULL):
>         self.running = True
>         self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
>                                              NULL, numThreads)
>         if self.handle == 0:
>             raise WinError()
>
>         self.numThreads = numThreads
>         stackless.tasklet(self.poll)()
>         self.overlappedByID = {}
>
>     def __del__(self):
>         if self.handle is None:
>             return
>         self.overlappedByID.clear()
>         CloseHandle(self.handle)
>
>     def poll(self, timeout=1):
>         while self.running and self.overlappedByID:
>             numBytes = DWORD()
>             completionKey = c_ulong()
>             ovp = POINTER(OVERLAPPED)()
>
>             ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
>                                             byref(completionKey), byref(ovp),
>                                             timeout)
>
>             if not ovp and ret == 0:
>                 if GetLastError() == WAIT_TIMEOUT:
>                     stackless.schedule()
>                     continue
>
>             if ovp.contents.taskletID in self.overlappedByID:
>                 #print ovp.contents.taskletID, " tasklet ID IN pool"
>                 c = self.overlappedByID[ovp.contents.taskletID]
>             else:
>                 #print ovp.contents.taskletID, " tasklet ID NOT in pool"
>                 continue
>
>             #print "sending data back to channel in ID", ovp.contents.taskletID
>             c.send(numBytes)
>             #print "sent data to channel in ID",
> ovp.contents.taskletID, numBytes
>             self.UnregisterChannelObject(ovp.contents.taskletID)
>
>         self.running = False
>
>     def RegisterChannelObject(self, ob, c):
>         self.overlappedByID[ob] = c
>
>     def UnregisterChannelObject(self, ob):
>         if self.overlappedByID.has_key(ob):
>             del self.overlappedByID[ob]
>
>
> mng = resultsManager()
>
> class stacklessfile(object):
>     """
>     stacklessfile(name[, mode[, buffering]]) -> stackless file object
>
>     This class creates a new file module permitting nonblocking IO calls
>     for tasklets using windows IOCP functionality.
>     When a read or write operation is called, only the calling tasklet is
>     blocked. In standard file module, the whole interpreter gets blocked
>     until the operation is concluded.
>
>     Open a file.  The mode can be 'r', 'w' or 'a' for reading (default),
>     writing or appending.  The file will be created if it doesn't exist
>     when opened for writing or appending; it will be truncated when
>     opened for writing.  Add a 'b' to the mode for binary files.
>     Add a '+' to the mode to allow simultaneous reading and writing.
>     If the buffering argument is given, 0 means unbuffered, 1 means line
>     buffered, and larger numbers specify the buffer size.
>     Add a 'U' to mode to open the file for input with universal newline
>     support.  Any line ending in the input file will be seen as a \'\\n\'
>     in Python.  Also, a file so opened gains the attribute 'newlines';
>     the value for this attribute is one of None (no newline read yet),
>     \'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline
> types seen.
>     """
>     closed = True
>     def __init__(self, name, mode='r', buffering=-1):
>         """
>         Initializes the file object and creates an internal file object.
>         """
>         if not self.closed:
>             self.close()
>         self.name = name
>         self.mode = mode
>         self.offset = 0
>         self.iocpLinked = False
>         self._check_manager()
>         self.open_handle()
>         self.closed = False
>
>     def __repr__(self):
>         return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open",
> "closed" ][self.closed], self.name, self.mode, id(self))
>
>     def __del__(self):
>         self.close()
>
>     def _check_manager(self):
>         if not mng.running:
>             stackless.tasklet(mng.poll)()
>             mng.running = True
>             #print "ERROR - Manager not running"
>
>     def _check_still_open(self):
>         if self.closed:
>             raise ValueError("I/O operation on closed file")
>
>     def _ensure_iocp_association(self):
>         if not self.iocpLinked:
>             CreateIoCompletionPort(self.handle, mng.handle, NULL,
> mng.numThreads)
>             self.iocpLinked = True
>
>     def close(self):
>         """
>         close() -> None or (perhaps) an integer.  Close the file.
>
>         Sets data attribute .closed to True.  A closed file canno
>         further I/O operations.  close() may be called more than
>         error.  Some kinds of file objects (for example, opened b
>         may return an exit status upon closing.
>         """
>         if not self.closed:
>             CloseHandle(self.handle)
>             del self.handle
>             self.closed = True
>
>     def open_handle(self):
>         self.binary = 'b' in self.mode
>         access = GENERIC_READ
>         if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
>             access |= GENERIC_WRITE
>         if 'a' in self.mode:
>             access |= FILE_APPEND_DATA
>
>         share = FILE_SHARE_READ | FILE_SHARE_WRITE
>
>         if 'w' in self.mode:
>             disposition = CREATE_ALWAYS
>         elif 'r' in self.mode and '+' in self.mode:
>             disposition = OPEN_ALWAYS
>         else:
>             disposition = OPEN_EXISTING
>
>         flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED
>
>         if isinstance(self.name, unicode):
>             func = CreateFileW
>         else:
>             func = CreateFileA
>
>         self.handle = func(self.name, access,
>                               share, c_void_p(), disposition,
>                               flags, NULL )
>
>         if self.handle == INVALID_HANDLE_VALUE:
>             raise WinError()
>
>         self.iocpLinked = False
>
>     def read(self, size=None):
>         """
>         read([size]) -> read at most size bytes, returned as a string.
>         """
>         self._check_still_open()
>         maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
>         if (size is None) or (maxBytesToRead < size):
>             size = maxBytesToRead
>
>         bytesRead = DWORD()
>         self.o = OVERLAPPED()
>         self.o.Offset = self.offset
>         self.o.taskletID = id(self)
>         self.buffer = create_string_buffer(size)
>         self.channel = stackless.channel()
>         self._ensure_iocp_association()
>         self._check_manager()
>
>         #print self.o.taskletID, "ID on read", self.name
>         #print "firing ReadFile", self.name
>
>         r = ReadFile(self.handle, self.buffer,
>                      size, byref(bytesRead), byref(self.o));
>         #print "fired ReadFile", self.name
>
>         if r == 0:
>             if GetLastError() != ERROR_IO_PENDING:
>
> pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
>                                                                  0,
> c_char_p(self.name))
>             mng.RegisterChannelObject(self.o.taskletID, self.channel)
>             #print "blocked on channel",self.channel, self.name,
> self.o.taskletID
>             self.channel.receive()
>             #print "returned from channel",self.channel, self.name,
> self.o.taskletID
>
>         self.offset += size
>         return self.buffer[:size]
>
>     def write(self, data):
>         """
>         write(str) -> None.  Write string str to file.
>         """
>         self._check_still_open()
>         bytesToWrite = c_int()
>         writeBufferPtr = c_char_p()
>         bytesWritten = DWORD()
>         self.o = OVERLAPPED()
>         self.o.Offset = self.offset
>         self.o.taskletID = id(self)
>         self.channel = stackless.channel()
>         self._ensure_iocp_association()
>         self._check_manager()
>         #print self.o.taskletID, "ID on write", self.name
>
>         fmt = self.binary and "s#" or "t#"
>         ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
>                                          byref(writeBufferPtr),
> byref(bytesToWrite))
>         if ret == 0:
>             raise WinError()
>
>         #print "firing WriteFile", self.name
>         r = WriteFile(self.handle, writeBufferPtr,
>                       bytesToWrite.value, byref(bytesWritten), byref(self.o))
>         #print "fired WriteFile", self.name
>
>         if r == 0:
>             if GetLastError() != ERROR_IO_PENDING:
>
> pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
>                                                                  0,
> c_char_p(self.name))
>
>             mng.RegisterChannelObject(self.o.taskletID, self.channel)
>             #print "blocked on channel",self.channel, self.name,
> self.o.taskletID
>             written = self.channel.receive()
>             #print "returned from channel",self.channel, self.name,
> self.o.taskletID
>         else:
>             written = bytesWritten
>         #print "Checking contents...", bytesToWrite.value,
> written.value, self.name
>
>         if bytesToWrite.value != written.value:
>             # Check if the quantity of bytes sent has been written to the file
>             #print self.o.taskletID, "size mismatch"
>             raise WinError()
>
>     def tell(self):
>         """
>         tell() -> current file position, an integer (may be a long integer).
>         """
>         return self.offset
>
>     def seek(self, offset, whence=os.SEEK_SET):
>         """
>         seek(offset[, whence]) -> None.  Move to new file position.
>
>         Argument offset is a byte count.  Optional argument whence defaults to
>         0 (offset from start of file, offset should be >= 0); other values are 1
>         (move relative to current position, positive or negative), and 2 (move
>         relative to end of file, usually negative, although many platforms allow
>         seeking beyond the end of a file).  If the file is opened in text mode,
>         only offsets returned by tell() are legal.  Use of other offsets causes
>         undefined behavior.
>         Note that not all file objects are seekable.
>         """
>         self._check_still_open()
>         if whence == os.SEEK_SET:
>             self.offset = offset
>         elif whence == os.SEEK_CUR:
>             self.offset += offset
>         elif whence == os.SEEK_END:
>             raise RuntimeError("SEEK_END unimplemented")
>
>     def flush(self):
>         pass
>
>     def isatty(self):
>         """
>         isatty() -> true or false.  True if the file is connected to a
> tty device.
>         """
>         self._check_still_open()
>         return False
>
>     def isatty(self):
>         self._check_still_open()
>         return False
>
>
> if __name__ == '__main__':
>     import time
>     import glob
>     import os
>     stdfile = file
>
>     # On your stackless apps, use these 2 lines below
>     from stacklessfile import stacklessfile as file
>     open = file
>
>     # Function to copy a file
>     def copyfile(who, infile, out):
>         st = time.time()
>         f1 = file(infile, 'rb')
>         f2 = file(out, 'wb')
>         print "%s started reading %s ..." % (who, infile)
>         a = f1.read()
>         print "%s started writing %s -> %s ..." % (who, infile, out)
>         f2.write(a)
>         f1.close()
>         f2.close()
>         print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)
>
>     # Creating two dummy files
>     newfile = stdfile('test-small.txt','w')
>     for x in xrange(10000):
>         newfile.write(str(x))
>     newfile.close()
>
>     newfile2 = stdfile('test-big.txt','w')
>     for x in xrange(500000):
>         newfile2.write(str(x))
>     newfile2.close()
>
>     # Launching tasklets to perform the file copy
>     for i in xrange(1,11):
>         stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)
>
>     for i in xrange(1,21):
>         stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)
>
>     st = time.time()
>     stackless.run()
>     print "Total time is %s seconds." % (time.time() - st)
>
>     # Cleanup all test files used
>     for f in glob.glob('test*.txt'):
>         os.unlink(f)
>     for f in glob.glob('sm*.txt'):
>         os.unlink(f)
>     for f in glob.glob('big*.txt'):
>         os.unlink(f)
>
> -------------------------------------------------------------------------------------------
>
>
> --
> -------------------------------------------------------------------
> Visit Stackless Examples Project
> http://code.google.com/p/stacklessexamples/
> Stackless Python - www.stackless.com
> -------------------------------------------------------------------
>
> _______________________________________________
> Stackless mailing list
> Stackless at stackless.com
> http://stackless.com/cgi-bin/mailman/listinfo/stackless
>

_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://stackless.com/cgi-bin/mailman/listinfo/stackless



More information about the Stackless mailing list