1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375 |
- """ Unit tests version 2.6.1.0 for adodbapi"""
- """
- adodbapi - A python DB API 2.0 interface to Microsoft ADO
- Copyright (C) 2002 Henrik Ekelund
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- Updates by Vernon Cole
- """
- import unittest
- import sys
- import datetime
- import decimal
- import copy
- import random
- import string
- try:
- import win32com.client
- win32 = True
- except ImportError:
- win32 = False
- # run the configuration module.
- import adodbapitestconfig as config # will set sys.path to find correct version of adodbapi
- # in our code below, all our switches are from config.whatever
- import tryconnection
- import adodbapi
- import adodbapi.apibase as api
- try:
- import adodbapi.ado_consts as ado_consts
- except ImportError: #we are doing a shortcut import as a module -- so
- try:
- import ado_consts
- except ImportError:
- from adodbapi import ado_consts
- def str2bytes(sval):
- return sval.encode("latin1")
- long = int
- def randomstring(length):
- return ''.join([random.choice(string.ascii_letters) for n in range(32)])
- class CommonDBTests(unittest.TestCase):
- "Self contained super-simple tests in easy syntax, should work on everything between mySQL and Oracle"
- def setUp(self):
- self.engine = 'unknown'
- def getEngine(self):
- return self.engine
- def getConnection(self):
- raise NotImplementedError #"This method must be overriden by a subclass"
- def getCursor(self):
- return self.getConnection().cursor()
- def testConnection(self):
- crsr=self.getCursor()
- assert crsr.__class__.__name__ == 'Cursor'
- def testErrorHandlerInherits(self):
- if not self.remote:
- conn=self.getConnection()
- mycallable=lambda connection,cursor,errorclass,errorvalue: 1
- conn.errorhandler=mycallable
- crsr=conn.cursor()
- assert crsr.errorhandler==mycallable,"Error handler on crsr should be same as on connection"
- def testDefaultErrorHandlerConnection(self):
- if not self.remote:
- conn=self.getConnection()
- del conn.messages[:]
- try:
- conn.close()
- conn.commit() #Should not be able to use connection after it is closed
- except:
- assert len(conn.messages)==1
- assert len(conn.messages[0])==2
- assert conn.messages[0][0]==api.ProgrammingError
-
- def testOwnErrorHandlerConnection(self):
- if self.remote: # ToDo: use "skip"
- return
- mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
- conn=self.getConnection()
- conn.errorhandler=mycallable
- conn.close()
- conn.commit() #Should not be able to use connection after it is closed
- assert len(conn.messages)==0
-
- conn.errorhandler=None #This should bring back the standard error handler
- try:
- conn.close()
- conn.commit() #Should not be able to use connection after it is closed
- except:
- pass
- #The Standard errorhandler appends error to messages attribute
- assert len(conn.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
- def testDefaultErrorHandlerCursor(self):
- crsr=self.getConnection().cursor()
- if not self.remote:
- del crsr.messages[:]
- try:
- crsr.execute("SELECT abbtytddrf FROM dasdasd")
- except:
- assert len(crsr.messages)==1
- assert len(crsr.messages[0])==2
- assert crsr.messages[0][0]==api.DatabaseError
-
- def testOwnErrorHandlerCursor(self):
- if self.remote: # ToDo: should be a "skip"
- return
- mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
- crsr=self.getConnection().cursor()
- crsr.errorhandler=mycallable
- crsr.execute("SELECT abbtytddrf FROM dasdasd")
- assert len(crsr.messages)==0
-
- crsr.errorhandler=None #This should bring back the standard error handler
- try:
- crsr.execute("SELECT abbtytddrf FROM dasdasd")
- except:
- pass
- #The Standard errorhandler appends error to messages attribute
- assert len(crsr.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
- def testUserDefinedConversions(self):
- if self.remote: ## Todo: should be a "skip"
- return
- try:
- duplicatingConverter=lambda aStringField: aStringField*2
- assert duplicatingConverter('gabba') == 'gabbagabba'
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- # the variantConversions attribute should not exist on a normal connection object
- self.assertRaises(AttributeError, lambda x:conn.variantConversions[x],[2])
- if not self.remote:
- # create a variantConversions attribute on the connection
- conn.variantConversions = copy.copy(api.variantConversions)
- crsr=conn.cursor()
- tabdef = "CREATE TABLE xx_%s (fldData VARCHAR(100) NOT NULL, fld2 VARCHAR(20))" % config.tmp
- crsr.execute(tabdef)
- crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('gabba','booga')" % config.tmp)
- crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('hey','yo')" % config.tmp)
- # change converter for ALL adoStringTypes columns
- conn.variantConversions[api.adoStringTypes]=duplicatingConverter
- crsr.execute("SELECT fldData,fld2 FROM xx_%s ORDER BY fldData" % config.tmp)
- rows=crsr.fetchall()
- row = rows[0]
- self.assertEqual(row[0],'gabbagabba')
- row = rows[1]
- self.assertEqual(row[0],'heyhey')
- self.assertEqual(row[1],'yoyo')
- upcaseConverter=lambda aStringField: aStringField.upper()
- assert upcaseConverter('upThis') == 'UPTHIS'
- # now use a single column converter
- rows.converters[1] = upcaseConverter # convert second column
- self.assertEqual(row[0],'heyhey') # first will be unchanged
- self.assertEqual(row[1],'YO') # second will convert to upper case
- finally:
- try:
- del conn.variantConversions #Restore the default
- except: pass
- self.helpRollbackTblTemp()
- def testUserDefinedConversionForExactNumericTypes(self):
- # variantConversions is a dictionary of conversion functions
- # held internally in adodbapi.apibase
- #
- # !!! this test intentionally alters the value of what should be constant in the module
- # !!! no new code should use this example, to is only a test to see that the
- # !!! deprecated way of doing this still works. (use connection.variantConversions)
- #
- if not self.remote and sys.version_info < (3,0): ### Py3 need different test
- oldconverter = adodbapi.variantConversions[ado_consts.adNumeric] #keep old function to restore later
- # By default decimal and "numbers" are returned as decimals.
- # Instead, make numbers return as floats
- try:
- adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtFloat
- self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
- self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
- # now return strings
- adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtString
- self.helpTestDataType("numeric(18,2)",'NUMBER','3.45')
- # now a completly weird user defined convertion
- adodbapi.variantConversions[ado_consts.adNumeric] = lambda x: '!!This function returns a funny unicode string %s!!'%x
- self.helpTestDataType("numeric(18,2)",'NUMBER','3.45',
- allowedReturnValues=['!!This function returns a funny unicode string 3.45!!'])
- finally:
- # now reset the converter to its original function
- adodbapi.variantConversions[ado_consts.adNumeric]=oldconverter #Restore the original convertion function
- def helpTestDataType(self,sqlDataTypeString,
- DBAPIDataTypeString,
- pyData,
- pyDataInputAlternatives=None,
- compareAlmostEqual=None,
- allowedReturnValues=None):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- crsr=conn.cursor()
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldData """ % config.tmp + sqlDataTypeString + ")\n"
- crsr.execute(tabdef)
-
- #Test Null values mapped to None
- crsr.execute("INSERT INTO xx_%s (fldId) VALUES (1)" % config.tmp)
-
- crsr.execute("SELECT fldId,fldData FROM xx_%s" % config.tmp)
- rs=crsr.fetchone()
- self.assertEqual(rs[1],None) #Null should be mapped to None
- assert rs[0]==1
- #Test description related
- descTuple=crsr.description[1]
- assert descTuple[0] in ['fldData','flddata'], 'was "%s" expected "%s"'%(descTuple[0],'fldData')
- if DBAPIDataTypeString=='STRING':
- assert descTuple[1] == api.STRING, 'was "%s" expected "%s"'%(descTuple[1],api.STRING.values)
- elif DBAPIDataTypeString == 'NUMBER':
- assert descTuple[1] == api.NUMBER, 'was "%s" expected "%s"'%(descTuple[1],api.NUMBER.values)
- elif DBAPIDataTypeString == 'BINARY':
- assert descTuple[1] == api.BINARY, 'was "%s" expected "%s"'%(descTuple[1],api.BINARY.values)
- elif DBAPIDataTypeString == 'DATETIME':
- assert descTuple[1] == api.DATETIME, 'was "%s" expected "%s"'%(descTuple[1],api.DATETIME.values)
- elif DBAPIDataTypeString == 'ROWID':
- assert descTuple[1] == api.ROWID, 'was "%s" expected "%s"'%(descTuple[1],api.ROWID.values)
- elif DBAPIDataTypeString == 'UUID':
- assert descTuple[1] == api.OTHER, 'was "%s" expected "%s"'%(descTuple[1],api.OTHER.values)
- else:
- raise NotImplementedError #"DBAPIDataTypeString not provided"
- #Test data binding
- inputs=[pyData]
- if pyDataInputAlternatives:
- inputs.extend(pyDataInputAlternatives)
- inputs = set(inputs) # removes redundant string==unicode tests
- fldId=1
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (?,?)" % config.tmp, (fldId, inParam))
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldData FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
- rs=crsr.fetchone()
- if allowedReturnValues:
- allowedTypes = tuple([type(aRV) for aRV in allowedReturnValues])
- assert isinstance(rs[0],allowedTypes), \
- 'result type "%s" must be one of %s'%(type(rs[0]),allowedTypes)
- else:
- assert isinstance(rs[0] ,type(pyData)), \
- 'result type "%s" must be instance of %s'%(type(rs[0]),type(pyData))
- if compareAlmostEqual and DBAPIDataTypeString == 'DATETIME':
- iso1=adodbapi.dateconverter.DateObjectToIsoFormatString(rs[0])
- iso2=adodbapi.dateconverter.DateObjectToIsoFormatString(pyData)
- self.assertEqual(iso1, iso2)
- elif compareAlmostEqual:
- s = float(pyData)
- v = float(rs[0])
- assert abs(v-s)/s < 0.00001, \
- "Values not almost equal recvd=%s, expected=%f" %(rs[0],s)
- else:
- if allowedReturnValues:
- ok=False
- self.assertTrue(rs[0] in allowedReturnValues,
- 'Value "%s" not in %s' % (repr(rs[0]), allowedReturnValues))
- else:
- self.assertEqual(rs[0], pyData,
- 'Values are not equal recvd="%s", expected="%s"' %(rs[0],pyData))
- def testDataTypeFloat(self):
- self.helpTestDataType("real",'NUMBER',3.45,compareAlmostEqual=True)
- self.helpTestDataType("float",'NUMBER',1.79e37,compareAlmostEqual=True)
- def testDataTypeDecmal(self):
- self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,
- allowedReturnValues=['3.45','3,45',decimal.Decimal('3.45')])
- self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,
- allowedReturnValues=['3.45','3,45',decimal.Decimal('3.45')])
- self.helpTestDataType("decimal(20,2)",'NUMBER',444444444444444444,
- allowedReturnValues=['444444444444444444.00', '444444444444444444,00',
- decimal.Decimal('444444444444444444')])
- if self.getEngine() == 'MSSQL':
- self.helpTestDataType("uniqueidentifier",'UUID','{71A4F49E-39F3-42B1-A41E-48FF154996E6}',
- allowedReturnValues=['{71A4F49E-39F3-42B1-A41E-48FF154996E6}'])
- def testDataTypeMoney(self): #v2.1 Cole -- use decimal for money
- if self.getEngine() == 'MySQL':
- self.helpTestDataType("DECIMAL(20,4)",'NUMBER',decimal.Decimal('-922337203685477.5808'))
- elif self.getEngine() == 'PostgreSQL':
- self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'),
- compareAlmostEqual=True,
- allowedReturnValues=[-922337203685477.5808,
- decimal.Decimal('-922337203685477.5808')])
- else:
- self.helpTestDataType("smallmoney",'NUMBER',decimal.Decimal('214748.02'))
- self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'))
- def testDataTypeInt(self):
- if self.getEngine() != 'PostgreSQL':
- self.helpTestDataType("tinyint",'NUMBER',115)
- self.helpTestDataType("smallint",'NUMBER',-32768)
- if self.getEngine() not in ['ACCESS','PostgreSQL']:
- self.helpTestDataType("bit",'NUMBER',1) #Does not work correctly with access
- if self.getEngine() in ['MSSQL','PostgreSQL']:
- self.helpTestDataType("bigint",'NUMBER',3000000000,
- allowedReturnValues=[3000000000, int(3000000000)])
- self.helpTestDataType("int",'NUMBER',2147483647)
- def testDataTypeChar(self):
- for sqlDataType in ("char(6)","nchar(6)"):
- self.helpTestDataType(sqlDataType,'STRING','spam ',allowedReturnValues=['spam','spam','spam ','spam '])
- def testDataTypeVarChar(self):
- if self.getEngine() == 'MySQL':
- stringKinds = ["varchar(10)","text"]
- elif self.getEngine() == 'PostgreSQL':
- stringKinds = ["varchar(10)","text","character varying"]
- else:
- stringKinds = ["varchar(10)","nvarchar(10)","text","ntext"] #,"varchar(max)"]
- for sqlDataType in stringKinds:
- self.helpTestDataType(sqlDataType,'STRING','spam',['spam'])
-
- def testDataTypeDate(self):
- if self.getEngine() == 'PostgreSQL':
- dt = "timestamp"
- else:
- dt = "datetime"
- self.helpTestDataType(dt,'DATETIME',adodbapi.Date(2002,10,28),
- compareAlmostEqual=True)
- if self.getEngine() not in ['MySQL','PostgreSQL']:
- self.helpTestDataType("smalldatetime",'DATETIME',adodbapi.Date(2002,10,28),
- compareAlmostEqual=True)
- if tag != 'pythontime' and self.getEngine() not in ['MySQL','PostgreSQL']: # fails when using pythonTime
- self.helpTestDataType(dt,'DATETIME', adodbapi.Timestamp(2002,10,28,12,15,1),
- compareAlmostEqual=True)
- def testDataTypeBinary(self):
- binfld = str2bytes('\x07\x00\xE2\x40*')
- arv = [binfld, adodbapi.Binary(binfld), bytes(binfld)]
- if self.getEngine() == 'PostgreSQL':
- self.helpTestDataType("bytea",'BINARY',adodbapi.Binary(binfld),
- allowedReturnValues=arv)
- else:
- self.helpTestDataType("binary(5)",'BINARY',adodbapi.Binary(binfld),
- allowedReturnValues=arv)
- self.helpTestDataType("varbinary(100)",'BINARY',adodbapi.Binary(binfld),
- allowedReturnValues=arv)
- if self.getEngine() != 'MySQL':
- self.helpTestDataType("image",'BINARY',adodbapi.Binary(binfld),
- allowedReturnValues=arv)
- def helpRollbackTblTemp(self):
- self.helpForceDropOnTblTemp()
-
- def helpForceDropOnTblTemp(self):
- conn=self.getConnection()
- with conn.cursor() as crsr:
- try:
- crsr.execute("DROP TABLE xx_%s" % config.tmp)
- if not conn.autocommit:
- conn.commit()
- except:
- pass
- def helpCreateAndPopulateTableTemp(self,crsr):
- tabdef= """
- CREATE TABLE xx_%s (
- fldData INTEGER
- )
- """ % config.tmp
- try: #EAFP
- crsr.execute(tabdef)
- except api.DatabaseError: # was not dropped before
- self.helpForceDropOnTblTemp() # so drop it now
- crsr.execute(tabdef)
- for i in range(9): # note: this poor SQL code, but a valid test
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES (%i)" % (config.tmp, i))
- # NOTE: building the test table without using parameter substitution
- def testFetchAll(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- rs=crsr.fetchall()
- assert len(rs)==9
- #test slice of rows
- i = 3
- for row in rs[3:-2]: #should have rowid 3..6
- assert row[0]==i
- i+=1
- self.helpRollbackTblTemp()
- def testPreparedStatement(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.prepare("SELECT fldData FROM xx_%s" % config.tmp)
- crsr.execute(crsr.command) # remembes the one that was prepared
- rs=crsr.fetchall()
- assert len(rs)==9
- assert rs[2][0]==2
- self.helpRollbackTblTemp()
- def testWrongPreparedStatement(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.prepare("SELECT * FROM nowhere")
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) # should execute this one, not the prepared one
- rs=crsr.fetchall()
- assert len(rs)==9
- assert rs[2][0]==2
- self.helpRollbackTblTemp()
- def testIterator(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- for i,row in enumerate(crsr): # using cursor as an iterator, rather than fetchxxx
- assert row[0]==i
- self.helpRollbackTblTemp()
-
- def testExecuteMany(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- seq_of_values = [ (111,) , (222,) ]
- crsr.executemany("INSERT INTO xx_%s (fldData) VALUES (?)" % config.tmp, seq_of_values)
- if crsr.rowcount==-1:
- print(self.getEngine()+" Provider does not support rowcount (on .executemany())")
- else:
- self.assertEqual( crsr.rowcount,2)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- rs=crsr.fetchall()
- assert len(rs)==11
- self.helpRollbackTblTemp()
-
- def testRowCount(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- if crsr.rowcount == -1:
- #print("provider does not support rowcount on select")
- pass
- else:
- self.assertEqual( crsr.rowcount,9)
- self.helpRollbackTblTemp()
-
- def testRowCountNoRecordset(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("DELETE FROM xx_%s WHERE fldData >= 5" % config.tmp)
- if crsr.rowcount==-1:
- print(self.getEngine()+" Provider does not support rowcount (on DELETE)")
- else:
- self.assertEqual( crsr.rowcount,4)
- self.helpRollbackTblTemp()
-
- def testFetchMany(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- rs=crsr.fetchmany(3)
- assert len(rs)==3
- rs=crsr.fetchmany(5)
- assert len(rs)==5
- rs=crsr.fetchmany(5)
- assert len(rs)==1 #Asked for five, but there is only one left
- self.helpRollbackTblTemp()
- def testFetchManyWithArraySize(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
- rs=crsr.fetchmany()
- assert len(rs)==1 #arraysize Defaults to one
- crsr.arraysize=4
- rs=crsr.fetchmany()
- assert len(rs)==4
- rs=crsr.fetchmany()
- assert len(rs)==4
- rs=crsr.fetchmany()
- assert len(rs)==0
- self.helpRollbackTblTemp()
- def testErrorConnect(self):
- conn = self.getConnection()
- kw = {}
- if 'proxy_host' in conn.kwargs:
- kw['proxy_host'] = conn.kwargs['proxy_host']
- conn.close()
- self.assertRaises(api.DatabaseError, self.db, 'not a valid connect string', kw)
- def testRowIterator(self):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- crsr=conn.cursor()
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldTwo integer,
- fldThree integer,
- fldFour integer)
- """ % config.tmp
- crsr.execute(tabdef)
- inputs = [(2,3,4),(102,103,104)]
- fldId=1
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_%s (fldId,fldTwo,fldThree,fldFour) VALUES (?,?,?,?)" % config.tmp,
- (fldId,inParam[0],inParam[1],inParam[2]))
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldTwo,fldThree,fldFour FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
- rec = crsr.fetchone()
- # check that stepping through an emulated row works
- for j in range(len(inParam)):
- assert rec[j] == inParam[j], 'returned value:"%s" != test value:"%s"'%(rec[j],inParam[j])
- # check that we can get a complete tuple from a row
- assert tuple(rec) == inParam, 'returned value:"%s" != test value:"%s"'%(repr(rec),repr(inParam))
- # test that slices of rows work
- slice1 = tuple(rec[:-1])
- slice2 = tuple(inParam[0:2])
- assert slice1 == slice2, 'returned value:"%s" != test value:"%s"'%(repr(slice1),repr(slice2))
- # now test named column retrieval
- assert rec['fldTwo'] == inParam[0]
- assert rec.fldThree == inParam[1]
- assert rec.fldFour == inParam[2]
- # test array operation
- # note that the fields vv vv vv are out of order
- crsr.execute("select fldThree,fldFour,fldTwo from xx_%s" % config.tmp)
- recs = crsr.fetchall()
- assert recs[1][0] == 103
- assert recs[0][1] == 4
- assert recs[1]['fldFour'] == 104
- assert recs[0,0] == 3
- assert recs[0,'fldTwo'] == 2
- assert recs[1,2] == 102
- for i in range(1):
- for j in range(2):
- assert recs[i][j] == recs[i,j]
- def testFormatParamstyle(self):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- conn.paramstyle = 'format' #test nonstandard use of paramstyle
- crsr=conn.cursor()
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldData varchar(10),
- fldConst varchar(30))
- """ % config.tmp
- crsr.execute(tabdef)
- inputs = ['one','two','three']
- fldId=2
- for inParam in inputs:
- fldId+=1
- sql = "INSERT INTO xx_" + \
- config.tmp + \
- " (fldId,fldConst,fldData) VALUES (%s,'thi%s :may cause? trouble', %s)"
- try:
- crsr.execute(sql, (fldId,inParam))
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE %s=fldID", [fldId])
- rec = crsr.fetchone()
- self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"' % (rec[0],inParam))
- self.assertEqual(rec[1], "thi%s :may cause? trouble")
- # now try an operation with a "%s" as part of a literal
- sel = "insert into xx_" + config.tmp + " (fldId,fldData) VALUES (%s,'four%sfive')"
- params = (20,)
- crsr.execute(sel,params)
- #test the .query implementation
- assert '(?,' in crsr.query, 'expected:"%s" in "%s"'%('(?,',crsr.query)
- #test the .command attribute
- assert crsr.command == sel, 'expected:"%s" but found "%s"' % (sel, crsr.command)
- #test the .parameters attribute
- if not self.remote: # parameter list will be altered in transit
- self.assertEqual(crsr.parameters, params)
- #now make sure the data made it
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=20" % config.tmp)
- rec = crsr.fetchone()
- self.assertEqual(rec[0], 'four%sfive')
- def testNamedParamstyle(self):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- crsr=conn.cursor()
- crsr.paramstyle = 'named' #test nonstandard use of paramstyle
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldData varchar(10))
- """ % config.tmp
- crsr.execute(tabdef)
- inputs = ['four','five','six']
- fldId=10
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
- {"f_Val":inParam,'Id':fldId})
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=:Id" % config.tmp, {'Id':fldId})
- rec = crsr.fetchone()
- self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
- # now a test with a ":" as part of a literal
- crsr.execute("insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp,{'xyz':30})
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
- rec = crsr.fetchone()
- self.assertEqual(rec[0], 'six:five')
- def testPyformatParamstyle(self):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- crsr=conn.cursor()
- crsr.paramstyle = 'pyformat' #test nonstandard use of paramstyle
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldData varchar(10))
- """ % config.tmp
- crsr.execute(tabdef)
- inputs = ['four', 'five', 'six']
- fldId=10
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (%%(Id)s,%%(f_Val)s)" % config.tmp,
- {"f_Val": inParam, 'Id': fldId})
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=%%(Id)s" % config.tmp, {'Id':fldId})
- rec = crsr.fetchone()
- self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
- # now a test with a "%" as part of a literal
- crsr.execute("insert into xx_%s (fldId,fldData) VALUES (%%(xyz)s,'six%%five')" % config.tmp,{'xyz': 30})
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
- rec = crsr.fetchone()
- self.assertEqual(rec[0], 'six%five')
- def testAutomaticParamstyle(self):
- self.helpForceDropOnTblTemp()
- conn=self.getConnection()
- conn.paramstyle = 'dynamic' #test nonstandard use of paramstyle
- crsr=conn.cursor()
- tabdef= """
- CREATE TABLE xx_%s (
- fldId integer NOT NULL,
- fldData varchar(10),
- fldConst varchar(30))
- """ % config.tmp
- crsr.execute(tabdef)
- inputs = ['one', 'two', 'three']
- fldId=2
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_" + config.tmp + \
- " (fldId,fldConst,fldData) VALUES (?,'thi%s :may cause? troub:1e', ?)", (fldId,inParam))
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- trouble = 'thi%s :may cause? troub:1e'
- crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE ?=fldID", [fldId])
- rec = crsr.fetchone()
- self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
- self.assertEqual(rec[1], trouble)
- # inputs = [u'four',u'five',u'six']
- fldId=10
- for inParam in inputs:
- fldId+=1
- try:
- crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
- {"f_Val":inParam,'Id':fldId})
- except:
- if self.remote:
- for message in crsr.messages:
- print(message)
- else:
- conn.printADOerrors()
- raise
- crsr.execute("SELECT fldData FROM xx_%s WHERE :Id=fldID" % config.tmp, {'Id':fldId})
- rec = crsr.fetchone()
- self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
- # now a test with a ":" as part of a literal -- and use a prepared query
- ppdcmd = "insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp
- crsr.prepare(ppdcmd)
- crsr.execute(ppdcmd, {'xyz':30})
- crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
- rec = crsr.fetchone()
- self.assertEqual(rec[0], 'six:five')
- def testRollBack(self):
- conn = self.getConnection()
- crsr = conn.cursor()
- assert not crsr.connection.autocommit, 'Unexpected beginning condition'
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.connection.commit() # commit the first bunch
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
- selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
- crsr.execute(selectSql)
- rs = crsr.fetchall()
- assert len(rs) == 1
- self.conn.rollback()
- crsr.execute(selectSql)
- assert crsr.fetchone() == None, 'cursor.fetchone should return None if a query retrieves no rows'
- crsr.execute('SELECT fldData from xx_%s' % config.tmp)
- rs = crsr.fetchall()
- assert len(rs) == 9, 'the original records should still be present'
- self.helpRollbackTblTemp()
- def testCommit(self):
- try:
- con2 = self.getAnotherConnection()
- except NotImplementedError:
- return # should be "SKIP" for ACCESS
- assert not con2.autocommit, 'default should be manual commit'
- crsr = con2.cursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
- con2.commit()
- selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
- crsr.execute(selectSql)
- rs = crsr.fetchall()
- assert len(rs) == 1
- crsr.close()
- con2.close()
- conn = self.getConnection()
- crsr = self.getCursor()
- with conn.cursor() as crsr:
- crsr.execute(selectSql)
- rs = crsr.fetchall()
- assert len(rs) == 1
- assert rs[0][0] == 100
- self.helpRollbackTblTemp()
- def testAutoRollback(self):
- try:
- con2 = self.getAnotherConnection()
- except NotImplementedError:
- return # should be "SKIP" for ACCESS
- assert not con2.autocommit, 'unexpected beginning condition'
- crsr = con2.cursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
- selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
- crsr.execute(selectSql)
- rs = crsr.fetchall()
- assert len(rs) == 1
- crsr.close()
- con2.close()
- crsr = self.getCursor()
- try:
- crsr.execute(selectSql) # closing the connection should have forced rollback
- row = crsr.fetchone()
- except api.DatabaseError:
- row = None # if the entire table disappeared the rollback was perfect and the test passed
- assert row == None, 'cursor.fetchone should return None if a query retrieves no rows. Got %s' % repr(row)
- self.helpRollbackTblTemp()
- def testAutoCommit(self):
- try:
- ac_conn = self.getAnotherConnection({'autocommit': True})
- except NotImplementedError:
- return # should be "SKIP" for ACCESS
- crsr = ac_conn.cursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
- crsr.close()
- with self.getCursor() as crsr:
- selectSql = 'SELECT fldData from xx_%s' % config.tmp
- crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
- rs = crsr.fetchall()
- assert len(rs) == 10, 'all records should still be present'
- ac_conn.close()
- self.helpRollbackTblTemp()
- def testSwitchedAutoCommit(self):
- try:
- ac_conn = self.getAnotherConnection()
- except NotImplementedError:
- return # should be "SKIP" for ACCESS
- ac_conn.autocommit = True
- crsr = ac_conn.cursor()
- self.helpCreateAndPopulateTableTemp(crsr)
- crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
- crsr.close()
- conn = self.getConnection()
- ac_conn.close()
- with self.getCursor() as crsr:
- selectSql = 'SELECT fldData from xx_%s' % config.tmp
- crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
- rs = crsr.fetchall()
- assert len(rs) == 10, 'all records should still be present'
- self.helpRollbackTblTemp()
- def testExtendedTypeHandling(self):
- class XtendString(str):
- pass
- class XtendInt(int):
- pass
- class XtendFloat(float):
- pass
- xs = XtendString(randomstring(30))
- xi = XtendInt(random.randint(-100, 500))
- xf = XtendFloat(random.random())
- self.helpForceDropOnTblTemp()
- conn = self.getConnection()
- crsr = conn.cursor()
- tabdef = """
- CREATE TABLE xx_%s (
- s VARCHAR(40) NOT NULL,
- i INTEGER NOT NULL,
- f REAL NOT NULL)""" % config.tmp
- crsr.execute(tabdef)
- crsr.execute("INSERT INTO xx_%s (s, i, f) VALUES (?, ?, ?)" % config.tmp, (xs, xi, xf))
- crsr.close()
- conn = self.getConnection()
- with self.getCursor() as crsr:
- selectSql = 'SELECT s, i, f from xx_%s' % config.tmp
- crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
- row = crsr.fetchone()
- self.assertEqual(row.s, xs)
- self.assertEqual(row.i, xi)
- self.assertAlmostEqual(row.f, xf)
- self.helpRollbackTblTemp()
- class TestADOwithSQLServer(CommonDBTests):
- def setUp(self):
- self.conn = config.dbSqlServerconnect(*config.connStrSQLServer[0], **config.connStrSQLServer[1])
- self.conn.timeout = 30 # turn timeout back up
- self.engine = 'MSSQL'
- self.db = config.dbSqlServerconnect
- self.remote = config.connStrSQLServer[2]
- def tearDown(self):
- try:
- self.conn.rollback()
- except:
- pass
- try:
- self.conn.close()
- except:
- pass
- self.conn=None
-
- def getConnection(self):
- return self.conn
- def getAnotherConnection(self, addkeys=None):
- keys = dict(config.connStrSQLServer[1])
- if addkeys:
- keys.update(addkeys)
- return config.dbSqlServerconnect(*config.connStrSQLServer[0], **keys)
- def testVariableReturningStoredProcedure(self):
- crsr=self.conn.cursor()
- spdef= """
- CREATE PROCEDURE sp_DeleteMeOnlyForTesting
- @theInput varchar(50),
- @theOtherInput varchar(50),
- @theOutput varchar(100) OUTPUT
- AS
- SET @theOutput=@theInput+@theOtherInput
- """
- try:
- crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
- self.conn.commit()
- except: #Make sure it is empty
- pass
- crsr.execute(spdef)
- retvalues=crsr.callproc('sp_DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
- assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
- assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
- assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
- self.conn.rollback()
-
- def testMultipleSetReturn(self):
- crsr=self.getCursor()
- self.helpCreateAndPopulateTableTemp(crsr)
-
- spdef= """
- CREATE PROCEDURE sp_DeleteMe_OnlyForTesting
- AS
- SELECT fldData FROM xx_%s ORDER BY fldData ASC
- SELECT fldData From xx_%s where fldData = -9999
- SELECT fldData FROM xx_%s ORDER BY fldData DESC
- """ % (config.tmp, config.tmp, config.tmp)
- try:
- crsr.execute("DROP PROCEDURE sp_DeleteMe_OnlyForTesting")
- self.conn.commit()
- except: #Make sure it is empty
- pass
- crsr.execute(spdef)
- retvalues=crsr.callproc('sp_DeleteMe_OnlyForTesting')
- row=crsr.fetchone()
- self.assertEqual(row[0], 0)
- assert crsr.nextset() == True, 'Operation should succeed'
- assert not crsr.fetchall(), 'Should be an empty second set'
- assert crsr.nextset() == True, 'third set should be present'
- rowdesc=crsr.fetchall()
- self.assertEqual(rowdesc[0][0],8)
- assert crsr.nextset() == None,'No more return sets, should return None'
- self.helpRollbackTblTemp()
- def testDatetimeProcedureParameter(self):
- crsr=self.conn.cursor()
- spdef= """
- CREATE PROCEDURE sp_DeleteMeOnlyForTesting
- @theInput DATETIME,
- @theOtherInput varchar(50),
- @theOutput varchar(100) OUTPUT
- AS
- SET @theOutput = CONVERT(CHARACTER(20), @theInput, 0) + @theOtherInput
- """
- try:
- crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
- self.conn.commit()
- except: #Make sure it is empty
- pass
- crsr.execute(spdef)
- result = crsr.callproc('sp_DeleteMeOnlyForTesting', [adodbapi.Timestamp(2014,12,25,0,1,0), 'Beep', ' ' * 30])
- assert result[2] == 'Dec 25 2014 12:01AM Beep', 'value was="%s"' % result[2]
- self.conn.rollback()
- def testIncorrectStoredProcedureParameter(self):
- crsr=self.conn.cursor()
- spdef= """
- CREATE PROCEDURE sp_DeleteMeOnlyForTesting
- @theInput DATETIME,
- @theOtherInput varchar(50),
- @theOutput varchar(100) OUTPUT
- AS
- SET @theOutput = CONVERT(CHARACTER(20), @theInput) + @theOtherInput
- """
- try:
- crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
- self.conn.commit()
- except: #Make sure it is empty
- pass
- crsr.execute(spdef)
- # calling the sproc with a string for the first parameter where a DateTime is expected
- result = tryconnection.try_operation_with_expected_exception(
- (api.DataError,api.DatabaseError),
- crsr.callproc,
- ['sp_DeleteMeOnlyForTesting'],
- {'parameters': ['this is wrong', 'Anne', 'not Alice']}
- )
- if result[0]: # the expected exception was raised
- assert '@theInput' in str(result[1]) or 'DatabaseError' in str(result), \
- 'Identifies the wrong erroneous parameter'
- else:
- assert result[0], result[1] # incorrect or no exception
- self.conn.rollback()
- class TestADOwithAccessDB(CommonDBTests):
- def setUp(self):
- self.conn = config.dbAccessconnect(*config.connStrAccess[0], **config.connStrAccess[1])
- self.conn.timeout = 30 # turn timeout back up
- self.engine = 'ACCESS'
- self.db = config.dbAccessconnect
- self.remote = config.connStrAccess[2]
- def tearDown(self):
- try:
- self.conn.rollback()
- except:
- pass
- try:
- self.conn.close()
- except:
- pass
- self.conn=None
-
- def getConnection(self):
- return self.conn
- def getAnotherConnection(self, addkeys=None):
- raise NotImplementedError('Jet cannot use a second connection to the database')
- def testOkConnect(self):
- c = self.db(*config.connStrAccess[0], **config.connStrAccess[1])
- assert c != None
- c.close()
-
- class TestADOwithMySql(CommonDBTests):
- def setUp(self):
- self.conn = config.dbMySqlconnect(*config.connStrMySql[0], **config.connStrMySql[1])
- self.conn.timeout = 30 # turn timeout back up
- self.engine = 'MySQL'
- self.db = config.dbMySqlconnect
- self.remote = config.connStrMySql[2]
- def tearDown(self):
- try:
- self.conn.rollback()
- except:
- pass
- try:
- self.conn.close()
- except:
- pass
- self.conn=None
- def getConnection(self):
- return self.conn
- def getAnotherConnection(self, addkeys=None):
- keys = dict(config.connStrMySql[1])
- if addkeys:
- keys.update(addkeys)
- return config.dbMySqlconnect(*config.connStrMySql[0], **keys)
- def testOkConnect(self):
- c = self.db(*config.connStrMySql[0], **config.connStrMySql[1])
- assert c != None
- # def testStoredProcedure(self):
- # crsr=self.conn.cursor()
- # try:
- # crsr.execute("DROP PROCEDURE DeleteMeOnlyForTesting")
- # self.conn.commit()
- # except: #Make sure it is empty
- # pass
- # spdef= """
- # DELIMITER $$
- # CREATE PROCEDURE DeleteMeOnlyForTesting (onein CHAR(10), twoin CHAR(10), OUT theout CHAR(20))
- # DETERMINISTIC
- # BEGIN
- # SET theout = onein //|| twoin;
- # /* (SELECT 'a small string' as result; */
- # END $$
- # """
- #
- # crsr.execute(spdef)
- #
- # retvalues=crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
- # print 'return value (mysql)=',repr(crsr.returnValue) ###
- # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
- # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
- # assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
- #
- # try:
- # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
- # self.conn.commit()
- # except: #Make sure it is empty
- # pass
- class TestADOwithPostgres(CommonDBTests):
- def setUp(self):
- self.conn = config.dbPostgresConnect(*config.connStrPostgres[0], **config.connStrPostgres[1])
- self.conn.timeout = 30 # turn timeout back up
- self.engine = 'PostgreSQL'
- self.db = config.dbPostgresConnect
- self.remote = config.connStrPostgres[2]
- def tearDown(self):
- try:
- self.conn.rollback()
- except:
- pass
- try:
- self.conn.close()
- except:
- pass
- self.conn=None
- def getConnection(self):
- return self.conn
- def getAnotherConnection(self, addkeys=None):
- keys = dict(config.connStrPostgres[1])
- if addkeys:
- keys.update(addkeys)
- return config.dbPostgresConnect(*config.connStrPostgres[0], **keys)
- def testOkConnect(self):
- c = self.db(*config.connStrPostgres[0], **config.connStrPostgres[1])
- assert c != None
- # def testStoredProcedure(self):
- # crsr=self.conn.cursor()
- # spdef= """
- # CREATE OR REPLACE FUNCTION DeleteMeOnlyForTesting (text, text)
- # RETURNS text AS $funk$
- # BEGIN
- # RETURN $1 || $2;
- # END;
- # $funk$
- # LANGUAGE SQL;
- # """
- #
- # crsr.execute(spdef)
- # retvalues = crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
- # ### print 'return value (pg)=',repr(crsr.returnValue) ###
- # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
- # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
- # assert retvalues[2]=='Dodsworth Anne','%s is not "Dodsworth Anne"'%repr(retvalues[2])
- # self.conn.rollback()
- # try:
- # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
- # self.conn.commit()
- # except: #Make sure it is empty
- # pass
- class TimeConverterInterfaceTest(unittest.TestCase):
- def testIDate(self):
- assert self.tc.Date(1990,2,2)
- def testITime(self):
- assert self.tc.Time(13,2,2)
- def testITimestamp(self):
- assert self.tc.Timestamp(1990,2,2,13,2,1)
- def testIDateObjectFromCOMDate(self):
- assert self.tc.DateObjectFromCOMDate(37435.7604282)
- def testICOMDate(self):
- assert hasattr(self.tc,'COMDate')
- def testExactDate(self):
- d=self.tc.Date(1994,11,15)
- comDate=self.tc.COMDate(d)
- correct=34653.0
- assert comDate == correct,comDate
-
- def testExactTimestamp(self):
- d=self.tc.Timestamp(1994,11,15,12,0,0)
- comDate=self.tc.COMDate(d)
- correct=34653.5
- self.assertEqual( comDate ,correct)
-
- d=self.tc.Timestamp(2003,5,6,14,15,17)
- comDate=self.tc.COMDate(d)
- correct=37747.593946759262
- self.assertEqual( comDate ,correct)
- def testIsoFormat(self):
- d=self.tc.Timestamp(1994,11,15,12,3,10)
- iso=self.tc.DateObjectToIsoFormatString(d)
- self.assertEqual(str(iso[:19]) , '1994-11-15 12:03:10')
-
- dt=self.tc.Date(2003,5,2)
- iso=self.tc.DateObjectToIsoFormatString(dt)
- self.assertEqual(str(iso[:10]), '2003-05-02')
-
- if config.doMxDateTimeTest:
- import mx.DateTime
- class TestMXDateTimeConverter(TimeConverterInterfaceTest):
- def setUp(self):
- self.tc = api.mxDateTimeConverter()
-
- def testCOMDate(self):
- t=mx.DateTime.DateTime(2002,6,28,18,15,2)
- cmd=self.tc.COMDate(t)
- assert cmd == t.COMDate()
-
- def testDateObjectFromCOMDate(self):
- cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
- t=mx.DateTime.DateTime(2002,6,28,18,15,0)
- t2=mx.DateTime.DateTime(2002,6,28,18,15,2)
- assert t2>cmd>t
-
- def testDate(self):
- assert mx.DateTime.Date(1980,11,4)==self.tc.Date(1980,11,4)
- def testTime(self):
- assert mx.DateTime.Time(13,11,4)==self.tc.Time(13,11,4)
- def testTimestamp(self):
- t=mx.DateTime.DateTime(2002,6,28,18,15,1)
- obj=self.tc.Timestamp(2002,6,28,18,15,1)
- assert t == obj
- import time
- class TestPythonTimeConverter(TimeConverterInterfaceTest):
- def setUp(self):
- self.tc=api.pythonTimeConverter()
-
- def testCOMDate(self):
- mk = time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+28,-1))
- t=time.localtime(mk)
- # Fri, 28 Jun 2002 18:15:01 +0000
- cmd=self.tc.COMDate(t)
- assert abs(cmd - 37435.7604282) < 1.0/24,"%f more than an hour wrong" % cmd
- def testDateObjectFromCOMDate(self):
- cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
- t1=time.gmtime(time.mktime((2002,6,28,0,14,1, 4,31+28+31+30+31+28,-1)))
- #there are errors in the implementation of gmtime which we ignore
- t2=time.gmtime(time.mktime((2002,6,29,12,14,2, 4,31+28+31+30+31+28,-1)))
- assert t1<cmd<t2, '"%s" should be about 2002-6-28 12:15:01'%repr(cmd)
-
- def testDate(self):
- t1=time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+30,0))
- t2=time.mktime((2002,6,30,18,15,1, 4,31+28+31+30+31+28,0))
- obj=self.tc.Date(2002,6,29)
- assert t1< time.mktime(obj)<t2,obj
- def testTime(self):
- self.assertEqual( self.tc.Time(18,15,2),time.gmtime(18*60*60+15*60+2))
- def testTimestamp(self):
- t1=time.localtime(time.mktime((2002,6,28,18,14,1, 4,31+28+31+30+31+28,-1)))
- t2=time.localtime(time.mktime((2002,6,28,18,16,1, 4,31+28+31+30+31+28,-1)))
- obj=self.tc.Timestamp(2002,6,28,18,15,2)
- assert t1< obj <t2,obj
- class TestPythonDateTimeConverter(TimeConverterInterfaceTest):
- def setUp(self):
- self.tc = api.pythonDateTimeConverter()
-
- def testCOMDate(self):
- t=datetime.datetime( 2002,6,28,18,15,1)
- # Fri, 28 Jun 2002 18:15:01 +0000
- cmd=self.tc.COMDate(t)
- assert abs(cmd - 37435.7604282) < 1.0/24,"more than an hour wrong"
-
- def testDateObjectFromCOMDate(self):
- cmd = self.tc.DateObjectFromCOMDate(37435.7604282)
- t1 = datetime.datetime(2002,6,28,18,14,1)
- t2 = datetime.datetime(2002,6,28,18,16,1)
- assert t1 < cmd < t2, cmd
- tx = datetime.datetime(2002,6,28,18,14,1,900000) # testing that microseconds don't become milliseconds
- c1 = self.tc.DateObjectFromCOMDate(self.tc.COMDate(tx))
- assert t1 < c1 < t2, c1
- def testDate(self):
- t1=datetime.date(2002,6,28)
- t2=datetime.date(2002,6,30)
- obj=self.tc.Date(2002,6,29)
- assert t1< obj <t2,obj
- def testTime(self):
- self.assertEqual( self.tc.Time(18,15,2).isoformat()[:8],'18:15:02')
- def testTimestamp(self):
- t1=datetime.datetime(2002,6,28,18,14,1)
- t2=datetime.datetime(2002,6,28,18,16,1)
- obj=self.tc.Timestamp(2002,6,28,18,15,2)
- assert t1< obj <t2,obj
- suites=[]
- suites.append( unittest.makeSuite(TestPythonDateTimeConverter,'test'))
- if config.doMxDateTimeTest:
- suites.append( unittest.makeSuite(TestMXDateTimeConverter,'test'))
- if config.doTimeTest:
- suites.append( unittest.makeSuite(TestPythonTimeConverter,'test'))
- if config.doAccessTest:
- suites.append( unittest.makeSuite(TestADOwithAccessDB,'test'))
- if config.doSqlServerTest:
- suites.append( unittest.makeSuite(TestADOwithSQLServer,'test'))
- if config.doMySqlTest:
- suites.append( unittest.makeSuite(TestADOwithMySql,'test'))
- if config.doPostgresTest:
- suites.append( unittest.makeSuite(TestADOwithPostgres,'test'))
- class cleanup_manager(object):
- def __enter__(self):
- pass
- def __exit__(self, exc_type, exc_val, exc_tb):
- config.cleanup(config.testfolder, config.mdb_name)
- suite=unittest.TestSuite(suites)
- if __name__ == '__main__':
- mysuite = copy.deepcopy(suite)
- with cleanup_manager():
- defaultDateConverter = adodbapi.dateconverter
- print(__doc__)
- print("Default Date Converter is %s" %(defaultDateConverter,))
- dateconverter = defaultDateConverter
- tag = 'datetime'
- unittest.TextTestRunner().run(mysuite)
- if config.iterateOverTimeTests:
- for test, dateconverter, tag in (
- (config.doTimeTest,api.pythonTimeConverter, 'pythontime'),
- (config.doMxDateTimeTest, api.mxDateTimeConverter, 'mx')):
- if test:
- mysuite = copy.deepcopy(suite) # work around a side effect of unittest.TextTestRunner
- adodbapi.adodbapi.dateconverter = dateconverter()
- print("Changed dateconverter to ")
- print(adodbapi.adodbapi.dateconverter)
- unittest.TextTestRunner().run(mysuite)
|