[Stackless] Stackless asynchronous file module - IOCP
Carlos Eduardo de Paula
carlosedp at gmail.com
Sat Aug 18 03:48:17 CEST 2007
Thats exactly what I want to do. This is the first step.
I plan on adding socket support on IOCP and sleep calls.
Then i`m gonna add posix support via select and poll (I can test at
home on Mac). This is a place where I will need help.... maybe I can
use some stuff from Ed`s work.. :)
The objective is to have a simple toolbox to allow async IO and helper
methods like sleep, semaphores, etc... like a uthread on steroids :D
One question, is the performance too affected using ctypes... I mean,
would one gain a lot of speed having a pure C module? (i liked
ctypes... never had used it before)
Carlos
On 8/17/07, Arnar Birgisson <arnarbi at gmail.com> wrote:
> Beautiful!
>
> Should there be a common interface/naming that modules such as this
> should adhere to, to make it easy for people to do something like
>
> try:
> from iocp import stacklessfile
> except ImportError:
> from epollstuff import stacklessfile
>
> etc. ?
>
> I'd love to see such an "async io" module with different
> implementations included with stackless and one central module that
> when imported would select the most appropriate implementation for the
> platform. Preferable also with sockets and timers.
>
> Arnar
>
> On 8/17/07, Carlos Eduardo de Paula <carlosedp at gmail.com> wrote:
> > Fiddling around I found that on windows select() doesn't support file
> > descriptors so the other option was OverlappedIO also known as IOCP.
> >
> > This made me learn a thousand things I thought would never know...
> > Ctypes, Windows internal APIs (kernel32 stuff), and even python
> > internals to see how its fileobject works.
> >
> > After this all here it is, the module that mimics the internal python
> > file() object. Please test it and report any successes or failures. I
> > also made a script to test its usage.
> >
> > Hope it's useful.
> > (full announcement in my blog: http://themindcaster.blogspot.com/)
> >
> > The code is here too: http://dpaste.com/hold/17151/
> > and will be posted on Stackless Examples Project SVN
> >
> > -------------------------------------------------------------------------------------------
> > #
> > # Stackless Asynchronous file module:
> > #
> > # Author: Carlos E. de Paula <carlosedp at gmail.com>
> > #
> > # This code was written to serve as an example of Stackless Python usage.
> > # Feel free to email me with any questions, comments, or suggestions for
> > # improvement.
> > #
> > # This is an asynchronous file class in order to have a file module replacement
> > # that uses channels and a windows async API to allow its methods to
> > # block just the calling tasklet not the entire interpreter.
> > #
> > #
> > import os
> > import time
> > import stackless
> > from ctypes import *
> > from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR,
> > LPCWSTR, WinError
> >
> > # Verify module compatibility
> > if os.name != 'nt':
> > raise ImportError('This module has been implemented for windows systems.')
> >
> > # Windows structures
> >
> > class _US(Structure):
> > _fields_ = [
> > ("Offset", DWORD),
> > ("OffsetHigh", DWORD),
> > ]
> >
> > class _U(Union):
> > _fields_ = [
> > ("s", _US),
> > ("Pointer", c_void_p),
> > ]
> >
> > _anonymous_ = ("s",)
> >
> > class OVERLAPPED(Structure):
> > _fields_ = [
> > ("Internal", POINTER(ULONG)),
> > ("InternalHigh", POINTER(ULONG)),
> > ("u", _U),
> > ("hEvent", HANDLE),
> >
> > # Custom fields.
> > ("taskletID", ULONG),
> > ]
> >
> > _anonymous_ = ("u",)
> >
> > # Windows kernel32 API
> >
> > CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
> > CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
> > CreateIoCompletionPort.restype = HANDLE
> >
> > GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
> > GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
> > POINTER(POINTER(OVERLAPPED)), DWORD)
> > GetQueuedCompletionStatus.restype = BOOL
> >
> > ReadFile = windll.kernel32.ReadFile
> > ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > POINTER(OVERLAPPED))
> > ReadFile.restype = BOOL
> >
> > WriteFile = windll.kernel32.WriteFile
> > WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD),
> > POINTER(OVERLAPPED))
> > WriteFile.restype = BOOL
> >
> > CreateFileA = windll.kernel32.CreateFileA
> > CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > CreateFileA.restype = HANDLE
> >
> > CreateFileW = windll.kernel32.CreateFileW
> > CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
> > CreateFileW.restype = HANDLE
> >
> > CloseHandle = windll.kernel32.CloseHandle
> > CloseHandle.argtypes = (HANDLE,)
> > CloseHandle.restype = BOOL
> >
> > GetLastError = windll.kernel32.GetLastError
> >
> > # Python API
> >
> > pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
> > pythonapi.PyErr_SetFromErrno.restype = py_object
> >
> > # Windows definitions
> >
> > INVALID_HANDLE_VALUE = 0xFFFFFFFF
> > NULL = c_ulong()
> >
> > WAIT_TIMEOUT = 0x102
> > ERROR_IO_PENDING = 997
> > FILE_FLAG_RANDOM_ACCESS = 0x10000000
> > FILE_FLAG_OVERLAPPED = 0x40000000
> >
> > GENERIC_READ = 0x80000000
> > GENERIC_WRITE = 0x40000000
> > FILE_APPEND_DATA = 0x00000004
> >
> > FILE_SHARE_READ = 0x00000001
> > FILE_SHARE_WRITE = 0x00000002
> >
> > OPEN_EXISTING = 3
> > OPEN_ALWAYS = 4
> > CREATE_ALWAYS = 2
> >
> > # ----------------------------------------------------------------------------
> >
> > class resultsManager(object):
> > """
> > Manages the results sent by the CreateIoCompletionPort call.
> > The resultsManager dequeues the IO requests and keeps a list of
> > pending handles.
> > The taskletID is stored in OVERLAPPED structure so it can be
> > recalled and the
> > signalling data sent via its channel unblocking the original tasklet.
> > The handle is then removed from the dict.
> > """
> > def __init__(self, numThreads=NULL):
> > self.running = True
> > self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
> > NULL, numThreads)
> > if self.handle == 0:
> > raise WinError()
> >
> > self.numThreads = numThreads
> > stackless.tasklet(self.poll)()
> > self.overlappedByID = {}
> >
> > def __del__(self):
> > if self.handle is None:
> > return
> > self.overlappedByID.clear()
> > CloseHandle(self.handle)
> >
> > def poll(self, timeout=1):
> > while self.running and self.overlappedByID:
> > numBytes = DWORD()
> > completionKey = c_ulong()
> > ovp = POINTER(OVERLAPPED)()
> >
> > ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
> > byref(completionKey), byref(ovp),
> > timeout)
> >
> > if not ovp and ret == 0:
> > if GetLastError() == WAIT_TIMEOUT:
> > stackless.schedule()
> > continue
> >
> > if ovp.contents.taskletID in self.overlappedByID:
> > #print ovp.contents.taskletID, " tasklet ID IN pool"
> > c = self.overlappedByID[ovp.contents.taskletID]
> > else:
> > #print ovp.contents.taskletID, " tasklet ID NOT in pool"
> > continue
> >
> > #print "sending data back to channel in ID", ovp.contents.taskletID
> > c.send(numBytes)
> > #print "sent data to channel in ID",
> > ovp.contents.taskletID, numBytes
> > self.UnregisterChannelObject(ovp.contents.taskletID)
> >
> > self.running = False
> >
> > def RegisterChannelObject(self, ob, c):
> > self.overlappedByID[ob] = c
> >
> > def UnregisterChannelObject(self, ob):
> > if self.overlappedByID.has_key(ob):
> > del self.overlappedByID[ob]
> >
> >
> > mng = resultsManager()
> >
> > class stacklessfile(object):
> > """
> > stacklessfile(name[, mode[, buffering]]) -> stackless file object
> >
> > This class creates a new file module permitting nonblocking IO calls
> > for tasklets using windows IOCP functionality.
> > When a read or write operation is called, only the calling tasklet is
> > blocked. In standard file module, the whole interpreter gets blocked
> > until the operation is concluded.
> >
> > Open a file. The mode can be 'r', 'w' or 'a' for reading (default),
> > writing or appending. The file will be created if it doesn't exist
> > when opened for writing or appending; it will be truncated when
> > opened for writing. Add a 'b' to the mode for binary files.
> > Add a '+' to the mode to allow simultaneous reading and writing.
> > If the buffering argument is given, 0 means unbuffered, 1 means line
> > buffered, and larger numbers specify the buffer size.
> > Add a 'U' to mode to open the file for input with universal newline
> > support. Any line ending in the input file will be seen as a \'\\n\'
> > in Python. Also, a file so opened gains the attribute 'newlines';
> > the value for this attribute is one of None (no newline read yet),
> > \'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline
> > types seen.
> > """
> > closed = True
> > def __init__(self, name, mode='r', buffering=-1):
> > """
> > Initializes the file object and creates an internal file object.
> > """
> > if not self.closed:
> > self.close()
> > self.name = name
> > self.mode = mode
> > self.offset = 0
> > self.iocpLinked = False
> > self._check_manager()
> > self.open_handle()
> > self.closed = False
> >
> > def __repr__(self):
> > return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open",
> > "closed" ][self.closed], self.name, self.mode, id(self))
> >
> > def __del__(self):
> > self.close()
> >
> > def _check_manager(self):
> > if not mng.running:
> > stackless.tasklet(mng.poll)()
> > mng.running = True
> > #print "ERROR - Manager not running"
> >
> > def _check_still_open(self):
> > if self.closed:
> > raise ValueError("I/O operation on closed file")
> >
> > def _ensure_iocp_association(self):
> > if not self.iocpLinked:
> > CreateIoCompletionPort(self.handle, mng.handle, NULL,
> > mng.numThreads)
> > self.iocpLinked = True
> >
> > def close(self):
> > """
> > close() -> None or (perhaps) an integer. Close the file.
> >
> > Sets data attribute .closed to True. A closed file canno
> > further I/O operations. close() may be called more than
> > error. Some kinds of file objects (for example, opened b
> > may return an exit status upon closing.
> > """
> > if not self.closed:
> > CloseHandle(self.handle)
> > del self.handle
> > self.closed = True
> >
> > def open_handle(self):
> > self.binary = 'b' in self.mode
> > access = GENERIC_READ
> > if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
> > access |= GENERIC_WRITE
> > if 'a' in self.mode:
> > access |= FILE_APPEND_DATA
> >
> > share = FILE_SHARE_READ | FILE_SHARE_WRITE
> >
> > if 'w' in self.mode:
> > disposition = CREATE_ALWAYS
> > elif 'r' in self.mode and '+' in self.mode:
> > disposition = OPEN_ALWAYS
> > else:
> > disposition = OPEN_EXISTING
> >
> > flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED
> >
> > if isinstance(self.name, unicode):
> > func = CreateFileW
> > else:
> > func = CreateFileA
> >
> > self.handle = func(self.name, access,
> > share, c_void_p(), disposition,
> > flags, NULL )
> >
> > if self.handle == INVALID_HANDLE_VALUE:
> > raise WinError()
> >
> > self.iocpLinked = False
> >
> > def read(self, size=None):
> > """
> > read([size]) -> read at most size bytes, returned as a string.
> > """
> > self._check_still_open()
> > maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
> > if (size is None) or (maxBytesToRead < size):
> > size = maxBytesToRead
> >
> > bytesRead = DWORD()
> > self.o = OVERLAPPED()
> > self.o.Offset = self.offset
> > self.o.taskletID = id(self)
> > self.buffer = create_string_buffer(size)
> > self.channel = stackless.channel()
> > self._ensure_iocp_association()
> > self._check_manager()
> >
> > #print self.o.taskletID, "ID on read", self.name
> > #print "firing ReadFile", self.name
> >
> > r = ReadFile(self.handle, self.buffer,
> > size, byref(bytesRead), byref(self.o));
> > #print "fired ReadFile", self.name
> >
> > if r == 0:
> > if GetLastError() != ERROR_IO_PENDING:
> >
> > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> > 0,
> > c_char_p(self.name))
> > mng.RegisterChannelObject(self.o.taskletID, self.channel)
> > #print "blocked on channel",self.channel, self.name,
> > self.o.taskletID
> > self.channel.receive()
> > #print "returned from channel",self.channel, self.name,
> > self.o.taskletID
> >
> > self.offset += size
> > return self.buffer[:size]
> >
> > def write(self, data):
> > """
> > write(str) -> None. Write string str to file.
> > """
> > self._check_still_open()
> > bytesToWrite = c_int()
> > writeBufferPtr = c_char_p()
> > bytesWritten = DWORD()
> > self.o = OVERLAPPED()
> > self.o.Offset = self.offset
> > self.o.taskletID = id(self)
> > self.channel = stackless.channel()
> > self._ensure_iocp_association()
> > self._check_manager()
> > #print self.o.taskletID, "ID on write", self.name
> >
> > fmt = self.binary and "s#" or "t#"
> > ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
> > byref(writeBufferPtr),
> > byref(bytesToWrite))
> > if ret == 0:
> > raise WinError()
> >
> > #print "firing WriteFile", self.name
> > r = WriteFile(self.handle, writeBufferPtr,
> > bytesToWrite.value, byref(bytesWritten), byref(self.o))
> > #print "fired WriteFile", self.name
> >
> > if r == 0:
> > if GetLastError() != ERROR_IO_PENDING:
> >
> > pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
> > 0,
> > c_char_p(self.name))
> >
> > mng.RegisterChannelObject(self.o.taskletID, self.channel)
> > #print "blocked on channel",self.channel, self.name,
> > self.o.taskletID
> > written = self.channel.receive()
> > #print "returned from channel",self.channel, self.name,
> > self.o.taskletID
> > else:
> > written = bytesWritten
> > #print "Checking contents...", bytesToWrite.value,
> > written.value, self.name
> >
> > if bytesToWrite.value != written.value:
> > # Check if the quantity of bytes sent has been written to the file
> > #print self.o.taskletID, "size mismatch"
> > raise WinError()
> >
> > def tell(self):
> > """
> > tell() -> current file position, an integer (may be a long integer).
> > """
> > return self.offset
> >
> > def seek(self, offset, whence=os.SEEK_SET):
> > """
> > seek(offset[, whence]) -> None. Move to new file position.
> >
> > Argument offset is a byte count. Optional argument whence defaults to
> > 0 (offset from start of file, offset should be >= 0); other values are 1
> > (move relative to current position, positive or negative), and 2 (move
> > relative to end of file, usually negative, although many platforms allow
> > seeking beyond the end of a file). If the file is opened in text mode,
> > only offsets returned by tell() are legal. Use of other offsets causes
> > undefined behavior.
> > Note that not all file objects are seekable.
> > """
> > self._check_still_open()
> > if whence == os.SEEK_SET:
> > self.offset = offset
> > elif whence == os.SEEK_CUR:
> > self.offset += offset
> > elif whence == os.SEEK_END:
> > raise RuntimeError("SEEK_END unimplemented")
> >
> > def flush(self):
> > pass
> >
> > def isatty(self):
> > """
> > isatty() -> true or false. True if the file is connected to a
> > tty device.
> > """
> > self._check_still_open()
> > return False
> >
> > def isatty(self):
> > self._check_still_open()
> > return False
> >
> >
> > if __name__ == '__main__':
> > import time
> > import glob
> > import os
> > stdfile = file
> >
> > # On your stackless apps, use these 2 lines below
> > from stacklessfile import stacklessfile as file
> > open = file
> >
> > # Function to copy a file
> > def copyfile(who, infile, out):
> > st = time.time()
> > f1 = file(infile, 'rb')
> > f2 = file(out, 'wb')
> > print "%s started reading %s ..." % (who, infile)
> > a = f1.read()
> > print "%s started writing %s -> %s ..." % (who, infile, out)
> > f2.write(a)
> > f1.close()
> > f2.close()
> > print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)
> >
> > # Creating two dummy files
> > newfile = stdfile('test-small.txt','w')
> > for x in xrange(10000):
> > newfile.write(str(x))
> > newfile.close()
> >
> > newfile2 = stdfile('test-big.txt','w')
> > for x in xrange(500000):
> > newfile2.write(str(x))
> > newfile2.close()
> >
> > # Launching tasklets to perform the file copy
> > for i in xrange(1,11):
> > stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)
> >
> > for i in xrange(1,21):
> > stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)
> >
> > st = time.time()
> > stackless.run()
> > print "Total time is %s seconds." % (time.time() - st)
> >
> > # Cleanup all test files used
> > for f in glob.glob('test*.txt'):
> > os.unlink(f)
> > for f in glob.glob('sm*.txt'):
> > os.unlink(f)
> > for f in glob.glob('big*.txt'):
> > os.unlink(f)
> >
> > -------------------------------------------------------------------------------------------
> >
> >
> > --
> > -------------------------------------------------------------------
> > Visit Stackless Examples Project
> > http://code.google.com/p/stacklessexamples/
> > Stackless Python - www.stackless.com
> > -------------------------------------------------------------------
> >
> > _______________________________________________
> > Stackless mailing list
> > Stackless at stackless.com
> > http://stackless.com/cgi-bin/mailman/listinfo/stackless
> >
>
--
-------------------------------------------------------------------
Visit Stackless Examples Project
http://code.google.com/p/stacklessexamples/
Stackless Python - www.stackless.com
-------------------------------------------------------------------
_______________________________________________
Stackless mailing list
Stackless at stackless.com
http://stackless.com/cgi-bin/mailman/listinfo/stackless
More information about the Stackless
mailing list