[Stackless] Stackless asynchronous file module - IOCP

Carlos Eduardo de Paula carlosedp at gmail.com
Sat Aug 18 03:48:17 CEST 2007


Thats exactly what I want to do. This is the first step.

I plan on adding socket support on IOCP and sleep calls.

Then i`m gonna add posix support via select and poll (I can test at
home on Mac). This is a place where I will need help.... maybe I can
use some stuff from Ed`s work.. :)

The objective is to have a simple toolbox to allow async IO and helper
methods like sleep, semaphores, etc... like a uthread on steroids :D

One question, is the performance too affected using ctypes... I mean,
would one gain a lot of speed having a pure C module? (i liked
ctypes... never had used it before)

Carlos

On 8/17/07, Arnar Birgisson <arnarbi at gmail.com> wrote:
> Beautiful!
>
> Should there be a common interface/naming that modules such as this
> should adhere to, to make it easy for people to do something like
>
> try:
>     from iocp import stacklessfile
> except ImportError:
>     from epollstuff import stacklessfile
>
> etc. ?
>
> I'd love to see such an "async io" module with different
> implementations included with stackless and one central module that
> when imported would select the most appropriate implementation for the
> platform. Preferable also with sockets and timers.
>
> Arnar
>
> On 8/17/07, Carlos Eduardo de Paula <carlosedp at gmail.com> wrote:
> > Fiddling around I found that on windows select() doesn't support file
> > descriptors so the other option was OverlappedIO also known as IOCP.
> >
> > This made me learn a thousand things I thought would never know...
> > Ctypes, Windows internal APIs (kernel32 stuff), and even python
> > internals to see how its fileobject works.
> >
> > After this all here it is, the module that mimics the internal python
> > file() object. Please test it and report any successes or failures. I
> > also made a script to test its usage.
> >
> > Hope it's useful.
> > (full announcement in my blog: http://themindcaster.blogspot.com/)
> >
> > The code is here too: http://dpaste.com/hold/17151/
> > and will be posted on Stackless Examples Project SVN
> >
> > -------------------------------------------------------------------------------------------
> > #
> > # Stackless Asynchronous file module:
> > #
> > # Author: Carlos E. de Paula <carlosedp at gmail.com>
> > #
> > # This code was written to serve as an example of Stackless Python usage.
> > # Feel free to email me with any questions, comments, or suggestions for
> > # improvement.
> > #
> > # This is an asynchronous file class in order to have a file module replacement
> > # that uses channels and a windows async API to allow its methods to
> > # block just the calling tasklet not the entire interpreter.
> > #
> > #
> > import os
> > import time
> > import stackless
> > from ctypes import *
> > from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR,
> > LPCWSTR, WinError
> >
> > # Verify module compatibility
> > if os.name != 'nt':
> >     raise ImportError('This module has been implemented for windows systems.')
> >
> > # Windows structures
> >
> > class _US(Structure):
> >     _fields_ = [
> >         ("Offset",          DWORD),
> >         ("OffsetHigh",      DWORD),
> >     ]
> >
> > class _U(Union):
> >     _fields_ = [
> >         ("s",               _US),
> >         ("Pointer",         c_void_p),
> >     ]
> >
> >     _anonymous_ = ("s",)
> >
> > class OVERLAPPED(Structure):
> >     _fields_ = [
> >         ("Internal",        POINTER(ULONG)),
> >         ("InternalHigh",    POINTER(ULONG)),
> >         ("u",               _U),
> >         ("hEvent",          HANDLE),
> >
> >         # Custom fields.
> >         ("taskletID",       ULONG),
> >     ]
> >
> >     _anonymous_ = ("u",)
> >
> > # Windows kernel32 API
> >
> > CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
> > CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
> > CreateIoCompletionPort.restype = HANDLE
> >
> > GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
> > GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
> >                                       POINTER(POINTER(OVERLAPPED)), DWORD)
> > GetQueuedCompletionStatus.restype = BOOL
> >
> > ReadFile = windll.kernel32.ReadFile
> > ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > POINTER(OVERLAPPED))
> > ReadFile.restype = BOOL
> >
> > WriteFile = windll.kernel32.WriteFile
> > WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > POINTER(OVERLAPPED))
> > WriteFile.restype = BOOL
> >
> > CreateFileA = windll.kernel32.CreateFileA
> > CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > CreateFileA.restype = HANDLE
> >
> > CreateFileW = windll.kernel32.CreateFileW
> > CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > CreateFileW.restype = HANDLE
> >
> > CloseHandle = windll.kernel32.CloseHandle
> > CloseHandle.argtypes = (HANDLE,)
> > CloseHandle.restype = BOOL
> >
> > GetLastError = windll.kernel32.GetLastError
> >
> > # Python API
> >
> > pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
> > pythonapi.PyErr_SetFromErrno.restype = py_object
> >
> > # Windows definitions
> >
> > INVALID_HANDLE_VALUE    = 0xFFFFFFFF
> > NULL                    = c_ulong()
> >
> > WAIT_TIMEOUT            = 0x102
> > ERROR_IO_PENDING        = 997
> > FILE_FLAG_RANDOM_ACCESS = 0x10000000
> > FILE_FLAG_OVERLAPPED    = 0x40000000
> >
> > GENERIC_READ            = 0x80000000
> > GENERIC_WRITE           = 0x40000000
> > FILE_APPEND_DATA                = 0x00000004
> >
> > FILE_SHARE_READ         = 0x00000001
> > FILE_SHARE_WRITE        = 0x00000002
> >
> > OPEN_EXISTING           = 3
> > OPEN_ALWAYS             = 4
> > CREATE_ALWAYS           = 2
> >
> > # ----------------------------------------------------------------------------
> >
> > class resultsManager(object):
> >     """
> >     Manages the results sent by the CreateIoCompletionPort call.
> >     The resultsManager dequeues the IO requests and keeps a list of
> > pending handles.
> >     The taskletID is stored in OVERLAPPED structure so it can be
> > recalled and the
> >     signalling data sent via its channel unblocking the original tasklet.
> >     The handle is then removed from the dict.
> >     """
> >     def __init__(self, numThreads=NULL):
> >         self.running = True
> >         self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
> >                                              NULL, numThreads)
> >         if self.handle == 0:
> >             raise WinError()
> >
> >         self.numThreads = numThreads
> >         stackless.tasklet(self.poll)()
> >         self.overlappedByID = {}
> >
> >     def __del__(self):
> >         if self.handle is None:
> >             return
> >         self.overlappedByID.clear()
> >         CloseHandle(self.handle)
> >
> >     def poll(self, timeout=1):
> >         while self.running and self.overlappedByID:
> >             numBytes = DWORD()
> >             completionKey = c_ulong()
> >             ovp = POINTER(OVERLAPPED)()
> >
> >             ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
> >                                             byref(completionKey), byref(ovp),
> >                                             timeout)
> >
> >             if not ovp and ret == 0:
> >                 if GetLastError() == WAIT_TIMEOUT:
> >                     stackless.schedule()
> >                     continue
> >
> >             if ovp.contents.taskletID in self.overlappedByID:
> >                 #print ovp.contents.taskletID, " tasklet ID IN pool"
> >                 c = self.overlappedByID[ovp.contents.taskletID]
> >             else:
> >                 #print ovp.contents.taskletID, " tasklet ID NOT in pool"
> >                 continue
> >
> >             #print "sending data back to channel in ID", ovp.contents.taskletID
> >             c.send(numBytes)
> >             #print "sent data to channel in ID",
> > ovp.contents.taskletID, numBytes
> >             self.UnregisterChannelObject(ovp.contents.taskletID)
> >
> >         self.running = False
> >
> >     def RegisterChannelObject(self, ob, c):
> >         self.overlappedByID[ob] = c
> >
> >     def UnregisterChannelObject(self, ob):
> >         if self.overlappedByID.has_key(ob):
> >             del self.overlappedByID[ob]
> >
> >
> > mng = resultsManager()
> >
> > class stacklessfile(object):
> >     """
> >     stacklessfile(name[, mode[, buffering]]) -> stackless file object
> >
> >     This class creates a new file module permitting nonblocking IO calls
> >     for tasklets using windows IOCP functionality.
> >     When a read or write operation is called, only the calling tasklet is
> >     blocked. In standard file module, the whole interpreter gets blocked
> >     until the operation is concluded.
> >
> >     Open a file.  The mode can be 'r', 'w' or 'a' for reading (default),
> >     writing or appending.  The file will be created if it doesn't exist
> >     when opened for writing or appending; it will be truncated when
> >     opened for writing.  Add a 'b' to the mode for binary files.
> >     Add a '+' to the mode to allow simultaneous reading and writing.
> >     If the buffering argument is given, 0 means unbuffered, 1 means line
> >     buffered, and larger numbers specify the buffer size.
> >     Add a 'U' to mode to open the file for input with universal newline
> >     support.  Any line ending in the input file will be seen as a \'\\n\'
> >     in Python.  Also, a file so opened gains the attribute 'newlines';
> >     the value for this attribute is one of None (no newline read yet),
> >     \'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline
> > types seen.
> >     """
> >     closed = True
> >     def __init__(self, name, mode='r', buffering=-1):
> >         """
> >         Initializes the file object and creates an internal file object.
> >         """
> >         if not self.closed:
> >             self.close()
> >         self.name = name
> >         self.mode = mode
> >         self.offset = 0
> >         self.iocpLinked = False
> >         self._check_manager()
> >         self.open_handle()
> >         self.closed = False
> >
> >     def __repr__(self):
> >         return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open",
> > "closed" ][self.closed], self.name, self.mode, id(self))
> >
> >     def __del__(self):
> >         self.close()
> >
> >     def _check_manager(self):
> >         if not mng.running:
> >             stackless.tasklet(mng.poll)()
> >             mng.running = True
> >             #print "ERROR - Manager not running"
> >
> >     def _check_still_open(self):
> >         if self.closed:
> >             raise ValueError("I/O operation on closed file")
> >
> >     def _ensure_iocp_association(self):
> >         if not self.iocpLinked:
> >             CreateIoCompletionPort(self.handle, mng.handle, NULL,
> > mng.numThreads)
> >             self.iocpLinked = True
> >
> >     def close(self):
> >         """
> >         close() -> None or (perhaps) an integer.  Close the file.
> >
> >         Sets data attribute .closed to True.  A closed file canno
> >         further I/O operations.  close() may be called more than
> >         error.  Some kinds of file objects (for example, opened b
> >         may return an exit status upon closing.
> >         """
> >         if not self.closed:
> >             CloseHandle(self.handle)
> >             del self.handle
> >             self.closed = True
> >
> >     def open_handle(self):
> >         self.binary = 'b' in self.mode
> >         access = GENERIC_READ
> >         if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
> >             access |= GENERIC_WRITE
> >         if 'a' in self.mode:
> >             access |= FILE_APPEND_DATA
> >
> >         share = FILE_SHARE_READ | FILE_SHARE_WRITE
> >
> >         if 'w' in self.mode:
> >             disposition = CREATE_ALWAYS
> >         elif 'r' in self.mode and '+' in self.mode:
> >             disposition = OPEN_ALWAYS
> >         else:
> >             disposition = OPEN_EXISTING
> >
> >         flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED
> >
> >         if isinstance(self.name, unicode):
> >             func = CreateFileW
> >         else:
> >             func = CreateFileA
> >
> >         self.handle = func(self.name, access,
> >                               share, c_void_p(), disposition,
> >                               flags, NULL )
> >
> >         if self.handle == INVALID_HANDLE_VALUE:
> >             raise WinError()
> >
> >         self.iocpLinked = False
> >
> >     def read(self, size=None):
> >         """
> >         read([size]) -> read at most size bytes, returned as a string.
> >         """
> >         self._check_still_open()
> >         maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
> >         if (size is None) or (maxBytesToRead < size):
> >             size = maxBytesToRead
> >
> >         bytesRead = DWORD()
> >         self.o = OVERLAPPED()
> >         self.o.Offset = self.offset
> >         self.o.taskletID = id(self)
> >         self.buffer = create_string_buffer(size)
> >         self.channel = stackless.channel()
> >         self._ensure_iocp_association()
> >         self._check_manager()
> >
> >         #print self.o.taskletID, "ID on read", self.name
> >         #print "firing ReadFile", self.name
> >
> >         r = ReadFile(self.handle, self.buffer,
> >                      size, byref(bytesRead), byref(self.o));
> >         #print "fired ReadFile", self.name
> >
> >         if r == 0:
> >             if GetLastError() != ERROR_IO_PENDING:
> >
> > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> >                                                                  0,
> > c_char_p(self.name))
> >             mng.RegisterChannelObject(self.o.taskletID, self.channel)
> >             #print "blocked on channel",self.channel, self.name,
> > self.o.taskletID
> >             self.channel.receive()
> >             #print "returned from channel",self.channel, self.name,
> > self.o.taskletID
> >
> >         self.offset += size
> >         return self.buffer[:size]
> >
> >     def write(self, data):
> >         """
> >         write(str) -> None.  Write string str to file.
> >         """
> >         self._check_still_open()
> >         bytesToWrite = c_int()
> >         writeBufferPtr = c_char_p()
> >         bytesWritten = DWORD()
> >         self.o = OVERLAPPED()
> >         self.o.Offset = self.offset
> >         self.o.taskletID = id(self)
> >         self.channel = stackless.channel()
> >         self._ensure_iocp_association()
> >         self._check_manager()
> >         #print self.o.taskletID, "ID on write", self.name
> >
> >         fmt = self.binary and "s#" or "t#"
> >         ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
> >                                          byref(writeBufferPtr),
> > byref(bytesToWrite))
> >         if ret == 0:
> >             raise WinError()
> >
> >         #print "firing WriteFile", self.name
> >         r = WriteFile(self.handle, writeBufferPtr,
> >                       bytesToWrite.value, byref(bytesWritten), byref(self.o))
> >         #print "fired WriteFile", self.name
> >
> >         if r == 0:
> >             if GetLastError() != ERROR_IO_PENDING:
> >
> > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> >                                                                  0,
> > c_char_p(self.name))
> >
> >             mng.RegisterChannelObject(self.o.taskletID, self.channel)
> >             #print "blocked on channel",self.channel, self.name,
> > self.o.taskletID
> >             written = self.channel.receive()
> >             #print "returned from channel",self.channel, self.name,
> > self.o.taskletID
> >         else:
> >             written = bytesWritten
> >         #print "Checking contents...", bytesToWrite.value,
> > written.value, self.name
> >
> >         if bytesToWrite.value != written.value:
> >             # Check if the quantity of bytes sent has been written to the file
> >             #print self.o.taskletID, "size mismatch"
> >             raise WinError()
> >
> >     def tell(self):
> >         """
> >         tell() -> current file position, an integer (may be a long integer).
> >         """
> >         return self.offset
> >
> >     def seek(self, offset, whence=os.SEEK_SET):
> >         """
> >         seek(offset[, whence]) -> None.  Move to new file position.
> >
> >         Argument offset is a byte count.  Optional argument whence defaults to
> >         0 (offset from start of file, offset should be >= 0); other values are 1
> >         (move relative to current position, positive or negative), and 2 (move
> >         relative to end of file, usually negative, although many platforms allow
> >         seeking beyond the end of a file).  If the file is opened in text mode,
> >         only offsets returned by tell() are legal.  Use of other offsets causes
> >         undefined behavior.
> >         Note that not all file objects are seekable.
> >         """
> >         self._check_still_open()
> >         if whence == os.SEEK_SET:
> >             self.offset = offset
> >         elif whence == os.SEEK_CUR:
> >             self.offset += offset
> >         elif whence == os.SEEK_END:
> >             raise RuntimeError("SEEK_END unimplemented")
> >
> >     def flush(self):
> >         pass
> >
> >     def isatty(self):
> >         """
> >         isatty() -> true or false.  True if the file is connected to a
> > tty device.
> >         """
> >         self._check_still_open()
> >         return False
> >
> >     def isatty(self):
> >         self._check_still_open()
> >         return False
> >
> >
> > if __name__ == '__main__':
> >     import time
> >     import glob
> >     import os
> >     stdfile = file
> >
> >     # On your stackless apps, use these 2 lines below
> >     from stacklessfile import stacklessfile as file
> >     open = file
> >
> >     # Function to copy a file
> >     def copyfile(who, infile, out):
> >         st = time.time()
> >         f1 = file(infile, 'rb')
> >         f2 = file(out, 'wb')
> >         print "%s started reading %s ..." % (who, infile)
> >         a = f1.read()
> >         print "%s started writing %s -> %s ..." % (who, infile, out)
> >         f2.write(a)
> >         f1.close()
> >         f2.close()
> >         print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)
> >
> >     # Creating two dummy files
> >     newfile = stdfile('test-small.txt','w')
> >     for x in xrange(10000):
> >         newfile.write(str(x))
> >     newfile.close()
> >
> >     newfile2 = stdfile('test-big.txt','w')
> >     for x in xrange(500000):
> >         newfile2.write(str(x))
> >     newfile2.close()
> >
> >     # Launching tasklets to perform the file copy
> >     for i in xrange(1,11):
> >         stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)
> >
> >     for i in xrange(1,21):
> >         stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)
> >
> >     st = time.time()
> >     stackless.run()
> >     print "Total time is %s seconds." % (time.time() - st)
> >
> >     # Cleanup all test files used
> >     for f in glob.glob('test*.txt'):
> >         os.unlink(f)
> >     for f in glob.glob('sm*.txt'):
> >         os.unlink(f)
> >     for f in glob.glob('big*.txt'):
> >         os.unlink(f)
> >
> > -------------------------------------------------------------------------------------------
> >
> >
> > --
> > -------------------------------------------------------------------
> > Visit Stackless Examples Project
> > http://code.google.com/p/stacklessexamples/
> > Stackless Python - www.stackless.com
> > -------------------------------------------------------------------
> >
> > _______________________________________________
> > Stackless mailing list
> > Stackless at stackless.com
> > http://stackless.com/cgi-bin/mailman/listinfo/stackless
> >
>


-- 
-------------------------------------------------------------------
Visit Stackless Examples Project
http://code.google.com/p/stacklessexamples/
Stackless Python - www.stackless.com
-------------------------------------------------------------------

_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://stackless.com/cgi-bin/mailman/listinfo/stackless



More information about the Stackless mailing list