Source code for ladybug_vtk.vtkjs.helper

"""This module helps in embedding vtkjs data in Paraview Glance HTML.

This module is originally developed by Kitware and can be found at the following link.
https://github.com/Kitware/VTK/blob/master/Web/Python/vtkmodules/web/vtkjs_helper.py

"""

import base64
import json
import re
import os
import shutil
import sys
import zipfile
import pathlib

try:
    import zlib
    compression = zipfile.ZIP_DEFLATED
except ModuleNotFoundError:
    compression = zipfile.ZIP_STORED


[docs] def convert_directory_to_zip_file( directory_path: str, remove: bool = True, extension='zip', move=True ) -> str: if os.path.isfile(directory_path): return zip_file_path = f'{directory_path}.{extension}' zf = zipfile.ZipFile(zip_file_path, mode='w') try: for dir_name, _, file_list in os.walk(directory_path): for fname in file_list: full_path = pathlib.Path(dir_name, fname) rel_path = full_path.relative_to(directory_path) zf.write( full_path.as_posix(), arcname=rel_path.as_posix(), compress_type=compression ) finally: zf.close() if remove: shutil.rmtree(directory_path) if move: return shutil.move(zip_file_path, directory_path) else: return zip_file_path
[docs] def add_data_to_viewer(data_path, src_html_path=None): template = pathlib.Path( pathlib.Path(__file__).parent, '../assets/PollinationViewer.html' ).resolve().as_posix() src_html_path = src_html_path or template if not os.path.isfile(data_path): raise FileNotFoundError(f'Failed to find {data_path}') if not os.path.exists(src_html_path): raise FileNotFoundError(f'Failed to find source html file: {src_html_path}') dstDir = os.path.dirname(data_path) dstHtmlPath = os.path.join(dstDir, "%s.html" % os.path.basename(data_path)[:-6]) # Extract data as base64 with open(data_path, 'rb') as data: data_content = data.read() base64Content = base64.b64encode(data_content) base64Content = base64Content.decode().replace('\n', '') # Create new output file with open(src_html_path, mode='r', encoding="utf-8") as srcHtml: with open(dstHtmlPath, mode='w', encoding="utf-8") as dstHtml: for line in srcHtml: if "<noscript>You need to enable JavaScript to run this app.</noscript>" in line: strings = line.split( "<noscript>You need to enable JavaScript to run this app.</noscript>", 1) dstHtml.write(strings[0]) dstHtml.write("<script>\n") dstHtml.write('var contentToLoad = "%s";\n\n' % base64Content) dstHtml.write("function _getContent() { return contentToLoad }\n") dstHtml.write('var contentToLoad = "%s";\n' % base64Content) dstHtml.write( "function _getContent() { return contentToLoad }</script>\n") dstHtml.write( "<noscript>You need to enable JavaScript to run this app.</noscript>") dstHtml.write(strings[1]) continue dstHtml.write(line) return dstHtmlPath
[docs] def zipAllTimeSteps(directory_path): if os.path.isfile(directory_path): return class UrlCounterDict(dict): Counter = 0 def GetUrlName(self, name): if name not in self.keys(): self[name] = str(objNameToUrls.Counter) self.Counter = self.Counter + 1 return self[name] def InitIndex(sourcePath, destObj): with open(sourcePath, 'r', encoding='utf-8') as sourceFile: sourceData = sourceFile.read() sourceObj = json.loads(sourceData) for key in sourceObj: destObj[key] = sourceObj[key] # remove vtkHttpDataSetReader information for obj in destObj["scene"]: obj.pop(obj["type"]) obj.pop("type") def getUrlToNameDictionary(indexObj): urls = {} for obj in indexObj["scene"]: urls[obj[obj["type"]]["url"]] = obj["name"] return urls def addDirectoryToZip( dir_name, zipobj, storedData, rootIdx, timeStep, objNameToUrls ): # Update root index.json file from index.json of this timestep index_file = os.path.join(dir_name, 'index.json') with open(index_file, 'r', encoding='utf-8') as currentIdxFile: currentIdx = json.loads(currentIdxFile.read()) urlToName = getUrlToNameDictionary(currentIdx) rootTimeStepSection = rootIdx["animation"]["timeSteps"][timeStep] for key in currentIdx: if key == "scene" or key == "version": continue rootTimeStepSection[key] = currentIdx[key] for obj in currentIdx["scene"]: objName = obj["name"] rootTimeStepSection[objName] = {} rootTimeStepSection[objName]["actor"] = obj["actor"] rootTimeStepSection[objName]["actorRotation"] = obj["actorRotation"] rootTimeStepSection[objName]["mapper"] = obj["mapper"] rootTimeStepSection[objName]["property"] = obj["property"] # For every object in the current timestep for folder in sorted(os.listdir(dir_name)): currentItem = os.path.join(dir_name, folder) if os.path.isdir(currentItem) is False: continue # Write all data array of the current timestep in the archive for filename in os.listdir(os.path.join(currentItem, "data")): full_path = os.path.join(currentItem, "data", filename) if os.path.isfile(full_path) and filename not in storedData: storedData.add(filename) rel_path = os.path.join("data", filename) zipobj.write(full_path, arcname=rel_path, compress_type=compression) # Write the index.json containing pointers to these data arrays # while replacing every basepath as '../../data' objIndexFilePath = os.path.join(dir_name, folder, "index.json") with open(objIndexFilePath, 'r', encoding='utf-8') as objIndexFile: objIndexObjData = json.loads(objIndexFile.read()) for elm in objIndexObjData.keys(): try: if "ref" in objIndexObjData[elm].keys(): objIndexObjData[elm]["ref"]["basepath"] = "../../data" if "arrays" in objIndexObjData[elm].keys(): for array in objIndexObjData[elm]["arrays"]: array["data"]["ref"]["basepath"] = "../../data" except AttributeError: continue currentObjName = urlToName[folder] objIndexrel_path = os.path.join( objNameToUrls.GetUrlName(currentObjName), str(timeStep), "index.json" ) zipobj.writestr( objIndexrel_path, json.dumps(objIndexObjData, indent=2), compress_type=compression, ) zip_file_path = "%s.zip" % directory_path currentDirectory = os.path.abspath(os.path.join(directory_path, os.pardir)) rootIndexPath = os.path.join(currentDirectory, "index.json") rootIndexFile = open(rootIndexPath, 'r', encoding='utf-8') rootIndexObj = json.loads(rootIndexFile.read()) zf = zipfile.ZipFile(zip_file_path, mode="w") try: # We copy the scene from an index of a specific timestep to the root index # Scenes should all have the same objects so only do it for the first one isSceneInitialized = False # currentlyAddedData set stores hashes of every data we already added to the # vtkjs archive to prevent data duplication currentlyAddedData = set() # Regex that folders storing timestep data from paraview should follow reg = re.compile(r"^" + os.path.basename(directory_path) + r"\.[0-9]+$") # We assume an object will not be deleted from a timestep to another so we # create a generic index.json for each object genericIndexObj = {} genericIndexObj["series"] = [] timeStep = 0 for item in rootIndexObj["animation"]["timeSteps"]: genericIndexObj["series"].append({}) genericIndexObj["series"][timeStep]["url"] = str(timeStep) genericIndexObj["series"][timeStep]["timeStep"] = float(item["time"]) timeStep = timeStep + 1 # Keep track of the url for every object objNameToUrls = UrlCounterDict() timeStep = 0 # zip all timestep directories for folder in sorted(os.listdir(currentDirectory)): full_path = os.path.join(currentDirectory, folder) if os.path.isdir(full_path) and reg.match(folder): if not isSceneInitialized: InitIndex(os.path.join(full_path, "index.json"), rootIndexObj) isSceneInitialized = True addDirectoryToZip( full_path, zf, currentlyAddedData, rootIndexObj, timeStep, objNameToUrls, ) shutil.rmtree(full_path) timeStep = timeStep + 1 # Write every index.json holding time information for each object for name in objNameToUrls: zf.writestr( os.path.join(objNameToUrls[name], "index.json"), json.dumps(genericIndexObj, indent=2), compress_type=compression, ) # Update root index.json urls and write it in the archive for obj in rootIndexObj["scene"]: obj["id"] = obj["name"] obj["type"] = "vtkHttpDataSetSeriesReader" obj["vtkHttpDataSetSeriesReader"] = {} obj["vtkHttpDataSetSeriesReader"]["url"] = objNameToUrls[obj["name"]] zf.writestr( "index.json", json.dumps(rootIndexObj, indent=2), compress_type=compression ) os.remove(rootIndexPath) finally: zf.close() shutil.move(zip_file_path, directory_path)
if __name__ == "__main__": if len(sys.argv) < 2: print( 'Usage: python helper.py /path/to/directory.vtkjs ' '[/path/to/ParaViewGlance.html]' ) else: fileName = sys.argv[1] convert_directory_to_zip_file(fileName) if len(sys.argv) == 3: add_data_to_viewer(fileName, sys.argv[2])