test_file2.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # This file is part of h5py, a Python interface to the HDF5 library.
  2. #
  3. # http://www.h5py.org
  4. #
  5. # Copyright 2008-2013 Andrew Collette and contributors
  6. #
  7. # License: Standard 3-clause BSD; see "license.txt" for full license terms
  8. # and contributor agreement.
  9. """
  10. Tests the h5py.File object.
  11. """
  12. import h5py
  13. from h5py._hl.files import _drivers
  14. from .common import ut, TestCase
  15. import io
  16. import tempfile
  17. import os
  18. def nfiles():
  19. return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_FILE)
  20. def ngroups():
  21. return h5py.h5f.get_obj_count(h5py.h5f.OBJ_ALL, h5py.h5f.OBJ_GROUP)
  22. class TestDealloc(TestCase):
  23. """
  24. Behavior on object deallocation. Note most of this behavior is
  25. delegated to FileID.
  26. """
  27. def test_autoclose(self):
  28. """ File objects close automatically when out of scope, but
  29. other objects remain open. """
  30. start_nfiles = nfiles()
  31. start_ngroups = ngroups()
  32. fname = self.mktemp()
  33. f = h5py.File(fname, 'w')
  34. g = f['/']
  35. self.assertEqual(nfiles(), start_nfiles+1)
  36. self.assertEqual(ngroups(), start_ngroups+1)
  37. del f
  38. self.assertTrue(g)
  39. self.assertEqual(nfiles(), start_nfiles)
  40. self.assertEqual(ngroups(), start_ngroups+1)
  41. f = g.file
  42. self.assertTrue(f)
  43. self.assertEqual(nfiles(), start_nfiles+1)
  44. self.assertEqual(ngroups(), start_ngroups+1)
  45. del g
  46. self.assertEqual(nfiles(), start_nfiles+1)
  47. self.assertEqual(ngroups(), start_ngroups)
  48. del f
  49. self.assertEqual(nfiles(), start_nfiles)
  50. self.assertEqual(ngroups(), start_ngroups)
  51. class TestDriverRegistration(TestCase):
  52. def test_register_driver(self):
  53. called_with = [None]
  54. def set_fapl(plist, *args, **kwargs):
  55. called_with[0] = args, kwargs
  56. return _drivers['sec2'](plist)
  57. h5py.register_driver('new-driver', set_fapl)
  58. self.assertIn('new-driver', h5py.registered_drivers())
  59. fname = self.mktemp()
  60. h5py.File(fname, driver='new-driver', driver_arg_0=0, driver_arg_1=1,
  61. mode='w')
  62. self.assertEqual(
  63. called_with,
  64. [((), {'driver_arg_0': 0, 'driver_arg_1': 1})],
  65. )
  66. def test_unregister_driver(self):
  67. h5py.register_driver('new-driver', lambda plist: None)
  68. self.assertIn('new-driver', h5py.registered_drivers())
  69. h5py.unregister_driver('new-driver')
  70. self.assertNotIn('new-driver', h5py.registered_drivers())
  71. with self.assertRaises(ValueError) as e:
  72. fname = self.mktemp()
  73. h5py.File(fname, driver='new-driver', mode='w')
  74. self.assertEqual(str(e.exception), 'Unknown driver type "new-driver"')
  75. class TestCache(TestCase):
  76. def test_defaults(self):
  77. fname = self.mktemp()
  78. f = h5py.File(fname, 'w')
  79. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  80. [0, 521, 1048576, 0.75])
  81. def test_nbytes(self):
  82. fname = self.mktemp()
  83. f = h5py.File(fname, 'w', rdcc_nbytes=1024)
  84. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  85. [0, 521, 1024, 0.75])
  86. def test_nslots(self):
  87. fname = self.mktemp()
  88. f = h5py.File(fname, 'w', rdcc_nslots=125)
  89. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  90. [0, 125, 1048576, 0.75])
  91. def test_w0(self):
  92. fname = self.mktemp()
  93. f = h5py.File(fname, 'w', rdcc_w0=0.25)
  94. self.assertEqual(list(f.id.get_access_plist().get_cache()),
  95. [0, 521, 1048576, 0.25])
  96. class TestFileObj(TestCase):
  97. def check_write(self, fileobj):
  98. f = h5py.File(fileobj, 'w')
  99. self.assertEqual(f.driver, 'fileobj')
  100. self.assertEqual(f.filename, repr(fileobj))
  101. f.create_dataset('test', data=list(range(12)))
  102. self.assertEqual(list(f), ['test'])
  103. self.assertEqual(list(f['test'][:]), list(range(12)))
  104. f.close()
  105. def check_read(self, fileobj):
  106. f = h5py.File(fileobj, 'r')
  107. self.assertEqual(list(f), ['test'])
  108. self.assertEqual(list(f['test'][:]), list(range(12)))
  109. self.assertRaises(Exception, f.create_dataset, 'another.test', data=list(range(3)))
  110. f.close()
  111. def test_BytesIO(self):
  112. with io.BytesIO() as fileobj:
  113. self.assertEqual(len(fileobj.getvalue()), 0)
  114. self.check_write(fileobj)
  115. self.assertGreater(len(fileobj.getvalue()), 0)
  116. self.check_read(fileobj)
  117. def test_file(self):
  118. fname = self.mktemp()
  119. try:
  120. with open(fname, 'wb+') as fileobj:
  121. self.assertEqual(os.path.getsize(fname), 0)
  122. self.check_write(fileobj)
  123. self.assertGreater(os.path.getsize(fname), 0)
  124. self.check_read(fileobj)
  125. with open(fname, 'rb') as fileobj:
  126. self.check_read(fileobj)
  127. finally:
  128. os.remove(fname)
  129. def test_TemporaryFile(self):
  130. # in this test, we check explicitly that temp file gets
  131. # automatically deleted upon h5py.File.close()...
  132. fileobj = tempfile.NamedTemporaryFile()
  133. fname = fileobj.name
  134. f = h5py.File(fileobj, 'w')
  135. del fileobj
  136. # ... but in your code feel free to simply
  137. # f = h5py.File(tempfile.TemporaryFile())
  138. f.create_dataset('test', data=list(range(12)))
  139. self.assertEqual(list(f), ['test'])
  140. self.assertEqual(list(f['test'][:]), list(range(12)))
  141. self.assertTrue(os.path.isfile(fname))
  142. f.close()
  143. self.assertFalse(os.path.isfile(fname))
  144. def test_exception_open(self):
  145. self.assertRaises(Exception, h5py.File, None,
  146. driver='fileobj', mode='x')
  147. self.assertRaises(Exception, h5py.File, 'rogue',
  148. driver='fileobj', mode='x')
  149. self.assertRaises(Exception, h5py.File, self,
  150. driver='fileobj', mode='x')
  151. def test_exception_read(self):
  152. class BrokenBytesIO(io.BytesIO):
  153. def readinto(self, b):
  154. raise Exception('I am broken')
  155. f = h5py.File(BrokenBytesIO(), 'w')
  156. f.create_dataset('test', data=list(range(12)))
  157. self.assertRaises(Exception, list, f['test'])
  158. def test_exception_write(self):
  159. class BrokenBytesIO(io.BytesIO):
  160. allow_write = False
  161. def write(self, b):
  162. if self.allow_write:
  163. return super().write(b)
  164. else:
  165. raise Exception('I am broken')
  166. bio = BrokenBytesIO()
  167. f = h5py.File(bio, 'w')
  168. try:
  169. self.assertRaises(Exception, f.create_dataset, 'test',
  170. data=list(range(12)))
  171. finally:
  172. # Un-break writing so we can close: errors while closing get messy.
  173. bio.allow_write = True
  174. f.close()
  175. @ut.skip("Incompletely closed files can cause segfaults")
  176. def test_exception_close(self):
  177. fileobj = io.BytesIO()
  178. f = h5py.File(fileobj, 'w')
  179. fileobj.close()
  180. self.assertRaises(Exception, f.close)
  181. def test_exception_writeonly(self):
  182. # HDF5 expects read & write access to a file it's writing;
  183. # check that we get the correct exception on a write-only file object.
  184. fileobj = open(os.path.join(self.tempdir, 'a.h5'), 'wb')
  185. with self.assertRaises(io.UnsupportedOperation):
  186. f = h5py.File(fileobj, 'w')
  187. group = f.create_group("group")
  188. group.create_dataset("data", data='foo', dtype=h5py.string_dtype())
  189. def test_method_vanish(self):
  190. fileobj = io.BytesIO()
  191. f = h5py.File(fileobj, 'w')
  192. f.create_dataset('test', data=list(range(12)))
  193. self.assertEqual(list(f['test'][:]), list(range(12)))
  194. fileobj.readinto = None
  195. self.assertRaises(Exception, list, f['test'])
  196. class TestTrackOrder(TestCase):
  197. def populate(self, f):
  198. for i in range(100):
  199. # Mix group and dataset creation.
  200. if i % 10 == 0:
  201. f.create_group(str(i))
  202. else:
  203. f[str(i)] = [i]
  204. def test_track_order(self):
  205. fname = self.mktemp()
  206. f = h5py.File(fname, 'w', track_order=True) # creation order
  207. self.populate(f)
  208. self.assertEqual(list(f),
  209. [str(i) for i in range(100)])
  210. def test_no_track_order(self):
  211. fname = self.mktemp()
  212. f = h5py.File(fname, 'w', track_order=False) # name alphanumeric
  213. self.populate(f)
  214. self.assertEqual(list(f),
  215. sorted([str(i) for i in range(100)]))