sas.pyx 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. # cython: profile=False
  2. # cython: boundscheck=False, initializedcheck=False
  3. from cython import Py_ssize_t
  4. import numpy as np
  5. import pandas.io.sas.sas_constants as const
  6. ctypedef signed long long int64_t
  7. ctypedef unsigned char uint8_t
  8. ctypedef unsigned short uint16_t
  9. # rle_decompress decompresses data using a Run Length Encoding
  10. # algorithm. It is partially documented here:
  11. #
  12. # https://cran.r-project.org/package=sas7bdat/vignettes/sas7bdat.pdf
  13. cdef const uint8_t[:] rle_decompress(int result_length, const uint8_t[:] inbuff):
  14. cdef:
  15. uint8_t control_byte, x
  16. uint8_t[:] result = np.zeros(result_length, np.uint8)
  17. int rpos = 0
  18. int i, nbytes, end_of_first_byte
  19. Py_ssize_t ipos = 0, length = len(inbuff)
  20. while ipos < length:
  21. control_byte = inbuff[ipos] & 0xF0
  22. end_of_first_byte = <int>(inbuff[ipos] & 0x0F)
  23. ipos += 1
  24. if control_byte == 0x00:
  25. if end_of_first_byte != 0:
  26. raise ValueError("Unexpected non-zero end_of_first_byte")
  27. nbytes = <int>(inbuff[ipos]) + 64
  28. ipos += 1
  29. for _ in range(nbytes):
  30. result[rpos] = inbuff[ipos]
  31. rpos += 1
  32. ipos += 1
  33. elif control_byte == 0x40:
  34. # not documented
  35. nbytes = end_of_first_byte * 16
  36. nbytes += <int>(inbuff[ipos])
  37. ipos += 1
  38. for _ in range(nbytes):
  39. result[rpos] = inbuff[ipos]
  40. rpos += 1
  41. ipos += 1
  42. elif control_byte == 0x60:
  43. nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17
  44. ipos += 1
  45. for _ in range(nbytes):
  46. result[rpos] = 0x20
  47. rpos += 1
  48. elif control_byte == 0x70:
  49. nbytes = end_of_first_byte * 256 + <int>(inbuff[ipos]) + 17
  50. ipos += 1
  51. for _ in range(nbytes):
  52. result[rpos] = 0x00
  53. rpos += 1
  54. elif control_byte == 0x80:
  55. nbytes = end_of_first_byte + 1
  56. for i in range(nbytes):
  57. result[rpos] = inbuff[ipos + i]
  58. rpos += 1
  59. ipos += nbytes
  60. elif control_byte == 0x90:
  61. nbytes = end_of_first_byte + 17
  62. for i in range(nbytes):
  63. result[rpos] = inbuff[ipos + i]
  64. rpos += 1
  65. ipos += nbytes
  66. elif control_byte == 0xA0:
  67. nbytes = end_of_first_byte + 33
  68. for i in range(nbytes):
  69. result[rpos] = inbuff[ipos + i]
  70. rpos += 1
  71. ipos += nbytes
  72. elif control_byte == 0xB0:
  73. nbytes = end_of_first_byte + 49
  74. for i in range(nbytes):
  75. result[rpos] = inbuff[ipos + i]
  76. rpos += 1
  77. ipos += nbytes
  78. elif control_byte == 0xC0:
  79. nbytes = end_of_first_byte + 3
  80. x = inbuff[ipos]
  81. ipos += 1
  82. for _ in range(nbytes):
  83. result[rpos] = x
  84. rpos += 1
  85. elif control_byte == 0xD0:
  86. nbytes = end_of_first_byte + 2
  87. for _ in range(nbytes):
  88. result[rpos] = 0x40
  89. rpos += 1
  90. elif control_byte == 0xE0:
  91. nbytes = end_of_first_byte + 2
  92. for _ in range(nbytes):
  93. result[rpos] = 0x20
  94. rpos += 1
  95. elif control_byte == 0xF0:
  96. nbytes = end_of_first_byte + 2
  97. for _ in range(nbytes):
  98. result[rpos] = 0x00
  99. rpos += 1
  100. else:
  101. raise ValueError(f"unknown control byte: {control_byte}")
  102. # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t
  103. if <Py_ssize_t>len(result) != <Py_ssize_t>result_length:
  104. raise ValueError(f"RLE: {len(result)} != {result_length}")
  105. return np.asarray(result)
  106. # rdc_decompress decompresses data using the Ross Data Compression algorithm:
  107. #
  108. # http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm
  109. cdef const uint8_t[:] rdc_decompress(int result_length, const uint8_t[:] inbuff):
  110. cdef:
  111. uint8_t cmd
  112. uint16_t ctrl_bits = 0, ctrl_mask = 0, ofs, cnt
  113. int rpos = 0, k
  114. uint8_t[:] outbuff = np.zeros(result_length, dtype=np.uint8)
  115. Py_ssize_t ipos = 0, length = len(inbuff)
  116. ii = -1
  117. while ipos < length:
  118. ii += 1
  119. ctrl_mask = ctrl_mask >> 1
  120. if ctrl_mask == 0:
  121. ctrl_bits = ((<uint16_t>inbuff[ipos] << 8) +
  122. <uint16_t>inbuff[ipos + 1])
  123. ipos += 2
  124. ctrl_mask = 0x8000
  125. if ctrl_bits & ctrl_mask == 0:
  126. outbuff[rpos] = inbuff[ipos]
  127. ipos += 1
  128. rpos += 1
  129. continue
  130. cmd = (inbuff[ipos] >> 4) & 0x0F
  131. cnt = <uint16_t>(inbuff[ipos] & 0x0F)
  132. ipos += 1
  133. # short RLE
  134. if cmd == 0:
  135. cnt += 3
  136. for k in range(cnt):
  137. outbuff[rpos + k] = inbuff[ipos]
  138. rpos += cnt
  139. ipos += 1
  140. # long RLE
  141. elif cmd == 1:
  142. cnt += <uint16_t>inbuff[ipos] << 4
  143. cnt += 19
  144. ipos += 1
  145. for k in range(cnt):
  146. outbuff[rpos + k] = inbuff[ipos]
  147. rpos += cnt
  148. ipos += 1
  149. # long pattern
  150. elif cmd == 2:
  151. ofs = cnt + 3
  152. ofs += <uint16_t>inbuff[ipos] << 4
  153. ipos += 1
  154. cnt = <uint16_t>inbuff[ipos]
  155. ipos += 1
  156. cnt += 16
  157. for k in range(cnt):
  158. outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
  159. rpos += cnt
  160. # short pattern
  161. elif (cmd >= 3) & (cmd <= 15):
  162. ofs = cnt + 3
  163. ofs += <uint16_t>inbuff[ipos] << 4
  164. ipos += 1
  165. for k in range(cmd):
  166. outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
  167. rpos += cmd
  168. else:
  169. raise ValueError("unknown RDC command")
  170. # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t
  171. if <Py_ssize_t>len(outbuff) != <Py_ssize_t>result_length:
  172. raise ValueError(f"RDC: {len(outbuff)} != {result_length}\n")
  173. return np.asarray(outbuff)
  174. cdef enum ColumnTypes:
  175. column_type_decimal = 1
  176. column_type_string = 2
  177. # type the page_data types
  178. cdef:
  179. int page_meta_type = const.page_meta_type
  180. int page_mix_types_0 = const.page_mix_types[0]
  181. int page_mix_types_1 = const.page_mix_types[1]
  182. int page_data_type = const.page_data_type
  183. int subheader_pointers_offset = const.subheader_pointers_offset
  184. cdef class Parser:
  185. cdef:
  186. int column_count
  187. int64_t[:] lengths
  188. int64_t[:] offsets
  189. int64_t[:] column_types
  190. uint8_t[:, :] byte_chunk
  191. object[:, :] string_chunk
  192. char *cached_page
  193. int current_row_on_page_index
  194. int current_page_block_count
  195. int current_page_data_subheader_pointers_len
  196. int current_page_subheaders_count
  197. int current_row_in_chunk_index
  198. int current_row_in_file_index
  199. int header_length
  200. int row_length
  201. int bit_offset
  202. int subheader_pointer_length
  203. int current_page_type
  204. bint is_little_endian
  205. const uint8_t[:] (*decompress)(int result_length, const uint8_t[:] inbuff)
  206. object parser
  207. def __init__(self, object parser):
  208. cdef:
  209. int j
  210. char[:] column_types
  211. self.parser = parser
  212. self.header_length = self.parser.header_length
  213. self.column_count = parser.column_count
  214. self.lengths = parser.column_data_lengths()
  215. self.offsets = parser.column_data_offsets()
  216. self.byte_chunk = parser._byte_chunk
  217. self.string_chunk = parser._string_chunk
  218. self.row_length = parser.row_length
  219. self.bit_offset = self.parser._page_bit_offset
  220. self.subheader_pointer_length = self.parser._subheader_pointer_length
  221. self.is_little_endian = parser.byte_order == "<"
  222. self.column_types = np.empty(self.column_count, dtype='int64')
  223. # page indicators
  224. self.update_next_page()
  225. column_types = parser.column_types()
  226. # map column types
  227. for j in range(self.column_count):
  228. if column_types[j] == b'd':
  229. self.column_types[j] = column_type_decimal
  230. elif column_types[j] == b's':
  231. self.column_types[j] = column_type_string
  232. else:
  233. raise ValueError(f"unknown column type: {self.parser.columns[j].ctype}")
  234. # compression
  235. if parser.compression == const.rle_compression:
  236. self.decompress = rle_decompress
  237. elif parser.compression == const.rdc_compression:
  238. self.decompress = rdc_decompress
  239. else:
  240. self.decompress = NULL
  241. # update to current state of the parser
  242. self.current_row_in_chunk_index = parser._current_row_in_chunk_index
  243. self.current_row_in_file_index = parser._current_row_in_file_index
  244. self.current_row_on_page_index = parser._current_row_on_page_index
  245. def read(self, int nrows):
  246. cdef:
  247. bint done
  248. int i
  249. for _ in range(nrows):
  250. done = self.readline()
  251. if done:
  252. break
  253. # update the parser
  254. self.parser._current_row_on_page_index = self.current_row_on_page_index
  255. self.parser._current_row_in_chunk_index = self.current_row_in_chunk_index
  256. self.parser._current_row_in_file_index = self.current_row_in_file_index
  257. cdef bint read_next_page(self):
  258. cdef done
  259. done = self.parser._read_next_page()
  260. if done:
  261. self.cached_page = NULL
  262. else:
  263. self.update_next_page()
  264. return done
  265. cdef update_next_page(self):
  266. # update data for the current page
  267. self.cached_page = <char *>self.parser._cached_page
  268. self.current_row_on_page_index = 0
  269. self.current_page_type = self.parser._current_page_type
  270. self.current_page_block_count = self.parser._current_page_block_count
  271. self.current_page_data_subheader_pointers_len = len(
  272. self.parser._current_page_data_subheader_pointers
  273. )
  274. self.current_page_subheaders_count = self.parser._current_page_subheaders_count
  275. cdef readline(self):
  276. cdef:
  277. int offset, bit_offset, align_correction
  278. int subheader_pointer_length, mn
  279. bint done, flag
  280. bit_offset = self.bit_offset
  281. subheader_pointer_length = self.subheader_pointer_length
  282. # If there is no page, go to the end of the header and read a page.
  283. if self.cached_page == NULL:
  284. self.parser._path_or_buf.seek(self.header_length)
  285. done = self.read_next_page()
  286. if done:
  287. return True
  288. # Loop until a data row is read
  289. while True:
  290. if self.current_page_type == page_meta_type:
  291. flag = self.current_row_on_page_index >=\
  292. self.current_page_data_subheader_pointers_len
  293. if flag:
  294. done = self.read_next_page()
  295. if done:
  296. return True
  297. continue
  298. current_subheader_pointer = (
  299. self.parser._current_page_data_subheader_pointers[
  300. self.current_row_on_page_index])
  301. self.process_byte_array_with_data(
  302. current_subheader_pointer.offset,
  303. current_subheader_pointer.length)
  304. return False
  305. elif (self.current_page_type == page_mix_types_0 or
  306. self.current_page_type == page_mix_types_1):
  307. align_correction = (
  308. bit_offset
  309. + subheader_pointers_offset
  310. + self.current_page_subheaders_count * subheader_pointer_length
  311. )
  312. align_correction = align_correction % 8
  313. offset = bit_offset + align_correction
  314. offset += subheader_pointers_offset
  315. offset += self.current_page_subheaders_count * subheader_pointer_length
  316. offset += self.current_row_on_page_index * self.row_length
  317. self.process_byte_array_with_data(offset, self.row_length)
  318. mn = min(self.parser.row_count, self.parser._mix_page_row_count)
  319. if self.current_row_on_page_index == mn:
  320. done = self.read_next_page()
  321. if done:
  322. return True
  323. return False
  324. elif self.current_page_type & page_data_type == page_data_type:
  325. self.process_byte_array_with_data(
  326. bit_offset
  327. + subheader_pointers_offset
  328. + self.current_row_on_page_index * self.row_length,
  329. self.row_length,
  330. )
  331. flag = self.current_row_on_page_index == self.current_page_block_count
  332. if flag:
  333. done = self.read_next_page()
  334. if done:
  335. return True
  336. return False
  337. else:
  338. raise ValueError(f"unknown page type: {self.current_page_type}")
  339. cdef void process_byte_array_with_data(self, int offset, int length):
  340. cdef:
  341. Py_ssize_t j
  342. int s, k, m, jb, js, current_row
  343. int64_t lngt, start, ct
  344. const uint8_t[:] source
  345. int64_t[:] column_types
  346. int64_t[:] lengths
  347. int64_t[:] offsets
  348. uint8_t[:, :] byte_chunk
  349. object[:, :] string_chunk
  350. source = np.frombuffer(
  351. self.cached_page[offset:offset + length], dtype=np.uint8)
  352. if self.decompress != NULL and (length < self.row_length):
  353. source = self.decompress(self.row_length, source)
  354. current_row = self.current_row_in_chunk_index
  355. column_types = self.column_types
  356. lengths = self.lengths
  357. offsets = self.offsets
  358. byte_chunk = self.byte_chunk
  359. string_chunk = self.string_chunk
  360. s = 8 * self.current_row_in_chunk_index
  361. js = 0
  362. jb = 0
  363. for j in range(self.column_count):
  364. lngt = lengths[j]
  365. if lngt == 0:
  366. break
  367. start = offsets[j]
  368. ct = column_types[j]
  369. if ct == column_type_decimal:
  370. # decimal
  371. if self.is_little_endian:
  372. m = s + 8 - lngt
  373. else:
  374. m = s
  375. for k in range(lngt):
  376. byte_chunk[jb, m + k] = source[start + k]
  377. jb += 1
  378. elif column_types[j] == column_type_string:
  379. # string
  380. string_chunk[js, current_row] = np.array(source[start:(
  381. start + lngt)]).tobytes().rstrip(b"\x00 ")
  382. js += 1
  383. self.current_row_on_page_index += 1
  384. self.current_row_in_chunk_index += 1
  385. self.current_row_in_file_index += 1