[Stackless] Stackless asynchronous file module - IOCP
Carlos Eduardo de Paula
carlosedp at gmail.com
Tue Aug 21 15:58:29 CEST 2007
I have committed the module to the SVN and added to to wiki for reference.
The wiki is: http://code.google.com/p/stacklessexamples/wiki/StacklessNonblockModules
And the direct link to the module is:
http://stacklessexamples.googlecode.com/svn/trunk/examples/fileIO/stacklessfileIOCP.py
Carlos
On 8/17/07, Carlos Eduardo de Paula <carlosedp at gmail.com> wrote:
> Thats exactly what I want to do. This is the first step.
>
> I plan on adding socket support on IOCP and sleep calls.
>
> Then i`m gonna add posix support via select and poll (I can test at
> home on Mac). This is a place where I will need help.... maybe I can
> use some stuff from Ed`s work.. :)
>
> The objective is to have a simple toolbox to allow async IO and helper
> methods like sleep, semaphores, etc... like a uthread on steroids :D
>
> One question, is the performance too affected using ctypes... I mean,
> would one gain a lot of speed having a pure C module? (i liked
> ctypes... never had used it before)
>
> Carlos
>
> On 8/17/07, Arnar Birgisson <arnarbi at gmail.com> wrote:
> > Beautiful!
> >
> > Should there be a common interface/naming that modules such as this
> > should adhere to, to make it easy for people to do something like
> >
> > try:
> > from iocp import stacklessfile
> > except ImportError:
> > from epollstuff import stacklessfile
> >
> > etc. ?
> >
> > I'd love to see such an "async io" module with different
> > implementations included with stackless and one central module that
> > when imported would select the most appropriate implementation for the
> > platform. Preferable also with sockets and timers.
> >
> > Arnar
> >
> > On 8/17/07, Carlos Eduardo de Paula <carlosedp at gmail.com> wrote:
> > > Fiddling around I found that on windows select() doesn't support file
> > > descriptors so the other option was OverlappedIO also known as IOCP.
> > >
> > > This made me learn a thousand things I thought would never know...
> > > Ctypes, Windows internal APIs (kernel32 stuff), and even python
> > > internals to see how its fileobject works.
> > >
> > > After this all here it is, the module that mimics the internal python
> > > file() object. Please test it and report any successes or failures. I
> > > also made a script to test its usage.
> > >
> > > Hope it's useful.
> > > (full announcement in my blog: http://themindcaster.blogspot.com/)
> > >
> > > The code is here too: http://dpaste.com/hold/17151/
> > > and will be posted on Stackless Examples Project SVN
> > >
> > > -------------------------------------------------------------------------------------------
> > > #
> > > # Stackless Asynchronous file module:
> > > #
> > > # Author: Carlos E. de Paula <carlosedp at gmail.com>
> > > #
> > > # This code was written to serve as an example of Stackless Python usage.
> > > # Feel free to email me with any questions, comments, or suggestions for
> > > # improvement.
> > > #
> > > # This is an asynchronous file class in order to have a file module replacement
> > > # that uses channels and a windows async API to allow its methods to
> > > # block just the calling tasklet not the entire interpreter.
> > > #
> > > #
> > > import os
> > > import time
> > > import stackless
> > > from ctypes import *
> > > from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR,
> > > LPCWSTR, WinError
> > >
> > > # Verify module compatibility
> > > if os.name != 'nt':
> > > raise ImportError('This module has been implemented for windows systems.')
> > >
> > > # Windows structures
> > >
> > > class _US(Structure):
> > > _fields_ = [
> > > ("Offset", DWORD),
> > > ("OffsetHigh", DWORD),
> > > ]
> > >
> > > class _U(Union):
> > > _fields_ = [
> > > ("s", _US),
> > > ("Pointer", c_void_p),
> > > ]
> > >
> > > _anonymous_ = ("s",)
> > >
> > > class OVERLAPPED(Structure):
> > > _fields_ = [
> > > ("Internal", POINTER(ULONG)),
> > > ("InternalHigh", POINTER(ULONG)),
> > > ("u", _U),
> > > ("hEvent", HANDLE),
> > >
> > > # Custom fields.
> > > ("taskletID", ULONG),
> > > ]
> > >
> > > _anonymous_ = ("u",)
> > >
> > > # Windows kernel32 API
> > >
> > > CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
> > > CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
> > > CreateIoCompletionPort.restype = HANDLE
> > >
> > > GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
> > > GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
> > > POINTER(POINTER(OVERLAPPED)), DWORD)
> > > GetQueuedCompletionStatus.restype = BOOL
> > >
> > > ReadFile = windll.kernel32.ReadFile
> > > ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > > POINTER(OVERLAPPED))
> > > ReadFile.restype = BOOL
> > >
> > > WriteFile = windll.kernel32.WriteFile
> > > WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > > POINTER(OVERLAPPED))
> > > WriteFile.restype = BOOL
> > >
> > > CreateFileA = windll.kernel32.CreateFileA
> > > CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > > CreateFileA.restype = HANDLE
> > >
> > > CreateFileW = windll.kernel32.CreateFileW
> > > CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > > CreateFileW.restype = HANDLE
> > >
> > > CloseHandle = windll.kernel32.CloseHandle
> > > CloseHandle.argtypes = (HANDLE,)
> > > CloseHandle.restype = BOOL
> > >
> > > GetLastError = windll.kernel32.GetLastError
> > >
> > > # Python API
> > >
> > > pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
> > > pythonapi.PyErr_SetFromErrno.restype = py_object
> > >
> > > # Windows definitions
> > >
> > > INVALID_HANDLE_VALUE = 0xFFFFFFFF
> > > NULL = c_ulong()
> > >
> > > WAIT_TIMEOUT = 0x102
> > > ERROR_IO_PENDING = 997
> > > FILE_FLAG_RANDOM_ACCESS = 0x10000000
> > > FILE_FLAG_OVERLAPPED = 0x40000000
> > >
> > > GENERIC_READ = 0x80000000
> > > GENERIC_WRITE = 0x40000000
> > > FILE_APPEND_DATA = 0x00000004
> > >
> > > FILE_SHARE_READ = 0x00000001
> > > FILE_SHARE_WRITE = 0x00000002
> > >
> > > OPEN_EXISTING = 3
> > > OPEN_ALWAYS = 4
> > > CREATE_ALWAYS = 2
> > >
> > > # ----------------------------------------------------------------------------
> > >
> > > class resultsManager(object):
> > > """
> > > Manages the results sent by the CreateIoCompletionPort call.
> > > The resultsManager dequeues the IO requests and keeps a list of
> > > pending handles.
> > > The taskletID is stored in OVERLAPPED structure so it can be
> > > recalled and the
> > > signalling data sent via its channel unblocking the original tasklet.
> > > The handle is then removed from the dict.
> > > """
> > > def __init__(self, numThreads=NULL):
> > > self.running = True
> > > self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
> > > NULL, numThreads)
> > > if self.handle == 0:
> > > raise WinError()
> > >
> > > self.numThreads = numThreads
> > > stackless.tasklet(self.poll)()
> > > self.overlappedByID = {}
> > >
> > > def __del__(self):
> > > if self.handle is None:
> > > return
> > > self.overlappedByID.clear()
> > > CloseHandle(self.handle)
> > >
> > > def poll(self, timeout=1):
> > > while self.running and self.overlappedByID:
> > > numBytes = DWORD()
> > > completionKey = c_ulong()
> > > ovp = POINTER(OVERLAPPED)()
> > >
> > > ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
> > > byref(completionKey), byref(ovp),
> > > timeout)
> > >
> > > if not ovp and ret == 0:
> > > if GetLastError() == WAIT_TIMEOUT:
> > > stackless.schedule()
> > > continue
> > >
> > > if ovp.contents.taskletID in self.overlappedByID:
> > > #print ovp.contents.taskletID, " tasklet ID IN pool"
> > > c = self.overlappedByID[ovp.contents.taskletID]
> > > else:
> > > #print ovp.contents.taskletID, " tasklet ID NOT in pool"
> > > continue
> > >
> > > #print "sending data back to channel in ID", ovp.contents.taskletID
> > > c.send(numBytes)
> > > #print "sent data to channel in ID",
> > > ovp.contents.taskletID, numBytes
> > > self.UnregisterChannelObject(ovp.contents.taskletID)
> > >
> > > self.running = False
> > >
> > > def RegisterChannelObject(self, ob, c):
> > > self.overlappedByID[ob] = c
> > >
> > > def UnregisterChannelObject(self, ob):
> > > if self.overlappedByID.has_key(ob):
> > > del self.overlappedByID[ob]
> > >
> > >
> > > mng = resultsManager()
> > >
> > > class stacklessfile(object):
> > > """
> > > stacklessfile(name[, mode[, buffering]]) -> stackless file object
> > >
> > > This class creates a new file module permitting nonblocking IO calls
> > > for tasklets using windows IOCP functionality.
> > > When a read or write operation is called, only the calling tasklet is
> > > blocked. In standard file module, the whole interpreter gets blocked
> > > until the operation is concluded.
> > >
> > > Open a file. The mode can be 'r', 'w' or 'a' for reading (default),
> > > writing or appending. The file will be created if it doesn't exist
> > > when opened for writing or appending; it will be truncated when
> > > opened for writing. Add a 'b' to the mode for binary files.
> > > Add a '+' to the mode to allow simultaneous reading and writing.
> > > If the buffering argument is given, 0 means unbuffered, 1 means line
> > > buffered, and larger numbers specify the buffer size.
> > > Add a 'U' to mode to open the file for input with universal newline
> > > support. Any line ending in the input file will be seen as a \'\\n\'
> > > in Python. Also, a file so opened gains the attribute 'newlines';
> > > the value for this attribute is one of None (no newline read yet),
> > > \'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline
> > > types seen.
> > > """
> > > closed = True
> > > def __init__(self, name, mode='r', buffering=-1):
> > > """
> > > Initializes the file object and creates an internal file object.
> > > """
> > > if not self.closed:
> > > self.close()
> > > self.name = name
> > > self.mode = mode
> > > self.offset = 0
> > > self.iocpLinked = False
> > > self._check_manager()
> > > self.open_handle()
> > > self.closed = False
> > >
> > > def __repr__(self):
> > > return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open",
> > > "closed" ][self.closed], self.name, self.mode, id(self))
> > >
> > > def __del__(self):
> > > self.close()
> > >
> > > def _check_manager(self):
> > > if not mng.running:
> > > stackless.tasklet(mng.poll)()
> > > mng.running = True
> > > #print "ERROR - Manager not running"
> > >
> > > def _check_still_open(self):
> > > if self.closed:
> > > raise ValueError("I/O operation on closed file")
> > >
> > > def _ensure_iocp_association(self):
> > > if not self.iocpLinked:
> > > CreateIoCompletionPort(self.handle, mng.handle, NULL,
> > > mng.numThreads)
> > > self.iocpLinked = True
> > >
> > > def close(self):
> > > """
> > > close() -> None or (perhaps) an integer. Close the file.
> > >
> > > Sets data attribute .closed to True. A closed file canno
> > > further I/O operations. close() may be called more than
> > > error. Some kinds of file objects (for example, opened b
> > > may return an exit status upon closing.
> > > """
> > > if not self.closed:
> > > CloseHandle(self.handle)
> > > del self.handle
> > > self.closed = True
> > >
> > > def open_handle(self):
> > > self.binary = 'b' in self.mode
> > > access = GENERIC_READ
> > > if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
> > > access |= GENERIC_WRITE
> > > if 'a' in self.mode:
> > > access |= FILE_APPEND_DATA
> > >
> > > share = FILE_SHARE_READ | FILE_SHARE_WRITE
> > >
> > > if 'w' in self.mode:
> > > disposition = CREATE_ALWAYS
> > > elif 'r' in self.mode and '+' in self.mode:
> > > disposition = OPEN_ALWAYS
> > > else:
> > > disposition = OPEN_EXISTING
> > >
> > > flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED
> > >
> > > if isinstance(self.name, unicode):
> > > func = CreateFileW
> > > else:
> > > func = CreateFileA
> > >
> > > self.handle = func(self.name, access,
> > > share, c_void_p(), disposition,
> > > flags, NULL )
> > >
> > > if self.handle == INVALID_HANDLE_VALUE:
> > > raise WinError()
> > >
> > > self.iocpLinked = False
> > >
> > > def read(self, size=None):
> > > """
> > > read([size]) -> read at most size bytes, returned as a string.
> > > """
> > > self._check_still_open()
> > > maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
> > > if (size is None) or (maxBytesToRead < size):
> > > size = maxBytesToRead
> > >
> > > bytesRead = DWORD()
> > > self.o = OVERLAPPED()
> > > self.o.Offset = self.offset
> > > self.o.taskletID = id(self)
> > > self.buffer = create_string_buffer(size)
> > > self.channel = stackless.channel()
> > > self._ensure_iocp_association()
> > > self._check_manager()
> > >
> > > #print self.o.taskletID, "ID on read", self.name
> > > #print "firing ReadFile", self.name
> > >
> > > r = ReadFile(self.handle, self.buffer,
> > > size, byref(bytesRead), byref(self.o));
> > > #print "fired ReadFile", self.name
> > >
> > > if r == 0:
> > > if GetLastError() != ERROR_IO_PENDING:
> > >
> > > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> > > 0,
> > > c_char_p(self.name))
> > > mng.RegisterChannelObject(self.o.taskletID, self.channel)
> > > #print "blocked on channel",self.channel, self.name,
> > > self.o.taskletID
> > > self.channel.receive()
> > > #print "returned from channel",self.channel, self.name,
> > > self.o.taskletID
> > >
> > > self.offset += size
> > > return self.buffer[:size]
> > >
> > > def write(self, data):
> > > """
> > > write(str) -> None. Write string str to file.
> > > """
> > > self._check_still_open()
> > > bytesToWrite = c_int()
> > > writeBufferPtr = c_char_p()
> > > bytesWritten = DWORD()
> > > self.o = OVERLAPPED()
> > > self.o.Offset = self.offset
> > > self.o.taskletID = id(self)
> > > self.channel = stackless.channel()
> > > self._ensure_iocp_association()
> > > self._check_manager()
> > > #print self.o.taskletID, "ID on write", self.name
> > >
> > > fmt = self.binary and "s#" or "t#"
> > > ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
> > > byref(writeBufferPtr),
> > > byref(bytesToWrite))
> > > if ret == 0:
> > > raise WinError()
> > >
> > > #print "firing WriteFile", self.name
> > > r = WriteFile(self.handle, writeBufferPtr,
> > > bytesToWrite.value, byref(bytesWritten), byref(self.o))
> > > #print "fired WriteFile", self.name
> > >
> > > if r == 0:
> > > if GetLastError() != ERROR_IO_PENDING:
> > >
> > > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> > > 0,
> > > c_char_p(self.name))
> > >
> > > mng.RegisterChannelObject(self.o.taskletID, self.channel)
> > > #print "blocked on channel",self.channel, self.name,
> > > self.o.taskletID
> > > written = self.channel.receive()
> > > #print "returned from channel",self.channel, self.name,
> > > self.o.taskletID
> > > else:
> > > written = bytesWritten
> > > #print "Checking contents...", bytesToWrite.value,
> > > written.value, self.name
> > >
> > > if bytesToWrite.value != written.value:
> > > # Check if the quantity of bytes sent has been written to the file
> > > #print self.o.taskletID, "size mismatch"
> > > raise WinError()
> > >
> > > def tell(self):
> > > """
> > > tell() -> current file position, an integer (may be a long integer).
> > > """
> > > return self.offset
> > >
> > > def seek(self, offset, whence=os.SEEK_SET):
> > > """
> > > seek(offset[, whence]) -> None. Move to new file position.
> > >
> > > Argument offset is a byte count. Optional argument whence defaults to
> > > 0 (offset from start of file, offset should be >= 0); other values are 1
> > > (move relative to current position, positive or negative), and 2 (move
> > > relative to end of file, usually negative, although many platforms allow
> > > seeking beyond the end of a file). If the file is opened in text mode,
> > > only offsets returned by tell() are legal. Use of other offsets causes
> > > undefined behavior.
> > > Note that not all file objects are seekable.
> > > """
> > > self._check_still_open()
> > > if whence == os.SEEK_SET:
> > > self.offset = offset
> > > elif whence == os.SEEK_CUR:
> > > self.offset += offset
> > > elif whence == os.SEEK_END:
> > > raise RuntimeError("SEEK_END unimplemented")
> > >
> > > def flush(self):
> > > pass
> > >
> > > def isatty(self):
> > > """
> > > isatty() -> true or false. True if the file is connected to a
> > > tty device.
> > > """
> > > self._check_still_open()
> > > return False
> > >
> > > def isatty(self):
> > > self._check_still_open()
> > > return False
> > >
> > >
> > > if __name__ == '__main__':
> > > import time
> > > import glob
> > > import os
> > > stdfile = file
> > >
> > > # On your stackless apps, use these 2 lines below
> > > from stacklessfile import stacklessfile as file
> > > open = file
> > >
> > > # Function to copy a file
> > > def copyfile(who, infile, out):
> > > st = time.time()
> > > f1 = file(infile, 'rb')
> > > f2 = file(out, 'wb')
> > > print "%s started reading %s ..." % (who, infile)
> > > a = f1.read()
> > > print "%s started writing %s -> %s ..." % (who, infile, out)
> > > f2.write(a)
> > > f1.close()
> > > f2.close()
> > > print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)
> > >
> > > # Creating two dummy files
> > > newfile = stdfile('test-small.txt','w')
> > > for x in xrange(10000):
> > > newfile.write(str(x))
> > > newfile.close()
> > >
> > > newfile2 = stdfile('test-big.txt','w')
> > > for x in xrange(500000):
> > > newfile2.write(str(x))
> > > newfile2.close()
> > >
> > > # Launching tasklets to perform the file copy
> > > for i in xrange(1,11):
> > > stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)
> > >
> > > for i in xrange(1,21):
> > > stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)
> > >
> > > st = time.time()
> > > stackless.run()
> > > print "Total time is %s seconds." % (time.time() - st)
> > >
> > > # Cleanup all test files used
> > > for f in glob.glob('test*.txt'):
> > > os.unlink(f)
> > > for f in glob.glob('sm*.txt'):
> > > os.unlink(f)
> > > for f in glob.glob('big*.txt'):
> > > os.unlink(f)
> > >
> > > -------------------------------------------------------------------------------------------
> > >
> > >
> > > --
> > > -------------------------------------------------------------------
> > > Visit Stackless Examples Project
> > > http://code.google.com/p/stacklessexamples/
> > > Stackless Python - www.stackless.com
> > > -------------------------------------------------------------------
> > >
> > > _______________________________________________
> > > Stackless mailing list
> > > Stackless at stackless.com
> > > http://stackless.com/cgi-bin/mailman/listinfo/stackless
> > >
> >
>
>
> --
> -------------------------------------------------------------------
> Visit Stackless Examples Project
> http://code.google.com/p/stacklessexamples/
> Stackless Python - www.stackless.com
> -------------------------------------------------------------------
>
--
-------------------------------------------------------------------
Visit Stackless Examples Project
http://code.google.com/p/stacklessexamples/
Stackless Python - www.stackless.com
-------------------------------------------------------------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://stackless.com/cgi-bin/mailman/listinfo/stackless
More information about the Stackless
mailing list