# coding=utf-8
"""Module for running geoJSON and OpenStudio files through URBANopt."""
from __future__ import division
import os
import json
import shutil
import subprocess
from ladybug.futil import preparedir, write_to_file
from ladybug.epw import EPW
from ladybug.config import folders as lb_folders
from honeybee.config import folders as hb_folders
from honeybee_energy.config import folders as hb_energy_folders
from honeybee_energy.result.emissions import emissions_region
from .config import folders
from .measure import MapperMeasure
from .reopt import REoptParameter
# Custom environment used to run Python packages without conflicts
PYTHON_ENV = os.environ.copy()
PYTHON_ENV['PYTHONHOME'] = ''
# Number to prevent GHE Designer simulations that would max out the memory
MAX_BOREHOLES = 10000
[docs]
def base_honeybee_osw(
project_directory, sim_par_json=None, additional_measures=None,
additional_mapper_measures=None, base_osw=None, epw_file=None,
skip_report=True, emissions_year=None):
"""Create a honeybee_workflow.osw to be used as a base in URBANopt simulations.
This method will also copy the Honeybee.rb mapper to this folder if it is
available in the config of this library.
Args:
project_directory: Full path to a folder out of which the URBANopt simulation
will be run. This is the folder that contains the feature geoJSON.
sim_par_json: Optional file path to the SimulationParameter JSON.
If None, the OpenStudio models generated in the URBANopt run will
not have everything they need to be simulate-able unless such
parameters are supplied from one of the additional_measures or the
base_osw. (Default: None).
additional_measures: An optional array of honeybee-energy Measure objects
to be included in the output osw. These Measure objects must have
values for all required input arguments or an exception will be
raised while running this function. (Default: None).
additional_mapper_measures: An optional array of dragonfly-energy MapperMeasure
objects to be included in the output osw. These MapperMeasure objects
must have values for all required input arguments or an exception will
be raised while running this function. (Default: None).
base_osw: Optional file path to an existing OSW JSON be used as the base
for the honeybee_workflow.osw. This is another way that outside measures
can be incorporated into the workflow. (Default: None).
epw_file: Optional file path to an EPW that should be associated with the
output energy model. (Default: None).
skip_report: Set to True to have the URBANopt default feature reporting
measure skipped as part of the workflow. If False, the measure will
be run after all simulations are complete. Note that this input
has no effect if the default_feature_reports measure is already
in the base_osw or additional_measures. (Default: True).
emissions_year: An optional integer to set the year for which carbon emissions
will be computed. If not for a historical year, values must be an even
number and be between 2020 and 2050. If None, no carbon emission
calculations will be included in the simulation. (Default: None).
Returns:
The file path to the honeybee_workflow.osw written out by this method.
This is used as the base for translating all features in the geoJSON.
"""
# create a dictionary representation of the .osw with steps to run
# the model measure and the simulation parameter measure
if base_osw is None:
osw_dict = {'steps': [], 'name': None, 'description': None}
else:
assert os.path.isfile(base_osw), 'No base OSW file found at {}.'.format(base_osw)
with open(base_osw, 'r') as base_file:
osw_dict = json.load(base_file)
# add a simulation parameter step if it is specified
if sim_par_json is not None:
sim_par_dict = {
'arguments': {
'simulation_parameter_json': sim_par_json
},
'measure_dir_name': 'from_honeybee_simulation_parameter'
}
osw_dict['steps'].insert(0, sim_par_dict)
# add the model json serialization into the steps
model_measure_dict = {
'arguments': {
'model_json': 'model_json_to_be_mapped.json'
},
'measure_dir_name': 'from_honeybee_model'
}
osw_dict['steps'].insert(0, model_measure_dict)
# assign the measure_paths to the osw_dict
if 'measure_paths' not in osw_dict:
osw_dict['measure_paths'] = []
if hb_energy_folders.honeybee_openstudio_gem_path: # add honeybee-openstudio measure
m_dir = os.path.join(hb_energy_folders.honeybee_openstudio_gem_path, 'measures')
osw_dict['measure_paths'].append(m_dir)
# add the emissions reporting if a year has been selected
if emissions_year is not None and epw_file is not None:
epw_obj = EPW(epw_file)
ems_region = emissions_region(epw_obj.location)
if ems_region is not None:
if emissions_year < 2020:
hist_yr, fut_yr = emissions_year, 2030
else:
hist_yr, fut_yr = 2019, emissions_year
emissions_measure_dict = {
'arguments': {
'future_subregion': ems_region[0],
'hourly_historical_subregion': ems_region[2],
'annual_historical_subregion': ems_region[1],
'future_year': str(fut_yr),
'hourly_historical_year': '2019',
'annual_historical_year': str(hist_yr)
},
'measure_dir_name': 'add_ems_emissions_reporting'
}
osw_dict['steps'].append(emissions_measure_dict)
# add any additional measures to the osw_dict
if additional_measures or additional_mapper_measures:
measures = []
if additional_measures is not None:
measures.extend(additional_measures)
if additional_mapper_measures is not None:
measures.extend(additional_mapper_measures)
measure_paths = set() # set of all unique measure paths
# ensure measures are correctly ordered
m_dict = {'ModelMeasure': [], 'EnergyPlusMeasure': [], 'ReportingMeasure': []}
for measure in measures:
m_dict[measure.type].append(measure)
sorted_measures = m_dict['ModelMeasure'] + m_dict['EnergyPlusMeasure'] + \
m_dict['ReportingMeasure']
for measure in sorted_measures:
measure.validate() # ensure that all required arguments have values
measure_paths.add(os.path.dirname(measure.folder))
osw_dict['steps'].append(measure.to_osw_dict()) # add measure to workflow
if isinstance(measure, MapperMeasure):
_add_mapper_measure(project_directory, measure)
for m_path in measure_paths: # add outside measure paths
osw_dict['measure_paths'].append(m_path)
# add default feature reports if they aren't in the steps
all_measures = [step['measure_dir_name'] for step in osw_dict['steps']]
if 'default_feature_reports' not in all_measures:
report_measure_dict = {
'arguments': {
'feature_id': None,
'feature_name': None,
'feature_type': None,
'feature_location': None
},
'measure_dir_name': 'default_feature_reports'
}
if skip_report:
report_measure_dict['arguments']['__SKIP__'] = True
osw_dict['steps'].append(report_measure_dict)
# if there is a system parameter JSON, make sure the EPW is copied and referenced
if epw_file is not None:
osw_dict['weather_file'] = epw_file
sys_param_file = os.path.join(project_directory, 'system_params.json')
if os.path.isfile(sys_param_file):
# make sure the Modelica measure runs as part of the simulation
modelica_measures = [
{
'measure_dir_name': 'export_time_series_modelica',
'arguments': {'__SKIP__': False}
},
{
'measure_dir_name': 'export_modelica_loads',
'arguments': {'__SKIP__': False}
},
]
osw_dict['steps'].extend(modelica_measures)
# copy the EPW to the project directory
epw_f_name = os.path.split(epw_file)[-1]
target_epw = os.path.join(project_directory, epw_f_name)
shutil.copy(epw_file, target_epw)
# create a MOS file from the EPW
epw_obj = EPW(target_epw)
mos_file = os.path.join(
project_directory, epw_f_name.replace('.epw', '.mos'))
epw_obj.to_mos(mos_file)
# find the path to the feature GeoJSON
feature_geojson = None
for fp in os.listdir(project_directory):
if fp.endswith('.geojson'):
feature_geojson = os.path.join(project_directory, fp)
break
if not feature_geojson:
raise ValueError(
'No feature geojson file was found in: {}'.format(project_directory))
# write the EPW path into the GeoJSON
with open(feature_geojson, 'r') as gjf:
geo_dict = json.load(gjf)
if 'project' in geo_dict:
if 'weather_filename' not in geo_dict['project']:
geo_dict['project']['weather_filename'] = epw_f_name
with open(feature_geojson, 'w') as fp:
json.dump(geo_dict, fp, indent=4)
# if the DES system is GSHP, specify any autocalculated ground temperatures
with open(sys_param_file, 'r') as spf:
sys_dict = json.load(spf)
if 'district_system' in sys_dict:
if 'fifth_generation' in sys_dict['district_system']:
g5_par = sys_dict['district_system']['fifth_generation']
if 'soil' in g5_par and 'undisturbed_temp' in g5_par['soil']:
soil_par = g5_par['soil']
if soil_par['undisturbed_temp'] == 'Autocalculate':
epw_obj = EPW(epw_file)
soil_par['undisturbed_temp'] = \
epw_obj.dry_bulb_temperature.average
with open(sys_param_file, 'w') as fp:
json.dump(sys_dict, fp, indent=4)
# write the dictionary to a honeybee_workflow.osw
mappers_dir = os.path.join(project_directory, 'mappers')
if not os.path.isdir(mappers_dir):
preparedir(mappers_dir)
osw_json = os.path.join(mappers_dir, 'honeybee_workflow.osw')
with open(osw_json, 'w') as fp:
json.dump(osw_dict, fp, indent=4)
# copy the Honeybee.rb mapper if it exists in the config
if folders.mapper_path:
shutil.copy(folders.mapper_path, os.path.join(mappers_dir, 'Honeybee.rb'))
return os.path.abspath(osw_json)
[docs]
def prepare_urbanopt_folder(feature_geojson, cpu_count=None, verbose=False):
"""Prepare a directory with a feature geoJSON for URBANopt simulation.
This includes copying the Gemfile to the folder and generating the runner.conf
to specify the number of CPUs to be used in the simulation. Lastly, the
the scenario .csv file will be generated from the feature_geojson.
Args:
feature_geojson: An URBANopt feature geoJSON to be prepared for URBANopt
simulation.
cpu_count: A positive integer for the number of CPUs to use in the
simulation. This number should not exceed the number of CPUs on the
machine running the simulation and should be lower if other tasks
are running while the simulation is running. If set to None, it
should automatically default to one less than the number of CPUs
currently available on the machine (or 1 if the machine has only
one processor). (Default: None).
verbose: Boolean to note if the simulation should be run with verbose
reporting of progress. (Default: False).
Returns:
Path to the .csv file for the URBANopt scenario.
"""
# copy the Gemfile into the folder containing the feature_geojson
assert folders.urbanopt_gemfile_path, \
'No URBANopt Gemfile was found in dragonfly_energy.config.folders.\n' \
'This file must exist to run URBANopt.'
folders.check_urbanopt_version()
uo_folder = os.path.dirname(feature_geojson)
shutil.copy(folders.urbanopt_gemfile_path, os.path.join(uo_folder, 'Gemfile'))
# auto-assign the number of processors if None
cpu_count = _recommended_processor_count() if cpu_count is None else cpu_count
# generate the runner.conf to set the number of CPUs based on the input
runner_dict = {
'file_version': '0.1.0',
'max_datapoints': 1000000000,
'num_parallel': cpu_count,
'run_simulations': True,
'verbose': verbose
}
runner_conf = os.path.join(uo_folder, 'runner.conf')
with open(runner_conf, 'w') as fp:
json.dump(runner_dict, fp, indent=2)
# generate the scenario csv file
return _make_scenario(feature_geojson)
[docs]
def run_urbanopt(feature_geojson, scenario_csv, cpu_count=None):
"""Run a feature and scenario file through URBANopt on any operating system.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
cpu_count: This input is deprecated and currently not used given that
the runner.conf generated in the prepare_urbanopt_folder step
correctly identifies the number of CPUs to be used.
Returns:
A series of file paths to the simulation output files
- osm -- Array of paths to .osm files for all generated OpenStudio models.
- idf -- Array of paths to .idf files containing the input for the
EnergyPlus simulation.
- sql -- Array of paths to .sqlite files containing all simulation results.
- zsz -- Array of paths to .csv files containing detailed zone load
information recorded over the course of the design days.
- rdd -- Array of paths to .rdd files containing all possible outputs
that can be requested from the simulation.
- html -- Array of paths to .html files containing all summary reports.
- err -- Array of paths to .err files containing all errors and warnings
from the simulation.
"""
folders.check_urbanopt_version()
# run the simulation
if os.name == 'nt': # we are on Windows
directory, stderr = \
_run_urbanopt_windows(feature_geojson, scenario_csv)
else: # we are on Mac, Linux, or some other unix-based system
directory, stderr = \
_run_urbanopt_unix(feature_geojson, scenario_csv)
# output the simulation files
return _output_urbanopt_files(directory, stderr)
[docs]
def run_default_report(feature_geojson, scenario_csv):
"""Generate default reports after an URBANopt simulation is run.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
A series of file paths to the report output files
- csv -- Path to a CSV file containing default scenario results.
- report_json -- Path to a JSON file containing default scenario results.
"""
folders.check_urbanopt_version()
assert os.path.isfile(feature_geojson), \
'No feature_geojson as found at the specified path: {}.'.format(feature_geojson)
assert os.path.isfile(scenario_csv), \
'No scenario_csv as found at the specified path: {}.'.format(scenario_csv)
# run the report command
if os.name == 'nt': # we are on Windows
return _run_default_report_windows(feature_geojson, scenario_csv)
else: # we are on Mac, Linux, or some other unix-based system
return _run_default_report_unix(feature_geojson, scenario_csv)
[docs]
def run_reopt(feature_geojson, scenario_csv, urdb_label, reopt_parameters=None,
developer_key=None):
"""Run a feature and scenario file through REopt post processing.
Note that the URBANopt simulation must already be run with the input feature_geojson
and scenario_csv in order for the post-processing to be successful.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
urdb_label: Text string for the Utility Rate Database (URDB) label for the
particular electrical utility rate for the optimization. The label is
the last term of the URL of a utility rate detail page (eg. the label for
the rate at https://openei.org/apps/IURDB/rate/view/5b0d83af5457a3f276733305
is 5b0d83af5457a3f276733305). Utility rates for specific locations
can be looked up in the REopt Lite tool (https://reopt.nrel.gov/tool)
and the label can be obtained by clicking on "Rate Details" link
for a particular selected rate.
reopt_parameters: A REoptParameter object to describe the major assumptions
of the REopt analysis. If None some default parameters will be
generated for a typical analysis. (Default: None).
developer_key: Text string for the NREL developer key. You can get a developer
key at (https://developer.nrel.gov/). (Default: None).
Returns:
A series of file paths to the simulation output files
- csv -- Path to a CSV file containing scenario optimization results.
- report_json -- Path to a JSON file containing scenario optimization results.
"""
# run checks on the inputs
folders.check_urbanopt_version()
assert folders.reopt_assumptions_path, \
'No REopt assumptions were found in dragonfly_energy.config.folders.'
assert os.path.isfile(feature_geojson), \
'No feature_geojson as found at the specified path: {}.'.format(feature_geojson)
assert os.path.isfile(scenario_csv), \
'No scenario_csv as found at the specified path: {}.'.format(scenario_csv)
developer_key = developer_key if developer_key is not None \
else 'bo8jGuFfk7DBDpTlzShuxWuAGletBq1j5AhcUhCD'
project_folder = os.path.dirname(feature_geojson)
# write the parameter file
if reopt_parameters is None: # generate some defaults
reopt_parameters = REoptParameter()
reopt_parameters.pv_parameter.max_kw = 1000000000
reopt_parameters.storage_parameter.max_kw = 1000000
reopt_parameters.generator_parameter.max_kw = 1000000000
else:
assert isinstance(reopt_parameters, REoptParameter), \
'Expected REoptParameter. Got {}.'.format(type(reopt_parameters))
reopt_folder = os.path.join(project_folder, 'reopt')
if not os.path.isdir(reopt_folder):
os.mkdir(reopt_folder)
reopt_par_json = os.path.join(reopt_folder, 'reopt_assumptions.json')
reopt_dict = reopt_parameters.to_assumptions_dict(
folders.reopt_assumptions_path, urdb_label)
with open(reopt_par_json, 'w') as fp:
json.dump(reopt_dict, fp, indent=4)
# run the simulation
if os.name == 'nt': # we are on Windows
directory = _run_reopt_windows(feature_geojson, scenario_csv, developer_key)
else: # we are on Mac, Linux, or some other unix-based system
directory = _run_reopt_unix(feature_geojson, scenario_csv, developer_key)
# output the simulation files
return _output_reopt_files(directory)
[docs]
def run_rnm(feature_geojson, scenario_csv, underground_ratio=0.9, lv_only=True,
nodes_per_building=1):
"""Run a feature and scenario file through RNM post processing.
Note that the URBANopt simulation must already be run with the input feature_geojson
and scenario_csv in order for the RNM post-processing to be successful.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
underground_ratio: A number between 0 and 1 for the ratio of overall cables
that are underground vs. overhead in the analysis. (Default: 0.9).
lv_only: Boolean to note whether to consider only low voltage consumers
in the analysis. (Default: True).
nodes_per_building: Positive integer for the maximum number of low voltage
nodes to represent a single building. (Default: 1).
Returns:
Path to a folder that contains all of the RNM output files.
"""
# load the information from the GeoJSON
geo_dict, project_dict = None, None
if os.path.isfile(feature_geojson):
with open(feature_geojson, 'r') as fg:
geo_dict = json.load(fg)
project_dict = geo_dict['project']
# change the GeoJSON to have the RNM inputs
if geo_dict is not None:
proj = geo_dict['project']
if 'underground_cables_ratio' not in proj or \
proj['underground_cables_ratio'] != underground_ratio or \
'only_lv_consumers' not in proj or \
proj['only_lv_consumers'] != lv_only or \
'max_number_of_lv_nodes_per_building' not in proj or \
proj['max_number_of_lv_nodes_per_building'] != nodes_per_building:
geo_dict['project']['underground_cables_ratio'] = underground_ratio
geo_dict['project']['only_lv_consumers'] = lv_only
geo_dict['project']['max_number_of_lv_nodes_per_building'] = \
nodes_per_building
with open(feature_geojson, 'w') as fp:
json.dump(geo_dict, fp, indent=4)
# run the simulation
folders.check_urbanopt_version()
if os.name == 'nt': # we are on Windows
directory = _run_rnm_windows(feature_geojson, scenario_csv)
else: # we are on Mac, Linux, or some other unix-based system
directory = _run_rnm_unix(feature_geojson, scenario_csv)
# get the path to the results folder
scenario_name = os.path.basename(scenario_csv).replace('.csv', '')
rnm_path = os.path.join(directory, 'run', scenario_name, 'rnm-us', 'results')
# copy the project information into the RNM GeoJSON
if os.path.isdir(rnm_path):
rnm_geojson = os.path.join(rnm_path, 'GeoJSON', 'Distribution_system.json')
with open(rnm_geojson, 'r') as fg:
rnm_dict = json.load(fg)
rnm_dict['project'] = project_dict
with open(rnm_geojson, 'w') as fp:
json.dump(rnm_dict, fp, indent=4)
return rnm_path
[docs]
def run_des_sys_param(feature_geojson, scenario_csv):
"""Run the GMT command to add the time series building loads to the sys param JSON.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
"""
# get the directory and parse the system parameter file
directory = os.path.dirname(feature_geojson)
sys_param_file = os.path.join(directory, 'system_params.json')
assert os.path.isfile(sys_param_file), \
'No DES system parameter was found for this model.\n' \
'Make sure that the des_loop_ was assigned in the GeoJSON export\n' \
'before running the URBANopt simulation.'
# parse the system parameter file to understand the type of system
with open(sys_param_file, 'r') as spf:
sp_dict = json.load(spf)
des_dict = sp_dict['district_system']
ghe_sys = True if 'fifth_generation' in des_dict and \
'ghe_parameters' in des_dict['fifth_generation'] else False
# run the command that adds the building loads to the system parameter
ext = '.exe' if os.name == 'nt' else ''
shell = True if os.name == 'nt' else False
uo_des_exe = os.path.join(
hb_folders.python_scripts_path, 'uo_des{}'.format(ext))
build_cmd = '"{des_exe}" build-sys-param ' \
'"{sp_file}" "{scenario}" "{feature}" '.format(
des_exe=uo_des_exe, sp_file=sys_param_file,
scenario=scenario_csv, feature=feature_geojson)
if ghe_sys:
build_cmd = '{} 5G_ghe'.format(build_cmd)
else:
build_cmd = '{} 4G'.format(build_cmd)
build_cmd = '{} time_series -o'.format(build_cmd)
process = subprocess.Popen(
build_cmd, stderr=subprocess.PIPE, shell=shell, env=PYTHON_ENV
)
stderr = process.communicate()
if not os.path.isfile(sys_param_file):
msg = 'Failed to add building loads to the DES system parameter file.\n' \
'No file found at:\n{}\n{}'.format(sys_param_file, stderr[1])
print(msg)
raise Exception(msg)
else:
print(stderr[1])
# after the loads have been added, put pack the properties of the DES
with open(sys_param_file, 'r') as spf:
sp_dict = json.load(spf)
if ghe_sys:
original_des_par = des_dict['fifth_generation']
original_ghe_par = original_des_par['ghe_parameters']
des_par = sp_dict['district_system']['fifth_generation']
des_par['soil'] = original_des_par['soil']
des_par['ghe_parameters']['fluid'] = original_ghe_par['fluid']
des_par['ghe_parameters']['grout'] = original_ghe_par['grout']
des_par['ghe_parameters']['pipe'] = original_ghe_par['pipe']
des_par['ghe_parameters']['geometric_constraints'] = \
original_ghe_par['geometric_constraints']
des_par['ghe_parameters']['ghe_specific_params'] = \
original_ghe_par['ghe_specific_params']
# remove geometric params so that ThermalNetwork uses GeoJSON polygon
rect_geo_par = []
for ghe_dict in des_par['ghe_parameters']['ghe_specific_params']:
rect_geo_par.append(ghe_dict.pop('ghe_geometric_params'))
else:
sp_dict['district_system'] = des_dict
with open(sys_param_file, 'w') as spf:
json.dump(sp_dict, spf, indent=2)
# if the DES system has a ground heat exchanger, run the thermal network package
if ghe_sys:
# run the GHE Designer to size the system
tn_exe = os.path.join(
hb_folders.python_scripts_path, 'thermalnetwork{}'.format(ext))
scn_name = os.path.basename(scenario_csv).replace('.csv', '')
scn_dir = os.path.join(directory, 'run', scn_name)
ghe_dir = os.path.join(scn_dir, 'ghe_dir')
build_cmd = \
'"{tn_exe}" -y "{sp_file}" -s "{scenario}" -f "{feature}" -o {out_p}'.format(
tn_exe=tn_exe, sp_file=sys_param_file,
scenario=scn_dir, feature=feature_geojson, out_p=ghe_dir)
process = subprocess.Popen(
build_cmd, stderr=subprocess.PIPE, shell=False, env=PYTHON_ENV
)
# if any errors were found in the sizing simulation, raise them to the user
stderr = process.communicate()[1]
stderr_str = str(stderr.strip())
print(stderr_str)
if 'ValueError' in stderr_str: # pass the exception onto the user
msg = stderr_str.split('ValueError: ')[-1].strip()
raise ValueError(msg)
# add the borehole length and count to the system parameter file
with open(sys_param_file, 'r') as spf:
sp_dict = json.load(spf)
ghe_par_dict = sp_dict['district_system']['fifth_generation']['ghe_parameters']
for ghe_s_par, rect_par in zip(ghe_par_dict['ghe_specific_params'], rect_geo_par):
r_dir = des_par['ghe_parameters']['ghe_dir']
res_file = os.path.join(r_dir, ghe_s_par['ghe_id'], 'SimulationSummary.json')
with open(res_file, 'r') as rf:
res_dict = json.load(rf)
ghe_s_par['borehole']['length_of_boreholes'] = \
res_dict['ghe_system']['active_borehole_length']['value']
ghe_s_par['borehole']['number_of_boreholes'] = \
res_dict['ghe_system']['number_of_boreholes']
ghe_s_par['ghe_geometric_params'] = rect_par
with open(sys_param_file, 'w') as spf:
json.dump(sp_dict, spf, indent=2)
return sys_param_file
[docs]
def run_des_modelica(sys_param_json, feature_geojson, scenario_csv):
"""Run the GMT command to create the Modelica files from the system param JSON.
Args:
sys_param_json: The full path to a system parameter JSON from which the
Modelica files will be generated.
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
The path to the folder where the Modelica files have been written.
"""
# run the simulation
if os.name == 'nt': # we are on Windows
modelica_dir, stderr = \
_generate_modelica_windows(sys_param_json, feature_geojson, scenario_csv)
else: # we are on Mac, Linux, or some other unix-based system
modelica_dir, stderr = \
_generate_modelica_unix(sys_param_json, feature_geojson, scenario_csv)
if not os.path.isdir(modelica_dir):
msg = 'Failed to translate DES to Modelica.\n' \
'No results were found at:\n{}\n{}'.format(modelica_dir, stderr)
print(msg)
raise Exception(msg)
else:
_add_water_heating_patch(modelica_dir)
return modelica_dir
[docs]
def run_modelica_docker(modelica_project_dir):
"""Execute Modelica files of a DES.
Args:
modelica_project_dir: The full path to the folder in which the Modelica
files were written.
Returns:
The path to where the results have been written.
"""
# run the simulation
if os.name == 'nt': # we are on Windows
modelica_dir, stderr = _run_modelica_windows(modelica_project_dir)
else: # we are on Mac, Linux, or some other unix-based system
modelica_dir, stderr = _run_modelica_unix(modelica_project_dir)
if not os.path.isdir(modelica_dir):
msg = 'Failed to execute Modelica simulation.\n' \
'No results were found at:\n{}\n{}'.format(modelica_dir, stderr)
print(msg)
raise Exception(msg)
return modelica_dir
def _add_mapper_measure(project_directory, mapper_measure):
"""Add mapper measure arguments to a geoJSON and the mapper_measures.json.
Args:
project_directory: Full path to a folder out of which the URBANopt simulation
will be run. This is the folder that contains the feature geoJSON.
mapper_measure: A MapperMeasure object to add.
"""
# find the feature geoJSON and parse in the dictionary
for proj_file in os.listdir(project_directory):
if proj_file.endswith('geojson'):
geojson_file = os.path.join(project_directory, proj_file)
break
with open(geojson_file, 'r') as base_file:
geojson_dict = json.load(base_file)
# find or start the mapper_measures.json
mapper_dir = os.path.join(project_directory, 'mappers')
map_meas_file = os.path.join(mapper_dir, 'mapper_measures.json')
if os.path.isfile(map_meas_file):
with open(map_meas_file, 'r') as base_file:
map_meas_list = json.load(base_file)
else:
map_meas_list = []
# loop through the mapper measure and assign any mapper arguments
for m_arg in mapper_measure.arguments:
if isinstance(m_arg.value, tuple): # argument to map to buildings
for i, feat in enumerate(geojson_dict['features']):
try:
if feat['properties']['type'] == 'Building':
feat['properties'][m_arg.identifier] = m_arg.value[i]
except IndexError:
raise ValueError(
'Number of MapperMeasure arguments ({}) does not equal the '
'number of buildings in the model ({}).'.format(
len(m_arg.value), len(geojson_dict['features'])))
except KeyError: # definitely not a building
pass
m_arg_info = [
os.path.basename(mapper_measure.folder),
m_arg.identifier, m_arg.identifier]
map_meas_list.append(m_arg_info)
# write the geoJSON and the mapper_measures.json
if not os.path.isdir(mapper_dir):
os.mkdir(mapper_dir)
with open(geojson_file, 'w') as fp:
json.dump(geojson_dict, fp, indent=4)
with open(map_meas_file, 'w') as fp:
json.dump(map_meas_list, fp, indent=4)
def _make_scenario(feature_geojson):
"""Generate a scenario CSV file for URBANopt simulation.
Args:
feature_geojson: The full path to a .geojson file.
"""
# load the geoJSON to a dictionary
with open(feature_geojson, 'r') as base_file:
geo_dict = json.load(base_file)
# loop through the building features and add them to the CSV
scenario_matrix = [['Feature Id', 'Feature Name', 'Mapper Class']]
hb_mapper = 'URBANopt::Scenario::HoneybeeMapper'
for feature in geo_dict['features']:
try:
if feature['properties']['type'] == 'Building':
props = feature['properties']
f_row = [props['id'], props['name'], hb_mapper]
scenario_matrix.append(f_row)
except KeyError: # definitely not a building
pass
# write the scenario CSV file
uo_folder = os.path.dirname(feature_geojson)
scenario = os.path.join(uo_folder, 'honeybee_scenario.csv')
with open(scenario, 'w') as fp:
for row in scenario_matrix:
fp.write('{}\n'.format(','.join(row)))
return scenario
def _recommended_processor_count():
"""Get an integer for one less than the number of processors on this machine.
This method should work on all of the major operating systems and in
both IronPython and cPython. If, for whatever reason, the number of
processors could not be sensed, a value of 1 will be returned.
"""
try: # assume that we are in cPython
cpu_count = os.cpu_count()
except AttributeError: # we are probably in IronPython
try:
from System.Environment import ProcessorCount
cpu_count = ProcessorCount
except ImportError: # no idea what Python this is; let's play it safe
cpu_count = 1
return 1 if cpu_count is None or cpu_count <= 1 else cpu_count - 1
def _run_urbanopt_windows(feature_geojson, scenario_csv):
"""Run a feature and scenario file through URBANopt on a Windows-based os.
A batch file will be used to run the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
A tuple with two values.
- directory -- Path to the folder out of which the simulation was run.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the batch file to call URBANopt CLI
working_drive = directory[:2]
batch = '{}\ncd {}\ncall "{}"\nuo run -f "{}" -s "{}"'.format(
working_drive, working_drive, folders.urbanopt_env_path,
feature_geojson, scenario_csv)
batch_file = os.path.join(directory, 'run_simulation.bat')
write_to_file(batch_file, batch, True)
# run the batch file
process = subprocess.Popen(
'"{}"'.format(batch_file), stderr=subprocess.PIPE, env=PYTHON_ENV
)
result = process.communicate()
stderr = result[1]
return directory, stderr
def _run_urbanopt_unix(feature_geojson, scenario_csv):
"""Run a feature and scenario file through URBANopt on a Unix-based os.
This includes both Mac OS and Linux since a shell will be used to run
the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
A tuple with two values.
- directory -- Path to the folder out of which the simulation was run.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the shell script to call URBANopt CLI
shell = '#!/usr/bin/env bash\nsource "{}"\nuo run -f "{}" -s "{}"'.format(
folders.urbanopt_env_path, feature_geojson, scenario_csv)
shell_file = os.path.join(directory, 'run_simulation.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
process = subprocess.Popen(
'"{}"'.format(shell_file), stderr=subprocess.PIPE, env=PYTHON_ENV, shell=True
)
result = process.communicate()
stderr = result[1]
return directory, stderr
def _check_urbanopt_file(feature_geojson, scenario_csv):
"""Prepare an OSW file to be run through URBANopt CLI.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
The folder in which the OSW exists and out of which the OpenStudio CLI
will operate.
"""
# check the input files
assert os.path.isfile(feature_geojson), \
'No feature file found at {}.'.format(feature_geojson)
assert os.path.isfile(scenario_csv), \
'No scenario file found at {}.'.format(scenario_csv)
return os.path.split(feature_geojson)[0]
def _output_urbanopt_files(directory, stderr=''):
"""Get the paths to the simulation output files given the urbanopt directory.
Args:
directory: The path to where the URBANopt feature and scenario files
were simulated.
stderr: The URBANopt standard error message, which will be returned to
the user in the event that no simulation folder was found.
Returns:
A series of file paths to the simulation output files
- osm -- Array of paths to .osm files for all generated OpenStudio models.
- idf -- Array of paths to .idf files containing the input for the
EnergyPlus simulation.
- sql -- Array of paths to .sqlite files containing all simulation results.
- zsz -- Array of paths to .csv files containing detailed zone load
information recorded over the course of the design days.
- rdd -- Array of paths to .rdd files containing all possible outputs
that can be requested from the simulation.
- html -- Array of paths to .htm files containing all summary reports.
- err -- Array of paths to .err files containing all errors and
warnings from the simulation.
"""
# empty list which will be filled with simulation output files
osm = []
idf = []
sql = []
zsz = []
rdd = []
html = []
err = []
# parse the GeoJSON so that we can get the correct order of result files
sim_dir = os.path.join(directory, 'run', 'honeybee_scenario')
if not os.path.isdir(sim_dir):
msg = 'The URBANopt simulation failed to run.\n' \
'No results were found at:\n{}\n{}'.format(sim_dir, stderr)
print(msg)
raise Exception(msg)
geojson = [f for f in os.listdir(directory) if f.endswith('.geojson')]
if len(geojson) == 1:
geo_file = os.path.join(directory, geojson[0])
with open(geo_file, 'r') as base_file:
geo_dict = json.load(base_file)
bldg_names = []
for ft in geo_dict['features']:
if 'properties' in ft and 'type' in ft['properties']:
if ft['properties']['type'] == 'Building' and 'id' in ft['properties']:
bldg_names.append(ft['properties']['id'])
else:
bldg_names = os.listdir(sim_dir)
# generate paths to the simulation files and check their existence
for bldg_name in bldg_names:
bldg_dir = os.path.join(sim_dir, bldg_name)
osm_file = os.path.join(bldg_dir, 'in.osm')
if os.path.isfile(osm_file):
osm.append(osm_file)
idf_file = os.path.join(bldg_dir, 'in.idf')
if os.path.isfile(idf_file):
idf.append(idf_file)
sql_file = os.path.join(bldg_dir, 'eplusout.sql')
if os.path.isfile(sql_file):
sql.append(sql_file)
zsz_file = os.path.join(bldg_dir, 'epluszsz.csv')
if os.path.isfile(zsz_file):
zsz.append(zsz_file)
rdd_file = os.path.join(bldg_dir, 'eplusout.rdd')
if os.path.isfile(rdd_file):
rdd.append(rdd_file)
html_file = os.path.join(bldg_dir, 'eplustbl.htm')
if os.path.isfile(html_file):
html.append(html_file)
err_file = os.path.join(bldg_dir, 'eplusout.err')
if os.path.isfile(err_file):
err.append(err_file)
return osm, idf, sql, zsz, rdd, html, err
def _run_default_report_windows(feature_geojson, scenario_csv):
"""Generate default reports on a Windows-based os.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
Paths to the scenario CSV and JSON reports.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the batch file to call URBANopt CLI
working_drive = directory[:2]
batch = '{}\ncd {}\ncall "{}"\nuo process --default -f "{}" -s "{}"'.format(
working_drive, working_drive, folders.urbanopt_env_path,
feature_geojson, scenario_csv)
batch_file = os.path.join(directory, 'run_default_report.bat')
write_to_file(batch_file, batch, True)
# run the batch file and return output files
os.system('"{}"'.format(batch_file))
result_folder = os.path.basename(scenario_csv).lower().replace('.csv', '')
run_folder = os.path.join(directory, 'run', result_folder)
return os.path.join(run_folder, 'default_scenario_report.csv'), \
os.path.join(run_folder, 'default_scenario_report.json')
def _run_default_report_unix(feature_geojson, scenario_csv):
"""Generate default reports on a Unix-based os.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
Paths to the scenario CSV and JSON reports.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the shell script to call OpenStudio CLI
shell = '#!/usr/bin/env bash\nsource "{}"\n' \
'uo process --default -f "{}" -s "{}"'.format(
folders.urbanopt_env_path, feature_geojson, scenario_csv)
shell_file = os.path.join(directory, 'run_default_report.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
subprocess.call(shell_file)
result_folder = os.path.basename(scenario_csv).lower().replace('.csv', '')
run_folder = os.path.join(directory, 'run', result_folder)
return os.path.join(run_folder, 'default_scenario_report.csv'), \
os.path.join(run_folder, 'default_scenario_report.json')
def _run_reopt_windows(feature_geojson, scenario_csv, developer_key):
"""Run a feature and scenario file through REopt on a Windows-based os.
A batch file will be used to run the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
developer_key: Text string for the NREL developer key.
Returns:
Path to the folder in which results should be contained.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the batch file to call URBANopt CLI
working_drive = directory[:2]
batch = '{}\ncd {}\ncall "{}"\nSET GEM_DEVELOPER_KEY={}\n' \
'uo process --reopt-scenario -f "{}" -s "{}"'.format(
working_drive, working_drive, folders.urbanopt_env_path, developer_key,
feature_geojson, scenario_csv)
batch_file = os.path.join(directory, 'run_reopt.bat')
write_to_file(batch_file, batch, True)
# run the batch file
os.system('"{}"'.format(batch_file))
result_folder = os.path.basename(scenario_csv).lower().replace('.csv', '')
return os.path.join(directory, 'run', result_folder)
def _run_reopt_unix(feature_geojson, scenario_csv, developer_key):
"""Run a feature and scenario file through REopt on a Unix-based os.
This includes both Mac OS and Linux since a shell will be used to run
the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
developer_key: Text string for the NREL developer key.
Returns:
Path to the folder in which results should be contained.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the shell script to call OpenStudio CLI
shell = '#!/usr/bin/env bash\nsource "{}"\nGEM_DEVELOPER_KEY={}\n' \
'uo process --reopt-scenario -f "{}" -s "{}"'.format(
folders.urbanopt_env_path, developer_key, feature_geojson, scenario_csv)
shell_file = os.path.join(directory, 'run_reopt.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
subprocess.call(shell_file)
result_folder = os.path.basename(scenario_csv).lower().replace('.csv', '')
return os.path.join(directory, 'run', result_folder)
def _output_reopt_files(directory):
"""Get the paths to the simulation output files given the reopt directory.
Args:
directory: The path to the folder in which results should be contained.
Returns:
A series of file paths to the simulation output files
- csv -- Path to a CSV file containing scenario optimization results.
- report_json -- Path to a JSON file containing scenario optimization results.
"""
# generate paths to the simulation files
csv_file = os.path.join(directory, 'scenario_optimization.csv')
report_json_file = os.path.join(directory, 'scenario_optimization.json')
# check that the simulation files exist
csv = csv_file if os.path.isfile(csv_file) else None
report_json = report_json_file if os.path.isfile(report_json_file) else None
return csv, report_json
def _run_rnm_windows(feature_geojson, scenario_csv):
"""Run a feature and scenario file through RNM on a Windows-based os.
A batch file will be used to run the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
Path to the project folder.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the batch file to call URBANopt CLI
working_drive = directory[:2]
batch = '{}\ncd {}\ncall "{}"\nuo rnm --feature "{}" --scenario "{}"'.format(
working_drive, working_drive, folders.urbanopt_env_path,
feature_geojson, scenario_csv)
batch_file = os.path.join(directory, 'run_rnm.bat')
write_to_file(batch_file, batch, True)
# run the batch file
os.system('"{}"'.format(batch_file))
return directory
def _run_rnm_unix(feature_geojson, scenario_csv):
"""Run a feature and scenario file through RNM on a Unix-based os.
This includes both Mac OS and Linux since a shell script will be used to run
the simulation.
Args:
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
Path to the project folder.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# Write the shell script to call URBANopt CLI
shell = '#!/usr/bin/env bash\nsource "{}"\nuo rnm --feature "{}" -s-scenario ' \
'"{}"'.format(folders.urbanopt_env_path, feature_geojson, scenario_csv)
shell_file = os.path.join(directory, 'run_rnm.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
subprocess.call(shell_file)
return directory
def _generate_modelica_windows(sys_param_json, feature_geojson, scenario_csv):
"""Generate Modelica files for a DES on a Windows-based os.
A batch file will be used to run the simulation.
Args:
sys_param_json: The full path to a system parameter JSON from which the
Modelica files will be generated.
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
A tuple with two values.
- modelica_dir -- Path to the folder in which the Modelica files were written.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# get the path to the MBL installation
install_directory = os.path.join(lb_folders.ladybug_tools_folder, 'resources')
mbl_dir = os.path.join(install_directory, 'mbl')
assert os.path.isdir(mbl_dir), \
'No Modelica Buildings Library installation was found on this machine.'
# get the paths to the output files
scn_name = os.path.basename(scenario_csv).replace('.csv', '')
modelica_dir = os.path.join(directory, 'run', scn_name, 'des_modelica')
uo_des_exe = os.path.join(hb_folders.python_scripts_path, 'uo_des.exe')
# Write the batch file to call the GMT
working_drive = directory[:2]
batch = '{}\ncd {}\ncall "{}"\nSET "MODELICAPATH={}"\n"{}" create-model ' \
'"{}" "{}" "{}" --overwrite'.format(
working_drive, working_drive, folders.urbanopt_env_path, mbl_dir,
uo_des_exe, sys_param_json, feature_geojson, modelica_dir)
batch_file = os.path.join(directory, 'generate_modelica.bat')
write_to_file(batch_file, batch, True)
# run the batch file
process = subprocess.Popen(
'"{}"'.format(batch_file), stderr=subprocess.PIPE, env=PYTHON_ENV
)
result = process.communicate()
stderr = result[1]
return modelica_dir, stderr
def _generate_modelica_unix(sys_param_json, feature_geojson, scenario_csv):
"""Generate Modelica files for a DES on a Unix-based os.
This includes both Mac OS and Linux since a shell will be used to run
the simulation.
Args:
sys_param_json: The full path to a system parameter JSON from which the
Modelica files will be generated.
feature_geojson: The full path to a .geojson file containing the
footprints of buildings to be simulated.
scenario_csv: The full path to a .csv file for the URBANopt scenario.
Returns:
A tuple with two values.
- directory -- Path to the folder out of which the simulation was run.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# check the input file
directory = _check_urbanopt_file(feature_geojson, scenario_csv)
# get the path to the MBL installation
install_directory = os.path.join(lb_folders.ladybug_tools_folder, 'resources')
mbl_dir = os.path.join(install_directory, 'mbl')
assert os.path.isdir(mbl_dir), \
'No Modelica Buildings Library installation was found on this machine.'
# get the paths to the output files
scn_name = os.path.basename(scenario_csv).replace('.csv', '')
modelica_dir = os.path.join(directory, 'run', scn_name, 'des_modelica')
uo_des_exe = os.path.join(hb_folders.python_scripts_path, 'uo_des')
# write the shell script to call the GMT
shell = '#!/usr/bin/env bash\nsource "{}"\nexport MODELICAPATH="{}"\n' \
'"{}" create-model "{}" "{}" "{}" --overwrite'.format(
folders.urbanopt_env_path, mbl_dir,
uo_des_exe, sys_param_json, feature_geojson, modelica_dir)
shell_file = os.path.join(directory, 'generate_modelica.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
process = subprocess.Popen(
'"{}"'.format(shell_file), stderr=subprocess.PIPE, env=PYTHON_ENV, shell=True
)
result = process.communicate()
stderr = result[1]
return modelica_dir, stderr
def _add_water_heating_patch(modelica_dir):
"""Add a dummy value for water heating for MBL 10 limitation."""
data_dir = os.path.join(modelica_dir, 'Loads', 'Resources', 'Data')
if os.path.isdir(data_dir):
for bldg_dir in os.listdir(data_dir):
mo_load_file = os.path.join(data_dir, bldg_dir, 'modelica.mos')
if os.path.isfile(mo_load_file):
fixed_lines, fl_found = [], False
with open(mo_load_file, 'r') as mlf:
for line in mlf:
if line == '#Peak water heating load = 0 Watts\n':
nl = '#Peak water heating load = 1 Watts\n'
fixed_lines.append(nl)
elif not fl_found and ';' in line:
split_vals = line.split(';')
split_vals[-1] = '1.0\n'
fixed_lines.append(';'.join(split_vals))
fl_found = True
else:
fixed_lines.append(line)
with open(mo_load_file, 'w') as mlf:
mlf.write(''.join(fixed_lines))
def _run_modelica_windows(modelica_project_dir):
"""Execute Modelica files of a DES on a Windows-based OS.
A batch file will be used to run the simulation.
Args:
modelica_project_dir: The full path to the folder in which the Modelica
files were written.
Returns:
A tuple with two values.
- results -- Path to where the results were written.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# make sure that docker is installed
assert folders.docker_version_str is not None, \
'No Docker installation was found on this machine.\n' \
'This is needed to execute Modelica simulations.'
# get the paths to the output files
directory = os.path.dirname(modelica_project_dir)
project_name = os.path.basename(modelica_project_dir)
results = os.path.join(
modelica_project_dir,
'{}.Districts.DistrictEnergySystem_results'.format(project_name))
uo_des_exe = os.path.join(hb_folders.python_scripts_path, 'uo_des.exe')
# Write the batch file to call the GMT
working_drive = modelica_project_dir[:2]
batch = '{}\ncd {}\n"{}" run-model "{}"'.format(
working_drive, working_drive, uo_des_exe, modelica_project_dir)
batch_file = os.path.join(directory, 'run_modelica.bat')
write_to_file(batch_file, batch, True)
# run the batch file
process = subprocess.Popen(
'"{}"'.format(batch_file), stderr=subprocess.PIPE, env=PYTHON_ENV
)
result = process.communicate()
stderr = result[1]
return results, stderr
def _run_modelica_unix(modelica_project_dir):
"""Execute Modelica files of a DES on a Unix-based OS.
This includes both Mac OS and Linux since a shell will be used to run
the simulation.
Args:
modelica_project_dir: The full path to the folder in which the Modelica
files were written.
Returns:
A tuple with two values.
- results -- Path to where the results were written.
- stderr -- The standard error message, which should get to the user
in the event of simulation failure.
"""
# make sure that docker is installed
assert folders.docker_version_str is not None, \
'No Docker installation was found on this machine.\n' \
'This is needed to execute Modelica simulations.'
# get the paths to the output files
directory = os.path.dirname(modelica_project_dir)
project_name = os.path.basename(modelica_project_dir)
results = os.path.join(
modelica_project_dir,
'{}.Districts.DistrictEnergySystem_results'.format(project_name))
uo_des_exe = os.path.join(hb_folders.python_scripts_path, 'uo_des')
# write the shell script to call the GMT
shell = '#!/usr/bin/env bash\n"{}" run-model "{}"'.format(
uo_des_exe, modelica_project_dir)
shell_file = os.path.join(directory, 'run_modelica.sh')
write_to_file(shell_file, shell, True)
# make the shell script executable using subprocess.check_call
# this is more reliable than native Python chmod on Mac
subprocess.check_call(['chmod', 'u+x', shell_file])
# run the shell script
process = subprocess.Popen(
'"{}"'.format(shell_file), stderr=subprocess.PIPE, env=PYTHON_ENV, shell=True
)
result = process.communicate()
stderr = result[1]
return results, stderr