[Stackless-checkins] CVS: slpdev/src/2.2/src/Lib/bsddb/test __init__.py, NONE, 1.1 test_all.py, NONE, 1.1 test_associate.py, NONE, 1.1 test_basics.py, NONE, 1.1 test_compat.py, NONE, 1.1 test_dbobj.py, NONE, 1.1 test_dbshelve.py, NONE, 1.1 test_dbtables.py, NONE, 1.1 test_env_close.py, NONE, 1.1 test_get_none.py, NONE, 1.1 test_join.py, NONE, 1.1 test_lock.py, NONE, 1.1 test_misc.py, NONE, 1.1 test_queue.py, NONE, 1.1 test_recno.py, NONE, 1.1 test_thread.py, NONE, 1.1

Christian Tismer tismer at centera.de
Sat May 1 02:54:08 CEST 2004


Update of /home/cvs/slpdev/src/2.2/src/Lib/bsddb/test
In directory centera.de:/home/tismer/slpdev/src/2.2/src/Lib/bsddb/test

Added Files:
	__init__.py test_all.py test_associate.py test_basics.py 
	test_compat.py test_dbobj.py test_dbshelve.py test_dbtables.py 
	test_env_close.py test_get_none.py test_join.py test_lock.py 
	test_misc.py test_queue.py test_recno.py test_thread.py 
Log Message:
added files

--- NEW FILE: __init__.py ---

--- NEW FILE: test_all.py ---
"""Run all test cases.
"""

import sys
import os
import unittest

verbose = 0
if 'verbose' in sys.argv:
    verbose = 1
    sys.argv.remove('verbose')

if 'silent' in sys.argv:  # take care of old flag, just in case
    verbose = 0
    sys.argv.remove('silent')


def print_versions():
    try:
        # For Pythons w/distutils pybsddb
        from bsddb3 import db
    except ImportError:
        # For Python 2.3
        from bsddb import db
    print
    print '-=' * 38
    print db.DB_VERSION_STRING
    print 'bsddb.db.version():   %s' % (db.version(), )
    print 'bsddb.db.__version__: %s' % db.__version__
    print 'bsddb.db.cvsid:       %s' % db.cvsid
    print 'python version:       %s' % sys.version
    print 'My pid:               %s' % os.getpid()
    print '-=' * 38


class PrintInfoFakeTest(unittest.TestCase):
    def testPrintVersions(self):
        print_versions()


# This little hack is for when this module is run as main and all the
# other modules import it so they will still be able to get the right
# verbose setting.  It's confusing but it works.
import test_all
test_all.verbose = verbose


def suite():
    test_modules = [
        'test_associate',
        'test_basics',
        'test_compat',
        'test_dbobj',
        'test_dbshelve',
        'test_dbtables',
        'test_env_close',
        'test_get_none',
        'test_join',
        'test_lock',
        'test_misc',
        'test_queue',
        'test_recno',
        'test_thread',
        ]

    alltests = unittest.TestSuite()
    for name in test_modules:
        module = __import__(name)
        alltests.addTest(module.test_suite())
    return alltests


def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
    return suite


if __name__ == '__main__':
    print_versions()
    unittest.main(defaultTest='suite')

--- NEW FILE: test_associate.py ---
"""
TestCases for DB.associate.
"""

import sys, os, string
import tempfile
import time
from pprint import pprint

try:
    from threading import Thread, currentThread
    have_threads = 1
except ImportError:
    have_threads = 0

import unittest
from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve
except ImportError:
    # For Python 2.3
    from bsddb import db, dbshelve


#----------------------------------------------------------------------


musicdata = {
1 : ("Bad English", "The Price Of Love", "Rock"),
2 : ("DNA featuring Suzanne Vega", "Tom's Diner", "Rock"),
3 : ("George Michael", "Praying For Time", "Rock"),
4 : ("Gloria Estefan", "Here We Are", "Rock"),
5 : ("Linda Ronstadt", "Don't Know Much", "Rock"),
6 : ("Michael Bolton", "How Am I Supposed To Live Without You", "Blues"),
7 : ("Paul Young", "Oh Girl", "Rock"),
8 : ("Paula Abdul", "Opposites Attract", "Rock"),
9 : ("Richard Marx", "Should've Known Better", "Rock"),
10: ("Rod Stewart", "Forever Young", "Rock"),
11: ("Roxette", "Dangerous", "Rock"),
12: ("Sheena Easton", "The Lover In Me", "Rock"),
13: ("Sinead O'Connor", "Nothing Compares 2 U", "Rock"),
14: ("Stevie B.", "Because I Love You", "Rock"),
15: ("Taylor Dayne", "Love Will Lead You Back", "Rock"),
16: ("The Bangles", "Eternal Flame", "Rock"),
17: ("Wilson Phillips", "Release Me", "Rock"),
18: ("Billy Joel", "Blonde Over Blue", "Rock"),
19: ("Billy Joel", "Famous Last Words", "Rock"),
20: ("Billy Joel", "Lullabye (Goodnight, My Angel)", "Rock"),
21: ("Billy Joel", "The River Of Dreams", "Rock"),
22: ("Billy Joel", "Two Thousand Years", "Rock"),
23: ("Janet Jackson", "Alright", "Rock"),
24: ("Janet Jackson", "Black Cat", "Rock"),
25: ("Janet Jackson", "Come Back To Me", "Rock"),
26: ("Janet Jackson", "Escapade", "Rock"),
27: ("Janet Jackson", "Love Will Never Do (Without You)", "Rock"),
28: ("Janet Jackson", "Miss You Much", "Rock"),
29: ("Janet Jackson", "Rhythm Nation", "Rock"),
30: ("Janet Jackson", "State Of The World", "Rock"),
31: ("Janet Jackson", "The Knowledge", "Rock"),
32: ("Spyro Gyra", "End of Romanticism", "Jazz"),
33: ("Spyro Gyra", "Heliopolis", "Jazz"),
34: ("Spyro Gyra", "Jubilee", "Jazz"),
35: ("Spyro Gyra", "Little Linda", "Jazz"),
36: ("Spyro Gyra", "Morning Dance", "Jazz"),
37: ("Spyro Gyra", "Song for Lorraine", "Jazz"),
38: ("Yes", "Owner Of A Lonely Heart", "Rock"),
39: ("Yes", "Rhythm Of Love", "Rock"),
40: ("Cusco", "Dream Catcher", "New Age"),
41: ("Cusco", "Geronimos Laughter", "New Age"),
42: ("Cusco", "Ghost Dance", "New Age"),
43: ("Blue Man Group", "Drumbone", "New Age"),
44: ("Blue Man Group", "Endless Column", "New Age"),
45: ("Blue Man Group", "Klein Mandelbrot", "New Age"),
46: ("Kenny G", "Silhouette", "Jazz"),
47: ("Sade", "Smooth Operator", "Jazz"),
48: ("David Arkenstone", "Papillon (On The Wings Of The Butterfly)",
     "New Age"),
49: ("David Arkenstone", "Stepping Stars", "New Age"),
50: ("David Arkenstone", "Carnation Lily Lily Rose", "New Age"),
51: ("David Lanz", "Behind The Waterfall", "New Age"),
52: ("David Lanz", "Cristofori's Dream", "New Age"),
53: ("David Lanz", "Heartsounds", "New Age"),
54: ("David Lanz", "Leaves on the Seine", "New Age"),
}

#----------------------------------------------------------------------


class AssociateTestCase(unittest.TestCase):
    keytype = ''

    def setUp(self):
        self.filename = self.__class__.__name__ + '.db'
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass
        self.env = db.DBEnv()
        self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
                               db.DB_INIT_LOCK | db.DB_THREAD)

    def tearDown(self):
        self.closeDB()
        self.env.close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def addDataToDB(self, d):
        for key, value in musicdata.items():
            if type(self.keytype) == type(''):
                key = "%02d" % key
            d.put(key, string.join(value, '|'))

    def createDB(self):
        self.primary = db.DB(self.env)
        self.primary.open(self.filename, "primary", self.dbtype,
                          db.DB_CREATE | db.DB_THREAD)

    def closeDB(self):
        self.primary.close()

    def getDB(self):
        return self.primary

    def test01_associateWithDB(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_associateWithDB..." % \
                  self.__class__.__name__

        self.createDB()

        secDB = db.DB(self.env)
        secDB.set_flags(db.DB_DUP)
        secDB.open(self.filename, "secondary", db.DB_BTREE,
                   db.DB_CREATE | db.DB_THREAD)
        self.getDB().associate(secDB, self.getGenre)

        self.addDataToDB(self.getDB())

        self.finish_test(secDB)


    def test02_associateAfterDB(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_associateAfterDB..." % \
                  self.__class__.__name__

        self.createDB()
        self.addDataToDB(self.getDB())

        secDB = db.DB(self.env)
        secDB.set_flags(db.DB_DUP)
        secDB.open(self.filename, "secondary", db.DB_BTREE,
                   db.DB_CREATE | db.DB_THREAD)

        # adding the DB_CREATE flag will cause it to index existing records
        self.getDB().associate(secDB, self.getGenre, db.DB_CREATE)

        self.finish_test(secDB)


    def finish_test(self, secDB):
        if verbose:
            print "Primary key traversal:"
        c = self.getDB().cursor()
        count = 0
        rec = c.first()
        while rec is not None:
            if type(self.keytype) == type(''):
                assert string.atoi(rec[0])  # for primary db, key is a number
            else:
                assert rec[0] and type(rec[0]) == type(0)
            count = count + 1
            if verbose:
                print rec
            rec = c.next()
        assert count == len(musicdata) # all items accounted for


        if verbose:
            print "Secondary key traversal:"
        c = secDB.cursor()
        count = 0
        rec = c.first()
        assert rec[0] == "Jazz"
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            rec = c.next()
        # all items accounted for EXCEPT for 1 with "Blues" genre
        assert count == len(musicdata)-1

    def getGenre(self, priKey, priData):
        assert type(priData) == type("")
        if verbose:
            print 'getGenre key: %r data: %r' % (priKey, priData)
        genre = string.split(priData, '|')[2]
        if genre == 'Blues':
            return db.DB_DONOTINDEX
        else:
            return genre


#----------------------------------------------------------------------


class AssociateHashTestCase(AssociateTestCase):
    dbtype = db.DB_HASH

class AssociateBTreeTestCase(AssociateTestCase):
    dbtype = db.DB_BTREE

class AssociateRecnoTestCase(AssociateTestCase):
    dbtype = db.DB_RECNO
    keytype = 0


#----------------------------------------------------------------------

class ShelveAssociateTestCase(AssociateTestCase):

    def createDB(self):
        self.primary = dbshelve.open(self.filename,
                                     dbname="primary",
                                     dbenv=self.env,
                                     filetype=self.dbtype)

    def addDataToDB(self, d):
        for key, value in musicdata.items():
            if type(self.keytype) == type(''):
                key = "%02d" % key
            d.put(key, value)    # save the value as is this time


    def getGenre(self, priKey, priData):
        assert type(priData) == type(())
        if verbose:
            print 'getGenre key: %r data: %r' % (priKey, priData)
        genre = priData[2]
        if genre == 'Blues':
            return db.DB_DONOTINDEX
        else:
            return genre


class ShelveAssociateHashTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_HASH

class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_BTREE

class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_RECNO
    keytype = 0


#----------------------------------------------------------------------

class ThreadedAssociateTestCase(AssociateTestCase):

    def addDataToDB(self, d):
        t1 = Thread(target = self.writer1,
                    args = (d, ))
        t2 = Thread(target = self.writer2,
                    args = (d, ))

        t1.start()
        t2.start()
        t1.join()
        t2.join()

    def writer1(self, d):
        for key, value in musicdata.items():
            if type(self.keytype) == type(''):
                key = "%02d" % key
            d.put(key, string.join(value, '|'))

    def writer2(self, d):
        for x in range(100, 600):
            key = 'z%2d' % x
            value = [key] * 4
            d.put(key, string.join(value, '|'))


class ThreadedAssociateHashTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_HASH

class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_BTREE

class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
    dbtype = db.DB_RECNO
    keytype = 0


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    if db.version() >= (3, 3, 11):
        suite.addTest(unittest.makeSuite(AssociateHashTestCase))
        suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
        suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))

        suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
        suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
        suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))

        if have_threads:
            suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
            suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
            suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_basics.py ---
"""
Basic TestCases for BTree and hash DBs, with and without a DBEnv, with
various DB flags, etc.
"""

import os
import sys
import errno
import shutil
import string
import tempfile
from pprint import pprint
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db

from test_all import verbose

DASH = '-'


#----------------------------------------------------------------------

class VersionTestCase(unittest.TestCase):
    def test00_version(self):
        info = db.version()
        if verbose:
            print '\n', '-=' * 20
            print 'bsddb.db.version(): %s' % (info, )
            print db.DB_VERSION_STRING
            print '-=' * 20
        assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
                        db.DB_VERSION_PATCH)

#----------------------------------------------------------------------

class BasicTestCase(unittest.TestCase):
    dbtype       = db.DB_UNKNOWN  # must be set in derived class
    dbopenflags  = 0
    dbsetflags   = 0
    dbmode       = 0660
    dbname       = None
    useEnv       = 0
    envflags     = 0
    envsetflags  = 0

    _numKeys      = 1002    # PRIVATE.  NOTE: must be an even value

    def setUp(self):
        if self.useEnv:
            homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
            self.homeDir = homeDir
            try:
                shutil.rmtree(homeDir)
            except OSError, e:
                # unix returns ENOENT, windows returns ESRCH
                if e.errno not in (errno.ENOENT, errno.ESRCH): raise
            os.mkdir(homeDir)
            try:
                self.env = db.DBEnv()
                self.env.set_lg_max(1024*1024)
                self.env.set_flags(self.envsetflags, 1)
                self.env.open(homeDir, self.envflags | db.DB_CREATE)
                tempfile.tempdir = homeDir
                self.filename = os.path.split(tempfile.mktemp())[1]
                tempfile.tempdir = None
            # Yes, a bare except is intended, since we're re-raising the exc.
            except:
                shutil.rmtree(homeDir)
                raise
        else:
            self.env = None
            self.filename = tempfile.mktemp()

        # create and open the DB
        self.d = db.DB(self.env)
        self.d.set_flags(self.dbsetflags)
        if self.dbname:
            self.d.open(self.filename, self.dbname, self.dbtype,
                        self.dbopenflags|db.DB_CREATE, self.dbmode)
        else:
            self.d.open(self.filename,   # try out keyword args
                        mode = self.dbmode,
                        dbtype = self.dbtype,
                        flags = self.dbopenflags|db.DB_CREATE)

        self.populateDB()


    def tearDown(self):
        self.d.close()
        if self.env is not None:
            self.env.close()
            shutil.rmtree(self.homeDir)
            ## Make a new DBEnv to remove the env files from the home dir.
            ## (It can't be done while the env is open, nor after it has been
            ## closed, so we make a new one to do it.)
            #e = db.DBEnv()
            #e.remove(self.homeDir)
            #os.remove(os.path.join(self.homeDir, self.filename))
        else:
            os.remove(self.filename)



    def populateDB(self, _txn=None):
        d = self.d

        for x in range(self._numKeys/2):
            key = '%04d' % (self._numKeys - x)  # insert keys in reverse order
            data = self.makeData(key)
            d.put(key, data, _txn)

        d.put('empty value', '', _txn)

        for x in range(self._numKeys/2-1):
            key = '%04d' % x  # and now some in forward order
            data = self.makeData(key)
            d.put(key, data, _txn)

        if _txn:
            _txn.commit()

        num = len(d)
        if verbose:
            print "created %d records" % num


    def makeData(self, key):
        return DASH.join([key] * 5)



    #----------------------------------------

    def test01_GetsAndPuts(self):
        d = self.d

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_GetsAndPuts..." % self.__class__.__name__

        for key in ['0001', '0100', '0400', '0700', '0999']:
            data = d.get(key)
            if verbose:
                print data

        assert d.get('0321') == '0321-0321-0321-0321-0321'

        # By default non-existant keys return None...
        assert d.get('abcd') == None

        # ...but they raise exceptions in other situations.  Call
        # set_get_returns_none() to change it.
        try:
            d.delete('abcd')
        except db.DBNotFoundError, val:
            assert val[0] == db.DB_NOTFOUND
            if verbose: print val
        else:
            self.fail("expected exception")


        d.put('abcd', 'a new record')
        assert d.get('abcd') == 'a new record'

        d.put('abcd', 'same key')
        if self.dbsetflags & db.DB_DUP:
            assert d.get('abcd') == 'a new record'
        else:
            assert d.get('abcd') == 'same key'


        try:
            d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE)
        except db.DBKeyExistError, val:
            assert val[0] == db.DB_KEYEXIST
            if verbose: print val
        else:
            self.fail("expected exception")

        if self.dbsetflags & db.DB_DUP:
            assert d.get('abcd') == 'a new record'
        else:
            assert d.get('abcd') == 'same key'


        d.sync()
        d.close()
        del d

        self.d = db.DB(self.env)
        if self.dbname:
            self.d.open(self.filename, self.dbname)
        else:
            self.d.open(self.filename)
        d = self.d

        assert d.get('0321') == '0321-0321-0321-0321-0321'
        if self.dbsetflags & db.DB_DUP:
            assert d.get('abcd') == 'a new record'
        else:
            assert d.get('abcd') == 'same key'

        rec = d.get_both('0555', '0555-0555-0555-0555-0555')
        if verbose:
            print rec

        assert d.get_both('0555', 'bad data') == None

        # test default value
        data = d.get('bad key', 'bad data')
        assert data == 'bad data'

        # any object can pass through
        data = d.get('bad key', self)
        assert data == self

        s = d.stat()
        assert type(s) == type({})
        if verbose:
            print 'd.stat() returned this dictionary:'
            pprint(s)


    #----------------------------------------

    def test02_DictionaryMethods(self):
        d = self.d

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_DictionaryMethods..." % \
                  self.__class__.__name__

        for key in ['0002', '0101', '0401', '0701', '0998']:
            data = d[key]
            assert data == self.makeData(key)
            if verbose:
                print data

        assert len(d) == self._numKeys
        keys = d.keys()
        assert len(keys) == self._numKeys
        assert type(keys) == type([])

        d['new record'] = 'a new record'
        assert len(d) == self._numKeys+1
        keys = d.keys()
        assert len(keys) == self._numKeys+1

        d['new record'] = 'a replacement record'
        assert len(d) == self._numKeys+1
        keys = d.keys()
        assert len(keys) == self._numKeys+1

        if verbose:
            print "the first 10 keys are:"
            pprint(keys[:10])

        assert d['new record'] == 'a replacement record'

        assert d.has_key('0001') == 1
        assert d.has_key('spam') == 0

        items = d.items()
        assert len(items) == self._numKeys+1
        assert type(items) == type([])
        assert type(items[0]) == type(())
        assert len(items[0]) == 2

        if verbose:
            print "the first 10 items are:"
            pprint(items[:10])

        values = d.values()
        assert len(values) == self._numKeys+1
        assert type(values) == type([])

        if verbose:
            print "the first 10 values are:"
            pprint(values[:10])



    #----------------------------------------

    def test03_SimpleCursorStuff(self, get_raises_error=0, set_raises_error=0):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \
                  (self.__class__.__name__, get_raises_error, set_raises_error)

        if self.env and self.dbopenflags & db.DB_AUTO_COMMIT:
            txn = self.env.txn_begin()
        else:
            txn = None
        c = self.d.cursor(txn=txn)
        
        rec = c.first()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            try:
                rec = c.next()
            except db.DBNotFoundError, val:
                if get_raises_error:
                    assert val[0] == db.DB_NOTFOUND
                    if verbose: print val
                    rec = None
                else:
                    self.fail("unexpected DBNotFoundError")
            assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1])
        
        assert count == self._numKeys


        rec = c.last()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            try:
                rec = c.prev()
            except db.DBNotFoundError, val:
                if get_raises_error:
                    assert val[0] == db.DB_NOTFOUND
                    if verbose: print val
                    rec = None
                else:
                    self.fail("unexpected DBNotFoundError")

        assert count == self._numKeys

        rec = c.set('0505')
        rec2 = c.current()
        assert rec == rec2
        assert rec[0] == '0505'
        assert rec[1] == self.makeData('0505')
        assert c.get_current_size() == len(rec[1])

        # make sure we get empty values properly
        rec = c.set('empty value')
        assert rec[1] == ''
        assert c.get_current_size() == 0
        
        try:
            n = c.set('bad key')
        except db.DBNotFoundError, val:
            assert val[0] == db.DB_NOTFOUND
            if verbose: print val
        else:
            if set_raises_error:
                self.fail("expected exception")
            if n != None:
                self.fail("expected None: %r" % (n,))

        rec = c.get_both('0404', self.makeData('0404'))
        assert rec == ('0404', self.makeData('0404'))

        try:
            n = c.get_both('0404', 'bad data')
        except db.DBNotFoundError, val:
            assert val[0] == db.DB_NOTFOUND
            if verbose: print val
        else:
            if get_raises_error:
                self.fail("expected exception")
            if n != None:
                self.fail("expected None: %r" % (n,))

        if self.d.get_type() == db.DB_BTREE:
            rec = c.set_range('011')
            if verbose:
                print "searched for '011', found: ", rec

            rec = c.set_range('011',dlen=0,doff=0)
            if verbose:
                print "searched (partial) for '011', found: ", rec
            if rec[1] != '': self.fail('expected empty data portion')

            ev = c.set_range('empty value')
            if verbose:
                print "search for 'empty value' returned", ev
            if ev[1] != '': self.fail('empty value lookup failed')

        c.set('0499')
        c.delete()
        try:
            rec = c.current()
        except db.DBKeyEmptyError, val:
            assert val[0] == db.DB_KEYEMPTY
            if verbose: print val
        else:
            self.fail('exception expected')

        c.next()
        c2 = c.dup(db.DB_POSITION)
        assert c.current() == c2.current()

        c2.put('', 'a new value', db.DB_CURRENT)
        assert c.current() == c2.current()
        assert c.current()[1] == 'a new value'

        c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5)
        assert c2.current()[1] == 'a newer value'

        c.close()
        c2.close()
        if txn:
            txn.commit()

        # time to abuse the closed cursors and hope we don't crash
        methods_to_test = {
            'current': (),
            'delete': (),
            'dup': (db.DB_POSITION,),
            'first': (),
            'get': (0,),
            'next': (),
            'prev': (),
            'last': (),
            'put':('', 'spam', db.DB_CURRENT),
            'set': ("0505",),
        }
        for method, args in methods_to_test.items():
            try:
                if verbose:
                    print "attempting to use a closed cursor's %s method" % \
                          method
                # a bug may cause a NULL pointer dereference...
                apply(getattr(c, method), args)
            except db.DBError, val:
                assert val[0] == 0
                if verbose: print val
            else:
                self.fail("no exception raised when using a buggy cursor's"
                          "%s method" % method)

        #
        # free cursor referencing a closed database, it should not barf:
        #
        oldcursor = self.d.cursor(txn=txn)
        self.d.close()

        # this would originally cause a segfault when the cursor for a
        # closed database was cleaned up.  it should not anymore.
        # SF pybsddb bug id 667343
        del oldcursor

    def test03b_SimpleCursorWithoutGetReturnsNone0(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(0)
        assert old == 2
        self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1)

    def test03b_SimpleCursorWithGetReturnsNone1(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(1)
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=1)


    def test03c_SimpleCursorGetReturnsNone2(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(1)
        assert old == 2
        old = self.d.set_get_returns_none(2)
        assert old == 1
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0)

    #----------------------------------------

    def test04_PartialGetAndPut(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test04_PartialGetAndPut..." % \
                  self.__class__.__name__

        key = "partialTest"
        data = "1" * 1000 + "2" * 1000
        d.put(key, data)
        assert d.get(key) == data
        assert d.get(key, dlen=20, doff=990) == ("1" * 10) + ("2" * 10)

        d.put("partialtest2", ("1" * 30000) + "robin" )
        assert d.get("partialtest2", dlen=5, doff=30000) == "robin"

        # There seems to be a bug in DB here...  Commented out the test for
        # now.
        ##assert d.get("partialtest2", dlen=5, doff=30010) == ""

        if self.dbsetflags != db.DB_DUP:
            # Partial put with duplicate records requires a cursor
            d.put(key, "0000", dlen=2000, doff=0)
            assert d.get(key) == "0000"

            d.put(key, "1111", dlen=1, doff=2)
            assert d.get(key) == "0011110"

    #----------------------------------------

    def test05_GetSize(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test05_GetSize..." % self.__class__.__name__

        for i in range(1, 50000, 500):
            key = "size%s" % i
            #print "before ", i,
            d.put(key, "1" * i)
            #print "after",
            assert d.get_size(key) == i
            #print "done"

    #----------------------------------------

    def test06_Truncate(self):
        if db.version() < (3,3):
            # truncate is a feature of BerkeleyDB 3.3 and above
            return

        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test99_Truncate..." % self.__class__.__name__

        d.put("abcde", "ABCDE");
        num = d.truncate()
        assert num >= 1, "truncate returned <= 0 on non-empty database"
        num = d.truncate()
        assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)

#----------------------------------------------------------------------


class BasicBTreeTestCase(BasicTestCase):
    dbtype = db.DB_BTREE


class BasicHashTestCase(BasicTestCase):
    dbtype = db.DB_HASH


class BasicBTreeWithThreadFlagTestCase(BasicTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD


class BasicHashWithThreadFlagTestCase(BasicTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD


class BasicBTreeWithEnvTestCase(BasicTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK


class BasicHashWithEnvTestCase(BasicTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK


#----------------------------------------------------------------------

class BasicTransactionTestCase(BasicTestCase):
    dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
    useEnv = 1
    envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
                db.DB_INIT_TXN)
    envsetflags = db.DB_AUTO_COMMIT


    def tearDown(self):
        self.txn.commit()
        BasicTestCase.tearDown(self)


    def populateDB(self):
        txn = self.env.txn_begin()
        BasicTestCase.populateDB(self, _txn=txn)

        self.txn = self.env.txn_begin()



    def test06_Transactions(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test06_Transactions..." % self.__class__.__name__

        assert d.get('new rec', txn=self.txn) == None
        d.put('new rec', 'this is a new record', self.txn)
        assert d.get('new rec', txn=self.txn) == 'this is a new record'
        self.txn.abort()
        assert d.get('new rec') == None

        self.txn = self.env.txn_begin()

        assert d.get('new rec', txn=self.txn) == None
        d.put('new rec', 'this is a new record', self.txn)
        assert d.get('new rec', txn=self.txn) == 'this is a new record'
        self.txn.commit()
        assert d.get('new rec') == 'this is a new record'

        self.txn = self.env.txn_begin()
        c = d.cursor(self.txn)
        rec = c.first()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            rec = c.next()
        assert count == self._numKeys+1

        c.close()                # Cursors *MUST* be closed before commit!
        self.txn.commit()

        # flush pending updates
        try:
            self.env.txn_checkpoint (0, 0, 0)
        except db.DBIncompleteError:
            pass

        # must have at least one log file present:
        logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
        assert logs != None
        for log in logs:
            if verbose:
                print 'log file: ' + log

        self.txn = self.env.txn_begin()

    #----------------------------------------

    def test07_TxnTruncate(self):
        if db.version() < (3,3):
            # truncate is a feature of BerkeleyDB 3.3 and above
            return

        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test07_TxnTruncate..." % self.__class__.__name__

        d.put("abcde", "ABCDE");
        txn = self.env.txn_begin()
        num = d.truncate(txn)
        assert num >= 1, "truncate returned <= 0 on non-empty database"
        num = d.truncate(txn)
        assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)
        txn.commit()

    #----------------------------------------

    def test08_TxnLateUse(self):
        txn = self.env.txn_begin()
        txn.abort()
        try:
            txn.abort()
        except db.DBError, e:
            pass
        else:
            raise RuntimeError, "DBTxn.abort() called after DB_TXN no longer valid w/o an exception"

        txn = self.env.txn_begin()
        txn.commit()
        try:
            txn.commit()
        except db.DBError, e:
            pass
        else:
            raise RuntimeError, "DBTxn.commit() called after DB_TXN no longer valid w/o an exception"


class BTreeTransactionTestCase(BasicTransactionTestCase):
    dbtype = db.DB_BTREE

class HashTransactionTestCase(BasicTransactionTestCase):
    dbtype = db.DB_HASH



#----------------------------------------------------------------------

class BTreeRecnoTestCase(BasicTestCase):
    dbtype     = db.DB_BTREE
    dbsetflags = db.DB_RECNUM

    def test07_RecnoInBTree(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test07_RecnoInBTree..." % self.__class__.__name__

        rec = d.get(200)
        assert type(rec) == type(())
        assert len(rec) == 2
        if verbose:
            print "Record #200 is ", rec

        c = d.cursor()
        c.set('0200')
        num = c.get_recno()
        assert type(num) == type(1)
        if verbose:
            print "recno of d['0200'] is ", num

        rec = c.current()
        assert c.set_recno(num) == rec

        c.close()



class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase):
    dbopenflags = db.DB_THREAD

#----------------------------------------------------------------------

class BasicDUPTestCase(BasicTestCase):
    dbsetflags = db.DB_DUP

    def test08_DuplicateKeys(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test08_DuplicateKeys..." % \
                  self.__class__.__name__

        d.put("dup0", "before")
        for x in "The quick brown fox jumped over the lazy dog.".split():
            d.put("dup1", x)
        d.put("dup2", "after")

        data = d.get("dup1")
        assert data == "The"
        if verbose:
            print data

        c = d.cursor()
        rec = c.set("dup1")
        assert rec == ('dup1', 'The')

        next = c.next()
        assert next == ('dup1', 'quick')

        rec = c.set("dup1")
        count = c.count()
        assert count == 9

        next_dup = c.next_dup()
        assert next_dup == ('dup1', 'quick')

        rec = c.set('dup1')
        while rec is not None:
            if verbose:
                print rec
            rec = c.next_dup()

        c.set('dup1')
        rec = c.next_nodup()
        assert rec[0] != 'dup1'
        if verbose:
            print rec

        c.close()



class BTreeDUPTestCase(BasicDUPTestCase):
    dbtype = db.DB_BTREE

class HashDUPTestCase(BasicDUPTestCase):
    dbtype = db.DB_HASH

class BTreeDUPWithThreadTestCase(BasicDUPTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD

class HashDUPWithThreadTestCase(BasicDUPTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD


#----------------------------------------------------------------------

class BasicMultiDBTestCase(BasicTestCase):
    dbname = 'first'

    def otherType(self):
        if self.dbtype == db.DB_BTREE:
            return db.DB_HASH
        else:
            return db.DB_BTREE

    def test09_MultiDB(self):
        d1 = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test09_MultiDB..." % self.__class__.__name__

        d2 = db.DB(self.env)
        d2.open(self.filename, "second", self.dbtype,
                self.dbopenflags|db.DB_CREATE)
        d3 = db.DB(self.env)
        d3.open(self.filename, "third", self.otherType(),
                self.dbopenflags|db.DB_CREATE)

        for x in "The quick brown fox jumped over the lazy dog".split():
            d2.put(x, self.makeData(x))

        for x in string.letters:
            d3.put(x, x*70)

        d1.sync()
        d2.sync()
        d3.sync()
        d1.close()
        d2.close()
        d3.close()

        self.d = d1 = d2 = d3 = None

        self.d = d1 = db.DB(self.env)
        d1.open(self.filename, self.dbname, flags = self.dbopenflags)
        d2 = db.DB(self.env)
        d2.open(self.filename, "second",  flags = self.dbopenflags)
        d3 = db.DB(self.env)
        d3.open(self.filename, "third", flags = self.dbopenflags)

        c1 = d1.cursor()
        c2 = d2.cursor()
        c3 = d3.cursor()

        count = 0
        rec = c1.first()
        while rec is not None:
            count = count + 1
            if verbose and (count % 50) == 0:
                print rec
            rec = c1.next()
        assert count == self._numKeys

        count = 0
        rec = c2.first()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            rec = c2.next()
        assert count == 9

        count = 0
        rec = c3.first()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            rec = c3.next()
        assert count == 52


        c1.close()
        c2.close()
        c3.close()

        d2.close()
        d3.close()



# Strange things happen if you try to use Multiple DBs per file without a
# DBEnv with MPOOL and LOCKing...

class BTreeMultiDBTestCase(BasicMultiDBTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK

class HashMultiDBTestCase(BasicMultiDBTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK


#----------------------------------------------------------------------
#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    suite.addTest(unittest.makeSuite(VersionTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeTestCase))
    suite.addTest(unittest.makeSuite(BasicHashTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase))
    suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase))
    suite.addTest(unittest.makeSuite(BTreeTransactionTestCase))
    suite.addTest(unittest.makeSuite(HashTransactionTestCase))
    suite.addTest(unittest.makeSuite(BTreeRecnoTestCase))
    suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BTreeDUPTestCase))
    suite.addTest(unittest.makeSuite(HashDUPTestCase))
    suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase))
    suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
    suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
    suite.addTest(unittest.makeSuite(HashMultiDBTestCase))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_compat.py ---
"""
Test cases adapted from the test_bsddb.py module in Python's
regression test suite.
"""

import sys, os, string
import unittest
import tempfile

from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, hashopen, btopen, rnopen
except ImportError:
    # For Python 2.3
    from bsddb import db, hashopen, btopen, rnopen


class CompatibilityTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.filename)
        except os.error:
            pass


    def test01_btopen(self):
        self.do_bthash_test(btopen, 'btopen')

    def test02_hashopen(self):
        self.do_bthash_test(hashopen, 'hashopen')

    def test03_rnopen(self):
        data = string.split("The quick brown fox jumped over the lazy dog.")
        if verbose:
            print "\nTesting: rnopen"

        f = rnopen(self.filename, 'c')
        for x in range(len(data)):
            f[x+1] = data[x]

        getTest = (f[1], f[2], f[3])
        if verbose:
            print '%s %s %s' % getTest

        assert getTest[1] == 'quick', 'data mismatch!'

        rv = f.set_location(3)
        if rv != (3, 'brown'):
            self.fail('recno database set_location failed: '+repr(rv))

        f[25] = 'twenty-five'
        f.close()
        del f

        f = rnopen(self.filename, 'w')
        f[20] = 'twenty'

        def noRec(f):
            rec = f[15]
        self.assertRaises(KeyError, noRec, f)

        def badKey(f):
            rec = f['a string']
        self.assertRaises(TypeError, badKey, f)

        del f[3]

        rec = f.first()
        while rec:
            if verbose:
                print rec
            try:
                rec = f.next()
            except KeyError:
                break

        f.close()


    def test04_n_flag(self):
        f = hashopen(self.filename, 'n')
        f.close()


    def do_bthash_test(self, factory, what):
        if verbose:
            print '\nTesting: ', what

        f = factory(self.filename, 'c')
        if verbose:
            print 'creation...'

        # truth test
        if f:
            if verbose: print "truth test: true"
        else:
            if verbose: print "truth test: false"

        f['0'] = ''
        f['a'] = 'Guido'
        f['b'] = 'van'
        f['c'] = 'Rossum'
        f['d'] = 'invented'
        # 'e' intentionally left out
        f['f'] = 'Python'
        if verbose:
            print '%s %s %s' % (f['a'], f['b'], f['c'])

        if verbose:
            print 'key ordering...'
        start = f.set_location(f.first()[0])
        if start != ('0', ''):
            self.fail("incorrect first() result: "+repr(start))
        while 1:
            try:
                rec = f.next()
            except KeyError:
                assert rec == f.last(), 'Error, last <> last!'
                f.previous()
                break
            if verbose:
                print rec

        assert f.has_key('f'), 'Error, missing key!'

        # test that set_location() returns the next nearest key, value
        # on btree databases and raises KeyError on others.
        if factory == btopen:
            e = f.set_location('e')
            if e != ('f', 'Python'):
                self.fail('wrong key,value returned: '+repr(e))
        else:
            try:
                e = f.set_location('e')
            except KeyError:
                pass
            else:
                self.fail("set_location on non-existant key did not raise KeyError")

        f.sync()
        f.close()
        # truth test
        try:
            if f:
                if verbose: print "truth test: true"
            else:
                if verbose: print "truth test: false"
        except db.DBError:
            pass
        else:
            self.fail("Exception expected")

        del f

        if verbose:
            print 'modification...'
        f = factory(self.filename, 'w')
        f['d'] = 'discovered'

        if verbose:
            print 'access...'
        for key in f.keys():
            word = f[key]
            if verbose:
                print word

        def noRec(f):
            rec = f['no such key']
        self.assertRaises(KeyError, noRec, f)

        def badKey(f):
            rec = f[15]
        self.assertRaises(TypeError, badKey, f)

        f.close()


#----------------------------------------------------------------------


def test_suite():
    return unittest.makeSuite(CompatibilityTestCase)


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_dbobj.py ---

import sys, os, string
import unittest
import glob

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbobj
except ImportError:
    # For Python 2.3
    from bsddb import db, dbobj


#----------------------------------------------------------------------

class dbobjTestCase(unittest.TestCase):
    """Verify that dbobj.DB and dbobj.DBEnv work properly"""
    db_home = 'db_home'
    db_name = 'test-dbobj.db'

    def setUp(self):
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass

    def tearDown(self):
        if hasattr(self, 'db'):
            del self.db
        if hasattr(self, 'env'):
            del self.env
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def test01_both(self):
        class TestDBEnv(dbobj.DBEnv): pass
        class TestDB(dbobj.DB):
            def put(self, key, *args, **kwargs):
                key = string.upper(key)
                # call our parent classes put method with an upper case key
                return apply(dbobj.DB.put, (self, key) + args, kwargs)
        self.env = TestDBEnv()
        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
        self.db = TestDB(self.env)
        self.db.open(self.db_name, db.DB_HASH, db.DB_CREATE)
        self.db.put('spam', 'eggs')
        assert self.db.get('spam') == None, \
               "overridden dbobj.DB.put() method failed [1]"
        assert self.db.get('SPAM') == 'eggs', \
               "overridden dbobj.DB.put() method failed [2]"
        self.db.close()
        self.env.close()

    def test02_dbobj_dict_interface(self):
        self.env = dbobj.DBEnv()
        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
        self.db = dbobj.DB(self.env)
        self.db.open(self.db_name+'02', db.DB_HASH, db.DB_CREATE)
        # __setitem__
        self.db['spam'] = 'eggs'
        # __len__
        assert len(self.db) == 1
        # __getitem__
        assert self.db['spam'] == 'eggs'
        # __del__
        del self.db['spam']
        assert self.db.get('spam') == None, "dbobj __del__ failed"
        self.db.close()
        self.env.close()

#----------------------------------------------------------------------

def test_suite():
    return unittest.makeSuite(dbobjTestCase)

if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_dbshelve.py ---
"""
TestCases for checking dbShelve objects.
"""

import sys, os, string
import tempfile, random
from pprint import pprint
from types import *
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve
except ImportError:
    # For Python 2.3
    from bsddb import db, dbshelve

from test_all import verbose


#----------------------------------------------------------------------

# We want the objects to be comparable so we can test dbshelve.values
# later on.
class DataClass:
    def __init__(self):
        self.value = random.random()

    def __cmp__(self, other):
        return cmp(self.value, other)

class DBShelveTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()
        self.do_open()

    def tearDown(self):
        self.do_close()
        try:
            os.remove(self.filename)
        except os.error:
            pass

    def populateDB(self, d):
        for x in string.letters:
            d['S' + x] = 10 * x           # add a string
            d['I' + x] = ord(x)           # add an integer
            d['L' + x] = [x] * 10         # add a list

            inst = DataClass()            # add an instance
            inst.S = 10 * x
            inst.I = ord(x)
            inst.L = [x] * 10
            d['O' + x] = inst


    # overridable in derived classes to affect how the shelf is created/opened
    def do_open(self):
        self.d = dbshelve.open(self.filename)

    # and closed...
    def do_close(self):
        self.d.close()



    def test01_basics(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_basics..." % self.__class__.__name__

        self.populateDB(self.d)
        self.d.sync()
        self.do_close()
        self.do_open()
        d = self.d

        l = len(d)
        k = d.keys()
        s = d.stat()
        f = d.fd()

        if verbose:
            print "length:", l
            print "keys:", k
            print "stats:", s

        assert 0 == d.has_key('bad key')
        assert 1 == d.has_key('IA')
        assert 1 == d.has_key('OA')

        d.delete('IA')
        del d['OA']
        assert 0 == d.has_key('IA')
        assert 0 == d.has_key('OA')
        assert len(d) == l-2

        values = []
        for key in d.keys():
            value = d[key]
            values.append(value)
            if verbose:
                print "%s: %s" % (key, value)
            self.checkrec(key, value)

        dbvalues = d.values()
        assert len(dbvalues) == len(d.keys())
        values.sort()
        dbvalues.sort()
        assert values == dbvalues

        items = d.items()
        assert len(items) == len(values)

        for key, value in items:
            self.checkrec(key, value)

        assert d.get('bad key') == None
        assert d.get('bad key', None) == None
        assert d.get('bad key', 'a string') == 'a string'
        assert d.get('bad key', [1, 2, 3]) == [1, 2, 3]

        d.set_get_returns_none(0)
        self.assertRaises(db.DBNotFoundError, d.get, 'bad key')
        d.set_get_returns_none(1)

        d.put('new key', 'new data')
        assert d.get('new key') == 'new data'
        assert d['new key'] == 'new data'



    def test02_cursors(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_cursors..." % self.__class__.__name__

        self.populateDB(self.d)
        d = self.d

        count = 0
        c = d.cursor()
        rec = c.first()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            key, value = rec
            self.checkrec(key, value)
            rec = c.next()
        del c

        assert count == len(d)

        count = 0
        c = d.cursor()
        rec = c.last()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            key, value = rec
            self.checkrec(key, value)
            rec = c.prev()

        assert count == len(d)

        c.set('SS')
        key, value = c.current()
        self.checkrec(key, value)
        del c



    def checkrec(self, key, value):
        x = key[1]
        if key[0] == 'S':
            assert type(value) == StringType
            assert value == 10 * x

        elif key[0] == 'I':
            assert type(value) == IntType
            assert value == ord(x)

        elif key[0] == 'L':
            assert type(value) == ListType
            assert value == [x] * 10

        elif key[0] == 'O':
            assert type(value) == InstanceType
            assert value.S == 10 * x
            assert value.I == ord(x)
            assert value.L == [x] * 10

        else:
            raise AssertionError, 'Unknown key type, fix the test'

#----------------------------------------------------------------------

class BasicShelveTestCase(DBShelveTestCase):
    def do_open(self):
        self.d = dbshelve.DBShelf()
        self.d.open(self.filename, self.dbtype, self.dbflags)

    def do_close(self):
        self.d.close()


class BTreeShelveTestCase(BasicShelveTestCase):
    dbtype = db.DB_BTREE
    dbflags = db.DB_CREATE


class HashShelveTestCase(BasicShelveTestCase):
    dbtype = db.DB_HASH
    dbflags = db.DB_CREATE


class ThreadBTreeShelveTestCase(BasicShelveTestCase):
    dbtype = db.DB_BTREE
    dbflags = db.DB_CREATE | db.DB_THREAD


class ThreadHashShelveTestCase(BasicShelveTestCase):
    dbtype = db.DB_HASH
    dbflags = db.DB_CREATE | db.DB_THREAD


#----------------------------------------------------------------------

class BasicEnvShelveTestCase(DBShelveTestCase):
    def do_open(self):
        self.homeDir = homeDir = os.path.join(
            os.path.dirname(sys.argv[0]), 'db_home')
        try: os.mkdir(homeDir)
        except os.error: pass
        self.env = db.DBEnv()
        self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)

        self.filename = os.path.split(self.filename)[1]
        self.d = dbshelve.DBShelf(self.env)
        self.d.open(self.filename, self.dbtype, self.dbflags)


    def do_close(self):
        self.d.close()
        self.env.close()


    def tearDown(self):
        self.do_close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)



class EnvBTreeShelveTestCase(BasicEnvShelveTestCase):
    envflags = 0
    dbtype = db.DB_BTREE
    dbflags = db.DB_CREATE


class EnvHashShelveTestCase(BasicEnvShelveTestCase):
    envflags = 0
    dbtype = db.DB_HASH
    dbflags = db.DB_CREATE


class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase):
    envflags = db.DB_THREAD
    dbtype = db.DB_BTREE
    dbflags = db.DB_CREATE | db.DB_THREAD


class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
    envflags = db.DB_THREAD
    dbtype = db.DB_HASH
    dbflags = db.DB_CREATE | db.DB_THREAD


#----------------------------------------------------------------------
# TODO:  Add test cases for a DBShelf in a RECNO DB.


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    suite.addTest(unittest.makeSuite(DBShelveTestCase))
    suite.addTest(unittest.makeSuite(BTreeShelveTestCase))
    suite.addTest(unittest.makeSuite(HashShelveTestCase))
    suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase))
    suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase))
    suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase))
    suite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
    suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
    suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_dbtables.py ---
#!/usr/bin/env python
#
#-----------------------------------------------------------------------
# A test suite for the table interface built on bsddb.db
#-----------------------------------------------------------------------
#
# Copyright (C) 2000, 2001 by Autonomous Zone Industries
# Copyright (C) 2002 Gregory P. Smith
#
# March 20, 2000
#
# License:      This is free software.  You may use this software for any
#               purpose including modification/redistribution, so long as
#               this header remains intact and that you do not claim any
#               rights of ownership or authorship of this software.  This
#               software has been tested, but no warranty is expressed or
#               implied.
#
#   --  Gregory P. Smith <greg at electricrain.com>
#
# $Id: test_dbtables.py,v 1.1 2004/05/01 00:54:03 tismer Exp $

import sys, os, re
try:
    import cPickle
    pickle = cPickle
except ImportError:
    import pickle

import unittest
from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbtables
except ImportError:
    # For Python 2.3
    from bsddb import db, dbtables



#----------------------------------------------------------------------

class TableDBTestCase(unittest.TestCase):
    db_home = 'db_home'
    db_name = 'test-table.db'

    def setUp(self):
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass
        self.tdb = dbtables.bsdTableDB(
            filename='tabletest.db', dbhome=homeDir, create=1)

    def tearDown(self):
        self.tdb.close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def test01(self):
        tabname = "test01"
        colname = 'cool numbers'
        try:
            self.tdb.Drop(tabname)
        except dbtables.TableDBError:
            pass
        self.tdb.CreateTable(tabname, [colname])
        self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)})

        if verbose:
            self.tdb._db_print()

        values = self.tdb.Select(
            tabname, [colname], conditions={colname: None})

        colval = pickle.loads(values[0][colname])
        assert(colval > 3.141 and colval < 3.142)


    def test02(self):
        tabname = "test02"
        col0 = 'coolness factor'
        col1 = 'but can it fly?'
        col2 = 'Species'
        testinfo = [
            {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'},
            {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'},
            {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'}
        ]

        try:
            self.tdb.Drop(tabname)
        except dbtables.TableDBError:
            pass
        self.tdb.CreateTable(tabname, [col0, col1, col2])
        for row in testinfo :
            self.tdb.Insert(tabname, row)

        values = self.tdb.Select(tabname, [col2],
            conditions={col0: lambda x: pickle.loads(x) >= 8})

        assert len(values) == 2
        if values[0]['Species'] == 'Penguin' :
            assert values[1]['Species'] == 'SR-71A Blackbird'
        elif values[0]['Species'] == 'SR-71A Blackbird' :
            assert values[1]['Species'] == 'Penguin'
        else :
            if verbose:
                print "values= %r" % (values,)
            raise "Wrong values returned!"

    def test03(self):
        tabname = "test03"
        try:
            self.tdb.Drop(tabname)
        except dbtables.TableDBError:
            pass
        if verbose:
            print '...before CreateTable...'
            self.tdb._db_print()
        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
        if verbose:
            print '...after CreateTable...'
            self.tdb._db_print()
        self.tdb.Drop(tabname)
        if verbose:
            print '...after Drop...'
            self.tdb._db_print()
        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])

        try:
            self.tdb.Insert(tabname,
                            {'a': "",
                             'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
                             'f': "Zero"})
            assert 0
        except dbtables.TableDBError:
            pass

        try:
            self.tdb.Select(tabname, [], conditions={'foo': '123'})
            assert 0
        except dbtables.TableDBError:
            pass

        self.tdb.Insert(tabname,
                        {'a': '42',
                         'b': "bad",
                         'c': "meep",
                         'e': 'Fuzzy wuzzy was a bear'})
        self.tdb.Insert(tabname,
                        {'a': '581750',
                         'b': "good",
                         'd': "bla",
                         'c': "black",
                         'e': 'fuzzy was here'})
        self.tdb.Insert(tabname,
                        {'a': '800000',
                         'b': "good",
                         'd': "bla",
                         'c': "black",
                         'e': 'Fuzzy wuzzy is a bear'})

        if verbose:
            self.tdb._db_print()

        # this should return two rows
        values = self.tdb.Select(tabname, ['b', 'a', 'd'],
            conditions={'e': re.compile('wuzzy').search,
                        'a': re.compile('^[0-9]+$').match})
        assert len(values) == 2

        # now lets delete one of them and try again
        self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')})
        values = self.tdb.Select(
            tabname, ['a', 'd', 'b'],
            conditions={'e': dbtables.PrefixCond('Fuzzy')})
        assert len(values) == 1
        assert values[0]['d'] == None

        values = self.tdb.Select(tabname, ['b'],
            conditions={'c': lambda c: c == 'meep'})
        assert len(values) == 1
        assert values[0]['b'] == "bad"


    def test04_MultiCondSelect(self):
        tabname = "test04_MultiCondSelect"
        try:
            self.tdb.Drop(tabname)
        except dbtables.TableDBError:
            pass
        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])

        try:
            self.tdb.Insert(tabname,
                            {'a': "",
                             'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
                             'f': "Zero"})
            assert 0
        except dbtables.TableDBError:
            pass

        self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D",
                                  'e': "E"})
        self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D",
                                  'e': "-E"})
        self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-",
                                  'e': "E-"})

        if verbose:
            self.tdb._db_print()

        # This select should return 0 rows.  it is designed to test
        # the bug identified and fixed in sourceforge bug # 590449
        # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down
        # and supplying a fix!!  This one caused many headaches to say
        # the least...)
        values = self.tdb.Select(tabname, ['b', 'a', 'd'],
            conditions={'e': dbtables.ExactCond('E'),
                        'a': dbtables.ExactCond('A'),
                        'd': dbtables.PrefixCond('-')
                       } )
        assert len(values) == 0, values


    def test_CreateOrExtend(self):
        tabname = "test_CreateOrExtend"

        self.tdb.CreateOrExtendTable(
            tabname, ['name', 'taste', 'filling', 'alcohol content', 'price'])
        try:
            self.tdb.Insert(tabname,
                            {'taste': 'crap',
                             'filling': 'no',
                             'is it Guinness?': 'no'})
            assert 0, "Insert should've failed due to bad column name"
        except:
            pass
        self.tdb.CreateOrExtendTable(tabname,
                                     ['name', 'taste', 'is it Guinness?'])

        # these should both succeed as the table should contain the union of both sets of columns.
        self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no',
                                  'is it Guinness?': 'no'})
        self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes',
                                  'is it Guinness?': 'yes',
                                  'name': 'Guinness'})


    def test_CondObjs(self):
        tabname = "test_CondObjs"

        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p'])

        self.tdb.Insert(tabname, {'a': "the letter A",
                                  'b': "the letter B",
                                  'c': "is for cookie"})
        self.tdb.Insert(tabname, {'a': "is for aardvark",
                                  'e': "the letter E",
                                  'c': "is for cookie",
                                  'd': "is for dog"})
        self.tdb.Insert(tabname, {'a': "the letter A",
                                  'e': "the letter E",
                                  'c': "is for cookie",
                                  'p': "is for Python"})

        values = self.tdb.Select(
            tabname, ['p', 'e'],
            conditions={'e': dbtables.PrefixCond('the l')})
        assert len(values) == 2, values
        assert values[0]['e'] == values[1]['e'], values
        assert values[0]['p'] != values[1]['p'], values

        values = self.tdb.Select(
            tabname, ['d', 'a'],
            conditions={'a': dbtables.LikeCond('%aardvark%')})
        assert len(values) == 1, values
        assert values[0]['d'] == "is for dog", values
        assert values[0]['a'] == "is for aardvark", values

        values = self.tdb.Select(tabname, None,
                                 {'b': dbtables.Cond(),
                                  'e':dbtables.LikeCond('%letter%'),
                                  'a':dbtables.PrefixCond('is'),
                                  'd':dbtables.ExactCond('is for dog'),
                                  'c':dbtables.PrefixCond('is for'),
                                  'p':lambda s: not s})
        assert len(values) == 1, values
        assert values[0]['d'] == "is for dog", values
        assert values[0]['a'] == "is for aardvark", values

    def test_Delete(self):
        tabname = "test_Delete"
        self.tdb.CreateTable(tabname, ['x', 'y', 'z'])

        # prior to 2001-05-09 there was a bug where Delete() would
        # fail if it encountered any rows that did not have values in
        # every column.
        # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff at nic.fi)
        self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'})
        self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'})

        self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')})
        values = self.tdb.Select(tabname, ['y'],
                                 conditions={'x': dbtables.PrefixCond('X')})
        assert len(values) == 0

    def test_Modify(self):
        tabname = "test_Modify"
        self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access'])

        self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc',
                                  'Type': 'Word', 'Access': '8'})
        self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'})
        self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'})

        def set_type(type):
            if type == None:
                return 'MP3'
            return type

        def increment_access(count):
            return str(int(count)+1)

        def remove_value(value):
            return None

        self.tdb.Modify(tabname,
                        conditions={'Access': dbtables.ExactCond('0')},
                        mappings={'Access': remove_value})
        self.tdb.Modify(tabname,
                        conditions={'Name': dbtables.LikeCond('%MP3%')},
                        mappings={'Type': set_type})
        self.tdb.Modify(tabname,
                        conditions={'Name': dbtables.LikeCond('%')},
                        mappings={'Access': increment_access})

        # Delete key in select conditions
        values = self.tdb.Select(
            tabname, None,
            conditions={'Type': dbtables.ExactCond('Unknown')})
        assert len(values) == 1, values
        assert values[0]['Name'] == None, values
        assert values[0]['Access'] == None, values

        # Modify value by select conditions
        values = self.tdb.Select(
            tabname, None,
            conditions={'Name': dbtables.ExactCond('Nifty.MP3')})
        assert len(values) == 1, values
        assert values[0]['Type'] == "MP3", values
        assert values[0]['Access'] == "2", values

        # Make sure change applied only to select conditions
        values = self.tdb.Select(
            tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')})
        assert len(values) == 1, values
        assert values[0]['Type'] == "Word", values
        assert values[0]['Access'] == "9", values


def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TableDBTestCase))
    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_env_close.py ---
"""TestCases for checking that it does not segfault when a DBEnv object
is closed before its DB objects.
"""

import os
import sys
import tempfile
import glob
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db

from test_all import verbose

# We're going to get warnings in this module about trying to close the db when
# its env is already closed.  Let's just ignore those.
try:
    import warnings
except ImportError:
    pass
else:
    warnings.filterwarnings('ignore',
                            message='DB could not be closed in',
                            category=RuntimeWarning)


#----------------------------------------------------------------------

class DBEnvClosedEarlyCrash(unittest.TestCase):
    def setUp(self):
        self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        try: os.mkdir(self.homeDir)
        except os.error: pass
        tempfile.tempdir = self.homeDir
        self.filename = os.path.split(tempfile.mktemp())[1]
        tempfile.tempdir = None

    def tearDown(self):
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)


    def test01_close_dbenv_before_db(self):
        dbenv = db.DBEnv()
        dbenv.open(self.homeDir,
                   db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
                   0666)

        d = db.DB(dbenv)
        d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)

        try:
            dbenv.close()
        except db.DBError:
            try:
                d.close()
            except db.DBError:
                return
            assert 0, \
                   "DB close did not raise an exception about its "\
                   "DBEnv being trashed"

        # XXX This may fail when using older versions of BerkeleyDB.
        # E.g. 3.2.9 never raised the exception.
        assert 0, "dbenv did not raise an exception about its DB being open"


    def test02_close_dbenv_delete_db_success(self):
        dbenv = db.DBEnv()
        dbenv.open(self.homeDir,
                   db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
                   0666)

        d = db.DB(dbenv)
        d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)

        try:
            dbenv.close()
        except db.DBError:
            pass  # good, it should raise an exception

        del d
        try:
            import gc
        except ImportError:
            gc = None
        if gc:
            # force d.__del__ [DB_dealloc] to be called
            gc.collect()


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(DBEnvClosedEarlyCrash))
    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_get_none.py ---
"""
TestCases for checking set_get_returns_none.
"""

import sys, os, string
import tempfile
from pprint import pprint
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db

from test_all import verbose


#----------------------------------------------------------------------

class GetReturnsNoneTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.filename)
        except os.error:
            pass


    def test01_get_returns_none(self):
        d = db.DB()
        d.open(self.filename, db.DB_BTREE, db.DB_CREATE)
        d.set_get_returns_none(1)

        for x in string.letters:
            d.put(x, x * 40)

        data = d.get('bad key')
        assert data == None

        data = d.get('a')
        assert data == 'a'*40

        count = 0
        c = d.cursor()
        rec = c.first()
        while rec:
            count = count + 1
            rec = c.next()

        assert rec == None
        assert count == 52

        c.close()
        d.close()


    def test02_get_raises_exception(self):
        d = db.DB()
        d.open(self.filename, db.DB_BTREE, db.DB_CREATE)
        d.set_get_returns_none(0)

        for x in string.letters:
            d.put(x, x * 40)

        self.assertRaises(db.DBNotFoundError, d.get, 'bad key')
        self.assertRaises(KeyError, d.get, 'bad key')

        data = d.get('a')
        assert data == 'a'*40

        count = 0
        exceptionHappened = 0
        c = d.cursor()
        rec = c.first()
        while rec:
            count = count + 1
            try:
                rec = c.next()
            except db.DBNotFoundError:  # end of the records
                exceptionHappened = 1
                break

        assert rec != None
        assert exceptionHappened
        assert count == 52

        c.close()
        d.close()

#----------------------------------------------------------------------

def test_suite():
    return unittest.makeSuite(GetReturnsNoneTestCase)


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_join.py ---
"""TestCases for using the DB.join and DBCursor.join_item methods.
"""

import sys, os, string
import tempfile
import time
from pprint import pprint

try:
    from threading import Thread, currentThread
    have_threads = 1
except ImportError:
    have_threads = 0

import unittest
from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve
except ImportError:
    # For Python 2.3
    from bsddb import db, dbshelve


#----------------------------------------------------------------------

ProductIndex = [
    ('apple', "Convenience Store"),
    ('blueberry', "Farmer's Market"),
    ('shotgun', "S-Mart"),              # Aisle 12
    ('pear', "Farmer's Market"),
    ('chainsaw', "S-Mart"),             # "Shop smart.  Shop S-Mart!"
    ('strawberry', "Farmer's Market"),
]

ColorIndex = [
    ('blue', "blueberry"),
    ('red', "apple"),
    ('red', "chainsaw"),
    ('red', "strawberry"),
    ('yellow', "peach"),
    ('yellow', "pear"),
    ('black', "shotgun"),
]

class JoinTestCase(unittest.TestCase):
    keytype = ''

    def setUp(self):
        self.filename = self.__class__.__name__ + '.db'
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass
        self.env = db.DBEnv()
        self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK )

    def tearDown(self):
        self.env.close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def test01_join(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_join..." % \
                  self.__class__.__name__

        # create and populate primary index
        priDB = db.DB(self.env)
        priDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE)
        map(lambda t, priDB=priDB: apply(priDB.put, t), ProductIndex)

        # create and populate secondary index
        secDB = db.DB(self.env)
        secDB.set_flags(db.DB_DUP | db.DB_DUPSORT)
        secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
        map(lambda t, secDB=secDB: apply(secDB.put, t), ColorIndex)

        sCursor = None
        jCursor = None
        try:
            # lets look up all of the red Products
            sCursor = secDB.cursor()
            # Don't do the .set() in an assert, or you can get a bogus failure
            # when running python -O
            tmp = sCursor.set('red')
            assert tmp

            # FIXME: jCursor doesn't properly hold a reference to its
            # cursors, if they are closed before jcursor is used it
            # can cause a crash.
            jCursor = priDB.join([sCursor])

            if jCursor.get(0) != ('apple', "Convenience Store"):
                self.fail("join cursor positioned wrong")
            if jCursor.join_item() != 'chainsaw':
                self.fail("DBCursor.join_item returned wrong item")
            if jCursor.get(0)[0] != 'strawberry':
                self.fail("join cursor returned wrong thing")
            if jCursor.get(0):  # there were only three red items to return
                self.fail("join cursor returned too many items")
        finally:
            if jCursor:
                jCursor.close()
            if sCursor:
                sCursor.close()
            priDB.close()
            secDB.close()


def test_suite():
    suite = unittest.TestSuite()

    suite.addTest(unittest.makeSuite(JoinTestCase))

    return suite

--- NEW FILE: test_lock.py ---
"""
TestCases for testing the locking sub-system.
"""

import sys, os, string
import tempfile
import time
from pprint import pprint
from whrandom import random

try:
    from threading import Thread, currentThread
    have_threads = 1
except ImportError:
    have_threads = 0


import unittest
from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db


#----------------------------------------------------------------------

class LockingTestCase(unittest.TestCase):

    def setUp(self):
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass
        self.env = db.DBEnv()
        self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
                      db.DB_INIT_LOCK | db.DB_CREATE)


    def tearDown(self):
        self.env.close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)


    def test01_simple(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_simple..." % self.__class__.__name__

        anID = self.env.lock_id()
        if verbose:
            print "locker ID: %s" % anID
        lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
        if verbose:
            print "Aquired lock: %s" % lock
        time.sleep(1)
        self.env.lock_put(lock)
        if verbose:
            print "Released lock: %s" % lock




    def test02_threaded(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_threaded..." % self.__class__.__name__

        threads = []
        threads.append(Thread(target = self.theThread,
                              args=(5, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))

        for t in threads:
            t.start()
        for t in threads:
            t.join()

    def test03_set_timeout(self):
        # test that the set_timeout call works
        if hasattr(self.env, 'set_timeout'):
            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)

    def theThread(self, sleepTime, lockType):
        name = currentThread().getName()
        if lockType ==  db.DB_LOCK_WRITE:
            lt = "write"
        else:
            lt = "read"

        anID = self.env.lock_id()
        if verbose:
            print "%s: locker ID: %s" % (name, anID)

        lock = self.env.lock_get(anID, "some locked thing", lockType)
        if verbose:
            print "%s: Aquired %s lock: %s" % (name, lt, lock)

        time.sleep(sleepTime)

        self.env.lock_put(lock)
        if verbose:
            print "%s: Released %s lock: %s" % (name, lt, lock)


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    if have_threads:
        suite.addTest(unittest.makeSuite(LockingTestCase))
    else:
        suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_misc.py ---
"""Miscellaneous bsddb module test cases
"""

import os
import sys
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve
except ImportError:
    # For Python 2.3
    from bsddb import db, dbshelve

#----------------------------------------------------------------------

class MiscTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = self.__class__.__name__ + '.db'
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try:
            os.mkdir(homeDir)
        except OSError:
            pass

    def tearDown(self):
        try:
            os.remove(self.filename)
        except OSError:
            pass
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def test01_badpointer(self):
        dbs = dbshelve.open(self.filename)
        dbs.close()
        self.assertRaises(db.DBError, dbs.get, "foo")

    def test02_db_home(self):
        env = db.DBEnv()
        # check for crash fixed when db_home is used before open()
        assert env.db_home is None
        env.open(self.homeDir, db.DB_CREATE)
        assert self.homeDir == env.db_home


#----------------------------------------------------------------------


def test_suite():
    return unittest.makeSuite(MiscTestCase)


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_queue.py ---
"""
TestCases for exercising a Queue DB.
"""

import sys, os, string
import tempfile
from pprint import pprint
import unittest

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db

from test_all import verbose


#----------------------------------------------------------------------

class SimpleQueueTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.filename)
        except os.error:
            pass


    def test01_basic(self):
        # Basic Queue tests using the deprecated DBCursor.consume method.

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_basic..." % self.__class__.__name__

        d = db.DB()
        d.set_re_len(40)  # Queues must be fixed length
        d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)

        if verbose:
            print "before appends" + '-' * 30
            pprint(d.stat())

        for x in string.letters:
            d.append(x * 40)

        assert len(d) == 52

        d.put(100, "some more data")
        d.put(101, "and some more ")
        d.put(75,  "out of order")
        d.put(1,   "replacement data")

        assert len(d) == 55

        if verbose:
            print "before close" + '-' * 30
            pprint(d.stat())

        d.close()
        del d
        d = db.DB()
        d.open(self.filename)

        if verbose:
            print "after open" + '-' * 30
            pprint(d.stat())

        d.append("one more")
        c = d.cursor()

        if verbose:
            print "after append" + '-' * 30
            pprint(d.stat())

        rec = c.consume()
        while rec:
            if verbose:
                print rec
            rec = c.consume()
        c.close()

        if verbose:
            print "after consume loop" + '-' * 30
            pprint(d.stat())

        assert len(d) == 0, \
               "if you see this message then you need to rebuild " \
               "BerkeleyDB 3.1.17 with the patch in patches/qam_stat.diff"

        d.close()



    def test02_basicPost32(self):
        # Basic Queue tests using the new DB.consume method in DB 3.2+
        # (No cursor needed)

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_basicPost32..." % self.__class__.__name__

        if db.version() < (3, 2, 0):
            if verbose:
                print "Test not run, DB not new enough..."
            return

        d = db.DB()
        d.set_re_len(40)  # Queues must be fixed length
        d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)

        if verbose:
            print "before appends" + '-' * 30
            pprint(d.stat())

        for x in string.letters:
            d.append(x * 40)

        assert len(d) == 52

        d.put(100, "some more data")
        d.put(101, "and some more ")
        d.put(75,  "out of order")
        d.put(1,   "replacement data")

        assert len(d) == 55

        if verbose:
            print "before close" + '-' * 30
            pprint(d.stat())

        d.close()
        del d
        d = db.DB()
        d.open(self.filename)
        #d.set_get_returns_none(true)

        if verbose:
            print "after open" + '-' * 30
            pprint(d.stat())

        d.append("one more")

        if verbose:
            print "after append" + '-' * 30
            pprint(d.stat())

        rec = d.consume()
        while rec:
            if verbose:
                print rec
            rec = d.consume()

        if verbose:
            print "after consume loop" + '-' * 30
            pprint(d.stat())

        d.close()



#----------------------------------------------------------------------

def test_suite():
    return unittest.makeSuite(SimpleQueueTestCase)


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_recno.py ---
"""TestCases for exercising a Recno DB.
"""

import os
import sys
import errno
import tempfile
from pprint import pprint
import unittest

from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db

letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'


#----------------------------------------------------------------------

class SimpleRecnoTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.filename)
        except OSError, e:
            if e.errno <> errno.EEXIST: raise

    def test01_basic(self):
        d = db.DB()
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

        for x in letters:
            recno = d.append(x * 60)
            assert type(recno) == type(0)
            assert recno >= 1
            if verbose:
                print recno,

        if verbose: print

        stat = d.stat()
        if verbose:
            pprint(stat)

        for recno in range(1, len(d)+1):
            data = d[recno]
            if verbose:
                print data

            assert type(data) == type("")
            assert data == d.get(recno)

        try:
            data = d[0]  # This should raise a KeyError!?!?!
        except db.DBInvalidArgError, val:
            assert val[0] == db.EINVAL
            if verbose: print val
        else:
            self.fail("expected exception")

        try:
            data = d[100]
        except KeyError:
            pass
        else:
            self.fail("expected exception")

        data = d.get(100)
        assert data == None

        keys = d.keys()
        if verbose:
            print keys
        assert type(keys) == type([])
        assert type(keys[0]) == type(123)
        assert len(keys) == len(d)

        items = d.items()
        if verbose:
            pprint(items)
        assert type(items) == type([])
        assert type(items[0]) == type(())
        assert len(items[0]) == 2
        assert type(items[0][0]) == type(123)
        assert type(items[0][1]) == type("")
        assert len(items) == len(d)

        assert d.has_key(25)

        del d[25]
        assert not d.has_key(25)

        d.delete(13)
        assert not d.has_key(13)

        data = d.get_both(26, "z" * 60)
        assert data == "z" * 60
        if verbose:
            print data

        fd = d.fd()
        if verbose:
            print fd

        c = d.cursor()
        rec = c.first()
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.set(50)
        rec = c.current()
        if verbose:
            print rec

        c.put(-1, "a replacement record", db.DB_CURRENT)

        c.set(50)
        rec = c.current()
        assert rec == (50, "a replacement record")
        if verbose:
            print rec

        rec = c.set_range(30)
        if verbose:
            print rec

        c.close()
        d.close()

        d = db.DB()
        d.open(self.filename)
        c = d.cursor()

        # put a record beyond the consecutive end of the recno's
        d[100] = "way out there"
        assert d[100] == "way out there"

        try:
            data = d[99]
        except KeyError:
            pass
        else:
            self.fail("expected exception")

        try:
            d.get(99)
        except db.DBKeyEmptyError, val:
            assert val[0] == db.DB_KEYEMPTY
            if verbose: print val
        else:
            self.fail("expected exception")

        rec = c.set(40)
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.close()
        d.close()

    def test02_WithSource(self):
        """
        A Recno file that is given a "backing source file" is essentially a
        simple ASCII file.  Normally each record is delimited by \n and so is
        just a line in the file, but you can set a different record delimiter
        if needed.
        """
        source = os.path.join(os.path.dirname(sys.argv[0]),
                              'db_home/test_recno.txt')
        f = open(source, 'w') # create the file
        f.close()

        d = db.DB()
        # This is the default value, just checking if both int
        d.set_re_delim(0x0A)
        d.set_re_delim('\n')  # and char can be used...
        d.set_re_source(source)
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

        data = "The quick brown fox jumped over the lazy dog".split()
        for datum in data:
            d.append(datum)
        d.sync()
        d.close()

        # get the text from the backing source
        text = open(source, 'r').read()
        text = text.strip()
        if verbose:
            print text
            print data
            print text.split('\n')

        assert text.split('\n') == data

        # open as a DB again
        d = db.DB()
        d.set_re_source(source)
        d.open(self.filename, db.DB_RECNO)

        d[3] = 'reddish-brown'
        d[8] = 'comatose'

        d.sync()
        d.close()

        text = open(source, 'r').read()
        text = text.strip()
        if verbose:
            print text
            print text.split('\n')

        assert text.split('\n') == \
             "The quick reddish-brown fox jumped over the comatose dog".split()

    def test03_FixedLength(self):
        d = db.DB()
        d.set_re_len(40)  # fixed length records, 40 bytes long
        d.set_re_pad('-') # sets the pad character...
        d.set_re_pad(45)  # ...test both int and char
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

        for x in letters:
            d.append(x * 35)    # These will be padded

        d.append('.' * 40)      # this one will be exact

        try:                    # this one will fail
            d.append('bad' * 20)
        except db.DBInvalidArgError, val:
            assert val[0] == db.EINVAL
            if verbose: print val
        else:
            self.fail("expected exception")

        c = d.cursor()
        rec = c.first()
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.close()
        d.close()


#----------------------------------------------------------------------


def test_suite():
    return unittest.makeSuite(SimpleRecnoTestCase)


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')

--- NEW FILE: test_thread.py ---
"""TestCases for multi-threaded access to a DB.
"""

import os
import sys
import time
import errno
import shutil
import tempfile
from pprint import pprint
from whrandom import random

try:
    True, False
except NameError:
    True = 1
    False = 0

DASH = '-'

try:
    from threading import Thread, currentThread
    have_threads = True
except ImportError:
    have_threads = False

import unittest
from test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbutils
except ImportError:
    # For Python 2.3
    from bsddb import db, dbutils


#----------------------------------------------------------------------

class BaseThreadedTestCase(unittest.TestCase):
    dbtype       = db.DB_UNKNOWN  # must be set in derived class
    dbopenflags  = 0
    dbsetflags   = 0
    envflags     = 0

    def setUp(self):
        if verbose:
            dbutils._deadlock_VerboseFile = sys.stdout

        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try:
            os.mkdir(homeDir)
        except OSError, e:
            if e.errno <> errno.EEXIST: raise
        self.env = db.DBEnv()
        self.setEnvOpts()
        self.env.open(homeDir, self.envflags | db.DB_CREATE)

        self.filename = self.__class__.__name__ + '.db'
        self.d = db.DB(self.env)
        if self.dbsetflags:
            self.d.set_flags(self.dbsetflags)
        self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)

    def tearDown(self):
        self.d.close()
        self.env.close()
        shutil.rmtree(self.homeDir)

    def setEnvOpts(self):
        pass

    def makeData(self, key):
        return DASH.join([key] * 5)


#----------------------------------------------------------------------


class ConcurrentDataStoreBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD
    envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
    readers     = 0 # derived class should set
    writers     = 0
    records     = 1000

    def test01_1WriterMultiReaders(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_1WriterMultiReaders..." % \
                  self.__class__.__name__

        threads = []
        for x in range(self.writers):
            wt = Thread(target = self.writerThread,
                        args = (self.d, self.records, x),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            threads.append(wt)

        for x in range(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            threads.append(rt)

        for t in threads:
            t.start()
        for t in threads:
            t.join()

    def writerThread(self, d, howMany, writerNum):
        #time.sleep(0.01 * writerNum + 0.01)
        name = currentThread().getName()
        start = howMany * writerNum
        stop = howMany * (writerNum + 1) - 1
        if verbose:
            print "%s: creating records %d - %d" % (name, start, stop)

        for x in range(start, stop):
            key = '%04d' % x
            dbutils.DeadlockWrap(d.put, key, self.makeData(key),
                                 max_retries=12)
            if verbose and x % 100 == 0:
                print "%s: records %d - %d finished" % (name, start, x)

        if verbose:
            print "%s: finished creating records" % name

##         # Each write-cursor will be exclusive, the only one that can update the DB...
##         if verbose: print "%s: deleting a few records" % name
##         c = d.cursor(flags = db.DB_WRITECURSOR)
##         for x in range(10):
##             key = int(random() * howMany) + start
##             key = '%04d' % key
##             if d.has_key(key):
##                 c.set(key)
##                 c.delete()

##         c.close()
        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        time.sleep(0.01 * readerNum)
        name = currentThread().getName()

        for loop in range(5):
            c = d.cursor()
            count = 0
            rec = c.first()
            while rec:
                count += 1
                key, data = rec
                self.assertEqual(self.makeData(key), data)
                rec = c.next()
            if verbose:
                print "%s: found %d records" % (name, count)
            c.close()
            time.sleep(0.05)

        if verbose:
            print "%s: thread finished" % name


class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
    dbtype  = db.DB_BTREE
    writers = 2
    readers = 10
    records = 1000


class HashConcurrentDataStore(ConcurrentDataStoreBase):
    dbtype  = db.DB_HASH
    writers = 2
    readers = 10
    records = 1000


#----------------------------------------------------------------------

class SimpleThreadedBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD
    envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
    readers = 5
    writers = 3
    records = 1000

    def setEnvOpts(self):
        self.env.set_lk_detect(db.DB_LOCK_DEFAULT)

    def test02_SimpleLocks(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_SimpleLocks..." % self.__class__.__name__

        threads = []
        for x in range(self.writers):
            wt = Thread(target = self.writerThread,
                        args = (self.d, self.records, x),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            threads.append(wt)
        for x in range(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            threads.append(rt)

        for t in threads:
            t.start()
        for t in threads:
            t.join()

    def writerThread(self, d, howMany, writerNum):
        name = currentThread().getName()
        start = howMany * writerNum
        stop = howMany * (writerNum + 1) - 1
        if verbose:
            print "%s: creating records %d - %d" % (name, start, stop)

        # create a bunch of records
        for x in xrange(start, stop):
            key = '%04d' % x
            dbutils.DeadlockWrap(d.put, key, self.makeData(key),
                                 max_retries=12)

            if verbose and x % 100 == 0:
                print "%s: records %d - %d finished" % (name, start, x)

            # do a bit or reading too
            if random() <= 0.05:
                for y in xrange(start, x):
                    key = '%04d' % x
                    data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
                    self.assertEqual(data, self.makeData(key))

        # flush them
        try:
            dbutils.DeadlockWrap(d.sync, max_retries=12)
        except db.DBIncompleteError, val:
            if verbose:
                print "could not complete sync()..."

        # read them back, deleting a few
        for x in xrange(start, stop):
            key = '%04d' % x
            data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
            if verbose and x % 100 == 0:
                print "%s: fetched record (%s, %s)" % (name, key, data)
            self.assertEqual(data, self.makeData(key))
            if random() <= 0.10:
                dbutils.DeadlockWrap(d.delete, key, max_retries=12)
                if verbose:
                    print "%s: deleted record %s" % (name, key)

        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        time.sleep(0.01 * readerNum)
        name = currentThread().getName()

        for loop in range(5):
            c = d.cursor()
            count = 0
            rec = dbutils.DeadlockWrap(c.first, max_retries=10)
            while rec:
                count += 1
                key, data = rec
                self.assertEqual(self.makeData(key), data)
                rec = dbutils.DeadlockWrap(c.next, max_retries=10)
            if verbose:
                print "%s: found %d records" % (name, count)
            c.close()
            time.sleep(0.05)

        if verbose:
            print "%s: thread finished" % name


class BTreeSimpleThreaded(SimpleThreadedBase):
    dbtype = db.DB_BTREE


class HashSimpleThreaded(SimpleThreadedBase):
    dbtype = db.DB_HASH


#----------------------------------------------------------------------


class ThreadedTransactionsBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
    envflags    = (db.DB_THREAD |
                   db.DB_INIT_MPOOL |
                   db.DB_INIT_LOCK |
                   db.DB_INIT_LOG |
                   db.DB_INIT_TXN
                   )
    readers = 0
    writers = 0
    records = 2000
    txnFlag = 0

    def setEnvOpts(self):
        #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
        pass

    def test03_ThreadedTransactions(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03_ThreadedTransactions..." % \
                  self.__class__.__name__

        threads = []
        for x in range(self.writers):
            wt = Thread(target = self.writerThread,
                        args = (self.d, self.records, x),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            threads.append(wt)

        for x in range(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            threads.append(rt)

        dt = Thread(target = self.deadlockThread)
        dt.start()

        for t in threads:
            t.start()
        for t in threads:
            t.join()

        self.doLockDetect = False
        dt.join()

    def doWrite(self, d, name, start, stop):
        finished = False
        while not finished:
            try:
                txn = self.env.txn_begin(None, self.txnFlag)
                for x in range(start, stop):
                    key = '%04d' % x
                    d.put(key, self.makeData(key), txn)
                    if verbose and x % 100 == 0:
                        print "%s: records %d - %d finished" % (name, start, x)
                txn.commit()
                finished = True
            except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
                if verbose:
                    print "%s: Aborting transaction (%s)" % (name, val[1])
                txn.abort()
                time.sleep(0.05)

    def writerThread(self, d, howMany, writerNum):
        name = currentThread().getName()
        start = howMany * writerNum
        stop = howMany * (writerNum + 1) - 1
        if verbose:
            print "%s: creating records %d - %d" % (name, start, stop)

        step = 100
        for x in range(start, stop, step):
            self.doWrite(d, name, x, min(stop, x+step))

        if verbose:
            print "%s: finished creating records" % name
        if verbose:
            print "%s: deleting a few records" % name

        finished = False
        while not finished:
            try:
                recs = []
                txn = self.env.txn_begin(None, self.txnFlag)
                for x in range(10):
                    key = int(random() * howMany) + start
                    key = '%04d' % key
                    data = d.get(key, None, txn, db.DB_RMW)
                    if data is not None:
                        d.delete(key, txn)
                        recs.append(key)
                txn.commit()
                finished = True
                if verbose:
                    print "%s: deleted records %s" % (name, recs)
            except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
                if verbose:
                    print "%s: Aborting transaction (%s)" % (name, val[1])
                txn.abort()
                time.sleep(0.05)

        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        time.sleep(0.01 * readerNum + 0.05)
        name = currentThread().getName()

        for loop in range(5):
            finished = False
            while not finished:
                try:
                    txn = self.env.txn_begin(None, self.txnFlag)
                    c = d.cursor(txn)
                    count = 0
                    rec = c.first()
                    while rec:
                        count += 1
                        key, data = rec
                        self.assertEqual(self.makeData(key), data)
                        rec = c.next()
                    if verbose: print "%s: found %d records" % (name, count)
                    c.close()
                    txn.commit()
                    finished = True
                except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
                    if verbose:
                        print "%s: Aborting transaction (%s)" % (name, val[1])
                    c.close()
                    txn.abort()
                    time.sleep(0.05)

            time.sleep(0.05)

        if verbose:
            print "%s: thread finished" % name

    def deadlockThread(self):
        self.doLockDetect = True
        while self.doLockDetect:
            time.sleep(0.5)
            try:
                aborted = self.env.lock_detect(
                    db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
                if verbose and aborted:
                    print "deadlock: Aborted %d deadlocked transaction(s)" \
                          % aborted
            except db.DBError:
                pass


class BTreeThreadedTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_BTREE
    writers = 3
    readers = 5
    records = 2000

class HashThreadedTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_HASH
    writers = 1
    readers = 5
    records = 2000

class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_BTREE
    writers = 3
    readers = 5
    records = 2000
    txnFlag = db.DB_TXN_NOWAIT

class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_HASH
    writers = 1
    readers = 5
    records = 2000
    txnFlag = db.DB_TXN_NOWAIT


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    if have_threads:
        suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
        suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
        suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
        suite.addTest(unittest.makeSuite(HashSimpleThreaded))
        suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
        suite.addTest(unittest.makeSuite(HashThreadedTransactions))
        suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
        suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))

    else:
        print "Threads not available, skipping thread tests."

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


_______________________________________________
Stackless-checkins mailing list
Stackless-checkins at stackless.com
http://www.stackless.com/mailman/listinfo/stackless-checkins



More information about the Stackless-checkins mailing list