spatiotemporalparallelism.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import sys
  2. import os
  3. import paraview
  4. import paraview.simple as pvsimple
  5. can_savecinema = True
  6. try:
  7. from vtkmodules.numpy_interface.algorithms import *
  8. from mpi4py import MPI
  9. except:
  10. can_savecinema = False
  11. paraview.options.batch = True # this may not be necessary
  12. paraview.simple._DisableFirstRenderCameraReset()
  13. def CreateTimeCompartments(globalController, timeCompartmentSize):
  14. if globalController.GetNumberOfProcesses() == 1:
  15. print ('single process')
  16. return
  17. elif globalController.GetNumberOfProcesses() % timeCompartmentSize != 0:
  18. print ('number of processes must be an integer multiple of time compartment size')
  19. return
  20. elif timeCompartmentSize == globalController.GetNumberOfProcesses():
  21. return globalController
  22. gid = globalController.GetLocalProcessId()
  23. timeCompartmentGroupId = int (gid / timeCompartmentSize )
  24. newController = globalController.PartitionController(timeCompartmentGroupId, gid % timeCompartmentSize)
  25. # must unregister if the reference count is greater than 1
  26. if newController.GetReferenceCount() > 1:
  27. newController.UnRegister(None)
  28. #print (gid, ' of global comm is ', newController.GetLocalProcessId())
  29. globalController.SetGlobalController(newController)
  30. return newController
  31. def CheckReader(reader):
  32. if hasattr(reader, "FileName") == False:
  33. print ("ERROR: Don't know how to set file name for ", reader.SMProxy.GetXMLName())
  34. sys.exit(-1)
  35. if hasattr(reader, "TimestepValues") == False:
  36. print ("ERROR: ", reader.SMProxy.GetXMLName(), " doesn't have time information")
  37. sys.exit(-1)
  38. def CreateControllers(timeCompartmentSize):
  39. pm = paraview.servermanager.vtkProcessModule.GetProcessModule()
  40. globalController = pm.GetGlobalController()
  41. if timeCompartmentSize > globalController.GetNumberOfProcesses():
  42. timeCompartmentSize = globalController.GetNumberOfProcesses()
  43. temporalController = CreateTimeCompartments(globalController, timeCompartmentSize)
  44. return globalController, temporalController, timeCompartmentSize
  45. def WriteImages(currentTimeStep, currentTime, views):
  46. cinemaLines = []
  47. cnt = 0
  48. for view in views:
  49. filename = view.tpFileName.replace("%t", str(currentTimeStep))
  50. view.ViewTime = currentTime
  51. cinemaLines.append(str(currentTime) + "," + view.GetXMLName()+"_"+str(cnt) + "," + filename + "\n")
  52. pvsimple.WriteImage(filename, view, Magnification=view.tpMagnification)
  53. cnt = cnt + 1
  54. return cinemaLines
  55. def WriteFiles(currentTimeStep, currentTime, writers):
  56. cinemaLines = []
  57. cnt = 0
  58. for writer in writers:
  59. originalfilename = writer.FileName
  60. fname = originalfilename.replace("%t", str(currentTimeStep))
  61. writer.FileName = fname
  62. cinemaLines.append(str(currentTime) + "," + writer.GetXMLName()+"_"+str(cnt) + "," + fname + "\n")
  63. writer.UpdatePipeline(currentTime)
  64. writer.FileName = originalfilename
  65. cnt = cnt + 1
  66. return cinemaLines
  67. def IterateOverTimeSteps(globalController, timeCompartmentSize, timeSteps, writers, views, make_cinema_table=False):
  68. if make_cinema_table and not can_savecinema:
  69. print ("WARNING: Can not save cinema table because MPI4PY is not available.")
  70. make_cinema_table = False
  71. numProcs = globalController.GetNumberOfProcesses()
  72. numTimeCompartments = numProcs//timeCompartmentSize
  73. tpp = len(timeSteps)/numTimeCompartments
  74. remainder = len(timeSteps)%numTimeCompartments
  75. timeCompartmentIndex = int(globalController.GetLocalProcessId()/timeCompartmentSize)
  76. myStartTimeStep = tpp*timeCompartmentIndex
  77. myEndTimeStep = myStartTimeStep+tpp
  78. if timeCompartmentIndex < remainder:
  79. myStartTimeStep = myStartTimeStep+timeCompartmentIndex
  80. myEndTimeStep = myStartTimeStep+tpp+1
  81. else:
  82. myStartTimeStep = myStartTimeStep+remainder
  83. myEndTimeStep = myStartTimeStep+tpp
  84. myStartTimeStep = int(myStartTimeStep)
  85. myEndTimeStep = int(myEndTimeStep)
  86. cinemaLines = []
  87. for currentTimeStep in range(myStartTimeStep,myEndTimeStep):
  88. #print (globalController.GetLocalProcessId(), " is working on ", currentTimeStep)
  89. ret = WriteImages(currentTimeStep, timeSteps[currentTimeStep], views)
  90. if ret:
  91. cinemaLines.extend(ret)
  92. ret = WriteFiles(currentTimeStep, timeSteps[currentTimeStep], writers)
  93. if ret:
  94. cinemaLines.extend(ret)
  95. if make_cinema_table:
  96. # gather the file list from each time compartment to the root and save it
  97. myGID = globalController.GetLocalProcessId()
  98. myLID = int(myGID) % int(timeCompartmentSize)
  99. mystring = ''
  100. # only one node per time compartment should participate
  101. if myLID == 0:
  102. mystring = ''.join(x for x in cinemaLines)
  103. mylen = len(mystring)
  104. result = ''
  105. comm = vtkMPI4PyCommunicator.ConvertToPython(globalController.GetCommunicator())
  106. gathered_lines = comm.gather(mystring, root=0)
  107. if myGID == 0:
  108. # root writes the file, prepending header
  109. f = open("data.csv", "w")
  110. f.write("timestep,producer,FILE\n")
  111. for x in gathered_lines:
  112. if x:
  113. f.write(x)
  114. f.close()
  115. def CreateReader(ctor, fileInfo, **kwargs):
  116. "Creates a reader, checks if it can be used, and sets the filenames"
  117. import glob
  118. files = glob.glob(fileInfo)
  119. files.sort() # assume there is a logical ordering of the filenames that corresponds to time ordering
  120. reader = paraview.simple.OpenDataFile(files)
  121. CheckReader(reader)
  122. if kwargs:
  123. pvsimple.SetProperties(reader, **kwargs)
  124. return reader
  125. def CreateWriter(ctor, filename, tp_writers):
  126. writer = ctor()
  127. return RegisterWriter(writer, filename, tp_writers)
  128. def RegisterWriter(writer, filename, tp_writers):
  129. writer.FileName = filename
  130. tp_writers.append(writer)
  131. return writer
  132. def CreateView(proxy_ctor, filename, magnification, width, height, tp_views):
  133. view = proxy_ctor()
  134. return RegisterView(view, filename, magnification, width, height, tp_views)
  135. def RegisterView(view, filename='filename_%t.vti', magnification=1.0, width=1024, height=1024, tp_views=[]):
  136. view.add_attribute("tpFileName", filename)
  137. view.add_attribute("tpMagnification", magnification)
  138. tp_views.append(view)
  139. view.ViewSize = [width, height]
  140. return view