chunk.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from __future__ import annotations
  2. import math
  3. def calc_chunk_sizes(
  4. chunk_size: int | tuple[int, int] | None,
  5. chunk_count: int | tuple[int, int] | None,
  6. total_chunk_count: int | None,
  7. ny: int,
  8. nx: int,
  9. ) -> tuple[int, int]:
  10. """Calculate chunk sizes.
  11. Args:
  12. chunk_size (int or tuple(int, int), optional): Chunk size in (y, x) directions, or the same
  13. size in both directions if only one is specified. Cannot be negative.
  14. chunk_count (int or tuple(int, int), optional): Chunk count in (y, x) directions, or the
  15. same count in both directions if only one is specified. If less than 1, set to 1.
  16. total_chunk_count (int, optional): Total number of chunks. If less than 1, set to 1.
  17. ny (int): Number of grid points in y-direction.
  18. nx (int): Number of grid points in x-direction.
  19. Return:
  20. tuple(int, int): Chunk sizes (y_chunk_size, x_chunk_size).
  21. Note:
  22. Zero or one of ``chunk_size``, ``chunk_count`` and ``total_chunk_count`` should be
  23. specified.
  24. """
  25. if sum([chunk_size is not None, chunk_count is not None, total_chunk_count is not None]) > 1:
  26. raise ValueError("Only one of chunk_size, chunk_count and total_chunk_count should be set")
  27. if nx < 2 or ny < 2:
  28. raise ValueError(f"(ny, nx) must be at least (2, 2), not ({ny}, {nx})")
  29. if total_chunk_count is not None:
  30. max_chunk_count = (nx-1)*(ny-1)
  31. total_chunk_count = min(max(total_chunk_count, 1), max_chunk_count)
  32. if total_chunk_count == 1:
  33. chunk_size = 0
  34. elif total_chunk_count == max_chunk_count:
  35. chunk_size = (1, 1)
  36. else:
  37. factors = two_factors(total_chunk_count)
  38. if ny > nx:
  39. chunk_count = factors
  40. else:
  41. chunk_count = (factors[1], factors[0])
  42. if chunk_count is not None:
  43. if isinstance(chunk_count, tuple):
  44. y_chunk_count, x_chunk_count = chunk_count
  45. else:
  46. y_chunk_count = x_chunk_count = chunk_count
  47. x_chunk_count = min(max(x_chunk_count, 1), nx-1)
  48. y_chunk_count = min(max(y_chunk_count, 1), ny-1)
  49. chunk_size = (math.ceil((ny-1) / y_chunk_count), math.ceil((nx-1) / x_chunk_count))
  50. if chunk_size is None:
  51. y_chunk_size = x_chunk_size = 0
  52. elif isinstance(chunk_size, tuple):
  53. y_chunk_size, x_chunk_size = chunk_size
  54. else:
  55. y_chunk_size = x_chunk_size = chunk_size
  56. if x_chunk_size < 0 or y_chunk_size < 0:
  57. raise ValueError("chunk_size cannot be negative")
  58. return y_chunk_size, x_chunk_size
  59. def two_factors(n: int) -> tuple[int, int]:
  60. """Split an integer into two integer factors.
  61. The two factors will be as close as possible to the sqrt of n, and are returned in decreasing
  62. order. Worst case returns (n, 1).
  63. Args:
  64. n (int): The integer to factorize, must be positive.
  65. Return:
  66. tuple(int, int): The two factors of n, in decreasing order.
  67. """
  68. if n < 0:
  69. raise ValueError(f"two_factors expects positive integer not {n}")
  70. i = math.ceil(math.sqrt(n))
  71. while n % i != 0:
  72. i -= 1
  73. j = n // i
  74. if i > j:
  75. return i, j
  76. else:
  77. return j, i