adodbapitest.py 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375
  1. """ Unit tests version 2.6.1.0 for adodbapi"""
  2. """
  3. adodbapi - A python DB API 2.0 interface to Microsoft ADO
  4. Copyright (C) 2002 Henrik Ekelund
  5. This library is free software; you can redistribute it and/or
  6. modify it under the terms of the GNU Lesser General Public
  7. License as published by the Free Software Foundation; either
  8. version 2.1 of the License, or (at your option) any later version.
  9. This library is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. Lesser General Public License for more details.
  13. You should have received a copy of the GNU Lesser General Public
  14. License along with this library; if not, write to the Free Software
  15. Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  16. Updates by Vernon Cole
  17. """
  18. import unittest
  19. import sys
  20. import datetime
  21. import decimal
  22. import copy
  23. import random
  24. import string
  25. try:
  26. import win32com.client
  27. win32 = True
  28. except ImportError:
  29. win32 = False
  30. # run the configuration module.
  31. import adodbapitestconfig as config # will set sys.path to find correct version of adodbapi
  32. # in our code below, all our switches are from config.whatever
  33. import tryconnection
  34. import adodbapi
  35. import adodbapi.apibase as api
  36. try:
  37. import adodbapi.ado_consts as ado_consts
  38. except ImportError: #we are doing a shortcut import as a module -- so
  39. try:
  40. import ado_consts
  41. except ImportError:
  42. from adodbapi import ado_consts
  43. def str2bytes(sval):
  44. return sval.encode("latin1")
  45. long = int
  46. def randomstring(length):
  47. return ''.join([random.choice(string.ascii_letters) for n in range(32)])
  48. class CommonDBTests(unittest.TestCase):
  49. "Self contained super-simple tests in easy syntax, should work on everything between mySQL and Oracle"
  50. def setUp(self):
  51. self.engine = 'unknown'
  52. def getEngine(self):
  53. return self.engine
  54. def getConnection(self):
  55. raise NotImplementedError #"This method must be overriden by a subclass"
  56. def getCursor(self):
  57. return self.getConnection().cursor()
  58. def testConnection(self):
  59. crsr=self.getCursor()
  60. assert crsr.__class__.__name__ == 'Cursor'
  61. def testErrorHandlerInherits(self):
  62. if not self.remote:
  63. conn=self.getConnection()
  64. mycallable=lambda connection,cursor,errorclass,errorvalue: 1
  65. conn.errorhandler=mycallable
  66. crsr=conn.cursor()
  67. assert crsr.errorhandler==mycallable,"Error handler on crsr should be same as on connection"
  68. def testDefaultErrorHandlerConnection(self):
  69. if not self.remote:
  70. conn=self.getConnection()
  71. del conn.messages[:]
  72. try:
  73. conn.close()
  74. conn.commit() #Should not be able to use connection after it is closed
  75. except:
  76. assert len(conn.messages)==1
  77. assert len(conn.messages[0])==2
  78. assert conn.messages[0][0]==api.ProgrammingError
  79. def testOwnErrorHandlerConnection(self):
  80. if self.remote: # ToDo: use "skip"
  81. return
  82. mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
  83. conn=self.getConnection()
  84. conn.errorhandler=mycallable
  85. conn.close()
  86. conn.commit() #Should not be able to use connection after it is closed
  87. assert len(conn.messages)==0
  88. conn.errorhandler=None #This should bring back the standard error handler
  89. try:
  90. conn.close()
  91. conn.commit() #Should not be able to use connection after it is closed
  92. except:
  93. pass
  94. #The Standard errorhandler appends error to messages attribute
  95. assert len(conn.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
  96. def testDefaultErrorHandlerCursor(self):
  97. crsr=self.getConnection().cursor()
  98. if not self.remote:
  99. del crsr.messages[:]
  100. try:
  101. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  102. except:
  103. assert len(crsr.messages)==1
  104. assert len(crsr.messages[0])==2
  105. assert crsr.messages[0][0]==api.DatabaseError
  106. def testOwnErrorHandlerCursor(self):
  107. if self.remote: # ToDo: should be a "skip"
  108. return
  109. mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
  110. crsr=self.getConnection().cursor()
  111. crsr.errorhandler=mycallable
  112. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  113. assert len(crsr.messages)==0
  114. crsr.errorhandler=None #This should bring back the standard error handler
  115. try:
  116. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  117. except:
  118. pass
  119. #The Standard errorhandler appends error to messages attribute
  120. assert len(crsr.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
  121. def testUserDefinedConversions(self):
  122. if self.remote: ## Todo: should be a "skip"
  123. return
  124. try:
  125. duplicatingConverter=lambda aStringField: aStringField*2
  126. assert duplicatingConverter('gabba') == 'gabbagabba'
  127. self.helpForceDropOnTblTemp()
  128. conn=self.getConnection()
  129. # the variantConversions attribute should not exist on a normal connection object
  130. self.assertRaises(AttributeError, lambda x:conn.variantConversions[x],[2])
  131. if not self.remote:
  132. # create a variantConversions attribute on the connection
  133. conn.variantConversions = copy.copy(api.variantConversions)
  134. crsr=conn.cursor()
  135. tabdef = "CREATE TABLE xx_%s (fldData VARCHAR(100) NOT NULL, fld2 VARCHAR(20))" % config.tmp
  136. crsr.execute(tabdef)
  137. crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('gabba','booga')" % config.tmp)
  138. crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('hey','yo')" % config.tmp)
  139. # change converter for ALL adoStringTypes columns
  140. conn.variantConversions[api.adoStringTypes]=duplicatingConverter
  141. crsr.execute("SELECT fldData,fld2 FROM xx_%s ORDER BY fldData" % config.tmp)
  142. rows=crsr.fetchall()
  143. row = rows[0]
  144. self.assertEqual(row[0],'gabbagabba')
  145. row = rows[1]
  146. self.assertEqual(row[0],'heyhey')
  147. self.assertEqual(row[1],'yoyo')
  148. upcaseConverter=lambda aStringField: aStringField.upper()
  149. assert upcaseConverter('upThis') == 'UPTHIS'
  150. # now use a single column converter
  151. rows.converters[1] = upcaseConverter # convert second column
  152. self.assertEqual(row[0],'heyhey') # first will be unchanged
  153. self.assertEqual(row[1],'YO') # second will convert to upper case
  154. finally:
  155. try:
  156. del conn.variantConversions #Restore the default
  157. except: pass
  158. self.helpRollbackTblTemp()
  159. def testUserDefinedConversionForExactNumericTypes(self):
  160. # variantConversions is a dictionary of conversion functions
  161. # held internally in adodbapi.apibase
  162. #
  163. # !!! this test intentionally alters the value of what should be constant in the module
  164. # !!! no new code should use this example, to is only a test to see that the
  165. # !!! deprecated way of doing this still works. (use connection.variantConversions)
  166. #
  167. if not self.remote and sys.version_info < (3,0): ### Py3 need different test
  168. oldconverter = adodbapi.variantConversions[ado_consts.adNumeric] #keep old function to restore later
  169. # By default decimal and "numbers" are returned as decimals.
  170. # Instead, make numbers return as floats
  171. try:
  172. adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtFloat
  173. self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
  174. self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
  175. # now return strings
  176. adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtString
  177. self.helpTestDataType("numeric(18,2)",'NUMBER','3.45')
  178. # now a completly weird user defined convertion
  179. adodbapi.variantConversions[ado_consts.adNumeric] = lambda x: '!!This function returns a funny unicode string %s!!'%x
  180. self.helpTestDataType("numeric(18,2)",'NUMBER','3.45',
  181. allowedReturnValues=['!!This function returns a funny unicode string 3.45!!'])
  182. finally:
  183. # now reset the converter to its original function
  184. adodbapi.variantConversions[ado_consts.adNumeric]=oldconverter #Restore the original convertion function
  185. def helpTestDataType(self,sqlDataTypeString,
  186. DBAPIDataTypeString,
  187. pyData,
  188. pyDataInputAlternatives=None,
  189. compareAlmostEqual=None,
  190. allowedReturnValues=None):
  191. self.helpForceDropOnTblTemp()
  192. conn=self.getConnection()
  193. crsr=conn.cursor()
  194. tabdef= """
  195. CREATE TABLE xx_%s (
  196. fldId integer NOT NULL,
  197. fldData """ % config.tmp + sqlDataTypeString + ")\n"
  198. crsr.execute(tabdef)
  199. #Test Null values mapped to None
  200. crsr.execute("INSERT INTO xx_%s (fldId) VALUES (1)" % config.tmp)
  201. crsr.execute("SELECT fldId,fldData FROM xx_%s" % config.tmp)
  202. rs=crsr.fetchone()
  203. self.assertEqual(rs[1],None) #Null should be mapped to None
  204. assert rs[0]==1
  205. #Test description related
  206. descTuple=crsr.description[1]
  207. assert descTuple[0] in ['fldData','flddata'], 'was "%s" expected "%s"'%(descTuple[0],'fldData')
  208. if DBAPIDataTypeString=='STRING':
  209. assert descTuple[1] == api.STRING, 'was "%s" expected "%s"'%(descTuple[1],api.STRING.values)
  210. elif DBAPIDataTypeString == 'NUMBER':
  211. assert descTuple[1] == api.NUMBER, 'was "%s" expected "%s"'%(descTuple[1],api.NUMBER.values)
  212. elif DBAPIDataTypeString == 'BINARY':
  213. assert descTuple[1] == api.BINARY, 'was "%s" expected "%s"'%(descTuple[1],api.BINARY.values)
  214. elif DBAPIDataTypeString == 'DATETIME':
  215. assert descTuple[1] == api.DATETIME, 'was "%s" expected "%s"'%(descTuple[1],api.DATETIME.values)
  216. elif DBAPIDataTypeString == 'ROWID':
  217. assert descTuple[1] == api.ROWID, 'was "%s" expected "%s"'%(descTuple[1],api.ROWID.values)
  218. elif DBAPIDataTypeString == 'UUID':
  219. assert descTuple[1] == api.OTHER, 'was "%s" expected "%s"'%(descTuple[1],api.OTHER.values)
  220. else:
  221. raise NotImplementedError #"DBAPIDataTypeString not provided"
  222. #Test data binding
  223. inputs=[pyData]
  224. if pyDataInputAlternatives:
  225. inputs.extend(pyDataInputAlternatives)
  226. inputs = set(inputs) # removes redundant string==unicode tests
  227. fldId=1
  228. for inParam in inputs:
  229. fldId+=1
  230. try:
  231. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (?,?)" % config.tmp, (fldId, inParam))
  232. except:
  233. if self.remote:
  234. for message in crsr.messages:
  235. print(message)
  236. else:
  237. conn.printADOerrors()
  238. raise
  239. crsr.execute("SELECT fldData FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
  240. rs=crsr.fetchone()
  241. if allowedReturnValues:
  242. allowedTypes = tuple([type(aRV) for aRV in allowedReturnValues])
  243. assert isinstance(rs[0],allowedTypes), \
  244. 'result type "%s" must be one of %s'%(type(rs[0]),allowedTypes)
  245. else:
  246. assert isinstance(rs[0] ,type(pyData)), \
  247. 'result type "%s" must be instance of %s'%(type(rs[0]),type(pyData))
  248. if compareAlmostEqual and DBAPIDataTypeString == 'DATETIME':
  249. iso1=adodbapi.dateconverter.DateObjectToIsoFormatString(rs[0])
  250. iso2=adodbapi.dateconverter.DateObjectToIsoFormatString(pyData)
  251. self.assertEqual(iso1, iso2)
  252. elif compareAlmostEqual:
  253. s = float(pyData)
  254. v = float(rs[0])
  255. assert abs(v-s)/s < 0.00001, \
  256. "Values not almost equal recvd=%s, expected=%f" %(rs[0],s)
  257. else:
  258. if allowedReturnValues:
  259. ok=False
  260. self.assertTrue(rs[0] in allowedReturnValues,
  261. 'Value "%s" not in %s' % (repr(rs[0]), allowedReturnValues))
  262. else:
  263. self.assertEqual(rs[0], pyData,
  264. 'Values are not equal recvd="%s", expected="%s"' %(rs[0],pyData))
  265. def testDataTypeFloat(self):
  266. self.helpTestDataType("real",'NUMBER',3.45,compareAlmostEqual=True)
  267. self.helpTestDataType("float",'NUMBER',1.79e37,compareAlmostEqual=True)
  268. def testDataTypeDecmal(self):
  269. self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,
  270. allowedReturnValues=['3.45','3,45',decimal.Decimal('3.45')])
  271. self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,
  272. allowedReturnValues=['3.45','3,45',decimal.Decimal('3.45')])
  273. self.helpTestDataType("decimal(20,2)",'NUMBER',444444444444444444,
  274. allowedReturnValues=['444444444444444444.00', '444444444444444444,00',
  275. decimal.Decimal('444444444444444444')])
  276. if self.getEngine() == 'MSSQL':
  277. self.helpTestDataType("uniqueidentifier",'UUID','{71A4F49E-39F3-42B1-A41E-48FF154996E6}',
  278. allowedReturnValues=['{71A4F49E-39F3-42B1-A41E-48FF154996E6}'])
  279. def testDataTypeMoney(self): #v2.1 Cole -- use decimal for money
  280. if self.getEngine() == 'MySQL':
  281. self.helpTestDataType("DECIMAL(20,4)",'NUMBER',decimal.Decimal('-922337203685477.5808'))
  282. elif self.getEngine() == 'PostgreSQL':
  283. self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'),
  284. compareAlmostEqual=True,
  285. allowedReturnValues=[-922337203685477.5808,
  286. decimal.Decimal('-922337203685477.5808')])
  287. else:
  288. self.helpTestDataType("smallmoney",'NUMBER',decimal.Decimal('214748.02'))
  289. self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'))
  290. def testDataTypeInt(self):
  291. if self.getEngine() != 'PostgreSQL':
  292. self.helpTestDataType("tinyint",'NUMBER',115)
  293. self.helpTestDataType("smallint",'NUMBER',-32768)
  294. if self.getEngine() not in ['ACCESS','PostgreSQL']:
  295. self.helpTestDataType("bit",'NUMBER',1) #Does not work correctly with access
  296. if self.getEngine() in ['MSSQL','PostgreSQL']:
  297. self.helpTestDataType("bigint",'NUMBER',3000000000,
  298. allowedReturnValues=[3000000000, int(3000000000)])
  299. self.helpTestDataType("int",'NUMBER',2147483647)
  300. def testDataTypeChar(self):
  301. for sqlDataType in ("char(6)","nchar(6)"):
  302. self.helpTestDataType(sqlDataType,'STRING','spam ',allowedReturnValues=['spam','spam','spam ','spam '])
  303. def testDataTypeVarChar(self):
  304. if self.getEngine() == 'MySQL':
  305. stringKinds = ["varchar(10)","text"]
  306. elif self.getEngine() == 'PostgreSQL':
  307. stringKinds = ["varchar(10)","text","character varying"]
  308. else:
  309. stringKinds = ["varchar(10)","nvarchar(10)","text","ntext"] #,"varchar(max)"]
  310. for sqlDataType in stringKinds:
  311. self.helpTestDataType(sqlDataType,'STRING','spam',['spam'])
  312. def testDataTypeDate(self):
  313. if self.getEngine() == 'PostgreSQL':
  314. dt = "timestamp"
  315. else:
  316. dt = "datetime"
  317. self.helpTestDataType(dt,'DATETIME',adodbapi.Date(2002,10,28),
  318. compareAlmostEqual=True)
  319. if self.getEngine() not in ['MySQL','PostgreSQL']:
  320. self.helpTestDataType("smalldatetime",'DATETIME',adodbapi.Date(2002,10,28),
  321. compareAlmostEqual=True)
  322. if tag != 'pythontime' and self.getEngine() not in ['MySQL','PostgreSQL']: # fails when using pythonTime
  323. self.helpTestDataType(dt,'DATETIME', adodbapi.Timestamp(2002,10,28,12,15,1),
  324. compareAlmostEqual=True)
  325. def testDataTypeBinary(self):
  326. binfld = str2bytes('\x07\x00\xE2\x40*')
  327. arv = [binfld, adodbapi.Binary(binfld), bytes(binfld)]
  328. if self.getEngine() == 'PostgreSQL':
  329. self.helpTestDataType("bytea",'BINARY',adodbapi.Binary(binfld),
  330. allowedReturnValues=arv)
  331. else:
  332. self.helpTestDataType("binary(5)",'BINARY',adodbapi.Binary(binfld),
  333. allowedReturnValues=arv)
  334. self.helpTestDataType("varbinary(100)",'BINARY',adodbapi.Binary(binfld),
  335. allowedReturnValues=arv)
  336. if self.getEngine() != 'MySQL':
  337. self.helpTestDataType("image",'BINARY',adodbapi.Binary(binfld),
  338. allowedReturnValues=arv)
  339. def helpRollbackTblTemp(self):
  340. self.helpForceDropOnTblTemp()
  341. def helpForceDropOnTblTemp(self):
  342. conn=self.getConnection()
  343. with conn.cursor() as crsr:
  344. try:
  345. crsr.execute("DROP TABLE xx_%s" % config.tmp)
  346. if not conn.autocommit:
  347. conn.commit()
  348. except:
  349. pass
  350. def helpCreateAndPopulateTableTemp(self,crsr):
  351. tabdef= """
  352. CREATE TABLE xx_%s (
  353. fldData INTEGER
  354. )
  355. """ % config.tmp
  356. try: #EAFP
  357. crsr.execute(tabdef)
  358. except api.DatabaseError: # was not dropped before
  359. self.helpForceDropOnTblTemp() # so drop it now
  360. crsr.execute(tabdef)
  361. for i in range(9): # note: this poor SQL code, but a valid test
  362. crsr.execute("INSERT INTO xx_%s (fldData) VALUES (%i)" % (config.tmp, i))
  363. # NOTE: building the test table without using parameter substitution
  364. def testFetchAll(self):
  365. crsr=self.getCursor()
  366. self.helpCreateAndPopulateTableTemp(crsr)
  367. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  368. rs=crsr.fetchall()
  369. assert len(rs)==9
  370. #test slice of rows
  371. i = 3
  372. for row in rs[3:-2]: #should have rowid 3..6
  373. assert row[0]==i
  374. i+=1
  375. self.helpRollbackTblTemp()
  376. def testPreparedStatement(self):
  377. crsr=self.getCursor()
  378. self.helpCreateAndPopulateTableTemp(crsr)
  379. crsr.prepare("SELECT fldData FROM xx_%s" % config.tmp)
  380. crsr.execute(crsr.command) # remembes the one that was prepared
  381. rs=crsr.fetchall()
  382. assert len(rs)==9
  383. assert rs[2][0]==2
  384. self.helpRollbackTblTemp()
  385. def testWrongPreparedStatement(self):
  386. crsr=self.getCursor()
  387. self.helpCreateAndPopulateTableTemp(crsr)
  388. crsr.prepare("SELECT * FROM nowhere")
  389. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) # should execute this one, not the prepared one
  390. rs=crsr.fetchall()
  391. assert len(rs)==9
  392. assert rs[2][0]==2
  393. self.helpRollbackTblTemp()
  394. def testIterator(self):
  395. crsr=self.getCursor()
  396. self.helpCreateAndPopulateTableTemp(crsr)
  397. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  398. for i,row in enumerate(crsr): # using cursor as an iterator, rather than fetchxxx
  399. assert row[0]==i
  400. self.helpRollbackTblTemp()
  401. def testExecuteMany(self):
  402. crsr=self.getCursor()
  403. self.helpCreateAndPopulateTableTemp(crsr)
  404. seq_of_values = [ (111,) , (222,) ]
  405. crsr.executemany("INSERT INTO xx_%s (fldData) VALUES (?)" % config.tmp, seq_of_values)
  406. if crsr.rowcount==-1:
  407. print(self.getEngine()+" Provider does not support rowcount (on .executemany())")
  408. else:
  409. self.assertEqual( crsr.rowcount,2)
  410. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  411. rs=crsr.fetchall()
  412. assert len(rs)==11
  413. self.helpRollbackTblTemp()
  414. def testRowCount(self):
  415. crsr=self.getCursor()
  416. self.helpCreateAndPopulateTableTemp(crsr)
  417. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  418. if crsr.rowcount == -1:
  419. #print("provider does not support rowcount on select")
  420. pass
  421. else:
  422. self.assertEqual( crsr.rowcount,9)
  423. self.helpRollbackTblTemp()
  424. def testRowCountNoRecordset(self):
  425. crsr=self.getCursor()
  426. self.helpCreateAndPopulateTableTemp(crsr)
  427. crsr.execute("DELETE FROM xx_%s WHERE fldData >= 5" % config.tmp)
  428. if crsr.rowcount==-1:
  429. print(self.getEngine()+" Provider does not support rowcount (on DELETE)")
  430. else:
  431. self.assertEqual( crsr.rowcount,4)
  432. self.helpRollbackTblTemp()
  433. def testFetchMany(self):
  434. crsr=self.getCursor()
  435. self.helpCreateAndPopulateTableTemp(crsr)
  436. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  437. rs=crsr.fetchmany(3)
  438. assert len(rs)==3
  439. rs=crsr.fetchmany(5)
  440. assert len(rs)==5
  441. rs=crsr.fetchmany(5)
  442. assert len(rs)==1 #Asked for five, but there is only one left
  443. self.helpRollbackTblTemp()
  444. def testFetchManyWithArraySize(self):
  445. crsr=self.getCursor()
  446. self.helpCreateAndPopulateTableTemp(crsr)
  447. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  448. rs=crsr.fetchmany()
  449. assert len(rs)==1 #arraysize Defaults to one
  450. crsr.arraysize=4
  451. rs=crsr.fetchmany()
  452. assert len(rs)==4
  453. rs=crsr.fetchmany()
  454. assert len(rs)==4
  455. rs=crsr.fetchmany()
  456. assert len(rs)==0
  457. self.helpRollbackTblTemp()
  458. def testErrorConnect(self):
  459. conn = self.getConnection()
  460. kw = {}
  461. if 'proxy_host' in conn.kwargs:
  462. kw['proxy_host'] = conn.kwargs['proxy_host']
  463. conn.close()
  464. self.assertRaises(api.DatabaseError, self.db, 'not a valid connect string', kw)
  465. def testRowIterator(self):
  466. self.helpForceDropOnTblTemp()
  467. conn=self.getConnection()
  468. crsr=conn.cursor()
  469. tabdef= """
  470. CREATE TABLE xx_%s (
  471. fldId integer NOT NULL,
  472. fldTwo integer,
  473. fldThree integer,
  474. fldFour integer)
  475. """ % config.tmp
  476. crsr.execute(tabdef)
  477. inputs = [(2,3,4),(102,103,104)]
  478. fldId=1
  479. for inParam in inputs:
  480. fldId+=1
  481. try:
  482. crsr.execute("INSERT INTO xx_%s (fldId,fldTwo,fldThree,fldFour) VALUES (?,?,?,?)" % config.tmp,
  483. (fldId,inParam[0],inParam[1],inParam[2]))
  484. except:
  485. if self.remote:
  486. for message in crsr.messages:
  487. print(message)
  488. else:
  489. conn.printADOerrors()
  490. raise
  491. crsr.execute("SELECT fldTwo,fldThree,fldFour FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
  492. rec = crsr.fetchone()
  493. # check that stepping through an emulated row works
  494. for j in range(len(inParam)):
  495. assert rec[j] == inParam[j], 'returned value:"%s" != test value:"%s"'%(rec[j],inParam[j])
  496. # check that we can get a complete tuple from a row
  497. assert tuple(rec) == inParam, 'returned value:"%s" != test value:"%s"'%(repr(rec),repr(inParam))
  498. # test that slices of rows work
  499. slice1 = tuple(rec[:-1])
  500. slice2 = tuple(inParam[0:2])
  501. assert slice1 == slice2, 'returned value:"%s" != test value:"%s"'%(repr(slice1),repr(slice2))
  502. # now test named column retrieval
  503. assert rec['fldTwo'] == inParam[0]
  504. assert rec.fldThree == inParam[1]
  505. assert rec.fldFour == inParam[2]
  506. # test array operation
  507. # note that the fields vv vv vv are out of order
  508. crsr.execute("select fldThree,fldFour,fldTwo from xx_%s" % config.tmp)
  509. recs = crsr.fetchall()
  510. assert recs[1][0] == 103
  511. assert recs[0][1] == 4
  512. assert recs[1]['fldFour'] == 104
  513. assert recs[0,0] == 3
  514. assert recs[0,'fldTwo'] == 2
  515. assert recs[1,2] == 102
  516. for i in range(1):
  517. for j in range(2):
  518. assert recs[i][j] == recs[i,j]
  519. def testFormatParamstyle(self):
  520. self.helpForceDropOnTblTemp()
  521. conn=self.getConnection()
  522. conn.paramstyle = 'format' #test nonstandard use of paramstyle
  523. crsr=conn.cursor()
  524. tabdef= """
  525. CREATE TABLE xx_%s (
  526. fldId integer NOT NULL,
  527. fldData varchar(10),
  528. fldConst varchar(30))
  529. """ % config.tmp
  530. crsr.execute(tabdef)
  531. inputs = ['one','two','three']
  532. fldId=2
  533. for inParam in inputs:
  534. fldId+=1
  535. sql = "INSERT INTO xx_" + \
  536. config.tmp + \
  537. " (fldId,fldConst,fldData) VALUES (%s,'thi%s :may cause? trouble', %s)"
  538. try:
  539. crsr.execute(sql, (fldId,inParam))
  540. except:
  541. if self.remote:
  542. for message in crsr.messages:
  543. print(message)
  544. else:
  545. conn.printADOerrors()
  546. raise
  547. crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE %s=fldID", [fldId])
  548. rec = crsr.fetchone()
  549. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"' % (rec[0],inParam))
  550. self.assertEqual(rec[1], "thi%s :may cause? trouble")
  551. # now try an operation with a "%s" as part of a literal
  552. sel = "insert into xx_" + config.tmp + " (fldId,fldData) VALUES (%s,'four%sfive')"
  553. params = (20,)
  554. crsr.execute(sel,params)
  555. #test the .query implementation
  556. assert '(?,' in crsr.query, 'expected:"%s" in "%s"'%('(?,',crsr.query)
  557. #test the .command attribute
  558. assert crsr.command == sel, 'expected:"%s" but found "%s"' % (sel, crsr.command)
  559. #test the .parameters attribute
  560. if not self.remote: # parameter list will be altered in transit
  561. self.assertEqual(crsr.parameters, params)
  562. #now make sure the data made it
  563. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=20" % config.tmp)
  564. rec = crsr.fetchone()
  565. self.assertEqual(rec[0], 'four%sfive')
  566. def testNamedParamstyle(self):
  567. self.helpForceDropOnTblTemp()
  568. conn=self.getConnection()
  569. crsr=conn.cursor()
  570. crsr.paramstyle = 'named' #test nonstandard use of paramstyle
  571. tabdef= """
  572. CREATE TABLE xx_%s (
  573. fldId integer NOT NULL,
  574. fldData varchar(10))
  575. """ % config.tmp
  576. crsr.execute(tabdef)
  577. inputs = ['four','five','six']
  578. fldId=10
  579. for inParam in inputs:
  580. fldId+=1
  581. try:
  582. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
  583. {"f_Val":inParam,'Id':fldId})
  584. except:
  585. if self.remote:
  586. for message in crsr.messages:
  587. print(message)
  588. else:
  589. conn.printADOerrors()
  590. raise
  591. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=:Id" % config.tmp, {'Id':fldId})
  592. rec = crsr.fetchone()
  593. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  594. # now a test with a ":" as part of a literal
  595. crsr.execute("insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp,{'xyz':30})
  596. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  597. rec = crsr.fetchone()
  598. self.assertEqual(rec[0], 'six:five')
  599. def testPyformatParamstyle(self):
  600. self.helpForceDropOnTblTemp()
  601. conn=self.getConnection()
  602. crsr=conn.cursor()
  603. crsr.paramstyle = 'pyformat' #test nonstandard use of paramstyle
  604. tabdef= """
  605. CREATE TABLE xx_%s (
  606. fldId integer NOT NULL,
  607. fldData varchar(10))
  608. """ % config.tmp
  609. crsr.execute(tabdef)
  610. inputs = ['four', 'five', 'six']
  611. fldId=10
  612. for inParam in inputs:
  613. fldId+=1
  614. try:
  615. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (%%(Id)s,%%(f_Val)s)" % config.tmp,
  616. {"f_Val": inParam, 'Id': fldId})
  617. except:
  618. if self.remote:
  619. for message in crsr.messages:
  620. print(message)
  621. else:
  622. conn.printADOerrors()
  623. raise
  624. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=%%(Id)s" % config.tmp, {'Id':fldId})
  625. rec = crsr.fetchone()
  626. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  627. # now a test with a "%" as part of a literal
  628. crsr.execute("insert into xx_%s (fldId,fldData) VALUES (%%(xyz)s,'six%%five')" % config.tmp,{'xyz': 30})
  629. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  630. rec = crsr.fetchone()
  631. self.assertEqual(rec[0], 'six%five')
  632. def testAutomaticParamstyle(self):
  633. self.helpForceDropOnTblTemp()
  634. conn=self.getConnection()
  635. conn.paramstyle = 'dynamic' #test nonstandard use of paramstyle
  636. crsr=conn.cursor()
  637. tabdef= """
  638. CREATE TABLE xx_%s (
  639. fldId integer NOT NULL,
  640. fldData varchar(10),
  641. fldConst varchar(30))
  642. """ % config.tmp
  643. crsr.execute(tabdef)
  644. inputs = ['one', 'two', 'three']
  645. fldId=2
  646. for inParam in inputs:
  647. fldId+=1
  648. try:
  649. crsr.execute("INSERT INTO xx_" + config.tmp + \
  650. " (fldId,fldConst,fldData) VALUES (?,'thi%s :may cause? troub:1e', ?)", (fldId,inParam))
  651. except:
  652. if self.remote:
  653. for message in crsr.messages:
  654. print(message)
  655. else:
  656. conn.printADOerrors()
  657. raise
  658. trouble = 'thi%s :may cause? troub:1e'
  659. crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE ?=fldID", [fldId])
  660. rec = crsr.fetchone()
  661. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  662. self.assertEqual(rec[1], trouble)
  663. # inputs = [u'four',u'five',u'six']
  664. fldId=10
  665. for inParam in inputs:
  666. fldId+=1
  667. try:
  668. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
  669. {"f_Val":inParam,'Id':fldId})
  670. except:
  671. if self.remote:
  672. for message in crsr.messages:
  673. print(message)
  674. else:
  675. conn.printADOerrors()
  676. raise
  677. crsr.execute("SELECT fldData FROM xx_%s WHERE :Id=fldID" % config.tmp, {'Id':fldId})
  678. rec = crsr.fetchone()
  679. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  680. # now a test with a ":" as part of a literal -- and use a prepared query
  681. ppdcmd = "insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp
  682. crsr.prepare(ppdcmd)
  683. crsr.execute(ppdcmd, {'xyz':30})
  684. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  685. rec = crsr.fetchone()
  686. self.assertEqual(rec[0], 'six:five')
  687. def testRollBack(self):
  688. conn = self.getConnection()
  689. crsr = conn.cursor()
  690. assert not crsr.connection.autocommit, 'Unexpected beginning condition'
  691. self.helpCreateAndPopulateTableTemp(crsr)
  692. crsr.connection.commit() # commit the first bunch
  693. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  694. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  695. crsr.execute(selectSql)
  696. rs = crsr.fetchall()
  697. assert len(rs) == 1
  698. self.conn.rollback()
  699. crsr.execute(selectSql)
  700. assert crsr.fetchone() == None, 'cursor.fetchone should return None if a query retrieves no rows'
  701. crsr.execute('SELECT fldData from xx_%s' % config.tmp)
  702. rs = crsr.fetchall()
  703. assert len(rs) == 9, 'the original records should still be present'
  704. self.helpRollbackTblTemp()
  705. def testCommit(self):
  706. try:
  707. con2 = self.getAnotherConnection()
  708. except NotImplementedError:
  709. return # should be "SKIP" for ACCESS
  710. assert not con2.autocommit, 'default should be manual commit'
  711. crsr = con2.cursor()
  712. self.helpCreateAndPopulateTableTemp(crsr)
  713. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  714. con2.commit()
  715. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  716. crsr.execute(selectSql)
  717. rs = crsr.fetchall()
  718. assert len(rs) == 1
  719. crsr.close()
  720. con2.close()
  721. conn = self.getConnection()
  722. crsr = self.getCursor()
  723. with conn.cursor() as crsr:
  724. crsr.execute(selectSql)
  725. rs = crsr.fetchall()
  726. assert len(rs) == 1
  727. assert rs[0][0] == 100
  728. self.helpRollbackTblTemp()
  729. def testAutoRollback(self):
  730. try:
  731. con2 = self.getAnotherConnection()
  732. except NotImplementedError:
  733. return # should be "SKIP" for ACCESS
  734. assert not con2.autocommit, 'unexpected beginning condition'
  735. crsr = con2.cursor()
  736. self.helpCreateAndPopulateTableTemp(crsr)
  737. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  738. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  739. crsr.execute(selectSql)
  740. rs = crsr.fetchall()
  741. assert len(rs) == 1
  742. crsr.close()
  743. con2.close()
  744. crsr = self.getCursor()
  745. try:
  746. crsr.execute(selectSql) # closing the connection should have forced rollback
  747. row = crsr.fetchone()
  748. except api.DatabaseError:
  749. row = None # if the entire table disappeared the rollback was perfect and the test passed
  750. assert row == None, 'cursor.fetchone should return None if a query retrieves no rows. Got %s' % repr(row)
  751. self.helpRollbackTblTemp()
  752. def testAutoCommit(self):
  753. try:
  754. ac_conn = self.getAnotherConnection({'autocommit': True})
  755. except NotImplementedError:
  756. return # should be "SKIP" for ACCESS
  757. crsr = ac_conn.cursor()
  758. self.helpCreateAndPopulateTableTemp(crsr)
  759. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  760. crsr.close()
  761. with self.getCursor() as crsr:
  762. selectSql = 'SELECT fldData from xx_%s' % config.tmp
  763. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  764. rs = crsr.fetchall()
  765. assert len(rs) == 10, 'all records should still be present'
  766. ac_conn.close()
  767. self.helpRollbackTblTemp()
  768. def testSwitchedAutoCommit(self):
  769. try:
  770. ac_conn = self.getAnotherConnection()
  771. except NotImplementedError:
  772. return # should be "SKIP" for ACCESS
  773. ac_conn.autocommit = True
  774. crsr = ac_conn.cursor()
  775. self.helpCreateAndPopulateTableTemp(crsr)
  776. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  777. crsr.close()
  778. conn = self.getConnection()
  779. ac_conn.close()
  780. with self.getCursor() as crsr:
  781. selectSql = 'SELECT fldData from xx_%s' % config.tmp
  782. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  783. rs = crsr.fetchall()
  784. assert len(rs) == 10, 'all records should still be present'
  785. self.helpRollbackTblTemp()
  786. def testExtendedTypeHandling(self):
  787. class XtendString(str):
  788. pass
  789. class XtendInt(int):
  790. pass
  791. class XtendFloat(float):
  792. pass
  793. xs = XtendString(randomstring(30))
  794. xi = XtendInt(random.randint(-100, 500))
  795. xf = XtendFloat(random.random())
  796. self.helpForceDropOnTblTemp()
  797. conn = self.getConnection()
  798. crsr = conn.cursor()
  799. tabdef = """
  800. CREATE TABLE xx_%s (
  801. s VARCHAR(40) NOT NULL,
  802. i INTEGER NOT NULL,
  803. f REAL NOT NULL)""" % config.tmp
  804. crsr.execute(tabdef)
  805. crsr.execute("INSERT INTO xx_%s (s, i, f) VALUES (?, ?, ?)" % config.tmp, (xs, xi, xf))
  806. crsr.close()
  807. conn = self.getConnection()
  808. with self.getCursor() as crsr:
  809. selectSql = 'SELECT s, i, f from xx_%s' % config.tmp
  810. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  811. row = crsr.fetchone()
  812. self.assertEqual(row.s, xs)
  813. self.assertEqual(row.i, xi)
  814. self.assertAlmostEqual(row.f, xf)
  815. self.helpRollbackTblTemp()
  816. class TestADOwithSQLServer(CommonDBTests):
  817. def setUp(self):
  818. self.conn = config.dbSqlServerconnect(*config.connStrSQLServer[0], **config.connStrSQLServer[1])
  819. self.conn.timeout = 30 # turn timeout back up
  820. self.engine = 'MSSQL'
  821. self.db = config.dbSqlServerconnect
  822. self.remote = config.connStrSQLServer[2]
  823. def tearDown(self):
  824. try:
  825. self.conn.rollback()
  826. except:
  827. pass
  828. try:
  829. self.conn.close()
  830. except:
  831. pass
  832. self.conn=None
  833. def getConnection(self):
  834. return self.conn
  835. def getAnotherConnection(self, addkeys=None):
  836. keys = dict(config.connStrSQLServer[1])
  837. if addkeys:
  838. keys.update(addkeys)
  839. return config.dbSqlServerconnect(*config.connStrSQLServer[0], **keys)
  840. def testVariableReturningStoredProcedure(self):
  841. crsr=self.conn.cursor()
  842. spdef= """
  843. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  844. @theInput varchar(50),
  845. @theOtherInput varchar(50),
  846. @theOutput varchar(100) OUTPUT
  847. AS
  848. SET @theOutput=@theInput+@theOtherInput
  849. """
  850. try:
  851. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  852. self.conn.commit()
  853. except: #Make sure it is empty
  854. pass
  855. crsr.execute(spdef)
  856. retvalues=crsr.callproc('sp_DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  857. assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  858. assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  859. assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
  860. self.conn.rollback()
  861. def testMultipleSetReturn(self):
  862. crsr=self.getCursor()
  863. self.helpCreateAndPopulateTableTemp(crsr)
  864. spdef= """
  865. CREATE PROCEDURE sp_DeleteMe_OnlyForTesting
  866. AS
  867. SELECT fldData FROM xx_%s ORDER BY fldData ASC
  868. SELECT fldData From xx_%s where fldData = -9999
  869. SELECT fldData FROM xx_%s ORDER BY fldData DESC
  870. """ % (config.tmp, config.tmp, config.tmp)
  871. try:
  872. crsr.execute("DROP PROCEDURE sp_DeleteMe_OnlyForTesting")
  873. self.conn.commit()
  874. except: #Make sure it is empty
  875. pass
  876. crsr.execute(spdef)
  877. retvalues=crsr.callproc('sp_DeleteMe_OnlyForTesting')
  878. row=crsr.fetchone()
  879. self.assertEqual(row[0], 0)
  880. assert crsr.nextset() == True, 'Operation should succeed'
  881. assert not crsr.fetchall(), 'Should be an empty second set'
  882. assert crsr.nextset() == True, 'third set should be present'
  883. rowdesc=crsr.fetchall()
  884. self.assertEqual(rowdesc[0][0],8)
  885. assert crsr.nextset() == None,'No more return sets, should return None'
  886. self.helpRollbackTblTemp()
  887. def testDatetimeProcedureParameter(self):
  888. crsr=self.conn.cursor()
  889. spdef= """
  890. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  891. @theInput DATETIME,
  892. @theOtherInput varchar(50),
  893. @theOutput varchar(100) OUTPUT
  894. AS
  895. SET @theOutput = CONVERT(CHARACTER(20), @theInput, 0) + @theOtherInput
  896. """
  897. try:
  898. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  899. self.conn.commit()
  900. except: #Make sure it is empty
  901. pass
  902. crsr.execute(spdef)
  903. result = crsr.callproc('sp_DeleteMeOnlyForTesting', [adodbapi.Timestamp(2014,12,25,0,1,0), 'Beep', ' ' * 30])
  904. assert result[2] == 'Dec 25 2014 12:01AM Beep', 'value was="%s"' % result[2]
  905. self.conn.rollback()
  906. def testIncorrectStoredProcedureParameter(self):
  907. crsr=self.conn.cursor()
  908. spdef= """
  909. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  910. @theInput DATETIME,
  911. @theOtherInput varchar(50),
  912. @theOutput varchar(100) OUTPUT
  913. AS
  914. SET @theOutput = CONVERT(CHARACTER(20), @theInput) + @theOtherInput
  915. """
  916. try:
  917. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  918. self.conn.commit()
  919. except: #Make sure it is empty
  920. pass
  921. crsr.execute(spdef)
  922. # calling the sproc with a string for the first parameter where a DateTime is expected
  923. result = tryconnection.try_operation_with_expected_exception(
  924. (api.DataError,api.DatabaseError),
  925. crsr.callproc,
  926. ['sp_DeleteMeOnlyForTesting'],
  927. {'parameters': ['this is wrong', 'Anne', 'not Alice']}
  928. )
  929. if result[0]: # the expected exception was raised
  930. assert '@theInput' in str(result[1]) or 'DatabaseError' in str(result), \
  931. 'Identifies the wrong erroneous parameter'
  932. else:
  933. assert result[0], result[1] # incorrect or no exception
  934. self.conn.rollback()
  935. class TestADOwithAccessDB(CommonDBTests):
  936. def setUp(self):
  937. self.conn = config.dbAccessconnect(*config.connStrAccess[0], **config.connStrAccess[1])
  938. self.conn.timeout = 30 # turn timeout back up
  939. self.engine = 'ACCESS'
  940. self.db = config.dbAccessconnect
  941. self.remote = config.connStrAccess[2]
  942. def tearDown(self):
  943. try:
  944. self.conn.rollback()
  945. except:
  946. pass
  947. try:
  948. self.conn.close()
  949. except:
  950. pass
  951. self.conn=None
  952. def getConnection(self):
  953. return self.conn
  954. def getAnotherConnection(self, addkeys=None):
  955. raise NotImplementedError('Jet cannot use a second connection to the database')
  956. def testOkConnect(self):
  957. c = self.db(*config.connStrAccess[0], **config.connStrAccess[1])
  958. assert c != None
  959. c.close()
  960. class TestADOwithMySql(CommonDBTests):
  961. def setUp(self):
  962. self.conn = config.dbMySqlconnect(*config.connStrMySql[0], **config.connStrMySql[1])
  963. self.conn.timeout = 30 # turn timeout back up
  964. self.engine = 'MySQL'
  965. self.db = config.dbMySqlconnect
  966. self.remote = config.connStrMySql[2]
  967. def tearDown(self):
  968. try:
  969. self.conn.rollback()
  970. except:
  971. pass
  972. try:
  973. self.conn.close()
  974. except:
  975. pass
  976. self.conn=None
  977. def getConnection(self):
  978. return self.conn
  979. def getAnotherConnection(self, addkeys=None):
  980. keys = dict(config.connStrMySql[1])
  981. if addkeys:
  982. keys.update(addkeys)
  983. return config.dbMySqlconnect(*config.connStrMySql[0], **keys)
  984. def testOkConnect(self):
  985. c = self.db(*config.connStrMySql[0], **config.connStrMySql[1])
  986. assert c != None
  987. # def testStoredProcedure(self):
  988. # crsr=self.conn.cursor()
  989. # try:
  990. # crsr.execute("DROP PROCEDURE DeleteMeOnlyForTesting")
  991. # self.conn.commit()
  992. # except: #Make sure it is empty
  993. # pass
  994. # spdef= """
  995. # DELIMITER $$
  996. # CREATE PROCEDURE DeleteMeOnlyForTesting (onein CHAR(10), twoin CHAR(10), OUT theout CHAR(20))
  997. # DETERMINISTIC
  998. # BEGIN
  999. # SET theout = onein //|| twoin;
  1000. # /* (SELECT 'a small string' as result; */
  1001. # END $$
  1002. # """
  1003. #
  1004. # crsr.execute(spdef)
  1005. #
  1006. # retvalues=crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  1007. # print 'return value (mysql)=',repr(crsr.returnValue) ###
  1008. # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  1009. # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  1010. # assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
  1011. #
  1012. # try:
  1013. # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
  1014. # self.conn.commit()
  1015. # except: #Make sure it is empty
  1016. # pass
  1017. class TestADOwithPostgres(CommonDBTests):
  1018. def setUp(self):
  1019. self.conn = config.dbPostgresConnect(*config.connStrPostgres[0], **config.connStrPostgres[1])
  1020. self.conn.timeout = 30 # turn timeout back up
  1021. self.engine = 'PostgreSQL'
  1022. self.db = config.dbPostgresConnect
  1023. self.remote = config.connStrPostgres[2]
  1024. def tearDown(self):
  1025. try:
  1026. self.conn.rollback()
  1027. except:
  1028. pass
  1029. try:
  1030. self.conn.close()
  1031. except:
  1032. pass
  1033. self.conn=None
  1034. def getConnection(self):
  1035. return self.conn
  1036. def getAnotherConnection(self, addkeys=None):
  1037. keys = dict(config.connStrPostgres[1])
  1038. if addkeys:
  1039. keys.update(addkeys)
  1040. return config.dbPostgresConnect(*config.connStrPostgres[0], **keys)
  1041. def testOkConnect(self):
  1042. c = self.db(*config.connStrPostgres[0], **config.connStrPostgres[1])
  1043. assert c != None
  1044. # def testStoredProcedure(self):
  1045. # crsr=self.conn.cursor()
  1046. # spdef= """
  1047. # CREATE OR REPLACE FUNCTION DeleteMeOnlyForTesting (text, text)
  1048. # RETURNS text AS $funk$
  1049. # BEGIN
  1050. # RETURN $1 || $2;
  1051. # END;
  1052. # $funk$
  1053. # LANGUAGE SQL;
  1054. # """
  1055. #
  1056. # crsr.execute(spdef)
  1057. # retvalues = crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  1058. # ### print 'return value (pg)=',repr(crsr.returnValue) ###
  1059. # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  1060. # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  1061. # assert retvalues[2]=='Dodsworth Anne','%s is not "Dodsworth Anne"'%repr(retvalues[2])
  1062. # self.conn.rollback()
  1063. # try:
  1064. # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
  1065. # self.conn.commit()
  1066. # except: #Make sure it is empty
  1067. # pass
  1068. class TimeConverterInterfaceTest(unittest.TestCase):
  1069. def testIDate(self):
  1070. assert self.tc.Date(1990,2,2)
  1071. def testITime(self):
  1072. assert self.tc.Time(13,2,2)
  1073. def testITimestamp(self):
  1074. assert self.tc.Timestamp(1990,2,2,13,2,1)
  1075. def testIDateObjectFromCOMDate(self):
  1076. assert self.tc.DateObjectFromCOMDate(37435.7604282)
  1077. def testICOMDate(self):
  1078. assert hasattr(self.tc,'COMDate')
  1079. def testExactDate(self):
  1080. d=self.tc.Date(1994,11,15)
  1081. comDate=self.tc.COMDate(d)
  1082. correct=34653.0
  1083. assert comDate == correct,comDate
  1084. def testExactTimestamp(self):
  1085. d=self.tc.Timestamp(1994,11,15,12,0,0)
  1086. comDate=self.tc.COMDate(d)
  1087. correct=34653.5
  1088. self.assertEqual( comDate ,correct)
  1089. d=self.tc.Timestamp(2003,5,6,14,15,17)
  1090. comDate=self.tc.COMDate(d)
  1091. correct=37747.593946759262
  1092. self.assertEqual( comDate ,correct)
  1093. def testIsoFormat(self):
  1094. d=self.tc.Timestamp(1994,11,15,12,3,10)
  1095. iso=self.tc.DateObjectToIsoFormatString(d)
  1096. self.assertEqual(str(iso[:19]) , '1994-11-15 12:03:10')
  1097. dt=self.tc.Date(2003,5,2)
  1098. iso=self.tc.DateObjectToIsoFormatString(dt)
  1099. self.assertEqual(str(iso[:10]), '2003-05-02')
  1100. if config.doMxDateTimeTest:
  1101. import mx.DateTime
  1102. class TestMXDateTimeConverter(TimeConverterInterfaceTest):
  1103. def setUp(self):
  1104. self.tc = api.mxDateTimeConverter()
  1105. def testCOMDate(self):
  1106. t=mx.DateTime.DateTime(2002,6,28,18,15,2)
  1107. cmd=self.tc.COMDate(t)
  1108. assert cmd == t.COMDate()
  1109. def testDateObjectFromCOMDate(self):
  1110. cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
  1111. t=mx.DateTime.DateTime(2002,6,28,18,15,0)
  1112. t2=mx.DateTime.DateTime(2002,6,28,18,15,2)
  1113. assert t2>cmd>t
  1114. def testDate(self):
  1115. assert mx.DateTime.Date(1980,11,4)==self.tc.Date(1980,11,4)
  1116. def testTime(self):
  1117. assert mx.DateTime.Time(13,11,4)==self.tc.Time(13,11,4)
  1118. def testTimestamp(self):
  1119. t=mx.DateTime.DateTime(2002,6,28,18,15,1)
  1120. obj=self.tc.Timestamp(2002,6,28,18,15,1)
  1121. assert t == obj
  1122. import time
  1123. class TestPythonTimeConverter(TimeConverterInterfaceTest):
  1124. def setUp(self):
  1125. self.tc=api.pythonTimeConverter()
  1126. def testCOMDate(self):
  1127. mk = time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+28,-1))
  1128. t=time.localtime(mk)
  1129. # Fri, 28 Jun 2002 18:15:01 +0000
  1130. cmd=self.tc.COMDate(t)
  1131. assert abs(cmd - 37435.7604282) < 1.0/24,"%f more than an hour wrong" % cmd
  1132. def testDateObjectFromCOMDate(self):
  1133. cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
  1134. t1=time.gmtime(time.mktime((2002,6,28,0,14,1, 4,31+28+31+30+31+28,-1)))
  1135. #there are errors in the implementation of gmtime which we ignore
  1136. t2=time.gmtime(time.mktime((2002,6,29,12,14,2, 4,31+28+31+30+31+28,-1)))
  1137. assert t1<cmd<t2, '"%s" should be about 2002-6-28 12:15:01'%repr(cmd)
  1138. def testDate(self):
  1139. t1=time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+30,0))
  1140. t2=time.mktime((2002,6,30,18,15,1, 4,31+28+31+30+31+28,0))
  1141. obj=self.tc.Date(2002,6,29)
  1142. assert t1< time.mktime(obj)<t2,obj
  1143. def testTime(self):
  1144. self.assertEqual( self.tc.Time(18,15,2),time.gmtime(18*60*60+15*60+2))
  1145. def testTimestamp(self):
  1146. t1=time.localtime(time.mktime((2002,6,28,18,14,1, 4,31+28+31+30+31+28,-1)))
  1147. t2=time.localtime(time.mktime((2002,6,28,18,16,1, 4,31+28+31+30+31+28,-1)))
  1148. obj=self.tc.Timestamp(2002,6,28,18,15,2)
  1149. assert t1< obj <t2,obj
  1150. class TestPythonDateTimeConverter(TimeConverterInterfaceTest):
  1151. def setUp(self):
  1152. self.tc = api.pythonDateTimeConverter()
  1153. def testCOMDate(self):
  1154. t=datetime.datetime( 2002,6,28,18,15,1)
  1155. # Fri, 28 Jun 2002 18:15:01 +0000
  1156. cmd=self.tc.COMDate(t)
  1157. assert abs(cmd - 37435.7604282) < 1.0/24,"more than an hour wrong"
  1158. def testDateObjectFromCOMDate(self):
  1159. cmd = self.tc.DateObjectFromCOMDate(37435.7604282)
  1160. t1 = datetime.datetime(2002,6,28,18,14,1)
  1161. t2 = datetime.datetime(2002,6,28,18,16,1)
  1162. assert t1 < cmd < t2, cmd
  1163. tx = datetime.datetime(2002,6,28,18,14,1,900000) # testing that microseconds don't become milliseconds
  1164. c1 = self.tc.DateObjectFromCOMDate(self.tc.COMDate(tx))
  1165. assert t1 < c1 < t2, c1
  1166. def testDate(self):
  1167. t1=datetime.date(2002,6,28)
  1168. t2=datetime.date(2002,6,30)
  1169. obj=self.tc.Date(2002,6,29)
  1170. assert t1< obj <t2,obj
  1171. def testTime(self):
  1172. self.assertEqual( self.tc.Time(18,15,2).isoformat()[:8],'18:15:02')
  1173. def testTimestamp(self):
  1174. t1=datetime.datetime(2002,6,28,18,14,1)
  1175. t2=datetime.datetime(2002,6,28,18,16,1)
  1176. obj=self.tc.Timestamp(2002,6,28,18,15,2)
  1177. assert t1< obj <t2,obj
  1178. suites=[]
  1179. suites.append( unittest.makeSuite(TestPythonDateTimeConverter,'test'))
  1180. if config.doMxDateTimeTest:
  1181. suites.append( unittest.makeSuite(TestMXDateTimeConverter,'test'))
  1182. if config.doTimeTest:
  1183. suites.append( unittest.makeSuite(TestPythonTimeConverter,'test'))
  1184. if config.doAccessTest:
  1185. suites.append( unittest.makeSuite(TestADOwithAccessDB,'test'))
  1186. if config.doSqlServerTest:
  1187. suites.append( unittest.makeSuite(TestADOwithSQLServer,'test'))
  1188. if config.doMySqlTest:
  1189. suites.append( unittest.makeSuite(TestADOwithMySql,'test'))
  1190. if config.doPostgresTest:
  1191. suites.append( unittest.makeSuite(TestADOwithPostgres,'test'))
  1192. class cleanup_manager(object):
  1193. def __enter__(self):
  1194. pass
  1195. def __exit__(self, exc_type, exc_val, exc_tb):
  1196. config.cleanup(config.testfolder, config.mdb_name)
  1197. suite=unittest.TestSuite(suites)
  1198. if __name__ == '__main__':
  1199. mysuite = copy.deepcopy(suite)
  1200. with cleanup_manager():
  1201. defaultDateConverter = adodbapi.dateconverter
  1202. print(__doc__)
  1203. print("Default Date Converter is %s" %(defaultDateConverter,))
  1204. dateconverter = defaultDateConverter
  1205. tag = 'datetime'
  1206. unittest.TextTestRunner().run(mysuite)
  1207. if config.iterateOverTimeTests:
  1208. for test, dateconverter, tag in (
  1209. (config.doTimeTest,api.pythonTimeConverter, 'pythontime'),
  1210. (config.doMxDateTimeTest, api.mxDateTimeConverter, 'mx')):
  1211. if test:
  1212. mysuite = copy.deepcopy(suite) # work around a side effect of unittest.TextTestRunner
  1213. adodbapi.adodbapi.dateconverter = dateconverter()
  1214. print("Changed dateconverter to ")
  1215. print(adodbapi.adodbapi.dateconverter)
  1216. unittest.TextTestRunner().run(mysuite)