pywin32_testutil.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # Utilities for the pywin32 tests
  2. import sys
  3. import unittest
  4. import gc
  5. import winerror
  6. ##
  7. ## General purpose utilities for the test suite.
  8. ##
  9. def int2long(val):
  10. """return a long on py2k"""
  11. return val + 0x100000000 - 0x100000000
  12. # The test suite has lots of string constants containing binary data, but
  13. # the strings are used in various "bytes" contexts.
  14. def str2bytes(sval):
  15. if sys.version_info < (3,0) and isinstance(sval, str):
  16. sval = sval.decode("latin1")
  17. return sval.encode("latin1")
  18. # Sometimes we want to pass a string that should explicitly be treated as
  19. # a memory blob.
  20. def str2memory(sval):
  21. if sys.version_info < (3,0):
  22. return buffer(sval)
  23. # py3k.
  24. return memoryview(sval.encode("latin1"))
  25. # Sometimes we want to pass an object that exposes its memory
  26. def ob2memory(ob):
  27. if sys.version_info < (3,0):
  28. return buffer(ob)
  29. # py3k.
  30. return memoryview(ob)
  31. # Note: no str2unicode: we use u'' literals or unicode() function, and 2to3
  32. #
  33. ##
  34. ## unittest related stuff
  35. ##
  36. # This is a specialized TestCase adaptor which wraps a real test.
  37. class LeakTestCase(unittest.TestCase):
  38. """An 'adaptor' which takes another test. In debug builds we execute the
  39. test once to remove one-off side-effects, then capture the total
  40. reference count, then execute the test a few times. If the total
  41. refcount at the end is greater than we first captured, we have a leak!
  42. In release builds the test is executed just once, as normal.
  43. Generally used automatically by the test runner - you can safely
  44. ignore this.
  45. """
  46. def __init__(self, real_test):
  47. unittest.TestCase.__init__(self)
  48. self.real_test = real_test
  49. self.num_test_cases = 1
  50. self.num_leak_iters = 2 # seems to be enough!
  51. if hasattr(sys, "gettotalrefcount"):
  52. self.num_test_cases = self.num_test_cases + self.num_leak_iters
  53. def countTestCases(self):
  54. return self.num_test_cases
  55. def __call__(self, result = None):
  56. # For the COM suite's sake, always ensure we don't leak
  57. # gateways/interfaces
  58. from pythoncom import _GetInterfaceCount, _GetGatewayCount
  59. gc.collect()
  60. ni = _GetInterfaceCount()
  61. ng = _GetGatewayCount()
  62. self.real_test(result)
  63. # Failed - no point checking anything else
  64. if result.shouldStop or not result.wasSuccessful():
  65. return
  66. self._do_leak_tests(result)
  67. gc.collect()
  68. lost_i = _GetInterfaceCount() - ni
  69. lost_g = _GetGatewayCount() - ng
  70. if lost_i or lost_g:
  71. msg = "%d interface objects and %d gateway objects leaked" \
  72. % (lost_i, lost_g)
  73. exc = AssertionError(msg)
  74. result.addFailure(self.real_test, (exc.__class__, exc, None))
  75. def runTest(self):
  76. assert 0, "not used"
  77. def _do_leak_tests(self, result = None):
  78. try:
  79. gtrc = sys.gettotalrefcount
  80. except AttributeError:
  81. return # can't do leak tests in this build
  82. # Assume already called once, to prime any caches etc
  83. gc.collect()
  84. trc = gtrc()
  85. for i in range(self.num_leak_iters):
  86. self.real_test(result)
  87. if result.shouldStop:
  88. break
  89. del i # created after we remembered the refcount!
  90. # int division here means one or 2 stray references won't force
  91. # failure, but one per loop
  92. gc.collect()
  93. lost = (gtrc() - trc) // self.num_leak_iters
  94. if lost < 0:
  95. msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost)
  96. result.addFailure(self.real_test, (AssertionError, msg, None))
  97. if lost > 0:
  98. msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
  99. exc = AssertionError(msg)
  100. result.addFailure(self.real_test, (exc.__class__, exc, None))
  101. class TestLoader(unittest.TestLoader):
  102. def loadTestsFromTestCase(self, testCaseClass):
  103. """Return a suite of all tests cases contained in testCaseClass"""
  104. leak_tests = []
  105. for name in self.getTestCaseNames(testCaseClass):
  106. real_test = testCaseClass(name)
  107. leak_test = self._getTestWrapper(real_test)
  108. leak_tests.append(leak_test)
  109. return self.suiteClass(leak_tests)
  110. def fixupTestsForLeakTests(self, test):
  111. if isinstance(test, unittest.TestSuite):
  112. test._tests = [self.fixupTestsForLeakTests(t) for t in test._tests]
  113. return test
  114. else:
  115. # just a normal test case.
  116. return self._getTestWrapper(test)
  117. def _getTestWrapper(self, test):
  118. # one or 2 tests in the COM test suite set this...
  119. no_leak_tests = getattr(test, "no_leak_tests", False)
  120. if no_leak_tests:
  121. print("Test says it doesn't want leak tests!")
  122. return test
  123. return LeakTestCase(test)
  124. def loadTestsFromModule(self, mod):
  125. if hasattr(mod, "suite"):
  126. tests = mod.suite()
  127. else:
  128. tests = unittest.TestLoader.loadTestsFromModule(self, mod)
  129. return self.fixupTestsForLeakTests(tests)
  130. def loadTestsFromName(self, name, module=None):
  131. test = unittest.TestLoader.loadTestsFromName(self, name, module)
  132. if isinstance(test, unittest.TestSuite):
  133. pass # hmmm? print "Don't wrap suites yet!", test._tests
  134. elif isinstance(test, unittest.TestCase):
  135. test = self._getTestWrapper(test)
  136. else:
  137. print("XXX - what is", test)
  138. return test
  139. # Lots of classes necessary to support one simple feature: we want a 3rd
  140. # test result state - "SKIPPED" - to indicate that the test wasn't able
  141. # to be executed for various reasons. Inspired by bzr's tests, but it
  142. # has other concepts, such as "Expected Failure", which we don't bother
  143. # with.
  144. # win32 error codes that probably mean we need to be elevated (ie, if we
  145. # aren't elevated, we treat these error codes as 'skipped')
  146. non_admin_error_codes = [winerror.ERROR_ACCESS_DENIED,
  147. winerror.ERROR_PRIVILEGE_NOT_HELD]
  148. _is_admin = None
  149. def check_is_admin():
  150. global _is_admin
  151. if _is_admin is None:
  152. from win32com.shell.shell import IsUserAnAdmin
  153. import pythoncom
  154. try:
  155. _is_admin = IsUserAnAdmin()
  156. except pythoncom.com_error as exc:
  157. if exc.hresult != winerror.E_NOTIMPL:
  158. raise
  159. # not impl on this platform - must be old - assume is admin
  160. _is_admin = True
  161. return _is_admin
  162. # If this exception is raised by a test, the test is reported as a 'skip'
  163. class TestSkipped(Exception):
  164. pass
  165. # The 'TestResult' subclass that records the failures and has the special
  166. # handling for the TestSkipped exception.
  167. class TestResult(unittest._TextTestResult):
  168. def __init__(self, *args, **kw):
  169. super(TestResult, self).__init__(*args, **kw)
  170. self.skips = {} # count of skips for each reason.
  171. def addError(self, test, err):
  172. """Called when an error has occurred. 'err' is a tuple of values as
  173. returned by sys.exc_info().
  174. """
  175. # translate a couple of 'well-known' exceptions into 'skipped'
  176. import pywintypes
  177. exc_val = err[1]
  178. # translate ERROR_ACCESS_DENIED for non-admin users to be skipped.
  179. # (access denied errors for an admin user aren't expected.)
  180. if isinstance(exc_val, pywintypes.error) \
  181. and exc_val.winerror in non_admin_error_codes \
  182. and not check_is_admin():
  183. exc_val = TestSkipped(exc_val)
  184. # and COM errors due to objects not being registered (the com test
  185. # suite will attempt to catch this and handle it itself if the user
  186. # is admin)
  187. elif isinstance(exc_val, pywintypes.com_error) and \
  188. exc_val.hresult in [winerror.CO_E_CLASSSTRING,
  189. winerror.REGDB_E_CLASSNOTREG,
  190. winerror.TYPE_E_LIBNOTREGISTERED]:
  191. exc_val = TestSkipped(exc_val)
  192. # NotImplemented generally means the platform doesn't support the
  193. # functionality.
  194. elif isinstance(exc_val, NotImplementedError):
  195. exc_val = TestSkipped(NotImplementedError)
  196. if isinstance(exc_val, TestSkipped):
  197. reason = exc_val.args[0]
  198. # if the reason itself is another exception, get its args.
  199. try:
  200. reason = tuple(reason.args)
  201. except (AttributeError, TypeError):
  202. pass
  203. self.skips.setdefault(reason, 0)
  204. self.skips[reason] += 1
  205. if self.showAll:
  206. self.stream.writeln("SKIP (%s)" % (reason,))
  207. elif self.dots:
  208. self.stream.write('S')
  209. self.stream.flush()
  210. return
  211. super(TestResult, self).addError(test, err)
  212. def printErrors(self):
  213. super(TestResult, self).printErrors()
  214. for reason, num_skipped in self.skips.items():
  215. self.stream.writeln("SKIPPED: %d tests - %s" % (num_skipped, reason))
  216. # TestRunner subclass necessary just to get our TestResult hooked up.
  217. class TestRunner(unittest.TextTestRunner):
  218. def _makeResult(self):
  219. return TestResult(self.stream, self.descriptions, self.verbosity)
  220. # TestProgream subclass necessary just to get our TestRunner hooked up,
  221. # which is necessary to get our TestResult hooked up *sob*
  222. class TestProgram(unittest.TestProgram):
  223. def runTests(self):
  224. # clobber existing runner - *sob* - it shouldn't be this hard
  225. self.testRunner = TestRunner(verbosity=self.verbosity)
  226. unittest.TestProgram.runTests(self)
  227. # A convenient entry-point - if used, 'SKIPPED' exceptions will be supressed.
  228. def testmain(*args, **kw):
  229. new_kw = kw.copy()
  230. if 'testLoader' not in new_kw:
  231. new_kw['testLoader'] = TestLoader()
  232. program_class = new_kw.get('testProgram', TestProgram)
  233. program_class(*args, **new_kw)