Source code for dragonfly_energy.opendss.result
"""Module for parsing OpenDSS results to data collections."""
import os
import datetime
from ladybug.datacollection import HourlyContinuousCollection
from ladybug.header import Header
from ladybug.analysisperiod import AnalysisPeriod
from ladybug.datatype.generic import GenericType
from ladybug.datatype.fraction import Fraction
from ladybug.futil import csv_to_matrix
class OpenDSSResult(object):
"""Object for parsing OpenDSS CSV result files into Ladybug DataCollections.
file_paths: A list of file paths to CSV files that were generated by OpenDSS.
* file_paths
* factor_data
* condition_data
* peak_factors
* average_factors
# data types for the various outputs from OpenDSS
IS_OVERLOADED = GenericType(
'Is Overloaded', 'condition',
unit_descr={1: 'Overloaded', 0: 'Normal'})
'Voltage Condition', 'condition',
unit_descr={-1: 'Undervoltage', 0: 'Normal', 1: 'Overvoltage'})
def __init__(self, file_paths):
"""Initialize OpenDSSResult"""
# check the file paths
for fp in file_paths:
assert os.path.isfile(fp), 'No file was found at {}'.format(fp)
assert fp.endswith('.csv'), \
'{} is not an SQL file ending in .sql or .db.'.format(fp)
self._file_paths = tuple(file_paths)
# parse the csv data
factors, condition = [], []
for result_file in self._file_paths:
# parse the data and figure out the time series properties
data = csv_to_matrix(result_file)
data.pop(0) # remove the header from the CSV column
a_period = self._extract_analysis_period(data)
# figure out the type of object to write into the metadata
obj_name = os.path.basename(result_file).replace('.csv', '')
if obj_name.startswith('Line.'):
obj_name = obj_name.replace('Line.', '')
obj_type = 'Electrical Connector Loading'
elif obj_name.startswith('Transformer.'):
obj_name = obj_name.replace('Transformer.', '')
obj_type = 'Transformer Loading'
obj_type = 'Building Voltage'
metadata = {'type': obj_type, 'name': obj_name}
# output the data collection of factors
result_vals = [float(data[i][1]) for i in range(len(data))]
header = Header(Fraction('Loading Factor'), 'fraction', a_period, metadata)
factors.append(HourlyContinuousCollection(header, result_vals))
# output the data collection of conditions
if len(data[0]) == 4: # building voltage results
cond_vals = []
for row in data:
cond = 0 if row[2] == 'False' else 1
if cond != 1 and row[3] == 'True\n':
cond = -1
header = Header(
a_period, metadata)
condition.append(HourlyContinuousCollection(header, cond_vals))
else: # transformer or connector load
cond_vals = []
for row in data:
cond = 0 if row[2] == 'False\n' else 1
header = Header(
self.IS_OVERLOADED, self.IS_OVERLOADED.units[0],
a_period, metadata)
condition.append(HourlyContinuousCollection(header, cond_vals))
self._factor_data = tuple(factors)
self._condition_data = tuple(condition)
def file_paths(self):
"""Get a tuple of file paths to CSV files."""
return self._file_paths
def factor_data(self):
"""Get a tuple of loading factors associated with the input file paths."""
return self._factor_data
def condition_data(self):
"""Get a tuple of conditions associated with the input file paths."""
return self._condition_data
def peak_factors(self):
"""Get a list of numbers for the peak factors across the data."""
return [dat.max for dat in self._factor_data]
def average_factors(self):
"""Get a list of numbers for the average factors across the data."""
return [dat.average for dat in self._factor_data]
def _date_str_to_datetime(date_str):
"""Get a datetime object from a string."""
return datetime.datetime.strptime(date_str, '%Y/%m/%d %H:%M:%S')
def _extract_analysis_period(data):
"""Extract an AnalysisPeriod from CSV data."""
dts = [OpenDSSResult._date_str_to_datetime(data[i][0]) for i in (0, 1, -2)]
timestep = int(3600 / (dts[1] - dts[0]).total_seconds())
leap_year = True if dts[0].year % 4 == 0 else False
a_period = AnalysisPeriod(
dts[0].month, dts[0].day, 0, dts[-1].month, dts[-1].day, 23,
timestep=timestep, is_leap_year=leap_year)
return a_period
def ToString(self):
"""Overwrite .NET ToString."""
return self.__repr__()
def __repr__(self):
return 'OpenDSS Result [{} files]'.format(len(self.file_paths))