|
- r"""paraviewweb_protocols is a module that contains a set of ParaViewWeb related
- protocols that can be combined together to provide a flexible way to define
- very specific web application.
- """
- import base64
- import fnmatch
- import json
- import os
- import re
- import sys
- import time
- # import paraview modules.
- import paraview
- from paraview import servermanager, simple
- from paraview.servermanager import (
- InputProperty,
- ProxyProperty,
- vtkPVRenderView,
- vtkSMPVRepresentationProxy,
- vtkSMTransferFunctionManager,
- vtkSMTransferFunctionProxy,
- )
- from paraview.web import helper
- from paraview.web.decorators import (
- arrayListDomainDecorator,
- arraySelectionDomainDecorator,
- booleanDomainDecorator,
- clipScalarDecorator,
- enumerationDomainDecorator,
- genericDecorator,
- multiLineDecorator,
- numberRangeDomainDecorator,
- proxyEditorPropertyWidgetDecorator,
- proxyListDomainDecorator,
- stringListDomainDecorator,
- treeDomainDecorator,
- )
- from vtkmodules.vtkCommonCore import vtkCollection, vtkUnsignedCharArray
- from vtkmodules.vtkCommonDataModel import vtkDataObject, vtkImageData
- from vtkmodules.vtkWebCore import vtkDataEncoder, vtkWebInteractionEvent
- from vtkmodules.web import iteritems
- from vtkmodules.web import protocols as vtk_protocols
- from vtkmodules.web.render_window_serializer import (
- SynchronizationContext,
- getReferenceId,
- initializeSerializers,
- serializeInstance,
- )
- from wslink import schedule_callback
- # import RPC annotation
- from wslink import register as exportRpc
- if sys.version_info >= (3,):
- xrange = range
- # =============================================================================
- # Helper methods
- # =============================================================================
- def tryint(s):
- try:
- return int(s)
- except:
- return s
- def alphanum_key(s):
- """Turn a string into a list of string and number chunks.
- "z23a" -> ["z", 23, "a"]
- """
- return [tryint(c) for c in re.split("([0-9]+)", s)]
- def sanitizeKeys(mapObj):
- output = {}
- for key in mapObj:
- sanitizeKey = servermanager._make_name_valid(key)
- output[sanitizeKey] = mapObj[key]
- return output
- # =============================================================================
- #
- # Base class for any ParaView based protocol
- #
- # =============================================================================
- class ParaViewWebProtocol(vtk_protocols.vtkWebProtocol):
- def __init__(self):
- # self.Application = None
- self.coreServer = None
- self.multiRoot = False
- self.baseDirectory = ""
- self.baseDirectoryMap = {}
- def mapIdToProxy(self, id):
- """
- Maps global-id for a proxy to the proxy instance. May return None if the
- id is not valid.
- """
- try:
- id = int(id)
- except:
- return None
- if id <= 0:
- return None
- return simple.servermanager._getPyProxy(
- simple.servermanager.ActiveConnection.Session.GetRemoteObject(id)
- )
- def getView(self, vid):
- """
- Returns the view for a given view ID, if vid is None then return the
- current active view.
- :param vid: The view ID
- :type vid: str
- """
- view = self.mapIdToProxy(vid)
- if not view:
- # Use active view is none provided.
- view = simple.GetActiveView()
- if not view:
- raise Exception("no view provided: " + str(vid))
- return view
- def debug(self, msg):
- if self.debugMode == True:
- print(msg)
- def setBaseDirectory(self, basePath):
- self.overrideDataDirKey = None
- self.baseDirectory = ""
- self.baseDirectoryMap = {}
- self.multiRoot = False
- if basePath.find("|") < 0:
- if basePath.find("=") >= 0:
- basePair = basePath.split("=")
- if os.path.exists(basePair[1]):
- self.baseDirectory = basePair[1]
- self.overrideDataDirKey = basePair[0]
- else:
- self.baseDirectory = basePath
- self.baseDirectory = os.path.normpath(self.baseDirectory)
- else:
- baseDirs = basePath.split("|")
- for baseDir in baseDirs:
- basePair = baseDir.split("=")
- if os.path.exists(basePair[1]):
- self.baseDirectoryMap[basePair[0]] = os.path.normpath(basePair[1])
- # Check if we ended up with just a single directory
- bdKeys = list(self.baseDirectoryMap)
- if len(bdKeys) == 1:
- self.baseDirectory = os.path.normpath(self.baseDirectoryMap[bdKeys[0]])
- self.overrideDataDirKey = bdKeys[0]
- self.baseDirectoryMap = {}
- elif len(bdKeys) > 1:
- self.multiRoot = True
- def getAbsolutePath(self, relativePath):
- absolutePath = None
- if self.multiRoot == True:
- relPathParts = relativePath.replace("\\", "/").split("/")
- realBasePath = self.baseDirectoryMap[relPathParts[0]]
- absolutePath = os.path.join(realBasePath, *relPathParts[1:])
- else:
- absolutePath = os.path.join(self.baseDirectory, relativePath)
- cleanedPath = os.path.normpath(absolutePath)
- # Make sure the cleanedPath is part of the allowed ones
- if self.multiRoot:
- for key, value in iteritems(self.baseDirectoryMap):
- if cleanedPath.startswith(value):
- return cleanedPath
- elif cleanedPath.startswith(self.baseDirectory):
- return cleanedPath
- return None
- def updateScalarBars(self, view=None, mode=1):
- """
- Manage scalarbar state
- view:
- A view proxy or the current active view will be used.
- mode:
- HIDE_UNUSED_SCALAR_BARS = 0x01,
- SHOW_USED_SCALAR_BARS = 0x02
- """
- v = view or self.getView(-1)
- lutMgr = vtkSMTransferFunctionManager()
- lutMgr.UpdateScalarBars(v.SMProxy, mode)
- def publish(self, topic, event):
- if self.coreServer:
- self.coreServer.publish(topic, event)
- # =============================================================================
- #
- # Handle Mouse interaction on any type of view
- #
- # =============================================================================
- class ParaViewWebMouseHandler(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- super(ParaViewWebMouseHandler, self).__init__()
- self.lastAction = "up"
- # RpcName: mouseInteraction => viewport.mouse.interaction
- @exportRpc("viewport.mouse.interaction")
- def mouseInteraction(self, event):
- """
- RPC Callback for mouse interactions.
- """
- view = self.getView(event["view"])
- realViewId = view.GetGlobalIDAsString()
- if hasattr(view, "UseInteractiveRenderingForScreenshots"):
- if event["action"] == "down":
- view.UseInteractiveRenderingForScreenshots = 1
- elif event["action"] == "up":
- view.UseInteractiveRenderingForScreenshots = 0
- buttons = 0
- if event["buttonLeft"]:
- buttons |= vtkWebInteractionEvent.LEFT_BUTTON
- if event["buttonMiddle"]:
- buttons |= vtkWebInteractionEvent.MIDDLE_BUTTON
- if event["buttonRight"]:
- buttons |= vtkWebInteractionEvent.RIGHT_BUTTON
- modifiers = 0
- if event["shiftKey"]:
- modifiers |= vtkWebInteractionEvent.SHIFT_KEY
- if event["ctrlKey"]:
- modifiers |= vtkWebInteractionEvent.CTRL_KEY
- if event["altKey"]:
- modifiers |= vtkWebInteractionEvent.ALT_KEY
- if event["metaKey"]:
- modifiers |= vtkWebInteractionEvent.META_KEY
- pvevent = vtkWebInteractionEvent()
- pvevent.SetButtons(buttons)
- pvevent.SetModifiers(modifiers)
- pvevent.SetX(event["x"])
- pvevent.SetY(event["y"])
- # pvevent.SetKeyCode(event["charCode"])
- retVal = self.getApplication().HandleInteractionEvent(view.SMProxy, pvevent)
- del pvevent
- if event["action"] == "down" and self.lastAction != event["action"]:
- self.getApplication().InvokeEvent("StartInteractionEvent")
- if event["action"] == "up" and self.lastAction != event["action"]:
- self.getApplication().InvokeEvent("EndInteractionEvent")
- if retVal:
- self.getApplication().InvokeEvent("UpdateEvent")
- self.lastAction = event["action"]
- return retVal
- # =============================================================================
- #
- # Basic 3D Viewport API (Camera + Orientation + CenterOfRotation
- #
- # =============================================================================
- class ParaViewWebViewPort(ParaViewWebProtocol):
- def __init__(self, scale=1.0, maxWidth=2560, maxHeight=1440, **kwargs):
- super(ParaViewWebViewPort, self).__init__()
- self.scale = scale
- self.maxWidth = maxWidth
- self.maxHeight = maxHeight
- # RpcName: resetCamera => viewport.camera.reset
- @exportRpc("viewport.camera.reset")
- def resetCamera(self, viewId):
- """
- RPC callback to reset camera.
- """
- view = self.getView(viewId)
- simple.Render(view)
- simple.ResetCamera(view)
- try:
- view.CenterOfRotation = view.CameraFocalPoint
- except:
- pass
- self.getApplication().InvalidateCache(view.SMProxy)
- self.getApplication().InvokeEvent("UpdateEvent")
- return view.GetGlobalIDAsString()
- # RpcName: updateOrientationAxesVisibility => viewport.axes.orientation.visibility.update
- @exportRpc("viewport.axes.orientation.visibility.update")
- def updateOrientationAxesVisibility(self, viewId, showAxis):
- """
- RPC callback to show/hide OrientationAxis.
- """
- view = self.getView(viewId)
- view.OrientationAxesVisibility = showAxis if 1 else 0
- self.getApplication().InvalidateCache(view.SMProxy)
- self.getApplication().InvokeEvent("UpdateEvent")
- return view.GetGlobalIDAsString()
- # RpcName: updateCenterAxesVisibility => viewport.axes.center.visibility.update
- @exportRpc("viewport.axes.center.visibility.update")
- def updateCenterAxesVisibility(self, viewId, showAxis):
- """
- RPC callback to show/hide CenterAxesVisibility.
- """
- view = self.getView(viewId)
- view.CenterAxesVisibility = showAxis if 1 else 0
- self.getApplication().InvalidateCache(view.SMProxy)
- self.getApplication().InvokeEvent("UpdateEvent")
- return view.GetGlobalIDAsString()
- # RpcName: updateCamera => viewport.camera.update
- @exportRpc("viewport.camera.update")
- def updateCamera(self, view_id, focal_point, view_up, position, forceUpdate=True):
- view = self.getView(view_id)
- view.CameraFocalPoint = focal_point
- view.CameraViewUp = view_up
- view.CameraPosition = position
- if forceUpdate:
- self.getApplication().InvalidateCache(view.SMProxy)
- self.getApplication().InvokeEvent("UpdateEvent")
- @exportRpc("viewport.camera.get")
- def getCamera(self, view_id):
- view = self.getView(view_id)
- bounds = [-1, 1, -1, 1, -1, 1]
- if view and view.GetClientSideView().GetClassName() == "vtkPVRenderView":
- rr = view.GetClientSideView().GetRenderer()
- bounds = rr.ComputeVisiblePropBounds()
- return {
- "bounds": bounds,
- "center": list(view.CenterOfRotation),
- "focal": list(view.CameraFocalPoint),
- "up": list(view.CameraViewUp),
- "position": list(view.CameraPosition),
- }
- @exportRpc("viewport.size.update")
- def updateSize(self, view_id, width, height):
- view = self.getView(view_id)
- w = width * self.scale
- h = height * self.scale
- if w > self.maxWidth:
- s = float(self.maxWidth) / float(w)
- w *= s
- h *= s
- elif h > self.maxHeight:
- s = float(self.maxHeight) / float(h)
- w *= s
- h *= s
- view.ViewSize = [int(w), int(h)]
- self.getApplication().InvokeEvent("UpdateEvent")
- # =============================================================================
- #
- # Provide Image delivery mechanism
- #
- # =============================================================================
- class ParaViewWebViewPortImageDelivery(ParaViewWebProtocol):
- # RpcName: stillRender => viewport.image.render
- @exportRpc("viewport.image.render")
- def stillRender(self, options):
- """
- RPC Callback to render a view and obtain the rendered image.
- """
- beginTime = int(round(time.time() * 1000))
- view = self.getView(options["view"])
- size = view.ViewSize[0:2]
- resize = size != options.get("size", size)
- if resize:
- size = options["size"]
- view.ViewSize = size
- t = 0
- if options and "mtime" in options:
- t = options["mtime"]
- quality = 100
- if options and "quality" in options:
- quality = options["quality"]
- localTime = 0
- if options and "localTime" in options:
- localTime = options["localTime"]
- reply = {}
- app = self.getApplication()
- reply["image"] = app.StillRenderToString(view.SMProxy, t, quality)
- # Check that we are getting image size we have set if not wait until we
- # do.
- tries = 10
- while (
- resize
- and list(app.GetLastStillRenderImageSize()) != size
- and size != [0, 0]
- and tries > 0
- ):
- app.InvalidateCache(view.SMProxy)
- reply["image"] = app.StillRenderToString(view.SMProxy, t, quality)
- tries -= 1
- if (
- not resize
- and options
- and ("clearCache" in options)
- and options["clearCache"]
- ):
- app.InvalidateCache(view.SMProxy)
- reply["image"] = app.StillRenderToString(view.SMProxy, t, quality)
- reply["stale"] = app.GetHasImagesBeingProcessed(view.SMProxy)
- reply["mtime"] = app.GetLastStillRenderToMTime()
- reply["size"] = view.ViewSize[0:2]
- reply["format"] = "jpeg;base64"
- reply["global_id"] = view.GetGlobalIDAsString()
- reply["localTime"] = localTime
- endTime = int(round(time.time() * 1000))
- reply["workTime"] = endTime - beginTime
- return reply
- # =============================================================================
- #
- # Provide Image publish-based delivery mechanism
- #
- # =============================================================================
- CAMERA_PROP_NAMES = [
- "CameraFocalPoint",
- "CameraParallelProjection",
- "CameraParallelScale",
- "CameraPosition",
- "CameraViewAngle",
- "CameraViewUp",
- ]
- def _pushCameraLink(viewSrc, viewDstList):
- props = {}
- for name in CAMERA_PROP_NAMES:
- props[name] = getattr(viewSrc, name)
- for v in viewDstList:
- for name in CAMERA_PROP_NAMES:
- v.__setattr__(name, props[name])
- return props
- class ParaViewWebPublishImageDelivery(ParaViewWebProtocol):
- def __init__(self, decode=True, **kwargs):
- ParaViewWebProtocol.__init__(self)
- self.trackingViews = {}
- self.lastStaleTime = {}
- self.staleHandlerCount = {}
- self.deltaStaleTimeBeforeRender = 0.1 # 0.1s
- self.staleCountLimit = 10
- self.decode = decode
- self.viewsInAnimations = []
- self.targetFrameRate = 30.0
- self.minFrameRate = 12.0
- self.maxFrameRate = 30.0
- # Camera link handling
- self.linkedViews = []
- self.linkNames = []
- self.onLinkChange = None
- # Mouse handling
- self.lastAction = "up"
- self.activeViewId = None
- # In case some external protocol wants to monitor when link views change
- def setLinkChangeCallback(self, fn):
- self.onLinkChange = fn
- def pushRender(self, vId, ignoreAnimation=False, staleCount=0):
- if vId not in self.trackingViews:
- return
- if not self.trackingViews[vId]["enabled"]:
- return
- if not ignoreAnimation and len(self.viewsInAnimations) > 0:
- return
- if "originalSize" not in self.trackingViews[vId]:
- view = self.getView(vId)
- self.trackingViews[vId]["originalSize"] = (
- int(view.ViewSize[0]),
- int(view.ViewSize[1]),
- )
- if "ratio" not in self.trackingViews[vId]:
- self.trackingViews[vId]["ratio"] = 1
- ratio = self.trackingViews[vId]["ratio"]
- mtime = self.trackingViews[vId]["mtime"]
- quality = self.trackingViews[vId]["quality"]
- size = [int(s * ratio) for s in self.trackingViews[vId]["originalSize"]]
- reply = self.stillRender(
- {"view": vId, "mtime": mtime, "quality": quality, "size": size}
- )
- # View might have been deleted
- if not reply:
- return
- stale = reply["stale"]
- if reply["image"]:
- # depending on whether the app has encoding enabled:
- if self.decode:
- reply["image"] = base64.standard_b64decode(reply["image"])
- reply["image"] = self.addAttachment(reply["image"])
- reply["format"] = "jpeg"
- # save mtime for next call.
- self.trackingViews[vId]["mtime"] = reply["mtime"]
- # echo back real ID, instead of -1 for 'active'
- reply["id"] = vId
- self.publish("viewport.image.push.subscription", reply)
- if stale:
- self.lastStaleTime[vId] = time.time()
- if self.staleHandlerCount[vId] == 0:
- self.staleHandlerCount[vId] += 1
- schedule_callback(
- self.deltaStaleTimeBeforeRender,
- lambda: self.renderStaleImage(vId, staleCount),
- )
- else:
- self.lastStaleTime[vId] = 0
- def renderStaleImage(self, vId, staleCount=0):
- if vId in self.staleHandlerCount and self.staleHandlerCount[vId] > 0:
- self.staleHandlerCount[vId] -= 1
- if self.lastStaleTime[vId] != 0:
- delta = time.time() - self.lastStaleTime[vId]
- # Break on staleCount otherwise linked view will always report to be stale
- # And loop forever
- if (
- delta >= (self.deltaStaleTimeBeforeRender * (staleCount + 1))
- and staleCount < self.staleCountLimit
- ):
- self.pushRender(vId, False, staleCount + 1)
- elif delta < self.deltaStaleTimeBeforeRender:
- self.staleHandlerCount[vId] += 1
- schedule_callback(
- self.deltaStaleTimeBeforeRender - delta + 0.001,
- lambda: self.renderStaleImage(vId, staleCount),
- )
- def animate(self, renderAllViews=True):
- if len(self.viewsInAnimations) == 0:
- return
- nextAnimateTime = time.time() + 1.0 / self.targetFrameRate
- # Handle the rendering of the views
- if self.activeViewId:
- self.pushRender(self.activeViewId, True)
- if renderAllViews:
- for vId in set(self.viewsInAnimations):
- if vId != self.activeViewId:
- self.pushRender(vId, True)
- nextAnimateTime -= time.time()
- if self.targetFrameRate > self.maxFrameRate:
- self.targetFrameRate = self.maxFrameRate
- if nextAnimateTime < 0:
- if nextAnimateTime < -1.0:
- self.targetFrameRate = 1
- if self.targetFrameRate > self.minFrameRate:
- self.targetFrameRate -= 1.0
- if self.activeViewId:
- # If active view, prioritize that one over the others
- # -> Divide by 2 the refresh rate of the other views
- schedule_callback(0.001, lambda: self.animate(not renderAllViews))
- else:
- # Keep animating at the best rate we can
- schedule_callback(0.001, lambda: self.animate())
- else:
- # We have time so let's render all
- if self.targetFrameRate < self.maxFrameRate and nextAnimateTime > 0.005:
- self.targetFrameRate += 1.0
- schedule_callback(nextAnimateTime, lambda: self.animate())
- @exportRpc("viewport.image.animation.fps.max")
- def setMaxFrameRate(self, fps=30):
- self.maxFrameRate = fps
- @exportRpc("viewport.image.animation.fps.get")
- def getCurrentFrameRate(self):
- return self.targetFrameRate
- @exportRpc("viewport.image.animation.start")
- def startViewAnimation(self, viewId="-1"):
- sView = self.getView(viewId)
- realViewId = sView.GetGlobalIDAsString()
- self.viewsInAnimations.append(realViewId)
- if len(self.viewsInAnimations) == 1:
- self.animate()
- @exportRpc("viewport.image.animation.stop")
- def stopViewAnimation(self, viewId="-1"):
- sView = self.getView(viewId)
- realViewId = sView.GetGlobalIDAsString()
- if realViewId in self.viewsInAnimations and realViewId in self.trackingViews:
- progressRendering = self.trackingViews[realViewId]["streaming"]
- self.viewsInAnimations.remove(realViewId)
- if progressRendering:
- self.progressiveRender(realViewId)
- def progressiveRender(self, viewId="-1"):
- sView = self.getView(viewId)
- realViewId = sView.GetGlobalIDAsString()
- if realViewId in self.viewsInAnimations:
- return
- if sView.GetSession().GetPendingProgress():
- schedule_callback(
- self.deltaStaleTimeBeforeRender, lambda: self.progressiveRender(viewId)
- )
- else:
- again = sView.StreamingUpdate(True)
- self.pushRender(realViewId, True)
- if again:
- schedule_callback(0.001, lambda: self.progressiveRender(viewId))
- @exportRpc("viewport.image.push")
- def imagePush(self, options):
- view = self.getView(options["view"])
- viewId = view.GetGlobalIDAsString()
- # Make sure an image is pushed
- self.getApplication().InvalidateCache(view.SMProxy)
- self.pushRender(viewId)
- # Internal function since the reply[image] is not
- # JSON(serializable) it can not be an RPC one
- def stillRender(self, options):
- """
- RPC Callback to render a view and obtain the rendered image.
- """
- beginTime = int(round(time.time() * 1000))
- viewId = str(options["view"])
- view = self.getView(viewId)
- # If no view id provided, skip rendering
- if not viewId:
- print("No view")
- print(options)
- return None
- # Make sure request match our selected view
- if viewId != "-1" and view.GetGlobalIDAsString() != viewId:
- # We got active view rather than our request
- view = None
- # No view to render => need some cleanup
- if not view:
- # The view has been deleted, we can not render it...
- # Clean up old view state
- if viewId in self.viewsInAnimations:
- self.viewsInAnimations.remove(viewId)
- if viewId in self.trackingViews:
- del self.trackingViews[viewId]
- if viewId in self.staleHandlerCount:
- del self.staleHandlerCount[viewId]
- # the view does not exist anymore, skip rendering
- return None
- # We are in business to render our view...
- # Make sure our view size match our request
- size = view.ViewSize[0:2]
- resize = size != options.get("size", size)
- if resize:
- size = options["size"]
- if size[0] > 10 and size[1] > 10:
- view.ViewSize = size
- # Rendering options
- t = 0
- if options and "mtime" in options:
- t = options["mtime"]
- quality = 100
- if options and "quality" in options:
- quality = options["quality"]
- localTime = 0
- if options and "localTime" in options:
- localTime = options["localTime"]
- reply = {}
- app = self.getApplication()
- if t == 0:
- app.InvalidateCache(view.SMProxy)
- if self.decode:
- stillRender = app.StillRenderToString
- else:
- stillRender = app.StillRenderToBuffer
- reply_image = stillRender(view.SMProxy, t, quality)
- # Check that we are getting image size we have set if not wait until we
- # do. The render call will set the actual window size.
- tries = 10
- while (
- resize
- and list(app.GetLastStillRenderImageSize()) != size
- and size != [0, 0]
- and tries > 0
- ):
- app.InvalidateCache(view.SMProxy)
- reply_image = stillRender(view.SMProxy, t, quality)
- tries -= 1
- if (
- not resize
- and options
- and ("clearCache" in options)
- and options["clearCache"]
- ):
- app.InvalidateCache(view.SMProxy)
- reply_image = stillRender(view.SMProxy, t, quality)
- # Pack the result
- reply["stale"] = app.GetHasImagesBeingProcessed(view.SMProxy)
- reply["mtime"] = app.GetLastStillRenderToMTime()
- reply["size"] = view.ViewSize[0:2]
- reply["memsize"] = reply_image.GetDataSize() if reply_image else 0
- reply["format"] = "jpeg;base64" if self.decode else "jpeg"
- reply["global_id"] = view.GetGlobalIDAsString()
- reply["localTime"] = localTime
- if self.decode:
- reply["image"] = reply_image
- else:
- # Convert the vtkUnsignedCharArray into a bytes object, required by Autobahn websockets
- reply["image"] = memoryview(reply_image).tobytes() if reply_image else None
- endTime = int(round(time.time() * 1000))
- reply["workTime"] = endTime - beginTime
- return reply
- @exportRpc("viewport.image.push.observer.add")
- def addRenderObserver(self, viewId):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- if not realViewId in self.trackingViews:
- observerCallback = lambda *args, **kwargs: self.pushRender(realViewId)
- startCallback = lambda *args, **kwargs: self.startViewAnimation(realViewId)
- stopCallback = lambda *args, **kwargs: self.stopViewAnimation(realViewId)
- tag = self.getApplication().AddObserver("UpdateEvent", observerCallback)
- tagStart = self.getApplication().AddObserver(
- "StartInteractionEvent", startCallback
- )
- tagStop = self.getApplication().AddObserver(
- "EndInteractionEvent", stopCallback
- )
- # TODO do we need self.getApplication().AddObserver('ResetActiveView', resetActiveView())
- self.trackingViews[realViewId] = {
- "tags": [tag, tagStart, tagStop],
- "observerCount": 1,
- "mtime": 0,
- "enabled": True,
- "quality": 100,
- "streaming": sView.GetClientSideObject().GetEnableStreaming(),
- }
- self.staleHandlerCount[realViewId] = 0
- else:
- # There is an observer on this view already
- self.trackingViews[realViewId]["observerCount"] += 1
- self.pushRender(realViewId)
- return {"success": True, "viewId": realViewId}
- @exportRpc("viewport.image.push.observer.remove")
- def removeRenderObserver(self, viewId):
- sView = None
- try:
- sView = self.getView(viewId)
- except:
- print("no view with ID %s available in removeRenderObserver" % viewId)
- realViewId = sView.GetGlobalIDAsString() if sView else viewId
- observerInfo = None
- if realViewId in self.trackingViews:
- observerInfo = self.trackingViews[realViewId]
- if not observerInfo:
- return {"error": "Unable to find subscription for view %s" % realViewId}
- observerInfo["observerCount"] -= 1
- if observerInfo["observerCount"] <= 0:
- for tag in observerInfo["tags"]:
- self.getApplication().RemoveObserver(tag)
- del self.trackingViews[realViewId]
- del self.staleHandlerCount[realViewId]
- return {"result": "success"}
- @exportRpc("viewport.image.push.quality")
- def setViewQuality(self, viewId, quality, ratio=1, updateLinkedView=True):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- observerInfo = None
- if realViewId in self.trackingViews:
- observerInfo = self.trackingViews[realViewId]
- if not observerInfo:
- return {"error": "Unable to find subscription for view %s" % realViewId}
- observerInfo["quality"] = quality
- observerInfo["ratio"] = ratio
- # Handle linked view quality/ratio synch
- if updateLinkedView and realViewId in self.linkedViews:
- for vid in self.linkedViews:
- self.setViewQuality(vid, quality, ratio, False)
- # Update image size right now!
- if "originalSize" in self.trackingViews[realViewId]:
- size = [
- int(s * ratio) for s in self.trackingViews[realViewId]["originalSize"]
- ]
- if "SetSize" in sView:
- sView.SetSize(size)
- else:
- sView.ViewSize = size
- return {"result": "success"}
- @exportRpc("viewport.image.push.original.size")
- def setViewSize(self, viewId, width, height):
- if width < 10 or height < 10:
- return {"result": "size skip"}
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- observerInfo = None
- if realViewId in self.trackingViews:
- observerInfo = self.trackingViews[realViewId]
- if not observerInfo:
- return {"error": "Unable to find subscription for view %s" % realViewId}
- observerInfo["originalSize"] = (int(width), int(height))
- return {"result": "success"}
- @exportRpc("viewport.image.push.enabled")
- def enableView(self, viewId, enabled):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- observerInfo = None
- if realViewId in self.trackingViews:
- observerInfo = self.trackingViews[realViewId]
- if not observerInfo:
- return {"error": "Unable to find subscription for view %s" % realViewId}
- observerInfo["enabled"] = enabled
- return {"result": "success"}
- @exportRpc("viewport.image.push.invalidate.cache")
- def invalidateCache(self, viewId):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- self.getApplication().InvalidateCache(sView.SMProxy)
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"result": "success"}
- # -------------------------------------------------------------------------
- # View linked
- # -------------------------------------------------------------------------
- def validateViewLinks(self):
- for linkName in self.linkNames:
- simple.RemoveCameraLink(linkName)
- self.linkNames = []
- if len(self.linkedViews) > 1:
- viewList = [self.getView(vid) for vid in self.linkedViews]
- refView = viewList.pop(0)
- for view in viewList:
- linkName = "%s_%s" % (
- refView.GetGlobalIDAsString(),
- view.GetGlobalIDAsString(),
- )
- simple.AddCameraLink(refView, view, linkName)
- self.linkNames.append(linkName)
- # Synch camera state
- srcView = viewList[0]
- dstViews = viewList[1:]
- _pushCameraLink(srcView, dstViews)
- @exportRpc("viewport.view.link")
- def updateViewLink(self, viewId=None, linkState=False):
- if viewId:
- if linkState:
- self.linkedViews.append(viewId)
- else:
- try:
- self.linkedViews.remove(viewId)
- except:
- pass
- # self.validateViewLinks()
- if len(self.linkedViews) > 1:
- allViews = [self.getView(vid) for vid in self.linkedViews]
- _pushCameraLink(allViews[0], allViews[1:])
- if self.onLinkChange:
- self.onLinkChange(self.linkedViews)
- if linkState:
- self.getApplication().InvokeEvent("UpdateEvent")
- return self.linkedViews
- # -------------------------------------------------------------------------
- # Mouse handling
- # -------------------------------------------------------------------------
- @exportRpc("viewport.mouse.interaction")
- def mouseInteraction(self, event):
- """
- RPC Callback for mouse interactions.
- """
- if "x" not in event or "y" not in event:
- return 0
- view = self.getView(event["view"])
- if hasattr(view, "UseInteractiveRenderingForScreenshots"):
- if event["action"] == "down":
- view.UseInteractiveRenderingForScreenshots = 1
- elif event["action"] == "up":
- view.UseInteractiveRenderingForScreenshots = 0
- buttons = 0
- if event["buttonLeft"]:
- buttons |= vtkWebInteractionEvent.LEFT_BUTTON
- if event["buttonMiddle"]:
- buttons |= vtkWebInteractionEvent.MIDDLE_BUTTON
- if event["buttonRight"]:
- buttons |= vtkWebInteractionEvent.RIGHT_BUTTON
- modifiers = 0
- if event["shiftKey"]:
- modifiers |= vtkWebInteractionEvent.SHIFT_KEY
- if event["ctrlKey"]:
- modifiers |= vtkWebInteractionEvent.CTRL_KEY
- if event["altKey"]:
- modifiers |= vtkWebInteractionEvent.ALT_KEY
- if event["metaKey"]:
- modifiers |= vtkWebInteractionEvent.META_KEY
- pvevent = vtkWebInteractionEvent()
- pvevent.SetButtons(buttons)
- pvevent.SetModifiers(modifiers)
- pvevent.SetX(event["x"])
- pvevent.SetY(event["y"])
- # pvevent.SetKeyCode(event["charCode"])
- retVal = self.getApplication().HandleInteractionEvent(view.SMProxy, pvevent)
- del pvevent
- self.activeViewId = view.GetGlobalIDAsString()
- if event["action"] == "down" and self.lastAction != event["action"]:
- self.getApplication().InvokeEvent("StartInteractionEvent")
- if event["action"] == "up" and self.lastAction != event["action"]:
- self.getApplication().InvokeEvent("EndInteractionEvent")
- # if retVal :
- # self.getApplication().InvokeEvent('UpdateEvent')
- if self.activeViewId in self.linkedViews:
- dstViews = [self.getView(vid) for vid in self.linkedViews]
- _pushCameraLink(view, dstViews)
- self.lastAction = event["action"]
- return retVal
- @exportRpc("viewport.mouse.zoom.wheel")
- def updateZoomFromWheel(self, event):
- if "Start" in event["type"]:
- self.getApplication().InvokeEvent("StartInteractionEvent")
- viewProxy = self.getView(event["view"])
- if viewProxy and "spinY" in event:
- rootId = viewProxy.GetGlobalIDAsString()
- zoomFactor = 1.0 - event["spinY"] / 10.0
- if rootId in self.linkedViews:
- fp = viewProxy.CameraFocalPoint
- pos = viewProxy.CameraPosition
- delta = [fp[i] - pos[i] for i in range(3)]
- viewProxy.GetActiveCamera().Zoom(zoomFactor)
- viewProxy.UpdatePropertyInformation()
- pos2 = viewProxy.CameraPosition
- viewProxy.CameraFocalPoint = [pos2[i] + delta[i] for i in range(3)]
- dstViews = [self.getView(vid) for vid in self.linkedViews]
- _pushCameraLink(viewProxy, dstViews)
- else:
- fp = viewProxy.CameraFocalPoint
- pos = viewProxy.CameraPosition
- delta = [fp[i] - pos[i] for i in range(3)]
- viewProxy.GetActiveCamera().Zoom(zoomFactor)
- viewProxy.UpdatePropertyInformation()
- pos2 = viewProxy.CameraPosition
- viewProxy.CameraFocalPoint = [pos2[i] + delta[i] for i in range(3)]
- if "End" in event["type"]:
- self.getApplication().InvokeEvent("EndInteractionEvent")
- # =============================================================================
- #
- # Provide Progress support
- #
- # =============================================================================
- class ParaViewWebProgressUpdate(ParaViewWebProtocol):
- progressObserverTag = None
- def __init__(self, **kwargs):
- super(ParaViewWebProgressUpdate, self).__init__()
- self.listenToProgress()
- def listenToProgress(self):
- progressHandler = (
- simple.servermanager.ActiveConnection.Session.GetProgressHandler()
- )
- if not ParaViewWebProgressUpdate.progressObserverTag:
- ParaViewWebProgressUpdate.progressObserverTag = progressHandler.AddObserver(
- "ProgressEvent", lambda handler, event: self.updateProgress(handler)
- )
- def updateProgress(self, caller):
- txt = caller.GetLastProgressText()
- progress = caller.GetLastProgress()
- self.publish("paraview.progress", {"text": txt, "progress": progress})
- # =============================================================================
- #
- # Provide Geometry delivery mechanism (WebGL)
- #
- # =============================================================================
- class ParaViewWebViewPortGeometryDelivery(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- super(ParaViewWebViewPortGeometryDelivery, self).__init__()
- self.dataCache = {}
- # RpcName: getSceneMetaData => viewport.webgl.metadata
- @exportRpc("viewport.webgl.metadata")
- def getSceneMetaData(self, view_id):
- view = self.getView(view_id)
- data = self.getApplication().GetWebGLSceneMetaData(view.SMProxy)
- return data
- # RpcName: getWebGLData => viewport.webgl.data
- @exportRpc("viewport.webgl.data")
- def getWebGLData(self, view_id, object_id, part):
- view = self.getView(view_id)
- data = self.getApplication().GetWebGLBinaryData(
- view.SMProxy, str(object_id), part - 1
- )
- return data
- # RpcName: getCachedWebGLData => viewport.webgl.cached.data
- @exportRpc("viewport.webgl.cached.data")
- def getCachedWebGLData(self, sha):
- if sha not in self.dataCache:
- return {"success": False, "reason": "Key %s not in data cache" % sha}
- return {"success": True, "data": self.dataCache[sha]}
- # RpcName: getSceneMetaDataAllTimesteps => viewport.webgl.metadata.alltimesteps
- @exportRpc("viewport.webgl.metadata.alltimesteps")
- def getSceneMetaDataAllTimesteps(self, view_id=-1):
- animationScene = simple.GetAnimationScene()
- timeKeeper = animationScene.TimeKeeper
- tsVals = timeKeeper.TimestepValues.GetData()
- currentTime = timeKeeper.Time
- oldCache = self.dataCache
- self.dataCache = {}
- returnToClient = {}
- view = self.getView(view_id)
- animationScene.GoToFirst()
- # Iterate over all the timesteps, building up a list of unique shas
- for i in xrange(len(tsVals)):
- simple.Render()
- mdString = self.getApplication().GetWebGLSceneMetaData(view.SMProxy)
- timestepMetaData = json.loads(mdString)
- objects = timestepMetaData["Objects"]
- # Iterate over the objects in the scene
- for obj in objects:
- sha = obj["md5"]
- objId = obj["id"]
- numParts = obj["parts"]
- if sha not in self.dataCache:
- if sha in oldCache:
- self.dataCache[sha] = oldCache[sha]
- else:
- transparency = obj["transparency"]
- layer = obj["layer"]
- partData = []
- # Ask for the binary data for each part of this object
- for part in xrange(numParts):
- partNumber = part
- data = self.getApplication().GetWebGLBinaryData(
- view.SMProxy, str(objId), partNumber
- )
- partData.append(data)
- # Now add object, with all its binary parts, to the cache
- self.dataCache[sha] = {
- "md5": sha,
- "id": objId,
- "numParts": numParts,
- "transparency": transparency,
- "layer": layer,
- "partsList": partData,
- }
- returnToClient[sha] = {"id": objId, "numParts": numParts}
- # Now move time forward
- animationScene.GoToNext()
- # Set the time back to where it was when all timesteps were requested
- timeKeeper.Time = currentTime
- animationScene.AnimationTime = currentTime
- simple.Render()
- return {"success": True, "metaDataList": returnToClient}
- # =============================================================================
- #
- # Provide an updated geometry delivery mechanism which better matches the
- # client-side rendering capability we have in vtk.js
- #
- # =============================================================================
- class ParaViewWebLocalRendering(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- super(ParaViewWebLocalRendering, self).__init__()
- initializeSerializers()
- self.context = SynchronizationContext()
- self.trackingViews = {}
- self.mtime = 0
- # RpcName: getArray => viewport.geometry.array.get
- @exportRpc("viewport.geometry.array.get")
- def getArray(self, dataHash, binary=False):
- if binary:
- return self.addAttachment(self.context.getCachedDataArray(dataHash, binary))
- return self.context.getCachedDataArray(dataHash, binary)
- # RpcName: addViewObserver => viewport.geometry.view.observer.add
- @exportRpc("viewport.geometry.view.observer.add")
- def addViewObserver(self, viewId):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- def pushGeometry(newSubscription=False):
- simple.Render(sView)
- stateToReturn = self.getViewState(realViewId, newSubscription)
- stateToReturn["mtime"] = 0 if newSubscription else self.mtime
- self.mtime += 1
- return stateToReturn
- if not realViewId in self.trackingViews:
- observerCallback = lambda *args, **kwargs: self.publish(
- "viewport.geometry.view.subscription", pushGeometry()
- )
- tag = self.getApplication().AddObserver("UpdateEvent", observerCallback)
- self.trackingViews[realViewId] = {"tags": [tag], "observerCount": 1}
- else:
- # There is an observer on this view already
- self.trackingViews[realViewId]["observerCount"] += 1
- self.publish("viewport.geometry.view.subscription", pushGeometry(True))
- return {"success": True, "viewId": realViewId}
- # RpcName: removeViewObserver => viewport.geometry.view.observer.remove
- @exportRpc("viewport.geometry.view.observer.remove")
- def removeViewObserver(self, viewId):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- realViewId = sView.GetGlobalIDAsString()
- observerInfo = None
- if realViewId in self.trackingViews:
- observerInfo = self.trackingViews[realViewId]
- if not observerInfo:
- return {"error": "Unable to find subscription for view %s" % realViewId}
- observerInfo["observerCount"] -= 1
- if observerInfo["observerCount"] <= 0:
- for tag in observerInfo["tags"]:
- self.getApplication().RemoveObserver(tag)
- del self.trackingViews[realViewId]
- return {"result": "success"}
- # RpcName: getViewState => viewport.geometry.view.get.state
- @exportRpc("viewport.geometry.view.get.state")
- def getViewState(self, viewId, newSubscription=False):
- sView = self.getView(viewId)
- if not sView:
- return {"error": "Unable to get view with id %s" % viewId}
- self.context.setIgnoreLastDependencies(newSubscription)
- # Get the active view and render window, use it to iterate over renderers
- renderWindow = sView.GetRenderWindow()
- renderWindowId = sView.GetGlobalIDAsString()
- viewInstance = serializeInstance(
- None, renderWindow, renderWindowId, self.context, 1
- )
- viewInstance["extra"] = {
- "vtkRefId": getReferenceId(renderWindow),
- "centerOfRotation": sView.CenterOfRotation.GetData(),
- "camera": getReferenceId(sView.GetActiveCamera()),
- }
- self.context.setIgnoreLastDependencies(False)
- self.context.checkForArraysToRelease()
- if viewInstance:
- return viewInstance
- return None
- # =============================================================================
- #
- # Time management
- #
- # =============================================================================
- class ParaViewWebTimeHandler(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- super(ParaViewWebTimeHandler, self).__init__()
- # setup animation scene
- self.scene = simple.GetAnimationScene()
- simple.GetTimeTrack()
- self.scene.PlayMode = "Snap To TimeSteps"
- self.playing = False
- self.playTime = 0.1 # Time in second
- def nextPlay(self):
- self.updateTime("next")
- if self.playing:
- schedule_callback(self.playTime, self.nextPlay)
- # RpcName: updateTime => pv.vcr.action
- @exportRpc("pv.vcr.action")
- def updateTime(self, action):
- view = simple.GetRenderView()
- animationScene = simple.GetAnimationScene()
- currentTime = view.ViewTime
- if action == "next":
- animationScene.GoToNext()
- if currentTime == view.ViewTime:
- animationScene.GoToFirst()
- if action == "prev":
- animationScene.GoToPrevious()
- if currentTime == view.ViewTime:
- animationScene.GoToLast()
- if action == "first":
- animationScene.GoToFirst()
- if action == "last":
- animationScene.GoToLast()
- timestep = list(animationScene.TimeKeeper.TimestepValues).index(
- animationScene.TimeKeeper.Time
- )
- self.publish(
- "pv.time.change", {"time": float(view.ViewTime), "timeStep": timestep}
- )
- self.getApplication().InvokeEvent("UpdateEvent")
- return view.ViewTime
- @exportRpc("pv.time.index.set")
- def setTimeStep(self, timeIdx):
- anim = simple.GetAnimationScene()
- anim.TimeKeeper.Time = anim.TimeKeeper.TimestepValues[timeIdx]
- self.getApplication().InvokeEvent("UpdateEvent")
- return anim.TimeKeeper.Time
- @exportRpc("pv.time.index.get")
- def getTimeStep(self):
- anim = simple.GetAnimationScene()
- try:
- return list(anim.TimeKeeper.TimestepValues).index(anim.TimeKeeper.Time)
- except:
- return 0
- @exportRpc("pv.time.value.set")
- def setTimeValue(self, t):
- anim = simple.GetAnimationScene()
- try:
- step = list(anim.TimeKeeper.TimestepValues).index(t)
- anim.TimeKeeper.Time = anim.TimeKeeper.TimestepValues[step]
- self.getApplication().InvokeEvent("UpdateEvent")
- except:
- print("Try to update time with", t, "but value not found in the list")
- return anim.TimeKeeper.Time
- @exportRpc("pv.time.value.get")
- def getTimeValue(self):
- anim = simple.GetAnimationScene()
- return anim.TimeKeeper.Time
- @exportRpc("pv.time.values")
- def getTimeValues(self):
- return list(simple.GetAnimationScene().TimeKeeper.TimestepValues)
- @exportRpc("pv.time.play")
- def play(self, deltaT=0.1):
- if not self.playing:
- self.playTime = deltaT
- self.playing = True
- self.getApplication().InvokeEvent("StartInteractionEvent")
- self.nextPlay()
- @exportRpc("pv.time.stop")
- def stop(self):
- self.getApplication().InvokeEvent("EndInteractionEvent")
- self.playing = False
- # =============================================================================
- #
- # Color management
- #
- # =============================================================================
- class ParaViewWebColorManager(ParaViewWebProtocol):
- def __init__(self, pathToColorMaps=None, showBuiltin=True, **kwargs):
- super(ParaViewWebColorManager, self).__init__()
- if pathToColorMaps:
- simple.ImportPresets(filename=pathToColorMaps)
- self.presets = servermanager.vtkSMTransferFunctionPresets.GetInstance()
- self.colorMapNames = []
- for i in range(self.presets.GetNumberOfPresets()):
- if showBuiltin or not self.presets.IsPresetBuiltin(i):
- self.colorMapNames.append(self.presets.GetPresetName(i))
- # RpcName: getScalarBarVisibilities => pv.color.manager.scalarbar.visibility.get
- @exportRpc("pv.color.manager.scalarbar.visibility.get")
- def getScalarBarVisibilities(self, proxyIdList):
- """
- Returns whether or not each specified scalar bar is visible.
- """
- visibilities = {}
- for proxyId in proxyIdList:
- proxy = self.mapIdToProxy(proxyId)
- if proxy is not None:
- rep = simple.GetRepresentation(proxy)
- view = self.getView(-1)
- visibilities[proxyId] = vtkSMPVRepresentationProxy.IsScalarBarVisible(
- rep.SMProxy, view.SMProxy
- )
- return visibilities
- # RpcName: setScalarBarVisibilities => pv.color.manager.scalarbar.visibility.set
- @exportRpc("pv.color.manager.scalarbar.visibility.set")
- def setScalarBarVisibilities(self, proxyIdMap):
- """
- Sets the visibility of the scalar bar corresponding to each specified
- proxy. The representation for each proxy is found using the
- filter/source proxy id and the current view.
- """
- visibilities = {}
- for proxyId in proxyIdMap:
- proxy = self.mapIdToProxy(proxyId)
- if proxy is not None:
- rep = simple.GetDisplayProperties(proxy)
- view = self.getView(-1)
- vtkSMPVRepresentationProxy.SetScalarBarVisibility(
- rep.SMProxy, view.SMProxy, proxyIdMap[proxyId]
- )
- visibilities[proxyId] = vtkSMPVRepresentationProxy.IsScalarBarVisible(
- rep.SMProxy, view.SMProxy
- )
- # Render to get scalar bars in correct position when doing local rendering (webgl)
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- return visibilities
- # RpcName: rescaleTransferFunction => pv.color.manager.rescale.transfer.function
- @exportRpc("pv.color.manager.rescale.transfer.function")
- def rescaleTransferFunction(self, options):
- """
- Rescale the color transfer function to fit either the data range,
- the data range over time, or to a custom range, for the array by
- which the representation is currently being colored.
- """
- type = options["type"]
- proxyId = options["proxyId"]
- proxy = self.mapIdToProxy(proxyId)
- rep = simple.GetRepresentation(proxy)
- status = {"success": False}
- if type == "time":
- status[
- "success"
- ] = vtkSMPVRepresentationProxy.RescaleTransferFunctionToDataRangeOverTime(
- rep.SMProxy
- )
- elif type == "data":
- extend = False
- if "extend" in options:
- extend = options["extend"]
- status[
- "success"
- ] = vtkSMPVRepresentationProxy.RescaleTransferFunctionToDataRange(
- rep.SMProxy, extend
- )
- elif type == "custom":
- rangemin = float(options["min"])
- rangemax = float(options["max"])
- extend = False
- if "extend" in options:
- extend = options["extend"]
- lookupTable = rep.LookupTable
- if lookupTable is not None:
- status["success"] = vtkSMTransferFunctionProxy.RescaleTransferFunction(
- lookupTable.SMProxy, rangemin, rangemax, extend
- )
- if status["success"]:
- currentRange = self.getCurrentScalarRange(proxyId)
- status["range"] = currentRange
- self.getApplication().InvokeEvent("UpdateEvent")
- return status
- # RpcName: getCurrentScalarRange => pv.color.manager.scalar.range.get
- @exportRpc("pv.color.manager.scalar.range.get")
- def getCurrentScalarRange(self, proxyId):
- proxy = self.mapIdToProxy(proxyId)
- rep = simple.GetRepresentation(proxy)
- lookupTable = rep.LookupTable
- cMin = lookupTable.RGBPoints[0]
- cMax = lookupTable.RGBPoints[-4]
- return {"min": cMin, "max": cMax}
- # RpcName: colorBy => pv.color.manager.color.by
- @exportRpc("pv.color.manager.color.by")
- def colorBy(
- self,
- representation,
- colorMode,
- arrayLocation="POINTS",
- arrayName="",
- vectorMode="Magnitude",
- vectorComponent=0,
- rescale=False,
- ):
- """
- Choose the array to color by, and optionally specify magnitude or a
- vector component in the case of vector array.
- """
- locationMap = {"POINTS": vtkDataObject.POINT, "CELLS": vtkDataObject.CELL}
- repProxy = self.mapIdToProxy(representation)
- lutProxy = repProxy.LookupTable
- if colorMode == "SOLID":
- # No array just diffuse color
- repProxy.ColorArrayName = ""
- else:
- simple.ColorBy(repProxy, (arrayLocation, arrayName))
- if repProxy.LookupTable:
- lut = repProxy.LookupTable
- lut.VectorMode = str(vectorMode)
- lut.VectorComponent = int(vectorComponent)
- if rescale:
- repProxy.RescaleTransferFunctionToDataRange(rescale, False)
- self.updateScalarBars()
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: setOpacityFunctionPoints => pv.color.manager.opacity.points.set
- @exportRpc("pv.color.manager.opacity.points.set")
- def setOpacityFunctionPoints(
- self, arrayName, pointArray, enableOpacityMapping=False
- ):
- lutProxy = simple.GetColorTransferFunction(arrayName)
- pwfProxy = simple.GetOpacityTransferFunction(arrayName)
- # Use whatever the current scalar range is for this array
- cMin = lutProxy.RGBPoints[0]
- cMax = lutProxy.RGBPoints[-4]
- # Scale and bias the x values, which come in between 0.0 and 1.0, to the
- # current scalar range
- for i in range(len(pointArray) // 4):
- idx = i * 4
- x = pointArray[idx]
- pointArray[idx] = (x * (cMax - cMin)) + cMin
- # Set the Points property to scaled and biased points array
- pwfProxy.Points = pointArray
- lutProxy.EnableOpacityMapping = enableOpacityMapping
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: getOpacityFunctionPoints => pv.color.manager.opacity.points.get
- @exportRpc("pv.color.manager.opacity.points.get")
- def getOpacityFunctionPoints(self, arrayName):
- result = []
- lutProxy = simple.GetColorTransferFunction(arrayName)
- pwfProxy = simple.GetOpacityTransferFunction(arrayName)
- # Use whatever the current scalar range is for this array
- cMin = lutProxy.RGBPoints[0]
- cMax = lutProxy.RGBPoints[-4]
- pointArray = pwfProxy.Points
- # Scale and bias the x values, which come in between 0.0 and 1.0, to the
- # current scalar range
- for i in range(len(pointArray) // 4):
- idx = i * 4
- result.append(
- {
- "x": (pointArray[idx] - cMin) / (cMax - cMin),
- "y": pointArray[idx + 1],
- "x2": pointArray[idx + 2],
- "y2": pointArray[idx + 3],
- }
- )
- return result
- # RpcName: getRgbPoints => pv.color.manager.rgb.points.get
- @exportRpc("pv.color.manager.rgb.points.get")
- def getRgbPoints(self, arrayName):
- lutProxy = simple.GetColorTransferFunction(arrayName)
- # First, set the current coloring mode
- colorInfo = {}
- colorInfo["mode"] = (
- "categorical" if lutProxy.InterpretValuesAsCategories else "continuous"
- )
- # Now build up the continuous coloring information
- continuousInfo = {}
- l = lutProxy.RGBPoints.GetData()
- scalars = l[0 : len(l) : 4]
- continuousInfo["scalars"] = [float(s) for s in scalars]
- reds = l[1 : len(l) : 4]
- greens = l[2 : len(l) : 4]
- blues = l[3 : len(l) : 4]
- continuousInfo["colors"] = [list(a) for a in zip(reds, greens, blues)]
- colorInfo["continuous"] = continuousInfo
- # Finally, build up the categorical coloring information
- categoricalInfo = {}
- rgbs = lutProxy.IndexedColors
- reds = rgbs[0 : len(rgbs) : 3]
- greens = rgbs[1 : len(rgbs) : 3]
- blues = rgbs[2 : len(rgbs) : 3]
- categoricalInfo["colors"] = [list(a) for a in zip(reds, greens, blues)]
- l = lutProxy.Annotations
- scalars = l[0 : len(l) : 2]
- categoricalInfo["scalars"] = [float(s) for s in scalars]
- categoricalInfo["annotations"] = l[1 : len(l) : 2]
- ### If the numbers of categorical colors and scalars do not match,
- ### then this is probably because you already had a categorical color
- ### scheme set up when you applied a new indexed preset.
- numColors = len(categoricalInfo["colors"])
- numScalars = len(categoricalInfo["scalars"])
- if numColors != numScalars:
- # More colors than scalars means we applied a preset colormap with
- # more colors than what we had set up already, so I want to add
- # scalars and annotations.
- if numColors > numScalars:
- nextValue = 0
- # Another special case here is that there were no categorical
- # scalars/colors before the preset was applied, and we will need
- # to insert a single space character (' ') annotation at the start
- # of the list to make the scalarbar appear.
- if numScalars == 0:
- categoricalInfo["scalars"].append(nextValue)
- categoricalInfo["annotations"].append(" ")
- nextValue = categoricalInfo["scalars"][-1] + 1
- for i in xrange((numColors - numScalars) - 1):
- categoricalInfo["scalars"].append(nextValue)
- categoricalInfo["annotations"].append("")
- nextValue += 1
- # More scalars than colors means we applied a preset colormap with
- # *fewer* colors than what had already, so I want to add fake colors
- # for any non-empty annotations I have after the end of the color list
- elif numScalars > numColors:
- newScalars = categoricalInfo["scalars"][0:numColors]
- newAnnotations = categoricalInfo["annotations"][0:numColors]
- for i in xrange(numColors, numScalars):
- if categoricalInfo["annotations"][i] != "":
- newScalars.append(categoricalInfo["scalars"][i])
- newAnnotations.append(categoricalInfo["annotations"][i])
- categoricalInfo["colors"].append([0.5, 0.5, 0.5])
- categoricalInfo["scalars"] = newScalars
- categoricalInfo["annotations"] = newAnnotations
- # Now that we have made the number of indexed colors and associated
- # scalars match, the final step in this special case is to apply the
- # matched up props we just computed so that server and client ui are
- # in sync.
- idxColorsProperty = []
- annotationsProperty = []
- for aIdx in xrange(len(categoricalInfo["scalars"])):
- annotationsProperty.append(str(categoricalInfo["scalars"][aIdx]))
- annotationsProperty.append(str(categoricalInfo["annotations"][aIdx]))
- idxColorsProperty.extend(categoricalInfo["colors"][aIdx])
- lutProxy.Annotations = annotationsProperty
- lutProxy.IndexedColors = idxColorsProperty
- simple.Render
- colorInfo["categorical"] = categoricalInfo
- return colorInfo
- # RpcName: setRgbPoints => pv.color.manager.rgb.points.set
- @exportRpc("pv.color.manager.rgb.points.set")
- def setRgbPoints(self, arrayName, rgbInfo):
- lutProxy = simple.GetColorTransferFunction(arrayName)
- colorMode = rgbInfo["mode"]
- continuousInfo = rgbInfo["continuous"]
- categoricalInfo = rgbInfo["categorical"]
- # First make sure the continuous mode properties are set
- continuousScalars = continuousInfo["scalars"]
- continuousColors = continuousInfo["colors"]
- rgbPoints = []
- for idx in xrange(len(continuousScalars)):
- scalar = continuousScalars[idx]
- rgb = continuousColors[idx]
- rgbPoints.append(float(scalar))
- rgbPoints.extend(rgb)
- lutProxy.RGBPoints = rgbPoints
- # Now make sure the categorical mode properties are set
- annotations = categoricalInfo["annotations"]
- categoricalScalars = categoricalInfo["scalars"]
- categoricalColors = categoricalInfo["colors"]
- annotationsProperty = []
- idxColorsProperty = []
- for aIdx in xrange(len(annotations)):
- annotationsProperty.append(str(categoricalScalars[aIdx]))
- annotationsProperty.append(str(annotations[aIdx]))
- idxColorsProperty.extend(categoricalColors[aIdx])
- lutProxy.Annotations = annotationsProperty
- lutProxy.IndexedColors = idxColorsProperty
- # Finally, set the coloring mode property
- if colorMode == "continuous": # continuous coloring
- lutProxy.InterpretValuesAsCategories = 0
- else: # categorical coloring
- lutProxy.InterpretValuesAsCategories = 1
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: getLutImage => pv.color.manager.lut.image.get
- @exportRpc("pv.color.manager.lut.image.get")
- def getLutImage(self, representation, numSamples, customRange=None):
- repProxy = self.mapIdToProxy(representation)
- lut = repProxy.LookupTable.GetClientSideObject()
- dataRange = customRange
- if not dataRange:
- dataRange = lut.GetRange()
- delta = (dataRange[1] - dataRange[0]) / float(numSamples)
- colorArray = vtkUnsignedCharArray()
- colorArray.SetNumberOfComponents(3)
- colorArray.SetNumberOfTuples(numSamples)
- rgb = [0, 0, 0]
- for i in range(numSamples):
- lut.GetColor(dataRange[0] + float(i) * delta, rgb)
- r = int(round(rgb[0] * 255))
- g = int(round(rgb[1] * 255))
- b = int(round(rgb[2] * 255))
- colorArray.SetTuple3(i, r, g, b)
- # Add the color array to an image data
- imgData = vtkImageData()
- imgData.SetDimensions(numSamples, 1, 1)
- aIdx = imgData.GetPointData().SetScalars(colorArray)
- # Use the vtk data encoder to base-64 encode the image as png, using no compression
- encoder = vtkDataEncoder()
- # two calls in a row crash on Windows - bald timing hack to avoid the crash.
- time.sleep(0.01)
- b64Str = encoder.EncodeAsBase64Png(imgData, 0)
- return {"range": dataRange, "image": b64Str}
- @exportRpc("pv.color.manager.lut.image.all")
- def getLutImages(self, numSamples):
- colorArray = vtkUnsignedCharArray()
- colorArray.SetNumberOfComponents(3)
- colorArray.SetNumberOfTuples(numSamples)
- pxm = simple.servermanager.ProxyManager()
- lutProxy = pxm.NewProxy("lookup_tables", "PVLookupTable")
- lut = lutProxy.GetClientSideObject()
- dataRange = lut.GetRange()
- delta = (dataRange[1] - dataRange[0]) / float(numSamples)
- # Add the color array to an image data
- imgData = vtkImageData()
- imgData.SetDimensions(numSamples, 1, 1)
- imgData.GetPointData().SetScalars(colorArray)
- # Use the vtk data encoder to base-64 encode the image as png, using no compression
- encoder = vtkDataEncoder()
- # Result container
- result = {}
- # Loop over all presets
- for name in self.colorMapNames:
- lutProxy.ApplyPreset(name, True)
- rgb = [0, 0, 0]
- for i in range(numSamples):
- lut.GetColor(dataRange[0] + float(i) * delta, rgb)
- r = int(round(rgb[0] * 255))
- g = int(round(rgb[1] * 255))
- b = int(round(rgb[2] * 255))
- colorArray.SetTuple3(i, r, g, b)
- result[name] = encoder.EncodeAsBase64Png(imgData, 0)
- simple.Delete(lutProxy)
- return result
- # RpcName: setSurfaceOpacity => pv.color.manager.surface.opacity.set
- @exportRpc("pv.color.manager.surface.opacity.set")
- def setSurfaceOpacity(self, representation, enabled):
- repProxy = self.mapIdToProxy(representation)
- lutProxy = repProxy.LookupTable
- lutProxy.EnableOpacityMapping = enabled
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: getSurfaceOpacity => pv.color.manager.surface.opacity.get
- @exportRpc("pv.color.manager.surface.opacity.get")
- def getSurfaceOpacity(self, representation):
- repProxy = self.mapIdToProxy(representation)
- lutProxy = repProxy.LookupTable
- return lutProxy.EnableOpacityMapping
- # RpcName: setSurfaceOpacityByArray => pv.color.manager.surface.opacity.by.array.set
- @exportRpc("pv.color.manager.surface.opacity.by.array.set")
- def setSurfaceOpacityByArray(self, arrayName, enabled):
- lutProxy = simple.GetColorTransferFunction(arrayName)
- lutProxy.EnableOpacityMapping = enabled
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: getSurfaceOpacityByArray => pv.color.manager.surface.opacity.by.array.get
- @exportRpc("pv.color.manager.surface.opacity.by.array.get")
- def getSurfaceOpacityByArray(self, arrayName):
- lutProxy = simple.GetColorTransferFunction(arrayName)
- return lutProxy.EnableOpacityMapping
- # RpcName: selectColorMap => pv.color.manager.select.preset
- @exportRpc("pv.color.manager.select.preset")
- def selectColorMap(self, representation, paletteName):
- """
- Choose the color map preset to use when coloring by an array.
- """
- repProxy = self.mapIdToProxy(representation)
- lutProxy = repProxy.LookupTable
- if lutProxy is not None:
- lutProxy.ApplyPreset(paletteName, True)
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"result": "success"}
- else:
- return {
- "result": "Representation proxy "
- + representation
- + " is missing lookup table"
- }
- return {"result": "preset " + paletteName + " not found"}
- # RpcName: listColorMapNames => pv.color.manager.list.preset
- @exportRpc("pv.color.manager.list.preset")
- def listColorMapNames(self):
- """
- List the names of all color map presets available on the server. This
- list will contain the names of any presets you provided in the file you
- supplied to the constructor of this protocol.
- """
- return self.colorMapNames
- # =============================================================================
- #
- # Proxy management protocol
- #
- # =============================================================================
- class ParaViewWebProxyManager(ParaViewWebProtocol):
- VTK_DATA_TYPES = [
- "void", # 0
- "bit", # 1
- "char", # 2
- "unsigned_char", # 3
- "short", # 4
- "unsigned_short", # 5
- "int", # 6
- "unsigned_int", # 7
- "long", # 8
- "unsigned_long", # 9
- "float", # 10
- "double", # 11
- "id_type", # 12
- "unspecified", # 13
- "unspecified", # 14
- "signed_char", # 15
- "long_long", # 16
- "unsigned_long_long", # 17
- ]
- def __init__(
- self,
- allowedProxiesFile=None,
- baseDir=None,
- fileToLoad=None,
- allowUnconfiguredReaders=True,
- groupProxyEditorWidgets=True,
- respectPropertyGroups=True,
- **kwargs
- ):
- """
- basePath: specify the base directory (or directories) that we should start with, if this
- parameter takes the form: "name1=path1|name2=path2|...", then we will treat this as the
- case where multiple data directories are required. In this case, each top-level directory
- will be given the name associated with the directory in the argument.
- """
- super(ParaViewWebProxyManager, self).__init__()
- self.debugMode = False
- self.groupProxyEditorPropertyWidget = groupProxyEditorWidgets
- self.respectPropertyGroups = respectPropertyGroups
- self.proxyDefinitionCache = {}
- self.domainFunctionMap = {
- "vtkSMBooleanDomain": booleanDomainDecorator,
- "vtkSMProxyListDomain": proxyListDomainDecorator,
- "vtkSMIntRangeDomain": numberRangeDomainDecorator,
- "vtkSMDoubleRangeDomain": numberRangeDomainDecorator,
- "vtkSMArrayRangeDomain": numberRangeDomainDecorator,
- "vtkSMRepresentationTypeDomain": stringListDomainDecorator,
- "vtkSMStringListDomain": stringListDomainDecorator,
- "vtkSMArrayListDomain": arrayListDomainDecorator,
- "vtkSMArraySelectionDomain": arraySelectionDomainDecorator,
- "vtkSMEnumerationDomain": enumerationDomainDecorator,
- "vtkSMCompositeTreeDomain": treeDomainDecorator,
- }
- self.alwaysIncludeProperties = ["GridAxesVisibility"]
- self.alwaysExcludeProperties = [
- "LookupTable",
- "Texture",
- "ColorArrayName",
- "Representations",
- "HiddenRepresentations",
- "HiddenProps",
- "UseTexturedBackground",
- "BackgroundTexture",
- "KeyLightWarmth",
- "KeyLightIntensity",
- "KeyLightElevation",
- "KeyLightAzimuth",
- "FileName",
- ]
- self.propertyTypeMap = {
- "vtkSMIntVectorProperty": "int",
- "vtkSMDoubleVectorProperty": "float",
- "vtkSMStringVectorProperty": "str",
- "vtkSMProxyProperty": "proxy",
- "vtkSMInputProperty": "proxy",
- "vtkSMDoubleMapProperty": "map",
- }
- self.allowedProxies = {}
- self.hintsMap = {
- "PropertyWidgetDecorator": {
- "ClipScalarsDecorator": clipScalarDecorator,
- "GenericDecorator": genericDecorator,
- },
- "Widget": {"multi_line": multiLineDecorator},
- "ProxyEditorPropertyWidget": {
- "default": proxyEditorPropertyWidgetDecorator
- },
- }
- self.setBaseDirectory(baseDir)
- self.allowUnconfiguredReaders = allowUnconfiguredReaders
- # If there was a proxy list file, use it, otherwise use the default
- if allowedProxiesFile:
- self.readAllowedProxies(allowedProxiesFile)
- else:
- from ._default_proxies import getDefaultProxies
- self.readAllowedProxies(getDefaultProxies())
- if fileToLoad:
- if "*" in fileToLoad:
- fullBasePathForGroup = os.path.dirname(self.getAbsolutePath(fileToLoad))
- fileNamePattern = os.path.basename(fileToLoad)
- groupToLoad = []
- for fileName in os.listdir(fullBasePathForGroup):
- if fnmatch.fnmatch(fileName, fileNamePattern):
- groupToLoad.append(os.path.join(fullBasePathForGroup, fileName))
- groupToLoad.sort(key=alphanum_key)
- self.open(groupToLoad)
- else:
- self.open(fileToLoad)
- self.simpleTypes = [int, float, list, str]
- self.view = simple.GetRenderView()
- simple.SetActiveView(self.view)
- simple.Render()
- # --------------------------------------------------------------------------
- # Read the configured proxies json file into a dictionary and process.
- # --------------------------------------------------------------------------
- def readAllowedProxies(self, filepathOrConfig):
- self.availableList = {}
- self.allowedProxies = {}
- configurationData = {}
- if type(filepathOrConfig) == dict:
- configurationData = filepathOrConfig
- else:
- with open(filepathOrConfig, "r") as fd:
- configurationData = json.load(fd)
- self.configureFiltersOrSources("sources", configurationData["sources"])
- self.configureFiltersOrSources("filters", configurationData["filters"])
- self.configureReaders(configurationData["readers"])
- # --------------------------------------------------------------------------
- # Handle filters and sources sections of the proxies config file.
- # --------------------------------------------------------------------------
- def configureFiltersOrSources(self, key, flist):
- list = []
- self.availableList[key] = list
- for item in flist:
- if "label" in item:
- self.allowedProxies[item["label"]] = item["name"]
- list.append(item["label"])
- else:
- self.allowedProxies[item["name"]] = item["name"]
- list.append(item["name"])
- # --------------------------------------------------------------------------
- # Handle the readers section of the proxies config file.
- # --------------------------------------------------------------------------
- def configureReaders(self, rlist):
- self.readerFactoryMap = {}
- for config in rlist:
- readerName = config["name"]
- readerMethod = "FileName"
- useDirectory = False
- if "method" in config:
- readerMethod = config["method"]
- if "useDirectory" in config:
- useDirectory = config["useDirectory"]
- for ext in config["extensions"]:
- self.readerFactoryMap[ext] = [readerName, readerMethod, useDirectory]
- # --------------------------------------------------------------------------
- # Convenience method to get proxy defs, cached if available
- # --------------------------------------------------------------------------
- def getProxyDefinition(self, group, name):
- cacheKey = "%s:%s" % (group, name)
- if cacheKey in self.proxyDefinitionCache:
- return self.proxyDefinitionCache[cacheKey]
- xmlElement = servermanager.ActiveConnection.Session.GetProxyDefinitionManager().GetCollapsedProxyDefinition(
- group, name, None
- )
- # print('\n\n\n (%s, %s): \n\n' % (group, name))
- # xmlElement.PrintXML()
- self.proxyDefinitionCache[cacheKey] = xmlElement
- return xmlElement
- # --------------------------------------------------------------------------
- # Look higher up in XML hierarchy for attributes on a property (useful if
- # a property is an exposed property). From the documentation of vtkSMProperty,
- # the GetParent method will access the sub-proxy to which the property
- # actually belongs, if that is the case.
- # --------------------------------------------------------------------------
- def extractParentAttributes(self, property, attributes):
- proxy = None
- name = ""
- try:
- proxy = property.SMProperty.GetParent()
- name = proxy.GetPropertyName(property.SMProperty)
- except:
- try:
- proxy = property.GetParent()
- name = proxy.GetPropertyName(property)
- except:
- print(
- "ERROR: unable to get property parent for property " + property.Name
- )
- return {}
- xmlElement = self.getProxyDefinition(proxy.GetXMLGroup(), proxy.GetXMLName())
- attrMap = {}
- nbChildren = xmlElement.GetNumberOfNestedElements()
- for i in range(nbChildren):
- xmlChild = xmlElement.GetNestedElement(i)
- elementName = xmlChild.GetName()
- if elementName.endswith("Property"):
- propName = xmlChild.GetAttribute("name")
- if propName == name:
- for attrName in attributes:
- if xmlChild.GetAttribute(attrName):
- attrMap[attrName] = xmlChild.GetAttribute(attrName)
- return attrMap
- # --------------------------------------------------------------------------
- # If we have a proxy property, gather up the xml information from the
- # possible proxy values it can take on.
- # --------------------------------------------------------------------------
- def getProxyListFromProperty(
- self, proxy, proxyPropElement, inPropGroup, group, parentGroup, detailsKey
- ):
- propPropName = proxyPropElement.GetAttribute("name")
- propVisibility = proxyPropElement.GetAttribute("panel_visibility")
- propInstance = proxy.GetProperty(propPropName)
- nbChildren = proxyPropElement.GetNumberOfNestedElements()
- foundPLDChild = False
- subProxiesToProcess = []
- domain = propInstance.FindDomain("vtkSMProxyListDomain")
- if domain:
- foundPLDChild = True
- for j in range(domain.GetNumberOfProxies()):
- subProxy = domain.GetProxy(j)
- pelt = self.getProxyDefinition(
- subProxy.GetXMLGroup(), subProxy.GetXMLName()
- )
- subProxiesToProcess.append([subProxy, pelt])
- if len(subProxiesToProcess) > 0:
- newGroup = group
- newParentGroup = parentGroup
- if self.groupProxyEditorPropertyWidget:
- hintChild = proxyPropElement.FindNestedElementByName("Hints")
- if hintChild:
- pepwChild = hintChild.FindNestedElementByName(
- "ProxyEditorPropertyWidget"
- )
- if pepwChild:
- newGroup = "%s:%s" % (proxy.GetGlobalIDAsString(), propPropName)
- self.groupDetailsMap[newGroup] = {
- "groupName": propPropName,
- "groupWidget": "ProxyEditorPropertyWidget",
- "groupVisibilityProperty": pepwChild.GetAttribute(
- "property"
- ),
- "groupVisibility": propVisibility
- if propVisibility is not None
- else "default",
- "groupType": "ProxyEditorPropertyWidget",
- }
- newParentGroup = group
- self.propertyDetailsMap[detailsKey]["group"] = newGroup
- self.propertyDetailsMap[detailsKey][
- "parentGroup"
- ] = newParentGroup
- for sub in subProxiesToProcess:
- self.processXmlElement(
- sub[0], sub[1], inPropGroup, newGroup, newParentGroup, detailsKey
- )
- return foundPLDChild
- # --------------------------------------------------------------------------
- # Decide whether to update our internal map of property details, or just the
- # ordered list of properties, or both.
- #
- # detailsKey => uniquely identifies a property via concatenation of proxy
- # id and property name, e.g. '476:Opacity'
- # name => property name
- # panelVis => value of 'panel_visibilty' attribute on element
- # size => number of elements to set for this property
- # inPropGroup => whether or not this property is inside a 'PropertyGroup' element
- # group => name of group this property belongs to (could be other than 'root',
- # even if 'inPropGroup' is false, due to the grouping we impose
- # when we encounter a ProxyProperty with Hints: 'ProxyEditorPropertyWidget')
- # parentGroup => name of parent group (could be None for root level property)
- # --------------------------------------------------------------------------
- def trackProperty(
- self,
- detailsKey,
- name,
- panelVis,
- panelVisQualifier,
- size,
- inPropGroup,
- group,
- parentGroup,
- belongsToProxyProperty=None,
- ):
- if detailsKey not in self.propertyDetailsMap:
- self.propertyDetailsMap[detailsKey] = {
- "type": name,
- "panelVis": panelVis,
- "panelVisQualifier": panelVisQualifier,
- "size": size,
- "group": group,
- "parentGroup": parentGroup,
- }
- if belongsToProxyProperty:
- if belongsToProxyProperty not in self.proxyPropertyDependents:
- self.proxyPropertyDependents[belongsToProxyProperty] = []
- self.proxyPropertyDependents[belongsToProxyProperty].append(detailsKey)
- # If this property belongs to a proxy property, and if that property is marked
- # as advanced, then this property should be marked advanced too.
- if (
- self.propertyDetailsMap[belongsToProxyProperty]["panelVis"]
- == "advanced"
- ):
- self.propertyDetailsMap[detailsKey]["panelVis"] = "advanced"
- if inPropGroup:
- self.propertyDetailsMap[detailsKey]["group"] = group
- self.propertyDetailsMap[detailsKey]["parentGroup"] = parentGroup
- if panelVis is not None:
- self.propertyDetailsMap[detailsKey]["panelVis"] = panelVis
- if panelVisQualifier is not None:
- self.propertyDetailsMap[detailsKey][
- "panelVisQualifier"
- ] = panelVisQualifier
- else:
- if (
- panelVis is not None
- and self.propertyDetailsMap[detailsKey]["panelVis"] is None
- ):
- self.propertyDetailsMap[detailsKey]["panelVis"] = panelVis
- if (
- panelVisQualifier is not None
- and self.propertyDetailsMap[detailsKey]["panelVisQualifier"] is None
- ):
- self.propertyDetailsMap[detailsKey][
- "panelVisQualifier"
- ] = panelVisQualifier
- if detailsKey not in self.orderedNameList:
- self.orderedNameList.append(detailsKey)
- else:
- # Do already have property in ordered list, but because we're in a
- # PropertyGroup, we want to override the previous order
- if inPropGroup:
- idx = self.orderedNameList.index(detailsKey)
- self.orderedNameList.pop(idx)
- self.orderedNameList.append(detailsKey)
- # Additionally, if the property we are now re-ordering is a ProxyProperty, then
- # we also need to grab any properties of the sub proxies of it's proxy list domain
- # and move all of those down as well
- if detailsKey in self.proxyPropertyDependents:
- subProxiesProps = self.proxyPropertyDependents[detailsKey]
- for subProxyPropKey in subProxiesProps:
- idx = self.orderedNameList.index(subProxyPropKey)
- self.orderedNameList.pop(idx)
- self.orderedNameList.append(subProxyPropKey)
- # --------------------------------------------------------------------------
- # Gather information from the xml associated with a proxy and properties.
- # --------------------------------------------------------------------------
- def processXmlElement(
- self,
- proxy,
- xmlElement,
- inPropGroup,
- group,
- parentGroup,
- belongsToProxyProperty=None,
- ):
- proxyId = proxy.GetGlobalIDAsString()
- nbChildren = xmlElement.GetNumberOfNestedElements()
- for i in range(nbChildren):
- xmlChild = xmlElement.GetNestedElement(i)
- name = xmlChild.GetName()
- nameAttr = xmlChild.GetAttribute("name")
- detailsKey = proxyId + ":" + str(nameAttr)
- infoOnly = xmlChild.GetAttribute("information_only")
- isInternal = xmlChild.GetAttribute("is_internal")
- panelVis = xmlChild.GetAttribute("panel_visibility")
- numElts = xmlChild.GetAttribute("number_of_elements")
- panelVisQualifier = None
- if self.proxyIsRepresentation:
- panelVisQualifier = xmlChild.GetAttribute(
- "panel_visibility_default_for_representation"
- )
- size = -1
- informationOnly = 0
- internal = 0
- # Check for attributes that might only exist on parent xml
- parentQueryList = []
- if numElts is None:
- parentQueryList.append("number_of_elements")
- else:
- size = int(numElts)
- if infoOnly is None:
- parentQueryList.append("information_only")
- else:
- informationOnly = int(infoOnly)
- if isInternal is None:
- parentQueryList.append("is_internal")
- else:
- internal = int(isInternal)
- if panelVis is None:
- parentQueryList.append("panel_visibility")
- if self.proxyIsRepresentation:
- if panelVisQualifier is None:
- parentQueryList.append(
- "panel_visibility_default_for_representation"
- )
- # Try to retrieve those attributes from parent xml (implicit assumption is this is a property
- # which actually belongs to a subproxy)
- parentAttrs = {}
- if len(parentQueryList) > 0:
- propInstance = proxy.GetProperty(nameAttr)
- if propInstance:
- parentAttrs = self.extractParentAttributes(
- propInstance, parentQueryList
- )
- if (
- "number_of_elements" in parentAttrs
- and parentAttrs["number_of_elements"]
- ):
- size = int(parentAttrs["number_of_elements"])
- if "information_only" in parentAttrs and parentAttrs["information_only"]:
- informationOnly = int(parentAttrs["information_only"])
- if "is_internal" in parentAttrs and parentAttrs["is_internal"]:
- internal = int(parentAttrs["is_internal"])
- if "panel_visibility" in parentAttrs:
- panelVis = parentAttrs["panel_visibility"]
- if "panel_visibility_default_for_representation" in parentAttrs:
- panelVisQualifier = parentAttrs[
- "panel_visibility_default_for_representation"
- ]
- # Now decide whether we should filter out this property
- if (
- (
- (name.endswith("Property") and panelVis == "never")
- or informationOnly == 1
- or internal == 1
- )
- and nameAttr not in self.alwaysIncludeProperties
- ) or nameAttr in self.alwaysExcludeProperties:
- self.debug(
- "Filtering out property "
- + str(nameAttr)
- + " because panelVis is never, infoOnly is 1, isInternal is 1, or "
- + str(nameAttr)
- + " is an ALWAYS EXCLUDE property"
- )
- continue
- if name == "Hints":
- self.debug(" ((((((((((Got the hints))))))))) ")
- for j in range(xmlChild.GetNumberOfNestedElements()):
- hintChild = xmlChild.GetNestedElement(j)
- if hintChild.GetName() == "Visibility":
- replaceInputAttr = hintChild.GetAttribute("replace_input")
- if replaceInputAttr:
- self.propertyDetailsMap["specialHints"] = {
- "replaceInput": int(replaceInputAttr)
- }
- self.debug(
- " Replace input value: "
- + str(replaceInputAttr)
- )
- elif name == "ProxyProperty" or name == "InputProperty":
- self.debug(str(nameAttr) + " is a proxy property")
- self.trackProperty(
- detailsKey,
- name,
- panelVis,
- panelVisQualifier,
- size,
- inPropGroup,
- group,
- parentGroup,
- belongsToProxyProperty,
- )
- foundProxyListDomain = self.getProxyListFromProperty(
- proxy, xmlChild, inPropGroup, group, parentGroup, detailsKey
- )
- if foundProxyListDomain == True:
- self.propertyDetailsMap[detailsKey]["size"] = 1
- elif name.endswith("Property"):
- self.trackProperty(
- detailsKey,
- name,
- panelVis,
- panelVisQualifier,
- size,
- inPropGroup,
- group,
- parentGroup,
- belongsToProxyProperty,
- )
- else:
- # Anything else, we recursively process the element, with special handling
- # in the case that the element is a PropertyGroup
- if name == "PropertyGroup":
- newGroup = group
- newParentGroup = parentGroup
- if self.respectPropertyGroups:
- typeAttr = xmlChild.GetAttribute("type")
- labelAttr = xmlChild.GetAttribute("label")
- panelWidgetAttr = None
- if not labelAttr:
- panelWidgetAttr = xmlChild.GetAttribute("panel_widget")
- labelAttr = panelWidgetAttr
- if not labelAttr:
- labelAttr = "Unlabeled Property Group (%s)" % proxyId
- newGroup = "%s:%s" % (proxyId, labelAttr)
- self.groupDetailsMap[newGroup] = {
- "groupName": labelAttr,
- "groupWidget": "PropertyGroup",
- "groupVisibility": panelVis,
- "groupType": typeAttr,
- }
- newParentGroup = group
- self.processXmlElement(
- proxy,
- xmlChild,
- True,
- newGroup,
- newParentGroup,
- belongsToProxyProperty,
- )
- else:
- self.processXmlElement(
- proxy,
- xmlChild,
- inPropGroup,
- group,
- parentGroup,
- belongsToProxyProperty,
- )
- # --------------------------------------------------------------------------
- # Entry point for the xml processing methods.
- # --------------------------------------------------------------------------
- def getProxyXmlDefinitions(self, proxy):
- self.orderedNameList = []
- self.propertyDetailsMap = {}
- self.groupDetailsMap = {}
- self.proxyPropertyDependents = {}
- self.proxyIsRepresentation = proxy.GetXMLGroup() == "representations"
- self.groupDetailsMap["root"] = {"groupName": "root"}
- xmlElement = self.getProxyDefinition(proxy.GetXMLGroup(), proxy.GetXMLName())
- self.processXmlElement(proxy, xmlElement, False, "root", None)
- self.debug(
- "Length of final ordered property list: " + str(len(self.orderedNameList))
- )
- for propName in self.orderedNameList:
- propRec = self.propertyDetailsMap[propName]
- self.debug(
- "Property "
- + str(propName)
- + ", type = "
- + str(propRec["type"])
- + ", pvis = "
- + str(propRec["panelVis"])
- + ", size = "
- + str(propRec["size"])
- )
- # --------------------------------------------------------------------------
- # Helper function to fill in all the properties of a proxy. Delegates
- # properties with specific domain types to other helpers.
- # --------------------------------------------------------------------------
- def fillPropertyList(self, proxy_id, propertyList, dependency=None):
- proxy = self.mapIdToProxy(proxy_id)
- if proxy:
- for property in proxy.ListProperties():
- prop = proxy.GetProperty(property)
- propertyName = prop.Name
- displayName = prop.GetXMLLabel()
- if propertyName in ["Refresh", "Input"] or propertyName.__contains__(
- "Info"
- ):
- continue
- pythonProp = servermanager._wrap_property(proxy, prop)
- propJson = {"name": propertyName, "id": proxy_id, "label": displayName}
- if dependency is not None:
- propJson["dependency"] = dependency
- if type(prop) == ProxyProperty or type(prop) == InputProperty:
- proxyListDomain = prop.FindDomain("vtkSMProxyListDomain")
- if proxyListDomain:
- # Set the value of this ProxyProperty
- propJson["value"] = prop.GetProxy(0).GetXMLLabel()
- # Now recursively fill in properties of this proxy property
- for i in range(proxyListDomain.GetNumberOfProxies()):
- subProxy = proxyListDomain.GetProxy(i)
- subProxyId = subProxy.GetGlobalIDAsString()
- depStr = (
- proxy_id
- + ":"
- + propertyName
- + ":"
- + subProxy.GetXMLLabel()
- + ":1"
- )
- self.fillPropertyList(subProxyId, propertyList, depStr)
- else:
- # The value of this ProxyProperty is the list of proxy ids
- value = []
- for i in range(prop.GetNumberOfProxies()):
- p = prop.GetProxy(i)
- value.append(p.GetGlobalIDAsString())
- propJson["value"] = value
- else:
- # For everything but ProxyProperty, we should just be able to call GetData()
- try:
- propJson["value"] = prop.GetData()
- except AttributeError as attrErr:
- self.debug(
- "Property "
- + propertyName
- + " has no GetData() method, skipping"
- )
- continue
- # One exception is properties which have enumeration domain, in which case we substitute
- # the numeric value for the enum text value.
- enumDomain = prop.FindDomain("vtkSMEnumerationDomain")
- if enumDomain:
- for entryNum in range(enumDomain.GetNumberOfEntries()):
- if enumDomain.GetEntryText(entryNum) == propJson["value"]:
- propJson["value"] = enumDomain.GetEntryValue(entryNum)
- break
- self.debug("Adding a property to the pre-sorted list: " + str(propJson))
- propertyList.append(propJson)
- # --------------------------------------------------------------------------
- # Make a first pass over the properties, building up a couple of data
- # structures we can use to reorder the properties to match the xml order.
- # --------------------------------------------------------------------------
- def reorderProperties(self, rootProxyId, proxyProperties):
- self.getProxyXmlDefinitions(self.mapIdToProxy(rootProxyId))
- propMap = {}
- for prop in proxyProperties:
- propMap[prop["id"] + ":" + prop["name"]] = prop
- orderedList = []
- groupsInfo = []
- for name in self.orderedNameList:
- if name in propMap:
- orderedList.append(propMap[name])
- xmlDetails = self.propertyDetailsMap[name]
- groupsInfo.append(
- {
- "group": xmlDetails["group"],
- "parentGroup": xmlDetails["parentGroup"],
- }
- )
- return orderedList, groupsInfo
- # --------------------------------------------------------------------------
- # Convenience function to set a property value. Can be extended with other
- # special cases as the need arises.
- # --------------------------------------------------------------------------
- def setProperty(self, property, propertyValue):
- if propertyValue == "vtkProcessId":
- property.SetElement(0, propertyValue)
- else:
- property.SetData(propertyValue)
- # --------------------------------------------------------------------------
- # Taken directly from helper.py, applies domains so we can see real values.
- # --------------------------------------------------------------------------
- def applyDomains(self, parentProxy, proxy_id):
- """
- This function is used to apply the domains so that queries for
- specific values like range min and max retrieve something useful.
- """
- proxy = self.mapIdToProxy(proxy_id)
- # Call recursively on each sub-proxy if any
- for property_name in proxy.ListProperties():
- prop = proxy.GetProperty(property_name)
- if prop.IsA("vtkSMProxyProperty"):
- try:
- if len(prop.Available) and prop.GetNumberOfProxies() == 1:
- listdomain = prop.FindDomain("vtkSMProxyListDomain")
- if listdomain:
- for i in xrange(listdomain.GetNumberOfProxies()):
- internal_proxy = listdomain.GetProxy(i)
- self.applyDomains(
- parentProxy, internal_proxy.GetGlobalIDAsString()
- )
- except:
- exc_type, exc_obj, exc_tb = sys.exc_info()
- print("Unexpected error:", exc_type, " line: ", exc_tb.tb_lineno)
- # Reset all properties to leverage domain capabilities
- for prop_name in proxy.ListProperties():
- if prop_name == "Input":
- continue
- try:
- skipReset = False
- prop = proxy.GetProperty(prop_name)
- iter = prop.NewDomainIterator()
- iter.Begin()
- while not iter.IsAtEnd():
- domain = iter.GetDomain()
- iter.Next()
- try:
- if domain.IsA("vtkSMBoundsDomain"):
- domain.SetDomainValues(
- parentProxy.GetDataInformation().GetBounds()
- )
- except AttributeError as attrErr:
- skipReset = True
- print(
- "Caught exception setting domain values in apply_domains:"
- )
- print(attrErr)
- if not skipReset:
- prop.ResetToDefault()
- # Need to UnRegister to handle the ref count from the NewDomainIterator
- iter.UnRegister(None)
- except:
- exc_type, exc_obj, exc_tb = sys.exc_info()
- print("Unexpected error:", exc_type, " line: ", exc_tb.tb_lineno)
- proxy.UpdateVTKObjects()
- # --------------------------------------------------------------------------
- # Helper function to gather information about the bounds, point, and cell
- # data.
- # --------------------------------------------------------------------------
- def getDataInformation(self, proxy):
- # The stuff in here works, but only after you have created the
- # representation.
- dataInfo = proxy.GetDataInformation()
- # Get some basic statistics about the data
- info = {
- "bounds": dataInfo.DataInformation.GetBounds(),
- "points": dataInfo.DataInformation.GetNumberOfPoints(),
- "cells": dataInfo.DataInformation.GetNumberOfCells(),
- "type": dataInfo.DataInformation.GetPrettyDataTypeString(),
- "memory": dataInfo.DataInformation.GetMemorySize(),
- }
- arrayData = []
- # Get information about point data arrays
- pdInfo = proxy.GetPointDataInformation()
- numberOfPointArrays = pdInfo.GetNumberOfArrays()
- for idx in xrange(numberOfPointArrays):
- array = pdInfo.GetArray(idx)
- numComps = array.GetNumberOfComponents()
- typeStr = ParaViewWebProxyManager.VTK_DATA_TYPES[array.GetDataType()]
- data = {
- "name": array.Name,
- "size": numComps,
- "location": "POINTS",
- "type": typeStr,
- }
- magRange = array.GetRange(-1)
- rangeList = []
- if numComps > 1:
- rangeList.append(
- {"name": "Magnitude", "min": magRange[0], "max": magRange[1]}
- )
- for i in range(numComps):
- ithrange = array.GetRange(i)
- rangeList.append(
- {
- "name": array.GetComponentName(i),
- "min": ithrange[0],
- "max": ithrange[1],
- }
- )
- else:
- rangeList.append({"name": "", "min": magRange[0], "max": magRange[1]})
- data["range"] = rangeList
- arrayData.append(data)
- # Get information about cell data arrays
- cdInfo = proxy.GetCellDataInformation()
- numberOfCellArrays = cdInfo.GetNumberOfArrays()
- for idx in xrange(numberOfCellArrays):
- array = cdInfo.GetArray(idx)
- numComps = array.GetNumberOfComponents()
- typeStr = ParaViewWebProxyManager.VTK_DATA_TYPES[array.GetDataType()]
- data = {
- "name": array.Name,
- "size": numComps,
- "location": "CELLS",
- "type": typeStr,
- }
- magRange = array.GetRange(-1)
- rangeList = []
- if numComps > 1:
- rangeList.append(
- {"name": "Magnitude", "min": magRange[0], "max": magRange[1]}
- )
- for i in range(numComps):
- ithrange = array.GetRange(i)
- rangeList.append(
- {
- "name": array.GetComponentName(i),
- "min": ithrange[0],
- "max": ithrange[1],
- }
- )
- else:
- rangeList.append({"name": "", "min": magRange[0], "max": magRange[1]})
- data["range"] = rangeList
- arrayData.append(data)
- # Get field data information
- fdInfo = dataInfo.DataInformation.GetFieldDataInformation()
- numFieldDataArrays = fdInfo.GetNumberOfArrays()
- for idx in xrange(numFieldDataArrays):
- array = fdInfo.GetArrayInformation(idx)
- numComps = array.GetNumberOfComponents()
- typeStr = ParaViewWebProxyManager.VTK_DATA_TYPES[array.GetDataType()]
- data = {
- "name": array.GetName(),
- "size": numComps,
- "location": "FIELDS",
- "type": typeStr,
- }
- rangeList = []
- for i in range(numComps):
- ithrange = array.GetComponentRange(i)
- rangeList.append(
- {
- "name": array.GetComponentName(i),
- "min": ithrange[0],
- "max": ithrange[1],
- }
- )
- data["range"] = rangeList
- arrayData.append(data)
- # Time data
- timeKeeper = next(
- iter(servermanager.ProxyManager().GetProxiesInGroup("timekeeper").values())
- )
- tsVals = timeKeeper.TimestepValues
- if tsVals:
- if isinstance(tsVals, float):
- tsVals = [tsVals]
- tValStrings = []
- for tsVal in tsVals:
- tValStrings.append(str(tsVal))
- info["time"] = tValStrings
- else:
- info["time"] = []
- info["arrays"] = arrayData
- return info
- # --------------------------------------------------------------------------
- # Helper function to gather information about the coloring used by this
- # representation proxy.
- # --------------------------------------------------------------------------
- def getColorInformation(self, proxy):
- """
- Generates a block of color information, given a representation proxy.
- colorBy: {
- 'representation': '652',
- 'scalarBar': 0,
- 'mode': 'color' # 'array'
- 'color': [ 0.5, 1.0, 1.0 ],
- 'array': [ 'POINTS', 'RTData', -1 ],
- 'colorMap': 'Blue to Red'
- }
- """
- colorInfo = {"representation": proxy.GetGlobalIDAsString()}
- if proxy.GetProperty("DiffuseColor"):
- colorInfo["color"] = proxy.GetProperty("DiffuseColor").GetData()
- if proxy.GetProperty("ColorArrayName"):
- colorInfo["mode"] = "array"
- view = self.getView(-1)
- sbVisible = vtkSMPVRepresentationProxy.IsScalarBarVisible(
- proxy.SMProxy, view.SMProxy
- )
- colorInfo["scalarBar"] = 1 if sbVisible else 0
- lut = proxy.GetProperty("LookupTable").GetData()
- component = -1
- if lut and lut.GetProperty("VectorMode").GetData() == "Component":
- component = lut.GetProperty("VectorComponent").GetData()
- colorArrayProp = proxy.GetProperty("ColorArrayName")
- colorInfo["array"] = [
- colorArrayProp.GetAssociation(),
- colorArrayProp.GetArrayName(),
- component,
- ]
- else:
- colorInfo["mode"] = "color"
- colorInfo["scalarBar"] = 0
- return colorInfo
- # --------------------------------------------------------------------------
- # Helper function populate the ui structures list, which corresponds with
- # with the properties list, element by element, and provides information
- # about the ui elements needed to represent each property.
- # --------------------------------------------------------------------------
- def getUiProperties(self, proxyId, proxyProperties):
- """
- Generates an array of objects, parallel to the properties, containing
- the information needed to render ui for each property. Not all of the
- following attributes will always be present.
- ui : [ {
- 'name': 'Clip Type',
- 'advanced': 1, # 0
- 'doc': 'Documentation string for the property',
- 'dependency': '498:ClipFunction:Sphere:1',
- 'values': { 'Plane': '456', 'Box': '457', 'Scalar': '458', 'Sphere': '459' },
- 'type': 'int', # 'float', 'int', 'str', 'proxy', 'input', ...
- 'widget': 'textfield', # 'checkbox', 'textarea', 'list-1', 'list-n'
- 'size': -1, # -1, 0, 2, 3, 6
- 'range': [ { 'min': 0, 'max': 1 }, { 'min': 4, 'max': 7 }, ... ]
- }, ...
- ]
- """
- if self.proxyIsRepresentation:
- reprProxy = self.mapIdToProxy(proxyId)
- reprProp = reprProxy.GetProperty("Representation")
- reprValue = reprProp.GetData()
- uiProps = []
- for proxyProp in proxyProperties:
- uiElt = {}
- pid = proxyProp["id"]
- propertyName = proxyProp["name"]
- proxy = self.mapIdToProxy(pid)
- prop = proxy.GetProperty(propertyName)
- # Get the xml details we already parsed out for this property
- xmlProps = self.propertyDetailsMap[pid + ":" + propertyName]
- # Set a few defaults for the ui elements, which may be overridden
- uiElt["size"] = xmlProps["size"]
- uiElt["widget"] = "textfield"
- uiElt["type"] = self.propertyTypeMap[prop.GetClassName()]
- doc = prop.GetDocumentation()
- if doc:
- uiElt["doc"] = doc.GetDescription()
- uiElt["name"] = proxyProp["label"]
- proxyProp.pop("label", None)
- if "dependency" in proxyProp:
- uiElt["depends"] = proxyProp["dependency"]
- proxyProp.pop("dependency", None)
- if xmlProps["panelVis"] == "advanced":
- uiElt["advanced"] = 1
- if (
- self.proxyIsRepresentation
- and "panelVisQualifier" in xmlProps
- and xmlProps["panelVisQualifier"]
- and xmlProps["panelVisQualifier"].lower() == reprValue.lower()
- ):
- uiElt["advanced"] = 0
- else:
- uiElt["advanced"] = 0
- # Iterate over the property domains until we find the interesting one
- domainIter = prop.NewDomainIterator()
- domainIter.Begin()
- foundDomain = False
- while domainIter.IsAtEnd() == 0 and foundDomain == False:
- nextDomain = domainIter.GetDomain()
- domainName = nextDomain.GetClassName()
- if domainName in self.domainFunctionMap:
- domainFunction = self.domainFunctionMap[domainName]
- foundDomain = domainFunction(proxyProp, xmlProps, uiElt, nextDomain)
- self.debug(
- "Processed "
- + str(propertyName)
- + " as domain "
- + str(domainName)
- )
- domainIter.Next()
- if foundDomain == False:
- self.debug(
- " ~~~|~~~ " + str(propertyName) + " did not have recognized domain"
- )
- domainIter.FastDelete()
- # Now get hints for the property and provide an opportunity to decorate
- hints = prop.GetHints()
- if hints:
- numHints = hints.GetNumberOfNestedElements()
- for idx in range(numHints):
- hint = hints.GetNestedElement(idx)
- if hint.GetName() in self.hintsMap:
- hmap = self.hintsMap[hint.GetName()]
- hintType = hint.GetAttribute("type")
- hmapKey = None
- if hintType and hintType in hmap:
- hmapKey = hintType
- elif "default" in hmap:
- hmapKey = "default"
- if hmapKey:
- # We're interested in decorating based on this hint
- hintFunction = hmap[hmapKey]
- hintFunction(prop, uiElt, hint)
- uiProps.append(uiElt)
- return uiProps
- # --------------------------------------------------------------------------
- # Helper function to restructure the properties so they reflect the grouping
- # encountered when processing the xml.
- # --------------------------------------------------------------------------
- def restructureProperties(self, groupList, propList, uiList):
- props = []
- uis = [] if uiList else None
- groupMap = {"root": {"proxy": props, "ui": uis}}
- for idx in range(len(groupList)):
- groupInfo = groupList[idx]
- group = groupInfo["group"]
- parentGroup = groupInfo["parentGroup"]
- if (
- group != "root"
- and self.groupDetailsMap[group]["groupVisibility"] == "never"
- ):
- self.debug(
- "Culling property (%s) in group (%s) because group has visibility never"
- % (propList[idx]["name"], group)
- )
- continue
- if not group in groupMap:
- groupMap[group] = {"proxy": [], "ui": [] if uiList else None}
- groupMap[parentGroup]["proxy"].append(
- {
- "id": group,
- "name": self.groupDetailsMap[group]["groupName"],
- "value": False,
- "children": groupMap[group]["proxy"],
- }
- )
- if uiList:
- groupMap[parentGroup]["ui"].append(
- {
- "widget": self.groupDetailsMap[group]["groupWidget"],
- "name": self.groupDetailsMap[group]["groupName"],
- "type": self.groupDetailsMap[group]["groupType"],
- "advanced": 1
- if self.groupDetailsMap[group]["groupVisibility"]
- == "advanced"
- else 0,
- "children": groupMap[group]["ui"],
- }
- )
- # Make sure we can find the group ui item we just appended
- groupMap[group]["groupItem"] = groupMap[parentGroup]["ui"][-1]
- pushToFront = False
- if uiList:
- if (
- "groupVisibilityProperty" in self.groupDetailsMap[group]
- and propList[idx]["name"]
- == self.groupDetailsMap[group]["groupVisibilityProperty"]
- ):
- # If a proxy property claimed this property as its visibilty property, we should not
- # allow it to remain marked as 'advanced'.
- uiList[idx]["advanced"] = 0
- pushToFront = True
- if pushToFront:
- groupMap[group]["ui"].insert(0, uiList[idx])
- else:
- groupMap[group]["ui"].append(uiList[idx])
- if pushToFront:
- groupMap[group]["proxy"].insert(0, propList[idx])
- else:
- groupMap[group]["proxy"].append(propList[idx])
- # Before we're done we need to check for groups of properties where every
- # property in the group has a panel_visibilty of 'advanced', in which
- # case, the group should be marked the same.
- if uiList:
- for groupName in groupMap:
- if groupName != "root" and "ui" in groupMap[groupName]:
- groupUiList = groupMap[groupName]["ui"]
- firstUiElt = groupUiList[0]
- groupDependency = False
- if "depends" in firstUiElt and firstUiElt["depends"]:
- groupDependency = True
- advancedGroup = True
- for uiProp in groupUiList:
- if uiProp["advanced"] == 0:
- advancedGroup = False
- if groupDependency and (
- "depends" not in uiProp
- or uiProp["depends"] != firstUiElt["depends"]
- ):
- groupDependency = False
- if advancedGroup:
- groupMap[groupName]["groupItem"]["advanced"] = 1
- if groupDependency:
- groupMap[groupName]["groupItem"]["depends"] = firstUiElt[
- "depends"
- ]
- return {"proxy": props, "ui": uis}
- # --------------------------------------------------------------------------
- # Helper function validates the string we get from the client to make sure
- # it is one of the allowed proxies that has been set up on the server side.
- # --------------------------------------------------------------------------
- def validate(self, functionName):
- if functionName not in self.allowedProxies:
- return None
- else:
- return functionName.strip()
- @exportRpc("pv.proxy.manager.create")
- def create(
- self,
- functionName,
- parentId,
- initialValues={},
- skipDomain=False,
- subProxyValues={},
- ):
- """
- Creates a new filter/source proxy as a child of the specified
- parent proxy. Returns the proxy state for the newly created
- proxy as a JSON object.
- """
- name = self.validate(functionName)
- if not name:
- return {
- "success": False,
- "reason": '"'
- + functionName
- + '" was not valid and could not be evaluated',
- }
- pid = parentId
- parentProxy = self.mapIdToProxy(parentId)
- if parentProxy:
- simple.SetActiveSource(parentProxy)
- else:
- pid = "0"
- # Create new source/filter
- sanitizedInitialValues = sanitizeKeys(initialValues)
- allowed = self.allowedProxies[name]
- newProxy = paraview.simple.__dict__[allowed](**sanitizedInitialValues)
- # Update subproxy values
- if newProxy:
- for subProxyName in subProxyValues:
- subProxy = newProxy.GetProperty(subProxyName).GetData()
- if subProxy:
- for propName in subProxyValues[subProxyName]:
- prop = subProxy.SMProxy.GetProperty(propName)
- value = subProxyValues[subProxyName][propName]
- if isinstance(value, list):
- prop.SetElements(value)
- else:
- prop.SetElements([value])
- subProxy.UpdateVTKObjects()
- # To make WebGL export work
- simple.Show()
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- try:
- if not skipDomain:
- self.applyDomains(parentProxy, newProxy.GetGlobalIDAsString())
- except Exception as inst:
- print("Caught exception applying domains:")
- print(inst)
- return self.get(newProxy.GetGlobalIDAsString())
- @exportRpc("pv.proxy.manager.create.reader")
- def open(self, relativePath):
- """
- Open relative file paths, attempting to use the file extension to select
- from the configured readers.
- """
- fileToLoad = []
- if type(relativePath) == list:
- for file in relativePath:
- validPath = self.getAbsolutePath(file)
- if validPath:
- fileToLoad.append(validPath)
- else:
- validPath = self.getAbsolutePath(relativePath)
- if validPath:
- fileToLoad.append(validPath)
- if len(fileToLoad) == 0:
- return {
- "success": False,
- "reason": "No valid path name " + str(relativePath),
- }
- # Get file extension and look for configured reader
- idx = fileToLoad[0].rfind(".")
- extension = fileToLoad[0][idx + 1 :]
- # Check if we were asked to load a state file
- if extension == "pvsm":
- simple.LoadState(fileToLoad[0])
- newView = simple.Render()
- simple.SetActiveView(newView)
- if self.getApplication():
- self.getApplication().InvokeEvent("ResetActiveView")
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"success": True, "view": newView.GetGlobalIDAsString()}
- readerName = None
- if extension in self.readerFactoryMap:
- readerName = self.readerFactoryMap[extension][0]
- customMethod = self.readerFactoryMap[extension][1]
- useDirectory = self.readerFactoryMap[extension][2]
- # Open the file(s)
- reader = None
- if readerName:
- if useDirectory:
- kw = {customMethod: os.path.dirname(fileToLoad[0])}
- else:
- kw = {customMethod: fileToLoad}
- reader = paraview.simple.__dict__[readerName](**kw)
- else:
- if self.allowUnconfiguredReaders:
- reader = simple.OpenDataFile(fileToLoad)
- else:
- return {
- "success": False,
- "reason": "No configured reader found for "
- + extension
- + " files, and unconfigured readers are not enabled.",
- }
- # Rename the reader proxy
- name = fileToLoad[0].split(os.path.sep)[-1]
- if len(name) > 15:
- name = name[:15] + "*"
- simple.RenameSource(name, reader)
- # Representation, view, and camera setup
- simple.Show()
- simple.Render()
- simple.ResetCamera()
- if self.getApplication():
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"success": True, "id": reader.GetGlobalIDAsString()}
- @exportRpc("pv.proxy.manager.get")
- def get(self, proxyId, ui=True):
- """
- Returns the proxy state for the given proxyId as a JSON object.
- """
- proxyProperties = []
- proxyId = str(proxyId)
- self.fillPropertyList(proxyId, proxyProperties)
- proxyProperties, groupsInfo = self.reorderProperties(proxyId, proxyProperties)
- proxyJson = {"id": proxyId}
- # Perform costly request only when needed
- uiProperties = None
- if ui:
- uiProperties = self.getUiProperties(proxyId, proxyProperties)
- # Now restructure the flat, ordered lists to reflect grouping
- restructuredProperties = self.restructureProperties(
- groupsInfo, proxyProperties, uiProperties
- )
- proxyJson["properties"] = restructuredProperties["proxy"]
- if ui:
- proxyJson["ui"] = restructuredProperties["ui"]
- if "specialHints" in self.propertyDetailsMap:
- proxyJson["hints"] = self.propertyDetailsMap["specialHints"]
- proxy = self.mapIdToProxy(proxyId)
- if proxy.SMProxy.IsA("vtkSMRepresentationProxy") == 1:
- colorInfo = self.getColorInformation(proxy)
- proxyJson["colorBy"] = colorInfo
- elif proxy.SMProxy.IsA("vtkSMSourceProxy") == 1:
- dataInfo = self.getDataInformation(proxy)
- proxyJson["data"] = dataInfo
- return proxyJson
- @exportRpc("pv.proxy.manager.find.id")
- def findProxyId(self, groupName, proxyName):
- proxyMgr = servermanager.ProxyManager()
- proxy = proxyMgr.GetProxy(groupName, proxyName)
- proxyId = proxy.SMProxy.GetGlobalIDAsString()
- return proxyId
- @exportRpc("pv.proxy.manager.update")
- def update(self, propertiesList):
- """
- Takes a list of properties and updates the corresponding proxies.
- """
- failureList = []
- for newPropObj in propertiesList:
- try:
- proxyId = str(newPropObj["id"])
- proxy = self.mapIdToProxy(proxyId)
- propertyName = servermanager._make_name_valid(newPropObj["name"])
- propertyValue = helper.removeUnicode(newPropObj["value"])
- proxyProperty = servermanager._wrap_property(
- proxy, proxy.GetProperty(propertyName)
- )
- self.setProperty(proxyProperty, propertyValue)
- except:
- failureList.append(
- "Unable to set property for proxy "
- + proxyId
- + ": "
- + str(newPropObj)
- )
- if len(failureList) > 0:
- return {"success": False, "errorList": failureList}
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"success": True}
- @exportRpc("pv.proxy.manager.delete")
- def delete(self, proxyId):
- """
- Checks if the proxy can be deleted (it is not the input to another
- proxy), and then deletes it if so. Returns True if the proxy was
- deleted, False otherwise.
- """
- proxy = self.mapIdToProxy(proxyId)
- canDelete = True
- pid = "0"
- for proxyJson in self.list()["sources"]:
- if proxyJson["parent"] == proxyId:
- canDelete = False
- elif proxyJson["id"] == proxyId:
- pid = proxyJson["parent"]
- if proxy is not None and canDelete is True:
- simple.Delete(proxy)
- self.updateScalarBars()
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"success": 1, "id": pid}
- self.getApplication().InvokeEvent("UpdateEvent")
- return {"success": 0, "id": "0"}
- @exportRpc("pv.proxy.manager.list")
- def list(self, viewId=None):
- """Returns the current proxy list, specifying for each proxy it's
- name, id, and parent (input) proxy id. A 'parent' of '0' means
- the proxy has no input.
- """
- proxies = servermanager.ProxyManager().GetProxiesInGroup("sources")
- viewProxy = self.getView(viewId)
- proxyObj = {"view": viewProxy.GetGlobalIDAsString()}
- proxyList = []
- for key in proxies:
- listElt = {}
- listElt["name"] = key[0]
- listElt["id"] = key[1]
- proxy = proxies[key]
- rep = simple.GetRepresentation(proxy=proxy, view=viewProxy)
- listElt["rep"] = rep.GetGlobalIDAsString()
- listElt["visible"] = rep.Visibility
- listElt["parent"] = "0"
- if hasattr(proxy, "Input") and proxy.Input:
- inputProp = proxy.Input
- if hasattr(inputProp, "GetNumberOfProxies"):
- numProxies = inputProp.GetNumberOfProxies()
- if numProxies > 1:
- listElt["multiparent"] = numProxies
- for inputIdx in range(numProxies):
- proxyId = inputProp.GetProxy(inputIdx).GetGlobalIDAsString()
- if inputIdx == 0:
- listElt["parent"] = proxyId
- else:
- listElt["parent_%d" % inputIdx] = proxyId
- elif numProxies == 1:
- listElt["parent"] = inputProp.GetProxy(0).GetGlobalIDAsString()
- else:
- listElt["parent"] = inputProp.GetGlobalIDAsString()
- proxyList.append(listElt)
- proxyObj["sources"] = proxyList
- return proxyObj
- @exportRpc("pv.proxy.manager.available")
- def available(self, typeOfProxy):
- """Returns a list of the available sources or filters, depending on the
- argument typeOfProxy, which can be either 'filters' or 'sources'. If
- typeOfProxy is anything other than 'filter' or 'source', the empty
- list will be returned.
- Any attempt to create a source or filter not returned by this method
- will result in an error response. The returned list has the following
- form:
- [ 'Wavelet', 'Cone', 'Sphere', ... ]
- or
- [ 'Contour', 'Clip', 'Slice', ... ]
- """
- return self.availableList[typeOfProxy]
- # =============================================================================
- #
- # Key/Value Store Protocol
- #
- # =============================================================================
- class ParaViewWebKeyValuePairStore(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- super(ParaViewWebKeyValuePairStore, self).__init__()
- self.keyValStore = {}
- # RpcName: storeKeyPair => 'pv.keyvaluepair.store'
- @exportRpc("pv.keyvaluepair.store")
- def storeKeyPair(self, key, value):
- self.keyValStore[key] = value
- # RpcName: retrieveKeyPair => 'pv.keyvaluepair.retrieve'
- @exportRpc("pv.keyvaluepair.retrieve")
- def retrieveKeyPair(self, key):
- if key in self.keyValStore:
- return self.keyValStore[key]
- else:
- return None
- # =============================================================================
- #
- # Save Data Protocol
- #
- # =============================================================================
- class ParaViewWebSaveData(ParaViewWebProtocol):
- def __init__(self, baseSavePath="", **kwargs):
- super(ParaViewWebSaveData, self).__init__()
- savePath = baseSavePath
- if baseSavePath.find("=") >= 0:
- parts = baseSavePath.split("=")
- savePath = parts[-1]
- self.baseSavePath = savePath
- self.imageExtensions = ["jpg", "png"]
- self.dataExtensions = [
- "vtk",
- "ex2",
- "vtp",
- "vti",
- "vtu",
- "vtm",
- "vts",
- "vtr",
- "csv",
- ]
- self.stateExtensions = ["pvsm"]
- # RpcName: saveData => 'pv.data.save'
- @exportRpc("pv.data.save")
- def saveData(self, filePath, options=None):
- """
- Save some data on the server side. Can save data from a proxy, a
- screenshot, or else the current state. Options may be different
- depending on the type of data to be save. Saving a screenshot
- can take an optional size array indicating the desired width and
- height of the saved image. Saving data can take an optional proxy
- id specifying which filter or source to save output from.
- options = {
- 'proxyId': '426',
- 'size': [ <imgWidth>, <imgHeight> ]
- }
- """
- # make sure file path exists
- fullPath = os.path.join(self.baseSavePath, filePath)
- if not os.path.exists(os.path.dirname(fullPath)):
- os.makedirs(os.path.dirname(fullPath))
- # Get file extension and look for configured reader
- idx = filePath.rfind(".")
- extension = filePath[idx + 1 :]
- # Now find out what kind of save it is, and do it
- if extension in self.imageExtensions:
- if options and "size" in options:
- # Get the active view
- v = self.getView(-1)
- # Keep track of current size
- cw = v.ViewSize[0]
- ch = v.ViewSize[1]
- # Resize to desired screenshot size
- v.ViewSize = [
- int(str(options["size"][0])),
- int(str(options["size"][1])),
- ]
- simple.Render()
- # Save actual screenshot
- simple.SaveScreenshot(fullPath)
- # Put the size back the way we found it
- v.ViewSize = [cw, ch]
- simple.Render()
- else:
- simple.SaveScreenshot(fullPath)
- elif extension in self.dataExtensions:
- proxy = None
- if options and "proxyId" in options:
- proxyId = str(options["proxyId"])
- proxy = self.mapIdToProxy(proxyId)
- simple.SaveData(fullPath, proxy)
- elif extension in self.stateExtensions:
- # simple.SaveState(fullPath) # FIXME: Fixed after 4.3.1
- servermanager.SaveState(fullPath)
- else:
- msg = "ERROR: Unrecognized extension (%s) in relative path: %s" % (
- extension,
- filePath,
- )
- print(msg)
- return {"success": False, "message": msg}
- return {"success": True}
- # =============================================================================
- #
- # Handle remote Connection
- #
- # =============================================================================
- class ParaViewWebRemoteConnection(ParaViewWebProtocol):
- # RpcName: connect => pv.remote.connect
- @exportRpc("pv.remote.connect")
- def connect(self, options):
- """
- Creates a connection to a remote pvserver.
- Expect an option argument which should override any of
- those default properties::
- {
- 'host': 'localhost',
- 'port': 11111,
- 'rs_host': None,
- 'rs_port': 11111
- }
- """
- ds_host = "localhost"
- ds_port = 11111
- rs_host = None
- rs_port = 11111
- if options:
- if "host" in options:
- ds_host = options["host"]
- if "port" in options:
- ds_port = options["port"]
- if "rs_host" in options:
- rs_host = options["rs_host"]
- if "rs_port" in options:
- rs_host = options["rs_port"]
- simple.Connect(ds_host, ds_port, rs_host, rs_port)
- # RpcName: reverseConnect => pv.remote.reverse.connect
- @exportRpc("pv.remote.reverse.connect")
- def reverseConnect(self, port=11111):
- """
- Create a reverse connection to a server. Listens on port and waits for
- an incoming connection from the server.
- """
- simple.ReverseConnect(port)
- # RpcName: pvDisconnect => pv.remote.disconnect
- @exportRpc("pv.remote.disconnect")
- def pvDisconnect(self, message):
- """Free the current active session"""
- simple.Disconnect()
- # =============================================================================
- #
- # Handle remote Connection at startup
- #
- # =============================================================================
- class ParaViewWebStartupRemoteConnection(ParaViewWebProtocol):
- connected = False
- def __init__(
- self, dsHost=None, dsPort=11111, rsHost=None, rsPort=22221, rcPort=-1, **kwargs
- ):
- super(ParaViewWebStartupRemoteConnection, self).__init__()
- if not ParaViewWebStartupRemoteConnection.connected and dsHost:
- ParaViewWebStartupRemoteConnection.connected = True
- simple.Connect(dsHost, dsPort, rsHost, rsPort)
- elif not ParaViewWebStartupRemoteConnection.connected and rcPort >= 0:
- ParaViewWebStartupRemoteConnection.connected = True
- simple.ReverseConnect(str(rcPort))
- # =============================================================================
- #
- # Handle plugin loading at startup
- #
- # =============================================================================
- class ParaViewWebStartupPluginLoader(ParaViewWebProtocol):
- loaded = False
- def __init__(self, plugins=None, pathSeparator=":", **kwargs):
- super(ParaViewWebStartupPluginLoader, self).__init__()
- if not ParaViewWebStartupPluginLoader.loaded and plugins:
- ParaViewWebStartupPluginLoader.loaded = True
- for path in plugins.split(pathSeparator):
- simple.LoadPlugin(path)
- # =============================================================================
- #
- # Handle State Loading
- #
- # =============================================================================
- class ParaViewWebStateLoader(ParaViewWebProtocol):
- def __init__(self, state_path=None, **kwargs):
- super(ParaViewWebStateLoader, self).__init__()
- if state_path and state_path[-5:] == ".pvsm":
- self.loadState(state_path)
- # RpcName: loadState => pv.loader.state
- @exportRpc("pv.loader.state")
- def loadState(self, state_file):
- """
- Load a state file and return the list of view ids
- """
- simple.LoadState(state_file)
- ids = []
- for view in simple.GetRenderViews():
- ids.append(view.GetGlobalIDAsString())
- self.getApplication().InvokeEvent("UpdateEvent")
- return ids
- # =============================================================================
- #
- # Handle Server File Listing
- #
- # =============================================================================
- class ParaViewWebFileListing(ParaViewWebProtocol):
- def __init__(
- self,
- basePath,
- name,
- excludeRegex=r"^\.|~$|^\$",
- groupRegex=r"[0-9]+\.",
- **kwargs
- ):
- """
- Configure the way the WebFile browser will expose the server content.
- - basePath: specify the base directory (or directories) that we should start with, if this
- parameter takes the form: "name1=path1|name2=path2|...", then we will treat this as the
- case where multiple data directories are required. In this case, each top-level directory
- will be given the name associated with the directory in the argument.
- - name: Name of that base directory that will show up on the web
- - excludeRegex: Regular expression of what should be excluded from the list of files/directories
- """
- self.setBaseDirectory(basePath)
- self.rootName = self.overrideDataDirKey or name
- self.pattern = re.compile(excludeRegex)
- self.gPattern = re.compile(groupRegex)
- pxm = simple.servermanager.ProxyManager()
- self.directory_proxy = pxm.NewProxy("misc", "ListDirectory")
- self.fileList = simple.servermanager.VectorProperty(
- self.directory_proxy, self.directory_proxy.GetProperty("FileList")
- )
- self.directoryList = simple.servermanager.VectorProperty(
- self.directory_proxy, self.directory_proxy.GetProperty("DirectoryList")
- )
- def handleSingleRoot(self, baseDirectory, relativeDir, startPath=None):
- path = startPath or [self.rootName]
- if len(relativeDir) > len(self.rootName):
- relativeDir = relativeDir[len(self.rootName) + 1 :]
- path += relativeDir.replace("\\", "/").split("/")
- currentPath = os.path.normpath(os.path.join(baseDirectory, relativeDir))
- normBase = os.path.normpath(baseDirectory)
- if not currentPath.startswith(normBase):
- print("### CAUTION ==========================================")
- print(" Attempt to get to another root path ###")
- print(" => Requested:", relativeDir)
- print(" => BaseDir:", normBase)
- print(" => Computed path:", currentPath)
- print("### CAUTION ==========================================")
- currentPath = normBase
- self.directory_proxy.List(currentPath)
- self.directory_proxy.UpdatePropertyInformation()
- # build file/dir lists
- files = []
- if len(self.fileList) > 1:
- for f in self.fileList.GetData():
- if not re.search(self.pattern, f):
- files.append({"label": f})
- elif len(self.fileList) == 1 and not re.search(
- self.pattern, self.fileList.GetData()
- ):
- files.append({"label": self.fileList.GetData()})
- dirs = []
- if len(self.directoryList) > 1:
- for d in self.directoryList.GetData():
- if not re.search(self.pattern, d):
- dirs.append(d)
- elif len(self.directoryList) == 1 and not re.search(
- self.pattern, self.directoryList.GetData()
- ):
- dirs.append(self.directoryList.GetData())
- result = {
- "label": relativeDir,
- "files": files,
- "dirs": dirs,
- "groups": [],
- "path": path,
- }
- if relativeDir == ".":
- result["label"] = self.rootName
- # Filter files to create groups. Dicts are not orderable in Py3 - supply a key function.
- files.sort(key=lambda x: x["label"])
- groups = result["groups"]
- groupIdx = {}
- filesToRemove = []
- for file in files:
- fileSplit = re.split(self.gPattern, file["label"])
- if len(fileSplit) == 2:
- filesToRemove.append(file)
- gName = "*.".join(fileSplit)
- if gName in groupIdx:
- groupIdx[gName]["files"].append(file["label"])
- else:
- groupIdx[gName] = {"files": [file["label"]], "label": gName}
- groups.append(groupIdx[gName])
- for file in filesToRemove:
- gName = "*.".join(re.split(self.gPattern, file["label"]))
- if len(groupIdx[gName]["files"]) > 1:
- files.remove(file)
- else:
- groups.remove(groupIdx[gName])
- return result
- def handleMultiRoot(self, relativeDir):
- if relativeDir == ".":
- return {
- "label": self.rootName,
- "files": [],
- "dirs": list(self.baseDirectoryMap),
- "groups": [],
- "path": [self.rootName],
- }
- pathList = relativeDir.replace("\\", "/").split("/")
- currentBaseDir = self.baseDirectoryMap[pathList[1]]
- if len(pathList) == 2:
- return self.handleSingleRoot(currentBaseDir, ".", pathList)
- else: # must be greater than 2
- return self.handleSingleRoot(
- currentBaseDir, "/".join([pathList[0]] + pathList[2:]), pathList[0:2]
- )
- # RpcName: listServerDirectory => file.server.directory.list
- @exportRpc("file.server.directory.list")
- def listServerDirectory(self, relativeDir="."):
- """
- RPC Callback to list a server directory relative to the basePath
- provided at start-up.
- """
- if self.multiRoot == True:
- return self.handleMultiRoot(relativeDir)
- else:
- return self.handleSingleRoot(self.baseDirectory, relativeDir)
- # =============================================================================
- #
- # Handle Data Selection
- #
- # =============================================================================
- class ParaViewWebSelectionHandler(ParaViewWebProtocol):
- def __init__(self, **kwargs):
- self.active_view = None
- self.previous_interaction = -1
- self.selection_type = -1
- # RpcName: startSelection => pv.selection.start
- @exportRpc("pv.selection.start")
- def startSelection(self, viewId, selectionType):
- """
- Method used to initialize an interactive selection
- """
- self.active_view = self.getView(viewId)
- if self.active_view.IsSelectionAvailable():
- self.previous_interaction = self.active_view.InteractionMode
- self.active_view.InteractionMode = (
- vtkPVRenderView.INTERACTION_MODE_SELECTION
- )
- else:
- self.active_view = None
- # RpcName: endSelection => pv.selection.end
- @exportRpc("pv.selection.end")
- def endSelection(self, area, extract):
- """
- Method used to finalize an interactive selection by providing
- the [ startPointX, startPointY, endPointX, endPointY ] area
- where (0,0) match the lower left corner of the pixel screen.
- """
- if self.active_view:
- self.active_view.InteractionMode = self.previous_interaction
- representations = vtkCollection()
- sources = vtkCollection()
- if self.selection_type == 0:
- self.active_view.SelectSurfacePoints(
- area, representations, sources, False
- )
- elif self.selection_type == 1:
- self.active_view.SelectSurfaceCells(
- area, representations, sources, False
- )
- elif self.selection_type == 2:
- self.active_view.SelectFrustumPoints(
- area, representations, sources, False
- )
- elif self.selection_type == 3:
- self.active_view.SelectFrustumCells(
- area, representations, sources, False
- )
- else:
- self.active_view.SelectSurfacePoints(
- area, representations, sources, False
- )
- # Don't know what to do if more than one representation/source
- if (
- representations.GetNumberOfItems() == sources.GetNumberOfItems()
- and sources.GetNumberOfItems() == 1
- ):
- # We are good for selection
- rep = servermanager._getPyProxy(representations.GetItemAsObject(0))
- selection = servermanager._getPyProxy(sources.GetItemAsObject(0))
- if extract:
- extract = simple.ExtractSelection(
- Input=rep.Input, Selection=selection
- )
- simple.Show(extract)
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- else:
- rep.Input.SMProxy.SetSelectionInput(0, selection.SMProxy, 0)
- # =============================================================================
- #
- # Handle Data Export
- #
- # =============================================================================
- class ParaViewWebExportData(ParaViewWebProtocol):
- def __init__(self, basePath, **kwargs):
- self.base_export_path = basePath
- # RpcName: exportData => pv.export.data
- @exportRpc("pv.export.data")
- def exportData(self, proxy_id, path):
- proxy = self.mapIdToProxy(proxy_id)
- fullpath = str(os.path.join(self.base_export_path, str(path)))
- if fullpath.index(".vtk") == -1:
- fullpath += ".vtk"
- parentDir = os.path.dirname(fullpath)
- if not os.path.exists(parentDir):
- os.makedirs(parentDir)
- if proxy:
- writer = simple.DataSetWriter(Input=proxy, FileName=fullpath)
- writer.UpdatePipeline()
- del writer
- # =============================================================================
- #
- # Protocols useful only for testing purposes
- #
- # =============================================================================
- class ParaViewWebTestProtocols(ParaViewWebProtocol):
- # RpcName: clearAll => pv.test.reset
- @exportRpc("pv.test.reset")
- def clearAll(self):
- simple.Disconnect()
- simple.Connect()
- view = simple.GetRenderView()
- simple.SetActiveView(view)
- simple.Render()
- self.getApplication().InvokeEvent("UpdateEvent")
- # RpcName: getColoringInfo => pv.test.color.info.get
- @exportRpc("pv.test.color.info.get")
- def getColoringInfo(self, proxyId):
- proxy = self.mapIdToProxy(proxyId)
- rep = simple.GetRepresentation(proxy)
- lut = rep.LookupTable
- arrayInfo = rep.GetArrayInformationForColorArray()
- arrayName = arrayInfo.GetName()
- return {
- "arrayName": arrayName,
- "vectorMode": lut.VectorMode,
- "vectorComponent": lut.VectorComponent,
- }
- @exportRpc("pv.test.repr.get")
- def getReprFromSource(self, srcProxyId):
- proxy = self.mapIdToProxy(srcProxyId)
- rep = simple.GetRepresentation(proxy)
- return {"reprProxyId": rep.GetGlobalIDAsString()}
- # =============================================================================
- #
- # Handle Widget Representation
- #
- # =============================================================================
- def _line_update_widget(self, widget):
- widget.Point1WorldPosition = self.Point1
- widget.Point2WorldPosition = self.Point2
- def _line_widget_update(self, obj, event):
- self.GetProperty("Point1").Copy(obj.GetProperty("Point1WorldPositionInfo"))
- self.GetProperty("Point2").Copy(obj.GetProperty("Point2WorldPositionInfo"))
- self.UpdateVTKObjects()
- def _plane_update_widget(self, widget):
- widget.GetProperty("OriginInfo").SetData(self.Origin)
- widget.Origin = self.Origin
- widget.Normal = self.Normal
- widget.UpdateVTKObjects()
- def _plane_widget_update(self, obj, event):
- self.GetProperty("Origin").Copy(obj.GetProperty("OriginInfo"))
- self.GetProperty("Normal").Copy(obj.GetProperty("NormalInfo"))
- self.UpdateVTKObjects()
- _hide_plane(obj)
- def _draw_plane(obj, event):
- obj.GetProperty("DrawPlane").SetElement(0, 1)
- obj.UpdateVTKObjects()
- def _hide_plane(obj):
- obj.GetProperty("DrawPlane").SetElement(0, 0)
- obj.UpdateVTKObjects()
- class ParaViewWebWidgetManager(ParaViewWebProtocol):
- # RpcName: addRuler => pv.widgets.ruler.add
- @exportRpc("pv.widgets.ruler.add")
- def addRuler(self, view_id=-1):
- proxy = simple.Ruler(Point1=[-1.0, -1.0, -1.0], Point2=[1.0, 1.0, 1.0])
- self.createWidgetRepresentation(proxy.GetGlobalID(), view_id)
- return proxy.GetGlobalIDAsString()
- # RpcName: createWidgetRepresentation => pv.widgets.representation.create
- @exportRpc("pv.widgets.representation.create")
- def createWidgetRepresentation(self, proxy_id, view_id):
- proxy = self.mapIdToProxy(proxy_id)
- view = self.getView(view_id)
- widgetProxy = None
- # Find the corresponding widget representation
- if proxy.__class__.__name__ == "Plane":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "ImplicitPlaneWidgetRepresentation"
- )
- setattr(proxy.__class__, "UpdateWidget", _plane_update_widget)
- setattr(proxy.__class__, "WidgetUpdate", _plane_widget_update)
- widgetProxy.GetProperty("DrawPlane").SetElement(0, 0)
- widgetProxy.GetProperty("PlaceFactor").SetElement(0, 1.0)
- proxy.UpdateWidget(widgetProxy)
- widgetProxy.AddObserver("StartInteractionEvent", _draw_plane)
- proxy.Observed = widgetProxy
- proxy.ObserverTag = widgetProxy.AddObserver(
- "EndInteractionEvent", proxy.WidgetUpdate
- )
- elif proxy.__class__.__name__ == "Box":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "BoxWidgetRepresentation"
- )
- elif proxy.__class__.__name__ == "Handle":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "HandleWidgetRepresentation"
- )
- elif proxy.__class__.__name__ == "PointSource":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "PointSourceWidgetRepresentation"
- )
- elif (
- proxy.__class__.__name__ == "LineSource"
- or proxy.__class__.__name__ == "HighResolutionLineSource"
- ):
- widgetProxy = self.CreateWidgetRepresentation(
- view, "LineSourceWidgetRepresentation"
- )
- setattr(proxy.__class__, "UpdateWidget", _line_update_widget)
- setattr(proxy.__class__, "WidgetUpdate", _line_widget_update)
- proxy.UpdateWidget(widgetProxy)
- proxy.Observed = widgetProxy
- proxy.ObserverTag = widgetProxy.AddObserver(
- "EndInteractionEvent", proxy.WidgetUpdate
- )
- elif proxy.__class__.__name__ == "Line":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "LineWidgetRepresentation"
- )
- setattr(proxy.__class__, "UpdateWidget", _line_update_widget)
- setattr(proxy.__class__, "WidgetUpdate", _line_widget_update)
- proxy.UpdateWidget(widgetProxy)
- proxy.Observed = widgetProxy
- proxy.ObserverTag = widgetProxy.AddObserver(
- "EndInteractionEvent", proxy.WidgetUpdate
- )
- elif proxy.__class__.__name__ in ["Distance", "Ruler"]:
- widgetProxy = self.CreateWidgetRepresentation(
- view, "DistanceWidgetRepresentation"
- )
- setattr(proxy.__class__, "UpdateWidget", _line_update_widget)
- setattr(proxy.__class__, "WidgetUpdate", _line_widget_update)
- proxy.UpdateWidget(widgetProxy)
- proxy.Observed = widgetProxy
- proxy.ObserverTag = widgetProxy.AddObserver(
- "EndInteractionEvent", proxy.WidgetUpdate
- )
- elif proxy.__class__.__name__ == "Sphere":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "SphereWidgetRepresentation"
- )
- elif proxy.__class__.__name__ == "Spline":
- widgetProxy = self.CreateWidgetRepresentation(
- view, "SplineWidgetRepresentation"
- )
- else:
- print("No widget representation for %s" % proxy.__class__.__name__)
- return widgetProxy.GetGlobalIDAsString()
- def CreateWidgetRepresentation(self, view, name):
- proxy = simple.servermanager.CreateProxy("representations", name, None)
- pythonWrap = simple.servermanager.rendering.__dict__[proxy.GetXMLName()]()
- pythonWrap.UpdateVTKObjects()
- view.Representations.append(pythonWrap)
- pythonWrap.Visibility = 1
- pythonWrap.Enabled = 1
- return pythonWrap
|