"""This module provides ParaView equivalents to several ParFlow pftools algorithms; these are implemented using VTKPythonAlgorithmBase subclasses and demonstrate how to access both surface and subsurface computational grids within a single filter.""" from paraview.util.vtkAlgorithm import * import numpy as np class pftools: """This class defines some utilities to perform computations similar to those implemented by ParFlow's pftools suite.""" @staticmethod def dataPointExtent(dataset): """Return the parflow extents in (k,j,i) order of the *point* extents of this dataset, which is assumed to be structured. Note that VTK normally assumes data is (i,j,k) ordered.""" ext = tuple(np.flip(dataset.GetDimensions())) return ext @staticmethod def dataCellExtent(dataset): """Return the parflow extents in (k,j,i) order of the *cell* extents of this dataset, which is assumed to be structured. Note that (1) VTK normally uses point counts along axes rather than cell counts and (2) assumes data is (i,j,k) ordered.""" # Subtracting 1 from GetDimensions accounts for point-counts # vs cell-counts, while np.flip(...) switches the axis order: ext = tuple(np.flip(dataset.GetDimensions()) - 1) return ext @staticmethod def computeTopSurface(ext, mask): # I. Reshape the mask and create zk, a column of k-axis indices: mr = np.reshape(mask > 0, ext) zk = np.reshape(np.arange(ext[0]), (ext[0],1,1)) # II. The top surface at each (i,j) is the location # of the highest zk-value that is not masked: top = np.argmax(mr * zk, axis=0) # III. Mark completely-masked columns of cells with an # invalid "top" index of -1: # Columns in mask that had no valid value for mr * zk result # in entries of "top" that are 0. But 0 is also a valid k index # so we mark those which should be marked out with -1: mr = (mr == False) # Invert the mask to catch bad cells top = top - np.all(mr, axis=0) # Subtract 1 from masked column entries. return top @staticmethod def computeTopSurfaceIndices(top): """Convert the "top" surface matrix into an array of indices that only include valid top-surface cells. Any negative entry of "top" does not have its index included. The result is a matrix; each row corresponds to a top-surface cell and each column holds a cell's k-, j-, and i-indices, respectively. """ itop = np.array([(top[i,j], j, i) \ for i in range(top.shape[0]) \ for j in range(top.shape[1]) \ if top[i,j] >= 0]) return itop @staticmethod def computeSurfaceStorage(ext, itop, xx, pressure): # I. Compute the dx * dy area term using point coordinates from the mesh: xt = xx[itop[:,0], itop[:,1], itop[:,2]] dx = xx[itop[:,0], itop[:,1], itop[:,2] + 1] - xt dy = xx[itop[:,0], itop[:,1] + 1, itop[:,2]] - xt # Now dx and dy are matrices whose rows are vectors. # Compute the norms of the vectors. # dx = np.sqrt(np.sum(dx * dx,axis=1)) # dy = np.sqrt(np.sum(dy * dy,axis=1)) # To more closely match parflow: dx = dx[:,0] dy = dy[:,1] pp = np.reshape(pressure, ext) pt = pp[itop[:,0], itop[:,1], itop[:,2]] sus = pt * (pt > 0.0) * dx * dy return sus @staticmethod def computeSubsurfaceStorage(saturation, pressure, volume, porosity, specific_storage): """Compute the subsurface storage for the entire domain.""" import numpy as np sbs = saturation * volume * (porosity + pressure * specific_storage) return sbs @staticmethod def computeGroundwaterStorage(saturation, pressure, volume, porosity, specific_storage): """Compute the groundwater storage. This is the same calculation as subsurface storage, but only sums values over cells with a saturation of 1.0. """ import numpy as np sbs = (saturation == 1.0) * volume * (porosity + pressure * specific_storage) return sbs @staticmethod def computeSurfaceRunoff(top, xx, pressure, slope, mannings): """Compute surface runoff (water leaving the domain boundary or flowing into a masked area)""" def addToRunoff(runoff, sro, top, slope, pressure, mannings, delta): """Given a truthy 2-d array (idx) of places where runoff occurs, compute the runoff and add it to the total (sro). """ # We would like to do this: # sro += \ # runoff * np.sqrt(np.abs(slope)) / mannings * \ # np.power(ptop, 5.0/3.0) * delta # ... but we can't since it might result in NaN values where # runoff is False. Instead, only compute the runoff exactly # at locations where runoff is true: ww = np.where(runoff) sro[ww] += \ np.sqrt(np.abs(slope[ww])) / mannings[ww] * \ np.power(ptop[ww], 5.0/3.0) * delta[ww] # Initialize runoff: sro = np.zeros(top.shape) # Prepare indices and tempororary variables: sz = np.prod(top.shape) ext = (int(np.prod(pressure.shape)/sz),) + top.shape tk = np.reshape(top, sz) tj = np.floor(np.arange(sz)/top.shape[1]).astype(int) ti = np.arange(sz) % top.shape[1] tt = np.zeros(top.shape) # temporary array # Subset of pressure at the top surface: ptop = np.reshape(np.reshape(pressure, ext)[tk, tj, ti], top.shape) # Determine size of top-surface cells along north-south direction: delta = np.reshape((xx[tk, tj + 1, ti] - xx[tk, tj, ti])[:,1], top.shape) # Fill the temporary array with data shifted south by 1: np.concatenate((top[1:,:], -np.ones((1,top.shape[1]))), axis=0, out=tt) # Compute conditions for flow exiting to the north: cnorth = (top >= 0) & (tt < 0) & (slope[:,:,1] < 0) & (ptop > 0) # Now add values to per-cell runoff sro: addToRunoff(cnorth, sro, top, slope[:,:,1], pressure, mannings, delta) # Fill the temporary array with data shifted north by 1: np.concatenate((-np.ones((1,top.shape[1])), top[0:-1,:]), axis=0, out=tt) # Compute conditions for flow exiting to the south: csouth = (top >= 0) & (tt < 0) & (slope[:,:,1] > 0) & (ptop > 0) # Now add values to per-cell runoff sro: addToRunoff(csouth, sro, top, slope[:,:,1], pressure, mannings, delta) # Determine size of top-surface cells along east-west direction: delta = np.reshape((xx[tk, tj, ti + 1] - xx[tk, tj, ti])[:,0], top.shape) # Fill the temporary array with data shifted east by 1: np.concatenate((top[:,1:], -np.ones((top.shape[0], 1))), axis=1, out=tt) # Compute conditions for flow exiting to the west: cwest = (top >= 0) & (tt < 0) & (slope[:,:,0] < 0) & (ptop > 0) # Now add values to per-cell runoff sro: addToRunoff(cwest, sro, top, slope[:,:,0], pressure, mannings, delta) # Fill the temporary array with data shifted north by 1: np.concatenate((-np.ones((top.shape[0], 1)), top[:,0:-1]), axis=1, out=tt) # Compute conditions for flow exiting to the south: ceast = (top >= 0) & (tt < 0) & (slope[:,:,0] > 0) & (ptop > 0) # Now add values to per-cell runoff sro: addToRunoff(ceast, sro, top, slope[:,:,0], pressure, mannings, delta) return sro @staticmethod def computeWaterTableDepth(top, saturation, xx): """Compute the depth to the water table. The returned array is the distance along the z axis between the water surface and ground level. It should always be negative (i.e., it is a displacement on the z axis starting at the surface). """ sz = np.prod(top.shape) ext = (int(np.prod(saturation.shape)/sz),) + top.shape sr = np.reshape(saturation >= 1.0, ext) zk = np.reshape(np.arange(ext[0]), (ext[0],1,1)) # The top surface at each (i,j) is the location # of the highest zk-value that is not masked: depthIdx = np.argmax(sr * zk, axis=0) tk = np.reshape(top, sz) dk = np.reshape(depthIdx, sz) dj = np.floor(np.arange(sz)/top.shape[1]).astype(int) di = np.arange(sz) % top.shape[1] depth = xx[dk, dj, di, 2] - xx[tk, dj, di, 2] # Mark locations where the domain is masked or # no water table appears as NaN (not-a-number): sr = (sr == False) # Invert the saturation mask ww = np.where(np.reshape(np.all(sr, axis=0), sz)) depth[ww] = np.nan return depth class ParFlowAlgorithm(VTKPythonAlgorithmBase): """A base class for algorithms that operate on ParFlow simulation data. This class provides method implementations that simplify working with data from the vtkParFlowMetaReader. Specifically, 1. RequestUpdateExtent will ensure valid extents are passed to each of the filter's inputs based on their available whole extents. Without this, passing both the surface and subsurface meshes to a python filter would cause warnings as the default implementation simply mirrors the downstream request to each of its upstream filters. 2. RequestDataObject will create the same type of output as the input data objects so that either image data or structured grids (both of which may be output by the reader) can be passed through the filter. """ def FillInputPortInformation(self, port, info): info.Set(self.INPUT_REQUIRED_DATA_TYPE(), "vtkDataSet") return 1 def RequestDataObject(self, request, inInfoVec, outInfoVec): """Create a data object of the same type as the input.""" from vtkmodules.vtkCommonDataModel import vtkTable, vtkDataSet, vtkPolyData from vtkmodules import vtkCommonDataModel as dm input0 = vtkDataSet.GetData(inInfoVec[0], 0) opt = dm.vtkDataObject.GetData(outInfoVec) if opt and opt.IsA(input0.GetClassName()): return 1 opt = input0.NewInstance() outInfoVec.GetInformationObject(0).Set(dm.vtkDataObject.DATA_OBJECT(), opt) return 1 def RequestUpdateExtent(self, request, inInfoVec, outInfoVec): """Override the default algorithm for updating extents to handle the surface and subsurface models, which have different dimensionality. Take the requested (downstream) extent and intersect it with the available (upstream) extents; use that as the request rather than blindly copying the request upstream. """ from vtkmodules.vtkCommonExecutionModel import vtkStreamingDemandDrivenPipeline as sddp # Determine the port requesting an update from us: reqPort = request.Get(sddp.FROM_OUTPUT_PORT()) # Get that port's information and the extents it is requesting: outInfo = self.GetOutputInformation(reqPort) esrc = outInfo.Get(sddp.UPDATE_EXTENT()) # Slice the extents into tuples holding the lo and hi indices: losrc = esrc[0:6:2] hisrc = esrc[1:6:2] np = self.GetNumberOfInputPorts() for port in range(np): # Loop over all the input connections and set their update extents: nc = self.GetNumberOfInputConnections(port) for connection in range(nc): inInfo = self.GetInputInformation(port, connection) # Set UPDATE_EXTENT to be the intersection of the input # port's WHOLE_EXTENT with the output port's UPDATE_EXTENT. # # NB/FIXME: Really this is incorrect. If reqPort is 1, then # someone is asking for an update of the 2-d surface grid # and they could conceivably need the entire 3-d subsurface # grid that overlaps the surface grid in x and y... etgt = inInfo.Get(sddp.WHOLE_EXTENT()) lotgt = etgt[0:6:2] hitgt = etgt[1:6:2] lodst = [int(max(x)) for x in zip(losrc, lotgt)] hidst = [int(min(x)) for x in zip(hisrc, hitgt)] edst = [val for tup in zip(lodst, hidst) for val in tup] inInfo.Set(sddp.UPDATE_EXTENT(), tuple(edst), 6) return 1 @smproxy.filter(label='Subsurface Storage') @smhint.xml(""" """) @smproperty.input(name="Subsurface", port_index=0) @smdomain.datatype(dataTypes=["vtkDataSet"], composite_data_supported=False) class SubsurfaceStorage(ParFlowAlgorithm): """This algorithm computes the subsurface storage across the entire domain. The result is added as field data and marked as time-varying so it can be plotted as a timeseries. """ def __init__(self): VTKPythonAlgorithmBase.__init__(self, nInputPorts=1, nOutputPorts=1, outputType="vtkDataSet") def RequestData(self, request, inInfoVec, outInfoVec): from vtkmodules.vtkCommonDataModel import vtkTable, vtkDataSet, vtkPolyData from vtkmodules.vtkIOExodus import vtkExodusIIReader as e2r from vtkmodules.vtkFiltersVerdict import vtkMeshQuality as mq from vtkmodules.util.numpy_support import vtk_to_numpy as vton from vtkmodules.util.numpy_support import numpy_to_vtk as ntov import numpy as np ## Fetch the filter inputs and outputs: input0 = vtkDataSet.GetData(inInfoVec[0], 0) output = vtkDataSet.GetData(outInfoVec, 0) output.ShallowCopy(input0) ## Fetch input arrays cd = output.GetCellData() vmask = cd.GetArray('mask') vsaturation = cd.GetArray('saturation') vporosity = cd.GetArray('porosity') vpressure = cd.GetArray('pressure') vspecstor = cd.GetArray('specific storage') if vmask == None or vsaturation == None or \ vporosity == None or vpressure == None or \ vspecstor == None: print('Error: A required array was not present.') return 0 ## Compute the volume of each cell: mqf = mq() mqf.SetHexQualityMeasureToVolume() mqf.SetInputDataObject(0, output) mqf.Update() volume = vton(mqf.GetOutputDataObject(0).GetCellData().GetArray('Quality')) ## Get NumPy versions of each array so we can do arithmetic: mask = vton(vmask) saturation = vton(vsaturation) porosity = vton(vporosity) pressure = vton(vpressure) specstor = vton(vspecstor) ## Compute the subsurface storage as sbs ## and store it as field data. sbs = np.sum(pftools.computeSubsurfaceStorage( \ saturation, pressure, volume, porosity, specstor)) arr = ntov(sbs) arr.SetName('subsurface storage') arr.GetInformation().Set(e2r.GLOBAL_TEMPORAL_VARIABLE(), 1) output.GetFieldData().AddArray(arr) return 1 @smproxy.filter(label='Water Table Depth') @smhint.xml(""" """) @smproperty.input(name="Surface", port_index=1) @smdomain.datatype(dataTypes=["vtkDataSet"], composite_data_supported=False) @smproperty.input(name="Subsurface", port_index=0) @smdomain.datatype(dataTypes=["vtkDataSet"], composite_data_supported=False) class WaterTableDepth(ParFlowAlgorithm): """This algorithm computes the depth to the water table the domain surface. """ def __init__(self): VTKPythonAlgorithmBase.__init__(self, nInputPorts=2, nOutputPorts=1, outputType="vtkDataSet") def RequestInformation(self, request, inInfoVec, outInfoVec): """Tell ParaView our output extents match the surface, not the subsurface.""" from vtkmodules.vtkCommonExecutionModel import vtkStreamingDemandDrivenPipeline as sddp iin = inInfoVec[1].GetInformationObject(0) outInfoVec.GetInformationObject(0).Set(sddp.WHOLE_EXTENT(), iin.Get(sddp.WHOLE_EXTENT()), 6); return 1 def RequestData(self, request, inInfoVec, outInfoVec): from vtkmodules.vtkCommonDataModel import vtkTable, vtkDataSet, vtkPolyData from vtkmodules.vtkIOExodus import vtkExodusIIReader as e2r from vtkmodules.vtkFiltersVerdict import vtkMeshQuality as mq from vtkmodules.util.numpy_support import vtk_to_numpy as vton from vtkmodules.util.numpy_support import numpy_to_vtk as ntov import numpy as np ## Fetch the filter inputs and outputs: input0 = vtkDataSet.GetData(inInfoVec[0], 0) input1 = vtkDataSet.GetData(inInfoVec[1], 0) output = vtkDataSet.GetData(outInfoVec, 0) output.ShallowCopy(input1) ## Fetch input arrays cd = output.GetCellData() scd = input0.GetCellData() vmask = scd.GetArray('mask') vsaturation = scd.GetArray('saturation') if vmask == None or vsaturation == None: print('Error: A required array was not present.') return 0 ## Get NumPy versions of each array so we can do arithmetic: mask = vton(vmask) saturation = vton(vsaturation) ## Find the top surface ext = pftools.dataCellExtent(input0) top = pftools.computeTopSurface(ext, mask) ## Compute the water table depth storage as wtd ## and store it as cell data. ps = pftools.dataPointExtent(input0) + (3,) xx = np.reshape(vton(input0.GetPoints().GetData()), ps) wtd = pftools.computeWaterTableDepth(top, saturation, xx) arr = ntov(wtd) arr.SetName('water table depth') output.GetCellData().AddArray(arr) return 1 @smproxy.filter(label='Water Balance') @smhint.xml(""" """) @smproperty.input(name="Surface", port_index=1) @smdomain.datatype(dataTypes=["vtkDataSet"], composite_data_supported=False) @smproperty.input(name="Subsurface", port_index=0) @smdomain.datatype(dataTypes=["vtkDataSet"], composite_data_supported=False) class WaterBalance(ParFlowAlgorithm): """This algorithm computes the subsurface storage across the entire domain. The result is added as field data and marked as time-varying so it can be plotted as a timeseries. """ def __init__(self): VTKPythonAlgorithmBase.__init__(self, nInputPorts=2, nOutputPorts=1, outputType="vtkDataSet") def RequestData(self, request, inInfoVec, outInfoVec): from vtkmodules.vtkCommonDataModel import vtkTable, vtkDataSet, vtkPolyData from vtkmodules.vtkIOExodus import vtkExodusIIReader as e2r from vtkmodules.vtkFiltersVerdict import vtkMeshQuality as mq from vtkmodules.util.numpy_support import vtk_to_numpy as vton from vtkmodules.util.numpy_support import numpy_to_vtk as ntov import numpy as np ## Fetch the filter inputs and outputs: input0 = vtkDataSet.GetData(inInfoVec[0], 0) input1 = vtkDataSet.GetData(inInfoVec[1], 0) output = vtkDataSet.GetData(outInfoVec, 0) output.ShallowCopy(input0) ## Fetch input arrays cd = output.GetCellData() scd = input1.GetCellData() vmask = cd.GetArray('mask') vsaturation = cd.GetArray('saturation') vporosity = cd.GetArray('porosity') vpressure = cd.GetArray('pressure') vspecstor = cd.GetArray('specific storage') vslope = scd.GetArray('slope') vmannings = scd.GetArray('mannings') if vmask == None or vsaturation == None or \ vporosity == None or vpressure == None or \ vspecstor == None or vslope == None or \ vmannings == None: print('Error: A required array was not present.') return 0 ## Compute the volume of each cell: mqf = mq() mqf.SetHexQualityMeasureToVolume() mqf.SetInputDataObject(0, output) mqf.Update() volume = vton(mqf.GetOutputDataObject(0).GetCellData().GetArray('Quality')) ## Get NumPy versions of each array so we can do arithmetic: mask = vton(vmask) saturation = vton(vsaturation) porosity = vton(vporosity) pressure = vton(vpressure) specstor = vton(vspecstor) slope = vton(vslope) mannings = vton(vmannings) ## Compute the subsurface storage as sbs ## and store it as field data. sbs = np.sum(pftools.computeSubsurfaceStorage( \ saturation, pressure, volume, porosity, specstor)) arr = ntov(sbs) arr.SetName('subsurface storage') arr.GetInformation().Set(e2r.GLOBAL_TEMPORAL_VARIABLE(), 1) output.GetFieldData().AddArray(arr) ## Find the top surface ext = pftools.dataCellExtent(input0) top = pftools.computeTopSurface(ext, mask) itop = pftools.computeTopSurfaceIndices(top) ## Compute the surface storage ps = pftools.dataPointExtent(input0) + (3,) xx = np.reshape(vton(output.GetPoints().GetData()), ps) sus = np.sum(pftools.computeSurfaceStorage(ext, itop, xx, pressure)) arr = ntov(sus) arr.SetName('surface storage') arr.GetInformation().Set(e2r.GLOBAL_TEMPORAL_VARIABLE(), 1) output.GetFieldData().AddArray(arr) ## Compute the surface runoff slope = np.reshape(slope, top.shape + (2,)) mannings = np.reshape(mannings, top.shape) sro = np.sum(pftools.computeSurfaceRunoff(top, xx, pressure, slope, mannings)) arr = ntov(sro) arr.SetName('surface runoff') arr.GetInformation().Set(e2r.GLOBAL_TEMPORAL_VARIABLE(), 1) output.GetFieldData().AddArray(arr) return 1 if __name__ == "__main__": from paraview.detail.pythonalgorithm import get_plugin_xmls from xml.dom.minidom import parseString for xml in get_plugin_xmls(globals()): dom = parseString(xml) print(dom.toprettyxml(" ","\n"))