123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- #-*- coding: iso-8859-1 -*-
- # pysqlite2/test/regression.py: pysqlite regression tests
- #
- # Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
- #
- # This file is part of pysqlite.
- #
- # This software is provided 'as-is', without any express or implied
- # warranty. In no event will the authors be held liable for any damages
- # arising from the use of this software.
- #
- # Permission is granted to anyone to use this software for any purpose,
- # including commercial applications, and to alter it and redistribute it
- # freely, subject to the following restrictions:
- #
- # 1. The origin of this software must not be misrepresented; you must not
- # claim that you wrote the original software. If you use this software
- # in a product, an acknowledgment in the product documentation would be
- # appreciated but is not required.
- # 2. Altered source versions must be plainly marked as such, and must not be
- # misrepresented as being the original software.
- # 3. This notice may not be removed or altered from any source distribution.
- import datetime
- import unittest
- import sqlite3 as sqlite
- import weakref
- import functools
- from test import support
- from unittest.mock import patch
- class RegressionTests(unittest.TestCase):
- def setUp(self):
- self.con = sqlite.connect(":memory:")
- def tearDown(self):
- self.con.close()
- def CheckPragmaUserVersion(self):
- # This used to crash pysqlite because this pragma command returns NULL for the column name
- cur = self.con.cursor()
- cur.execute("pragma user_version")
- def CheckPragmaSchemaVersion(self):
- # This still crashed pysqlite <= 2.2.1
- con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
- try:
- cur = self.con.cursor()
- cur.execute("pragma schema_version")
- finally:
- cur.close()
- con.close()
- def CheckStatementReset(self):
- # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
- # reset before a rollback, but only those that are still in the
- # statement cache. The others are not accessible from the connection object.
- con = sqlite.connect(":memory:", cached_statements=5)
- cursors = [con.cursor() for x in range(5)]
- cursors[0].execute("create table test(x)")
- for i in range(10):
- cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
- for i in range(5):
- cursors[i].execute(" " * i + "select x from test")
- con.rollback()
- def CheckColumnNameWithSpaces(self):
- cur = self.con.cursor()
- cur.execute('select 1 as "foo bar [datetime]"')
- self.assertEqual(cur.description[0][0], "foo bar [datetime]")
- cur.execute('select 1 as "foo baz"')
- self.assertEqual(cur.description[0][0], "foo baz")
- def CheckStatementFinalizationOnCloseDb(self):
- # pysqlite versions <= 2.3.3 only finalized statements in the statement
- # cache when closing the database. statements that were still
- # referenced in cursors weren't closed and could provoke "
- # "OperationalError: Unable to close due to unfinalised statements".
- con = sqlite.connect(":memory:")
- cursors = []
- # default statement cache size is 100
- for i in range(105):
- cur = con.cursor()
- cursors.append(cur)
- cur.execute("select 1 x union select " + str(i))
- con.close()
- @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
- def CheckOnConflictRollback(self):
- con = sqlite.connect(":memory:")
- con.execute("create table foo(x, unique(x) on conflict rollback)")
- con.execute("insert into foo(x) values (1)")
- try:
- con.execute("insert into foo(x) values (1)")
- except sqlite.DatabaseError:
- pass
- con.execute("insert into foo(x) values (2)")
- try:
- con.commit()
- except sqlite.OperationalError:
- self.fail("pysqlite knew nothing about the implicit ROLLBACK")
- def CheckWorkaroundForBuggySqliteTransferBindings(self):
- """
- pysqlite would crash with older SQLite versions unless
- a workaround is implemented.
- """
- self.con.execute("create table foo(bar)")
- self.con.execute("drop table foo")
- self.con.execute("create table foo(bar)")
- def CheckEmptyStatement(self):
- """
- pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
- for "no-operation" statements
- """
- self.con.execute("")
- def CheckTypeMapUsage(self):
- """
- pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
- a statement. This test exhibits the problem.
- """
- SELECT = "select * from foo"
- con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
- con.execute("create table foo(bar timestamp)")
- con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
- con.execute(SELECT).close()
- con.execute("drop table foo")
- con.execute("create table foo(bar integer)")
- con.execute("insert into foo(bar) values (5)")
- con.execute(SELECT).close()
- def CheckBindMutatingList(self):
- # Issue41662: Crash when mutate a list of parameters during iteration.
- class X:
- def __conform__(self, protocol):
- parameters.clear()
- return "..."
- parameters = [X(), 0]
- con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
- con.execute("create table foo(bar X, baz integer)")
- # Should not crash
- with self.assertRaises(IndexError):
- con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
- def CheckErrorMsgDecodeError(self):
- # When porting the module to Python 3.0, the error message about
- # decoding errors disappeared. This verifies they're back again.
- with self.assertRaises(sqlite.OperationalError) as cm:
- self.con.execute("select 'xxx' || ? || 'yyy' colname",
- (bytes(bytearray([250])),)).fetchone()
- msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
- self.assertIn(msg, str(cm.exception))
- def CheckRegisterAdapter(self):
- """
- See issue 3312.
- """
- self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
- def CheckSetIsolationLevel(self):
- # See issue 27881.
- class CustomStr(str):
- def upper(self):
- return None
- def __del__(self):
- con.isolation_level = ""
- con = sqlite.connect(":memory:")
- con.isolation_level = None
- for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
- with self.subTest(level=level):
- con.isolation_level = level
- con.isolation_level = level.lower()
- con.isolation_level = level.capitalize()
- con.isolation_level = CustomStr(level)
- # setting isolation_level failure should not alter previous state
- con.isolation_level = None
- con.isolation_level = "DEFERRED"
- pairs = [
- (1, TypeError), (b'', TypeError), ("abc", ValueError),
- ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
- ]
- for value, exc in pairs:
- with self.subTest(level=value):
- with self.assertRaises(exc):
- con.isolation_level = value
- self.assertEqual(con.isolation_level, "DEFERRED")
- def CheckCursorConstructorCallCheck(self):
- """
- Verifies that cursor methods check whether base class __init__ was
- called.
- """
- class Cursor(sqlite.Cursor):
- def __init__(self, con):
- pass
- con = sqlite.connect(":memory:")
- cur = Cursor(con)
- with self.assertRaises(sqlite.ProgrammingError):
- cur.execute("select 4+5").fetchall()
- with self.assertRaisesRegex(sqlite.ProgrammingError,
- r'^Base Cursor\.__init__ not called\.$'):
- cur.close()
- def CheckStrSubclass(self):
- """
- The Python 3.0 port of the module didn't cope with values of subclasses of str.
- """
- class MyStr(str): pass
- self.con.execute("select ?", (MyStr("abc"),))
- def CheckConnectionConstructorCallCheck(self):
- """
- Verifies that connection methods check whether base class __init__ was
- called.
- """
- class Connection(sqlite.Connection):
- def __init__(self, name):
- pass
- con = Connection(":memory:")
- with self.assertRaises(sqlite.ProgrammingError):
- cur = con.cursor()
- def CheckCursorRegistration(self):
- """
- Verifies that subclassed cursor classes are correctly registered with
- the connection object, too. (fetch-across-rollback problem)
- """
- class Connection(sqlite.Connection):
- def cursor(self):
- return Cursor(self)
- class Cursor(sqlite.Cursor):
- def __init__(self, con):
- sqlite.Cursor.__init__(self, con)
- con = Connection(":memory:")
- cur = con.cursor()
- cur.execute("create table foo(x)")
- cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
- cur.execute("select x from foo")
- con.rollback()
- with self.assertRaises(sqlite.InterfaceError):
- cur.fetchall()
- def CheckAutoCommit(self):
- """
- Verifies that creating a connection in autocommit mode works.
- 2.5.3 introduced a regression so that these could no longer
- be created.
- """
- con = sqlite.connect(":memory:", isolation_level=None)
- def CheckPragmaAutocommit(self):
- """
- Verifies that running a PRAGMA statement that does an autocommit does
- work. This did not work in 2.5.3/2.5.4.
- """
- cur = self.con.cursor()
- cur.execute("create table foo(bar)")
- cur.execute("insert into foo(bar) values (5)")
- cur.execute("pragma page_size")
- row = cur.fetchone()
- def CheckConnectionCall(self):
- """
- Call a connection with a non-string SQL request: check error handling
- of the statement constructor.
- """
- self.assertRaises(TypeError, self.con, 1)
- def CheckCollation(self):
- def collation_cb(a, b):
- return 1
- self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
- # Lone surrogate cannot be encoded to the default encoding (utf8)
- "\uDC80", collation_cb)
- def CheckRecursiveCursorUse(self):
- """
- http://bugs.python.org/issue10811
- Recursively using a cursor, such as when reusing it from a generator led to segfaults.
- Now we catch recursive cursor usage and raise a ProgrammingError.
- """
- con = sqlite.connect(":memory:")
- cur = con.cursor()
- cur.execute("create table a (bar)")
- cur.execute("create table b (baz)")
- def foo():
- cur.execute("insert into a (bar) values (?)", (1,))
- yield 1
- with self.assertRaises(sqlite.ProgrammingError):
- cur.executemany("insert into b (baz) values (?)",
- ((i,) for i in foo()))
- def CheckConvertTimestampMicrosecondPadding(self):
- """
- http://bugs.python.org/issue14720
- The microsecond parsing of convert_timestamp() should pad with zeros,
- since the microsecond string "456" actually represents "456000".
- """
- con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
- cur = con.cursor()
- cur.execute("CREATE TABLE t (x TIMESTAMP)")
- # Microseconds should be 456000
- cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
- # Microseconds should be truncated to 123456
- cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
- cur.execute("SELECT * FROM t")
- values = [x[0] for x in cur.fetchall()]
- self.assertEqual(values, [
- datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
- datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
- ])
- def CheckInvalidIsolationLevelType(self):
- # isolation level is a string, not an integer
- self.assertRaises(TypeError,
- sqlite.connect, ":memory:", isolation_level=123)
- def CheckNullCharacter(self):
- # Issue #21147
- con = sqlite.connect(":memory:")
- self.assertRaises(ValueError, con, "\0select 1")
- self.assertRaises(ValueError, con, "select 1\0")
- cur = con.cursor()
- self.assertRaises(ValueError, cur.execute, " \0select 2")
- self.assertRaises(ValueError, cur.execute, "select 2\0")
- def CheckCommitCursorReset(self):
- """
- Connection.commit() did reset cursors, which made sqlite3
- to return rows multiple times when fetched from cursors
- after commit. See issues 10513 and 23129 for details.
- """
- con = sqlite.connect(":memory:")
- con.executescript("""
- create table t(c);
- create table t2(c);
- insert into t values(0);
- insert into t values(1);
- insert into t values(2);
- """)
- self.assertEqual(con.isolation_level, "")
- counter = 0
- for i, row in enumerate(con.execute("select c from t")):
- with self.subTest(i=i, row=row):
- con.execute("insert into t2(c) values (?)", (i,))
- con.commit()
- if counter == 0:
- self.assertEqual(row[0], 0)
- elif counter == 1:
- self.assertEqual(row[0], 1)
- elif counter == 2:
- self.assertEqual(row[0], 2)
- counter += 1
- self.assertEqual(counter, 3, "should have returned exactly three rows")
- def CheckBpo31770(self):
- """
- The interpreter shouldn't crash in case Cursor.__init__() is called
- more than once.
- """
- def callback(*args):
- pass
- con = sqlite.connect(":memory:")
- cur = sqlite.Cursor(con)
- ref = weakref.ref(cur, callback)
- cur.__init__(con)
- del cur
- # The interpreter shouldn't crash when ref is collected.
- del ref
- support.gc_collect()
- def CheckDelIsolation_levelSegfault(self):
- with self.assertRaises(AttributeError):
- del self.con.isolation_level
- def CheckBpo37347(self):
- class Printer:
- def log(self, *args):
- return sqlite.SQLITE_OK
- for method in [self.con.set_trace_callback,
- functools.partial(self.con.set_progress_handler, n=1),
- self.con.set_authorizer]:
- printer_instance = Printer()
- method(printer_instance.log)
- method(printer_instance.log)
- self.con.execute("select 1") # trigger seg fault
- method(None)
- class RecursiveUseOfCursors(unittest.TestCase):
- # GH-80254: sqlite3 should not segfault for recursive use of cursors.
- msg = "Recursive use of cursors not allowed"
- def setUp(self):
- self.con = sqlite.connect(":memory:",
- detect_types=sqlite.PARSE_COLNAMES)
- self.cur = self.con.cursor()
- self.cur.execute("create table test(x foo)")
- self.cur.executemany("insert into test(x) values (?)",
- [("foo",), ("bar",)])
- def tearDown(self):
- self.cur.close()
- self.con.close()
- del self.cur
- del self.con
- def test_recursive_cursor_init(self):
- conv = lambda x: self.cur.__init__(self.con)
- with patch.dict(sqlite.converters, {"INIT": conv}):
- with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
- self.cur.execute(f'select x as "x [INIT]", x from test')
- def test_recursive_cursor_close(self):
- conv = lambda x: self.cur.close()
- with patch.dict(sqlite.converters, {"CLOSE": conv}):
- with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
- self.cur.execute(f'select x as "x [CLOSE]", x from test')
- def test_recursive_cursor_fetch(self):
- conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
- with patch.dict(sqlite.converters, {"ITER": conv}):
- self.cur.execute(f'select x as "x [ITER]", x from test')
- with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
- self.cur.fetchall()
- def suite():
- regression_suite = unittest.makeSuite(RegressionTests, "Check")
- recursive_cursor = unittest.makeSuite(RecursiveUseOfCursors)
- return unittest.TestSuite((
- regression_suite,
- recursive_cursor,
- ))
- def test():
- runner = unittest.TextTestRunner()
- runner.run(suite())
- if __name__ == "__main__":
- test()
|