# coding=utf-8
"""Module for parsing EnergyPlus SQLite result files into Ladybug DataCollections"""
from __future__ import division
import os
import sqlite3
from collections import OrderedDict
import ladybug.datatype
from ladybug.datatype.generic import GenericType
from .dt import DateTime, datetime
from .location import Location
from .analysisperiod import AnalysisPeriod
from .header import Header
from .datacollection import HourlyContinuousCollection, DailyCollection, \
MonthlyCollection
[docs]
class SQLiteResult(object):
"""Object for parsing EnergyPlus SQLite result files into Ladybug DataCollections.
Args:
file_path: Full path to an SQLite file that was generated by EnergyPlus.
Properties:
* file_path
* location
* reporting_frequency
* run_periods
* run_period_names
* run_period_indices
* available_outputs
* available_outputs_info
* zone_cooling_sizes
* zone_heating_sizes
* component_sizes
* component_types
"""
_interval_codes = ('Timestep', 'Hourly', 'Daily', 'Monthly', 'Annual', 'Annual')
def __init__(self, file_path):
"""Initialize SQLiteResult"""
assert os.path.isfile(file_path), 'No file was found at {}'.format(file_path)
assert file_path.endswith(('.sql', '.db', '.sqlite')), \
'{} is not an SQL file ending in .sql or .db.'.format(file_path)
self._file_path = file_path
# values to be computed as soon as they are requested
self._location = None
self._reporting_frequency = None
self._run_periods = None
self._run_period_names = None
self._run_period_indices = None
self._available_outputs = None
self._available_outputs_info = None
self._zone_cooling_sizes = None
self._zone_heating_sizes = None
self._component_sizes = None
@property
def file_path(self):
"""Get the path to the .sql file."""
return self._file_path
@property
def location(self):
"""Get a Ladybug Location object derived from the SQL data.
This will be None if there was no AllSummary report in the SQL file.
"""
if not self._location:
self._extract_location()
return self._location
@property
def reporting_frequency(self):
"""Get text for the output reporting frequency. Will be one of the following.
* Annual
* Monthly
* Daily
* Hourly
* An integer for the number of steps per hour
"""
if not self._reporting_frequency:
self._extract_available_outputs()
if 'Timestep' in self._reporting_frequency: # get timesteps per hour
self._reporting_frequency = self._extract_timestep()
return self._reporting_frequency
@property
def run_periods(self):
"""Get an array of Ladybug AnalysisPeriod objects for the simulation run periods.
This will be None if there was no AllSummary report in the SQL file.
"""
if not self._run_periods:
self._extract_full_run_periods()
return tuple(self._run_periods)
@property
def run_period_names(self):
"""Get an array of text for the names of the run periods.
These will align with the run_periods property. This will be None if
there was no AllSummary report in the SQL file.
"""
if not self._run_period_names:
self._extract_full_run_periods()
return tuple(self._run_period_names)
@property
def run_period_indices(self):
"""Get an array integers used to identify the run periods in the time table.
These will align with the run_periods property.
"""
if not self._run_period_indices:
self._extract_full_run_period_indices()
return self._run_period_indices
@property
def available_outputs(self):
"""Get a list of strings for available timeseries outputs that can be requested.
Any of these outputs when input to data_collections_by_output_name will
yield a result with data collections.
"""
if not self._available_outputs:
self._extract_available_outputs()
return self._available_outputs
@property
def available_outputs_info(self):
"""Get a list of dictionaries with outputs within the SQL and their metadata.
Each dictionary is formatted with the keys below.
.. code-block:: python
{
"output_name": "Zone Ideal Loads Supply Air Total Cooling Energy",
"object_type": "Zone",
"units": "kWh",
"data_type ": Energy # this is a ladybug DataType object
}
"""
if not self._available_outputs_info:
self._extract_available_outputs()
return self._available_outputs_info
@property
def zone_cooling_sizes(self):
"""Get a list of ZoneSize objects for all conditioned zones in the model.
Each ZoneSize object contains several properties regarding the outcome
of the cooling sizing calculation.
"""
if not self._zone_cooling_sizes:
self._zone_cooling_sizes = self._extract_zone_sizes('Cooling')
return self._zone_cooling_sizes
@property
def zone_heating_sizes(self):
"""Get a list of ZoneSize objects for all conditioned zones in the model.
Each ZoneSize object contains several properties regarding the outcome
of the heating sizing calculation.
"""
if not self._zone_heating_sizes:
self._zone_heating_sizes = self._extract_zone_sizes('Heating')
return self._zone_heating_sizes
@property
def component_sizes(self):
"""Get a list of ComponentSize objects for all HVAC components in the results.
Each ComponentSize object contains several properties regarding the outcome
of the sizing calculation.
"""
if not self._component_sizes:
self._component_sizes = self._extract_component_sizes()
return self._component_sizes
@property
def component_types(self):
"""Get a list of text for all component types contained in the results."""
if not self._component_sizes:
self._component_sizes = self._extract_component_sizes()
_comp_types = set()
for comp in self._component_sizes:
_comp_types.add(comp.component_type)
return list(_comp_types)
[docs]
def component_sizes_by_type(self, component_type):
"""Get a list of ComponentSize objects for a specific type of HVAC component.
This can be much faster than using the component_sizes property when
there are a lot of components in the model and only one type of
component is needed.
Args:
component_type: Text for the type of component to be retrieved.
(eg. 'ZoneHVAC:IdealLoadsAirSystem')
"""
return self._extract_component_sizes(component_type)
[docs]
def values_by_output_name(self, output_name):
"""Get a list of values for a specified output.
This list will be flat and will not be organized into different run periods
or different zones.
Args:
output_name: The name of an EnergyPlus output to be retrieved from
the SQLite result file. This can also be an array of output names
for which all data collections should be retrieved.
Returns:
An array of values for the requested output type. This will be an empty list
if no output of the requested name was found in the file.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract all indices in the ReportDataDictionary with the output_name
c = conn.cursor()
cols = 'ReportDataDictionaryIndex, IndexGroup, KeyValue, Name, ' \
'ReportingFrequency, Units'
if isinstance(output_name, str): # assume it's a single output
query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols)
c.execute(query, (output_name,))
elif len(output_name) == 1: # assume it's a list
query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols)
c.execute(query, (output_name[0],))
else: # assume it is a list of outputs
c.execute('SELECT {} FROM ReportDataDictionary WHERE Name IN {}'.format(
cols, tuple(output_name)))
header_rows = c.fetchall()
# if nothing was found, return an empty list
if len(header_rows) == 0:
conn.close() # ensure connection is always closed
return []
# remove any data not of the same frequency
freq = header_rows[0][4]
header_rows = [row for row in header_rows if row[4] == freq]
# extract all data of the relevant type from ReportData
rel_indices = tuple(row[0] for row in header_rows)
if len(rel_indices) == 1:
c.execute('SELECT Value, TimeIndex FROM ReportData WHERE '
'ReportDataDictionaryIndex=? ORDER BY '
'TimeIndex', rel_indices)
else:
c.execute('SELECT Value, TimeIndex FROM ReportData WHERE '
'ReportDataDictionaryIndex IN {} ORDER BY '
'TimeIndex'.format(rel_indices))
data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# return all of the values that were found
return [val[0] for val in data]
[docs]
def data_collections_by_output_name(self, output_name):
"""Get an array of Ladybug DataCollections for a specified output.
Args:
output_name: The name of an EnergyPlus output to be retrieved from
the SQLite result file. This can also be an array of output names
for which all data collections should be retrieved.
Returns:
An array of data collections of the requested output type. This will
be an empty list if no output of the requested name was found in the
file.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract all indices in the ReportDataDictionary with the output_name
c = conn.cursor()
cols = 'ReportDataDictionaryIndex, IndexGroup, KeyValue, Name, ' \
'ReportingFrequency, Units'
if isinstance(output_name, str): # assume it's a single output
query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols)
c.execute(query, (output_name,))
elif len(output_name) == 1: # assume it's a list
query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols)
c.execute(query, (output_name[0],))
else: # assume it is a list of outputs
c.execute('SELECT {} FROM ReportDataDictionary WHERE Name IN {}'.format(
cols, tuple(output_name)))
header_rows = c.fetchall()
# if nothing was found, return an empty list
if len(header_rows) == 0:
conn.close() # ensure connection is always closed
return []
# remove any data not of the same frequency
freq = header_rows[0][4]
header_rows = [row for row in header_rows if row[4] == freq]
# extract all data of the relevant type from ReportData
rel_indices = tuple(row[0] for row in header_rows)
if len(rel_indices) == 1:
c.execute('SELECT Value, TimeIndex FROM ReportData WHERE '
'ReportDataDictionaryIndex=? ORDER BY '
'TimeIndex', rel_indices)
else:
c.execute('SELECT Value, TimeIndex FROM ReportData WHERE '
'ReportDataDictionaryIndex IN {} ORDER BY '
'TimeIndex'.format(rel_indices))
data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# get the analysis period and the reporting frequency from the time table
st_time, end_time = data[0][1], data[-1][1]
run_period, report_frequency, mult = self._extract_run_period(st_time, end_time)
if mult: # there are multiple analysis periods; get them all
run_period = self._extract_all_run_period(
report_frequency, run_period.timestep, run_period.is_leap_year)
# create the header objects to be used for the resulting data collections
units = header_rows[0][-1] if header_rows[0][-1] != 'J' else 'kWh'
data_type, units = self._data_type_from_unit(units, header_rows[0][3])
meta_datas = []
for row in header_rows:
obj_type = row[1] if 'Surface' not in output_name else 'Surface'
meta_datas.append({'type': row[3], obj_type: row[2]})
headers = []
if report_frequency == 'Annual':
pass
elif isinstance(run_period, list): # multiple run periods
for runper in run_period:
for m_data in meta_datas:
headers.append(Header(data_type, units, runper, m_data))
else: # just one run period
for m_data in meta_datas:
headers.append(Header(data_type, units, run_period, m_data))
# format the data such that we have one list for each of the header rows
if isinstance(run_period, list): # multiple run periods
if report_frequency == 'Monthly':
chunks = [len(runper.months_int) for runper in run_period]
elif report_frequency == 'Daily':
chunks = [len(runper.doys_int) for runper in run_period]
else:
chunks = [len(runper) for runper in run_period]
if units == 'kWh':
all_values = self._partition_and_convert_timeseries_chunks(data, chunks)
else:
all_values = self._partition_timeseries_chunks(data, chunks)
else: # just one run period
n_lists = len(header_rows)
if units == 'kWh':
all_values = self._partition_and_convert_timeseries(data, n_lists)
else:
all_values = self._partition_timeseries(data, n_lists)
# create the final data collections
data_colls = []
if report_frequency == 'Hourly' or isinstance(report_frequency, int):
for head, values in zip(headers, all_values):
data_colls.append(HourlyContinuousCollection(head, values))
elif report_frequency == 'Daily':
for head, values in zip(headers, all_values):
data_colls.append(DailyCollection(
head, values, head.analysis_period.doys_int))
elif report_frequency == 'Monthly':
for head, values in zip(headers, all_values):
data_colls.append(MonthlyCollection(
head, values, head.analysis_period.months_int))
else: # Annual data; just return the values as they are
return [val[0] for val in all_values]
# ensure all imported data gets marked as valid; this increases speed elsewhere
for data in data_colls:
data._validated_a_period = True
return data_colls
[docs]
def data_collections_by_output_name_run_period(self, output_name, run_period_index):
"""Get an array of Ladybug DataCollections for an output and a run period index.
Args:
output_name: The name of an EnergyPlus output to be retrieved from
the SQLite result file as a string.
run_period_index: An integer taken from the run_period_indices property
of this object, which will be used to select out data collections
for just one run period in the SQL file.
Returns:
An array of data collections of the requested output type. This will
be an empty list if no output of the requested name was found in the
file.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract all indices in the ReportDataDictionary with the output_name
c = conn.cursor()
cols = 'ReportDataDictionaryIndex, IndexGroup, KeyValue, Name, ' \
'ReportingFrequency, Units'
query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols)
c.execute(query, (output_name,))
header_rows = c.fetchall()
# if nothing was found, return an empty list
if len(header_rows) == 0:
conn.close() # ensure connection is always closed
return []
# remove any data not of the same frequency
freq = header_rows[0][4]
header_rows = [row for row in header_rows if row[4] == freq]
# extract all data of the relevant type from ReportData
rel_indices = tuple(row[0] for row in header_rows)
query = 'SELECT ReportData.Value, ReportData.TimeIndex ' \
'FROM ReportData ' \
'INNER JOIN Time ON ReportData.TimeIndex=Time.TimeIndex ' \
'WHERE ReportData.ReportDataDictionaryIndex IN {} AND ' \
'Time.EnvironmentPeriodIndex=?'.format(rel_indices)
c.execute(query, (run_period_index,))
data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# get the analysis period and the reporting frequency from the time table
st_time, end_time = data[0][1], data[-1][1]
run_period, report_frequency, mult = self._extract_run_period(st_time, end_time)
# create the header objects to be used for the resulting data collections
units = header_rows[0][-1] if header_rows[0][-1] != 'J' else 'kWh'
data_type, units = self._data_type_from_unit(units, header_rows[0][3])
headers = []
for row in header_rows:
obj_type = row[1] if 'Surface' not in output_name else 'Surface'
m_data = {'type': row[3], obj_type: row[2]}
headers.append(Header(data_type, units, run_period, m_data))
# format the data such that we have one list for each of the header rows
all_values = self._partition_and_convert_timeseries(data, len(header_rows)) if \
units == 'kWh' else self._partition_timeseries(data, len(header_rows))
# create the final data collections
data_colls = []
if report_frequency == 'Hourly' or isinstance(report_frequency, int):
for head, values in zip(headers, all_values):
data_colls.append(HourlyContinuousCollection(head, values))
elif report_frequency == 'Daily':
for head, values in zip(headers, all_values):
data_colls.append(DailyCollection(
head, values, head.analysis_period.doys_int))
elif report_frequency == 'Monthly':
for head, values in zip(headers, all_values):
data_colls.append(MonthlyCollection(
head, values, head.analysis_period.months_int))
else: # Annual data; just return the values as they are
return all_values
# ensure all imported data gets marked as valid; this increases speed elsewhere
for data in data_colls:
data._validated_a_period = True
return data_colls
[docs]
def tabular_data_by_name(self, table_name, j_to_kwh=True, report_name=None):
"""Get all the data within a table of a Summary Report using the table name.
Args:
table_name: Text string for the name of a table within a summary
report. (eg. 'General').
j_to_kwh: Boolean to note if any data in MJ or GJ should be
converted to kWh upon import of the table. This will also mean
that any area-normalized values will also be converted to kWh/m2.
report_name: An optional text string to indicate the report name from
which the table should be pulled. This is useful in cases where
tables have the same name in different reports. If None, data from
all available tables will be returned. (Default: None).
Returns:
An ordered dictionary representing a matrix (list of lists), where
the keys of the dictionary represent the row names and each value
is a row of the table. The output should mirror how the table appears
in the HTML output.
"""
conn = sqlite3.connect(self.file_path)
try:
# get the cursor and list of fields to be extracted
c = conn.cursor()
fields_to_extract = ['RowName', 'Value']
if j_to_kwh:
fields_to_extract.append('Units')
fields_to_extract_str = ', '.join(fields_to_extract)
# extract the fields using a query
if report_name is None:
query_str = 'SELECT %s FROM TabularDataWithStrings ' \
'WHERE TableName=?' % fields_to_extract_str
c.execute(query_str, (table_name,))
else:
query_str = 'SELECT %s FROM TabularDataWithStrings ' \
'WHERE TableName=? AND ReportName=?' % fields_to_extract_str
c.execute(query_str, (table_name, report_name))
table_data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# convert all of the extracted data into a tabular format
table_dict = OrderedDict()
if j_to_kwh:
for item in table_data:
try:
val = float(item[1])
if 'GJ' in item[2]:
val = val / 0.0036
elif 'MJ' in item[2]:
val = val / 3.6
except ValueError: # not a number
val = item[1]
try:
table_dict[item[0]].append(val)
except KeyError:
table_dict[item[0]] = [val]
else:
for item in table_data:
try:
val = float(item[1])
except ValueError: # not a number
val = item[1]
try:
table_dict[item[0]].append(val)
except KeyError:
table_dict[item[0]] = [val]
return table_dict
[docs]
def tabular_column_names(self, table_name, report_name=None):
"""Get the names of the columns for a table of a Summary Report.
Args:
table_name: Text string for the name of a table within a summary
report. (eg. 'General').
report_name: An optional text string to indicate the report name from
which the table should be pulled. This is useful in cases where
tables have the same name in different reports. If None, data from
all available tables will be returned. (Default: None).
Returns:
A list of the column names of the table
"""
conn = sqlite3.connect(self.file_path)
try:
# extract the data from the General table in AllSummary
c = conn.cursor()
if report_name is None:
c.execute('SELECT ColumnName FROM TabularDataWithStrings '
'WHERE TableName=?', (table_name,))
else:
c.execute(
'SELECT ColumnName FROM TabularDataWithStrings '
'WHERE TableName=? AND ReportName=?', (table_name, report_name)
)
table_col_names = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
return list(OrderedDict.fromkeys([item[0] for item in table_col_names]))
def _extract_location(self):
"""Extract a Location object from the SQLite file."""
# extract all of the data from the General table in AllSummary
table_dict = self.tabular_data_by_name('General', False)
general = list(table_dict.values())
if general == []:
return
# convert the extracted data into a Location object
split_id = general[2][0].split(' ')
city = ' '.join(split_id[:-2])
source = split_id[-2]
station_id = split_id[-1].split('=')[-1]
self._location = Location(
city=city, source=source, station_id=station_id,
latitude=general[3][0], longitude=general[4][0],
time_zone=general[6][0], elevation=general[5][0])
def _extract_full_run_periods(self):
"""Extract all of the RunPeriod objects from the SQLite file."""
# extract all of the data from the General table in AllSummary
table_dict = self.tabular_data_by_name('Environment', False)
sim_env = list(table_dict.values())
if sim_env == []:
return
# convert the extracted data into a AnalysisPeriod objects
self._run_periods = []
self._run_period_names = []
for row in sim_env:
st_date = [int(digit) for digit in row[2].split('/')]
end_date = [int(digit) for digit in row[3].split('/')]
if len(st_date) == 3: # it's a true run period
lp_yr = True if st_date[-1] % 4 == 0 else False
aper = AnalysisPeriod(st_date[0], st_date[1], 0, end_date[0],
end_date[1], 23, is_leap_year=lp_yr)
else: # it's a design day period
aper = AnalysisPeriod(st_date[0], st_date[1], 0,
end_date[0], end_date[1], 23)
self._run_periods.append(aper)
self._run_period_names.append(row[0])
def _extract_full_run_period_indices(self):
"""Extract all RunPeriod indices from the Time table of the SQLite file."""
conn = sqlite3.connect(self.file_path)
try:
# extract all of the data from the Time table
c = conn.cursor()
c.execute('SELECT EnvironmentPeriodIndex FROM Time '
'GROUP BY EnvironmentPeriodIndex')
e_periods = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
self._run_period_indices = tuple(ind[0] for ind in e_periods)
def _extract_available_outputs(self):
"""Extract the list of all available outputs from the SQLite file."""
conn = sqlite3.connect(self.file_path)
try:
# extract all indices in the ReportDataDictionary
c = conn.cursor()
c.execute('SELECT Name, IndexGroup, Units, ReportingFrequency '
'FROM ReportDataDictionary')
outputs = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
unique_outputs = set(outputs)
self._available_outputs = tuple(outp[0] for outp in unique_outputs)
self._available_outputs_info = []
for outp in unique_outputs:
outp_dict = {}
outp_dict['output_name'] = outp[0]
outp_dict['object_type'] = outp[1]
outp_dict['units'] = outp[2] if outp[2] != 'J' else 'kWh'
outp_dict['data_type'], outp_dict['units'] = \
self._data_type_from_unit(outp_dict['units'], outp[0])
self._available_outputs_info.append(outp_dict)
self._reporting_frequency = outp[3]
def _extract_timestep(self):
"""Extract an integer for the timestep of the data.
This is done by checking the first entry within the Time table.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract the start and end times from the Time table
c = conn.cursor()
c.execute('SELECT Interval FROM Time')
min_per_step = c.fetchone()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
return int(60 / min_per_step[0])
def _extract_zone_sizes(self, load_type):
"""Get all of the ZoneSize objects of a certain load type.
Args:
load_type: Text for the type of load to retrieve.
This must be either 'Cooling' or 'Heating'.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract the data from the ZoneSizes table
c = conn.cursor()
c.execute('SELECT * FROM ZoneSizes WHERE LoadType=?', (load_type,))
table_data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
return [ZoneSize(table_row) for table_row in table_data]
def _extract_component_sizes(self, component_type=None):
"""Get all of the ComponentSize objects of a certain type in the model.
Args:
component_type: Text for the type of component to be retrieved.
(eg. 'ZoneHVAC:IdealLoadsAirSystem')
"""
conn = sqlite3.connect(self.file_path)
try:
# extract the data from the ZoneSizes table
c = conn.cursor()
if component_type:
c.execute('SELECT * FROM ComponentSizes WHERE CompType=?',
(component_type,))
else:
c.execute('SELECT * FROM ComponentSizes')
table_data = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# group the rows by component name
table_dict = {}
for prop in table_data:
comp_name = prop[2]
try:
table_dict[comp_name].append(prop)
except KeyError:
table_dict[comp_name] = [prop]
# create the ComponentSize objects
return [ComponentSize(table_rows) for table_rows in table_dict.values()]
def _extract_run_period(self, st_time, end_time):
"""Extract the run period object and frequency from the SQLite file.
Args:
st_time: Index for the start time of the data.
end_time: Index for the end time of the data.
Returns:
A tuple with run_period, reporting_frequency, and a boolean for whether
the data was for a design day.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract the start and end times from the Time table
query_str = 'SELECT Year, Month, Day, Interval, IntervalType, ' \
'EnvironmentPeriodIndex FROM Time WHERE TimeIndex=?'
c = conn.cursor()
c.execute(query_str, (st_time,))
start = c.fetchone()
c.execute(query_str, (end_time,))
end = c.fetchone()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
# check whether the data was for a design day
multiple_period = True if start[5] != end[5] else False
# set the reporting frequency by the interval type
interval_typ = start[4]
if interval_typ <= 1:
min_per_step = start[3]
aper_timestep = int(60 / min_per_step)
reporting_frequency = aper_timestep
else:
reporting_frequency = self._interval_codes[interval_typ]
aper_timestep = 1
min_per_step = 60
# convert the extracted data into an AnalysisPeriod object
leap_year = True if end[0] != 0 and end[0] % 4 == 0 else False
if reporting_frequency == 'Annual':
return None, reporting_frequency, multiple_period
if reporting_frequency == 'Monthly':
st_date = DateTime(start[1], 1, 0)
else:
st_date = DateTime(start[1], start[2], 0)
end_date = DateTime(end[1], end[2], 0)
end_date = end_date.add_minute(1440 - min_per_step)
run_period = AnalysisPeriod(
st_date.month, st_date.day, st_date.hour, end_date.month, end_date.day,
end_date.hour, aper_timestep, leap_year)
return run_period, reporting_frequency, multiple_period
def _extract_all_run_period(self, reporting_frequency, timestep, leap_year):
"""Extract all run period objects the Time table in the SQLite file.
Args:
reporting_frequency: Text for the reporting frequency of the data.
timestep: Integer to note the timestep of the analysis periods. By the
time this function is running, this value should have been gotten
from the _extract_run_period method
leap_year: Boolean to note whether the analysis periods are for a
leap year.
Returns:
A list of AnalysisPeriods for all periods that could be obtained from
the Time table.
"""
conn = sqlite3.connect(self.file_path)
try:
# extract all of the data from the Time table
c = conn.cursor()
c.execute('SELECT Month, Day, EnvironmentPeriodIndex FROM Time')
timeseries = c.fetchall()
conn.close() # ensure connection is always closed
except Exception as e:
conn.close() # ensure connection is always closed
raise Exception(str(e))
min_per_step = int(60 / timestep)
# extract information about the first run period
if reporting_frequency == 'Monthly':
st_date = DateTime(timeseries[0][0], 1, 0)
else:
st_date = DateTime(timeseries[0][0], timeseries[0][1], 0)
env_period = timeseries[0][2]
# build up the analysis period objects
run_periods = []
for i, time_row in enumerate(timeseries):
if time_row[2] != env_period: # start of a new run period
# create the run period
end = timeseries[i - 1]
end_date = DateTime(end[0], end[1], 0)
end_date = end_date.add_minute(1440 - min_per_step)
run_period = AnalysisPeriod(
st_date.month, st_date.day, st_date.hour, end_date.month,
end_date.day, end_date.hour, timestep, leap_year)
run_periods.append(run_period)
# reset the tracking variables
if reporting_frequency == 'Monthly':
st_date = DateTime(time_row[0], 1, 0)
else:
st_date = DateTime(time_row[0], time_row[1], 0)
env_period = time_row[2]
# create the last run period object and return all run periods
end_date = DateTime(time_row[0], time_row[1], 0)
end_date = end_date.add_minute(1440 - min_per_step)
run_period = AnalysisPeriod(
st_date.month, st_date.day, st_date.hour, end_date.month,
end_date.day, end_date.hour, timestep, leap_year)
run_periods.append(run_period)
return run_periods
@staticmethod
def _data_type_from_unit(from_unit, data_name=''):
"""Get a Ladybug DataType object instance from a unit abbreviation.
The returned object will be the base type (eg. Temperature, Energy, etc.).
"""
if from_unit == '': # dimensionless data type
return ladybug.datatype.TYPESDICT['Fraction'](), 'fraction'
for key in ladybug.datatype.UNITS:
if from_unit in ladybug.datatype.UNITS[key]:
return ladybug.datatype.TYPESDICT[key](), from_unit
# no units are specified; the values are dimensionless or fractional
return GenericType(data_name, from_unit), from_unit
@staticmethod
def _partition_timeseries(data, n_lists):
"""Partition timeseries data that has been retrieved from the SQL file.
Args:
n_lists: An integer for the number of lists to partition the data into.
"""
all_values = []
for i in range(0, len(data), n_lists):
all_values.append([val[0] for val in data[i:i + n_lists]])
return zip(*all_values)
@staticmethod
def _partition_and_convert_timeseries(data, n_lists):
"""Partition data that retrieved from the SQL file + convert it to kWh.
Args:
n_lists: An integer for the number of lists to partition the data into.
"""
all_values = []
for i in range(0, len(data), n_lists):
all_values.append([val[0] / 3600000. for val in data[i:i + n_lists]])
return zip(*all_values)
@staticmethod
def _partition_timeseries_chunks(data, chunks):
"""Partition timeseries data based on a chunking pattern.
Args:
chunks: A list of integers for the chunking pattern (eg. [24, 24, 8760]).
"""
n_lists = int(len(data) / sum(chunks))
zero_cum_chunks = [0] + SQLiteResult._accumulate(chunks)
all_values = []
for j, chunk in enumerate(chunks):
start = zero_cum_chunks[j] * n_lists
day_vals = []
for i in range(0, chunk * n_lists, n_lists):
day_vals.append([val[0] for val in data[start + i:start + i + n_lists]])
all_values.extend(zip(*day_vals))
return all_values
@staticmethod
def _partition_and_convert_timeseries_chunks(data, chunks):
"""Partition timeseries data based on a chunking pattern + convert it to kWh.
Args:
chunks: A list of integers for the chunking pattern (eg. [24, 24, 8760]).
"""
n_lists = int(len(data) / sum(chunks))
zero_sum_chunks = [0] + SQLiteResult._accumulate(chunks)
all_values = []
for j, chunk in enumerate(chunks):
start = zero_sum_chunks[j] * n_lists
day_vals = []
for i in range(0, chunk * n_lists, n_lists):
day_vals.append([val[0] / 3600000. for val in
data[start + i:start + i + n_lists]])
all_values.extend(zip(*day_vals))
return all_values
@staticmethod
def _accumulate(chunks):
"""Cumulatively sum a list of numbers."""
cum_chunks = []
total = 0
for x in chunks:
total += x
cum_chunks.append(total)
return cum_chunks
[docs]
def ToString(self):
"""Overwrite .NET ToString."""
return self.__repr__()
def __repr__(self):
return 'Energy SQLiteResult: {}'.format(self.file_path)
[docs]
class ZoneSize(object):
"""Object for holding the sizing results of an individual zone.
Args:
sql_table_row: A list that represents a row of the SQLite ZoneSizes table.
This row contains all of the sizing information for a single conditioned
zone (either heating or cooling).
Properties:
* zone_name
* load_type
* calculated_design_load
* final_design_load
* calculated_design_flow
* final_design_flow
* design_day_name
* peak_date_time
* peak_temperature
* peak_humidity_ratio
* peak_outdoor_air_flow
"""
__slots__ = ('_zone_name', '_load_type', '_calculated_design_load',
'_final_design_load', '_calculated_design_flow', '_final_design_flow',
'_design_day_name', '_peak_date_time', '_peak_temperature',
'_peak_humidity_ratio', '_peak_outdoor_air_flow')
def __init__(self, sql_table_row):
"""Initialize ZoneSize"""
self._zone_name = str(sql_table_row[1])
self._load_type = str(sql_table_row[2])
self._calculated_design_load = sql_table_row[3]
self._final_design_load = sql_table_row[4]
self._calculated_design_flow = sql_table_row[5]
self._final_design_flow = sql_table_row[6]
self._design_day_name = str(sql_table_row[7])
self._peak_temperature = sql_table_row[9]
self._peak_humidity_ratio = sql_table_row[10]
self._peak_outdoor_air_flow = sql_table_row[11]
try:
date_str = sql_table_row[8] if '24:00:00' not in sql_table_row[8] else \
sql_table_row[8].replace('24:00:00', '00:00:00')
pyd = datetime.strptime(date_str, '%m/%d %H:%M:%S')
self._peak_date_time = DateTime(pyd.month, pyd.day, pyd.hour, pyd.minute)
except Exception: # likely a zone with no cooling; a peak value of 0
self._peak_date_time = None
[docs]
@classmethod
def from_dict(cls, data):
"""Create a ZoneSize from a dictionary.
Args:
data: ZoneSize dictionary following the format below.
.. code-block:: python
{
"type": "ZoneSize",
"zone_name": str,
"load_type": str,
"calculated_design_load": float,
"final_design_load": float,
"calculated_design_flow": float,
"final_design_flow": float,
"design_day_name": str,
"peak_date_time": str,
"peak_temperature": float,
"peak_humidity_ratio": float,
"peak_outdoor_air_flow": float
}
"""
assert data['type'] == 'ZoneSize', \
'Expected ZoneSize. Got {}.'.format(data['type'])
return cls(
[None, data['zone_name'], data['load_type'], data['calculated_design_load'],
data['final_design_load'], data['calculated_design_flow'],
data['final_design_flow'], data['design_day_name'], data['peak_date_time'],
data['peak_temperature'], data['peak_humidity_ratio'],
data['peak_outdoor_air_flow']])
[docs]
def to_dict(self):
"""Get ZoneSize as a dictionary."""
pk_time = None if self.peak_date_time is None else \
self.peak_date_time.strftime('%m/%d %H:%M:%S')
return {
'type': 'ZoneSize',
'zone_name': self.zone_name,
'load_type': self.load_type,
'calculated_design_load': self.calculated_design_load,
'final_design_load': self.final_design_load,
'calculated_design_flow': self.calculated_design_flow,
'final_design_flow': self.final_design_flow,
'design_day_name': self.design_day_name,
'peak_date_time': pk_time,
'peak_temperature': self.peak_temperature,
'peak_humidity_ratio': self.peak_humidity_ratio,
'peak_outdoor_air_flow': self.peak_outdoor_air_flow
}
@property
def zone_name(self):
"""Get the name of the zone to which this sizing information corresponds."""
return self._zone_name
@property
def load_type(self):
"""Get a text string that is either "Cooling" or "Heating"."""
return self._load_type
@property
def calculated_design_load(self):
"""Get the peak load of the Zone computed by the EnergyPlus sizing calculation.
Values are always in Watts.
"""
return self._calculated_design_load
@property
def final_design_load(self):
"""Get the peak load of the Zone that is ultimately used to size the equipment.
Values are always in Watts. This accounts for the heating_factor and
cooling_factor of the specified in the SizingParameter of the
SimulationParameter object.
"""
return self._final_design_load
@property
def calculated_design_flow(self):
"""Get the peak flow of the Zone computed by the EnergyPlus sizing calculation.
Values are always in m3/s.
"""
return self._calculated_design_flow
@property
def final_design_flow(self):
"""Get the peak flor of the Zone that is ultimately used to size the equipment.
Values are always in m3/s. This accounts for the heating_factor and
cooling_factor of the specified in the SizingParameter of the
SimulationParameter object.
"""
return self._final_design_flow
@property
def design_day_name(self):
"""Get the name of the design day on which the peak load occurred."""
return self._design_day_name
@property
def peak_date_time(self):
"""Get a DateTime for the time at which the peak occurred."""
return self._peak_date_time
@property
def peak_temperature(self):
"""Get the outdoor air temperature at the time of the peak load (C)."""
return self._peak_temperature
@property
def peak_humidity_ratio(self):
"""Get the outdoor humidity ratio at the time of the peak load (fractional)."""
return self._peak_humidity_ratio
@property
def peak_outdoor_air_flow(self):
"""Get the outdoor air flow into the zone at the time of the peak load (m3/s)."""
return self._peak_outdoor_air_flow
[docs]
def ToString(self):
"""Overwrite .NET ToString."""
return self.__repr__()
def __repr__(self):
return '{} Zone Size: {} W'.format(self.load_type, self.calculated_design_load)
[docs]
class ComponentSize(object):
"""Object for holding the sizing results of an individual HVAC components.
Args:
sql_table_rows: A list of list where each sub-list represents a row of
the SQLite ComponentSizes table. These rows are all expected to have
the same component name.
Properties:
* component_type
* component_name
* descriptions
* properties
* values
* units
* properties_dict
"""
__slots__ = ('_component_type', '_component_name', '_properties', '_values',
'_units', '_properties_dict')
def __init__(self, sql_table_rows):
"""Initialize ComponentSize"""
self._component_type = str(sql_table_rows[0][1])
self._component_name = str(sql_table_rows[0][2])
self._properties = tuple(str(table_row[3]) for table_row in sql_table_rows)
self._values = tuple(table_row[4] for table_row in sql_table_rows)
self._units = tuple(str(table_row[5]) for table_row in sql_table_rows)
[docs]
@classmethod
def from_dict(cls, data):
"""Create a ComponentSize from a dictionary.
Args:
data: ComponentSize dictionary following the format below.
.. code-block:: python
{
"type": "ComponentSize",
"component_type": str,
"component_name": str,
"properties": [],
"values": [],
"units": []
}
"""
assert data['type'] == 'ComponentSize', \
'Expected ComponentSize. Got {}.'.format(data['type'])
mtx = []
for prop, val, unit in zip(data['properties'], data['values'], data['units']):
row = [None, data['component_type'], data['component_name'], prop, val, unit]
mtx.append(row)
return cls(mtx)
[docs]
def to_dict(self):
"""Get ComponentSize as a dictionary."""
return {
'type': 'ComponentSize',
'component_type': self.component_type,
'component_name': self.component_name,
'properties': self.properties,
'values': self.values,
'units': self.units
}
@property
def component_type(self):
"""Get text for the type of component that this object represents."""
return self._component_type
@property
def component_name(self):
"""Get text for the name of component that this object represents."""
return self._component_name
@property
def descriptions(self):
"""Get a tuple with text descriptions for all component properties.
Descriptions are formatted as component_name-property-unit. They are
aligned with the values on this object.
"""
return tuple('{}-{} [{}]'.format(self._component_name, prop, unit)
for prop, unit in zip(self._properties, self._units))
@property
def properties(self):
"""Get a tuple with text for all component properties.
They are aligned with the values on this object.
"""
return self._properties
@property
def values(self):
"""Get a tuple with numbers for all component property values.
They are aligned with the properties on this object.
"""
return self._values
@property
def units(self):
"""Get a tuple with text for all component property units.
They are aligned with the values on this object.
"""
return self._units
@property
def properties_dict(self):
"""Get a dictionary with the properties as keys and values as the values."""
return {prop: val for prop, val in zip(self._properties, self._values)}
[docs]
def ToString(self):
"""Overwrite .NET ToString."""
return self.__repr__()
def __repr__(self):
return 'Component Size: {}'.format(self.component_name)