"""Post-processing Results class."""
import json
from pathlib import Path
from itertools import islice, cycle
from typing import Tuple, Union, List
import numpy as np
import itertools
from ladybug.analysisperiod import AnalysisPeriod
from ladybug.datacollection import HourlyContinuousCollection
from ladybug.datatype.generic import GenericType
from ladybug.datatype.base import DataTypeBase
from ladybug.dt import DateTime
from ladybug.header import Header
from ..annual import occupancy_schedule_8_to_6
from ..metrics import (average_values_array2d, cumulative_values_array2d,
peak_values_array2d)
from ..util import filter_array, hoys_mask, check_array_dim, \
_filter_grids_by_pattern
from .. import type_hints
from ..dynamic import DynamicSchedule, ApertureGroupSchedule
class _ResultsFolder(object):
"""Base class for ResultsFolder.
This class includes properties that are independent of the results.
Args:
folder: Path to results folder.
Properties:
* folder
* grids_info
* sun_up_hours
* sun_down_hours
* light_paths
* default_states
* grid_states
* timestep
* study_hours
"""
__slots__ = ('_folder', '_grids_info', '_sun_up_hours', '_sun_down_hours',
'_sun_up_hours_mask', '_sun_down_hours_mask', '_datetimes',
'_light_paths', '_default_states', '_grid_states', '_timestep',
'_study_hours')
def __init__(self, folder: Union[str, Path]):
"""Initialize ResultsFolder."""
self._folder = Path(folder).absolute().as_posix()
self._timestep, self._study_hours = self._get_study_info()
self.grids_info = self._get_grids_info()
self.sun_up_hours = self._get_sun_up_hours()
self._sun_up_hours_mask = self._get_sun_up_hours_mask()
self._sun_down_hours_mask = self._get_sun_down_hours_mask()
self._datetimes = self._get_datetimes()
self._light_paths = self._get_light_paths()
self._default_states = self._get_default_states()
self._grid_states = self._get_grid_states()
@property
def folder(self):
"""Return full path to results folder as a string."""
return self._folder
@property
def grids_info(self):
"""Return grids information as list of dictionaries for each grid."""
return self._grids_info
@grids_info.setter
def grids_info(self, grids_info):
assert isinstance(grids_info, list), \
f'Grids information must be a list. Got object of type: {type(grids_info)}.'
for grid_info in grids_info:
assert isinstance(grid_info, dict), \
'Object in grids information must be a dictionary. ' \
f'Got object of type {type(grid_info)}.'
if 'light_path' in grid_info.keys():
_grid_info = []
for light_path in grid_info['light_path']:
if Path(self.folder, light_path[0]).exists():
_grid_info.append((light_path))
grid_info['light_path'] = _grid_info
if not grid_info['light_path']:
# if light path is empty
grid_info['light_path'] = [['__static_apertures__']]
else:
# if light path key is nonexistent
grid_info['light_path'] = [['__static_apertures__']]
self._grids_info = grids_info
@property
def sun_up_hours(self):
"""Return sun up hours."""
return self._sun_up_hours
@sun_up_hours.setter
def sun_up_hours(self, sun_up_hours):
assert isinstance(sun_up_hours, list), \
f'Sun up hours must be a list. Got object of type: {type(sun_up_hours)}'
self._sun_up_hours = sun_up_hours
self.sun_down_hours = np.setdiff1d(self.study_hours, sun_up_hours).tolist()
@property
def sun_up_hours_mask(self):
"""Return sun up hours masking array."""
return self._sun_up_hours_mask
@property
def sun_down_hours(self):
"""Return sun down hours."""
return self._sun_down_hours
@sun_down_hours.setter
def sun_down_hours(self, sun_down_hours):
assert isinstance(sun_down_hours, list), \
f'Sun down hours must be a list. Got object of type: {type(sun_down_hours)}'
self._sun_down_hours = sun_down_hours
@property
def sun_down_hours_mask(self):
"""Return sun down hours masking array."""
return self._sun_down_hours_mask
@property
def datetimes(self):
"""Return DateTimes for sun up hours."""
return self._datetimes
@property
def light_paths(self):
"""Return the identifiers of the light paths."""
return self._light_paths
@property
def default_states(self):
"""Return default states as a dictionary."""
return self._default_states
@property
def grid_states(self):
"""Return grid states as a dictionary."""
return self._grid_states
@property
def timestep(self):
"""Return timestep as an integer."""
return self._timestep
@property
def study_hours(self):
"""Return study hours as a list."""
return self._study_hours
def _get_light_paths(self) -> list:
"""Find all light paths in grids_info."""
lp = []
for grid_info in self.grids_info:
try:
light_paths = grid_info['light_path']
except KeyError:
grid_info['light_path'] = [['__static_apertures__']]
light_paths = grid_info['light_path']
for light_path in light_paths:
for elem in light_path:
if elem in lp:
continue
if elem == '__static_apertures__':
lp.insert(0, elem)
else:
lp.append(elem)
if not light_paths and '__static_apertures__' not in lp:
lp.insert(0, '__static_apertures__')
return lp
def _get_default_states(self) -> dict:
"""Set default state to 0 for all light paths."""
default_states = {}
for light_path in self.light_paths:
default_states[light_path] = [0]
return default_states
def _get_grid_states(self) -> dict:
"""Read grid_states.json if available."""
info = Path(self.folder, 'grid_states.json')
if info.is_file():
with open(info) as data_f:
data = json.load(data_f)
return data
else:
# only static results
return {}
def _get_study_info(self) -> Tuple[int, list]:
"""Read study info file."""
study_info_file = Path(self.folder).joinpath('study_info.json')
if study_info_file.exists():
with open(study_info_file) as file:
study_info = json.load(file)
if study_info['timestep'] == 1:
study_info['study_hours'] = \
list(map(int, study_info['study_hours']))
else:
study_info = {}
study_info['timestep'] = 1
study_info['study_hours'] = AnalysisPeriod().hoys
return study_info['timestep'], study_info['study_hours']
def _get_datetimes(self) -> List[DateTime]:
"""Get a list of DateTimes of the sun up hours."""
datetimes = [
DateTime.from_hoy(hoy) for hoy in list(map(float, self.sun_up_hours))
]
return datetimes
def _get_grids_info(self) -> List[dict]:
"""Get grids info from folder."""
info = Path(self.folder, 'grids_info.json')
with open(info) as data_f:
grids = json.load(data_f)
return grids
def _get_sun_up_hours(self) -> List[float]:
"""Get sun up hours from folder."""
suh_fp = Path(self.folder, 'sun-up-hours.txt')
sun_up_hours = np.loadtxt(suh_fp, dtype=float).tolist()
if self.timestep == 1:
sun_up_hours = list(map(int, sun_up_hours))
return sun_up_hours
def _get_sun_up_hours_mask(self) -> List[int]:
"""Get a sun up hours masking array of the study hours."""
sun_up_hours_mask = \
np.where(np.isin(self.study_hours, self.sun_up_hours))[0]
return sun_up_hours_mask
def _get_sun_down_hours_mask(self) -> List[int]:
"""Get a sun down hours masking array of the study hours."""
sun_down_hours_mask = \
np.where(~np.isin(self.study_hours, self.sun_up_hours))[0]
return sun_down_hours_mask
def __repr__(self):
return f'{self.__class__.__name__}: {self.folder}'
[docs]
class Results(_ResultsFolder):
"""Results class.
Args:
folder: Path to results folder.
schedule: 8760 values as a list. Values must be either 0 or 1. Values of 1
indicates occupied hours. If no schedule is provided a default schedule
will be used. (Default: None).
load_arrays: Set to True to load all NumPy arrays. If False the arrays will be
loaded only once they are needed. In both cases the loaded array(s) will be
stored in a dictionary under the arrays property. (Default: False).
Properties:
* schedule
* occ_pattern
* total_occ
* sun_down_occ_hours
* occ_mask
* arrays
* valid_states
* datatype
* unit
"""
__slots__ = ('_schedule', '_occ_pattern', '_total_occ', '_sun_down_occ_hours',
'_occ_mask', '_arrays', '_valid_states', '_datatype', '_unit')
def __init__(self, folder, datatype: DataTypeBase = None,
schedule: list = None, unit: str = None,
load_arrays: bool = False):
"""Initialize Results."""
_ResultsFolder.__init__(self, folder)
self.schedule = schedule
self._arrays = self._load_arrays() if load_arrays else {}
self._valid_states = self._get_valid_states()
self.datatype = datatype
self.unit = unit
@property
def schedule(self):
"""Return schedule."""
return self._schedule
@schedule.setter
def schedule(self, schedule):
self._schedule = schedule if schedule else \
occupancy_schedule_8_to_6(timestep=self.timestep, as_list=True)
self._update_occ()
@property
def occ_pattern(self):
"""Return a filtered version of the annual schedule that only includes the
sun-up-hours."""
return self._occ_pattern
@property
def total_occ(self):
"""Return an integer for the total occupied hours of the schedule."""
return self._total_occ
@property
def sun_down_occ_hours(self):
"""Return an integer for the number of occupied hours where the sun is down and
there's no possibility of being daylit or experiencing glare."""
return self._sun_down_occ_hours
@property
def occ_mask(self):
"""Return an occupancy mask as a NumPy array that can be used to mask the
results."""
return self._occ_mask
@property
def arrays(self):
"""Return a dictionary of all the NumPy arrays that have been loaded."""
return self._arrays
@arrays.setter
def arrays(self, value):
self._arrays = value
@property
def valid_states(self):
"""Return a dictionary with valid states. Each light path is represented by a
key-value pair where the light path identifier is the key and the value is a list
of valid states, e.g., [0, 1, 2, ...]."""
return self._valid_states
@property
def datatype(self):
"""Return a Ladybug DataType object."""
return self._datatype
@datatype.setter
def datatype(self, value):
if value is not None:
assert isinstance(value, DataTypeBase), \
f'data_type must be a Ladybug DataType. Got {type(value)}'
else:
value = GenericType('Generic', '')
self._datatype = value
@property
def unit(self):
"""Return unit of hourly values."""
return self._unit
@unit.setter
def unit(self, value):
self._unit = value
[docs]
def total(
self, hoys: list = [], states: DynamicSchedule = None,
grids_filter: str = '*', res_type: str = 'total'
) -> type_hints.total:
"""Get summed values for each sensor.
Args:
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with total values and grid information.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
mask = hoys_mask(self.sun_up_hours, hoys)
total = []
for grid_info in grids_info:
array = self._array_from_states(grid_info, states=states, res_type=res_type)
if np.any(array):
array_filter = np.apply_along_axis(
filter_array, 1, array, mask=mask
)
array_total = array_filter.sum(axis=1)
else:
array_total = np.zeros(grid_info['count'])
total.append(array_total)
return total, grids_info
[docs]
def point_in_time(
self, datetime: Union[float, DateTime], states: DynamicSchedule = None,
grids_filter: str = '*', res_type: str = 'total'
) -> type_hints.point_in_time:
"""Get point in time values.
Args:
datetime: Hour of the year as a float or DateTime object.
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with point in time values and grid information.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
if isinstance(datetime, float):
dt = DateTime.from_hoy(datetime)
elif isinstance(datetime, DateTime):
dt = datetime
else:
error_message = (
f'Input datetime must be of type {int} or {DateTime}. '
f'Received {type(DateTime)}.'
)
raise ValueError(error_message)
idx = self._index_from_datetime(dt)
pit_values = []
for grid_info in grids_info:
if idx:
array = self._array_from_states(
grid_info, states=states, res_type=res_type)
pit_values.append(array[:, idx])
else:
# datetime not in sun up hours, add zeros
pit_values.append(np.zeros(grid_info['count']))
return pit_values, grids_info
[docs]
def point_in_time_to_folder(
self, target_folder: str, datetime: Union[float, DateTime],
states: DynamicSchedule = None, grids_filter: str = '*',
res_type: str = 'total'
) -> type_hints.point_in_time:
"""Get point in time values and write the values to a folder.
Args:
target_folder: Folder path to write annual metrics in. Usually this
folder is called 'metrics'.
datetime: Hour of the year as a float or DateTime object.
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with point in time values and grid information.
"""
folder = Path(target_folder)
folder.mkdir(parents=True, exist_ok=True)
pit_values, grids_info = self.point_in_time(
datetime=datetime, states=states,
grids_filter=grids_filter, res_type=res_type)
metric_folder = folder.joinpath('point_in_time')
for count, grid_info in enumerate(grids_info):
d = pit_values[count]
full_id = grid_info['full_id']
output_file = metric_folder.joinpath(f'{full_id}.pit')
output_file.parent.mkdir(parents=True, exist_ok=True)
np.savetxt(output_file, d, fmt='%.2f')
info_file = metric_folder.joinpath('grids_info.json')
info_file.write_text(json.dumps(grids_info))
return pit_values, grids_info
[docs]
def average_values(
self, hoys: list = [], states: DynamicSchedule = None, grids_filter: str = '*',
res_type: str = 'total') -> type_hints.average_values:
"""Get average values for each sensor over a given period.
The hoys input can be used to filter the data for a particular time
period.
Args:
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with the average value for each sensor and grid information.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
full_length = len(self.study_hours) if len(hoys) == 0 else len(hoys)
mask = hoys_mask(self.sun_up_hours, hoys)
average_values = []
for grid_info in grids_info:
array = self._array_from_states(grid_info, states=states, res_type=res_type)
if np.any(array):
array_filter = np.apply_along_axis(
filter_array, 1, array, mask=mask)
results = average_values_array2d(
array_filter, full_length)
else:
results = np.zeros(grid_info['count'])
average_values.append(results)
return average_values, grids_info
[docs]
def average_values_to_folder(
self, target_folder: str, hoys: list = [], states: DynamicSchedule = None,
grids_filter: str = '*', res_type: str = 'total'):
"""Get average values for each sensor over a given period and write the
values to a folder.
Args:
target_folder: Folder path to write annual metrics in. Usually this
folder is called 'metrics'.
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
"""
folder = Path(target_folder)
folder.mkdir(parents=True, exist_ok=True)
average_values, grids_info = self.average_values(
hoys=hoys, states=states,
grids_filter=grids_filter, res_type=res_type)
metric_folder = folder.joinpath('average_values')
for count, grid_info in enumerate(grids_info):
d = average_values[count]
full_id = grid_info['full_id']
output_file = metric_folder.joinpath(f'{full_id}.average')
output_file.parent.mkdir(parents=True, exist_ok=True)
np.savetxt(output_file, d, fmt='%.2f')
info_file = metric_folder.joinpath('grids_info.json')
info_file.write_text(json.dumps(grids_info))
[docs]
def cumulative_values(
self, hoys: list = [], states: DynamicSchedule = None,
t_step_multiplier: float = 1, grids_filter: str = '*',
res_type: str = 'total') -> type_hints.cumulative_values:
"""Get cumulative values for each sensor over a given period.
The hoys input can be used to filter the data for a particular time
period.
Args:
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
t_step_multiplier: A value that will be multiplied with the timestep.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with the cumulative value for each sensor and grid
information.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
mask = hoys_mask(self.sun_up_hours, hoys)
cumulative_values = []
for grid_info in grids_info:
array = self._array_from_states(grid_info, states=states, res_type=res_type)
if np.any(array):
array_filter = np.apply_along_axis(
filter_array, 1, array, mask=mask)
results = cumulative_values_array2d(
array_filter, self.timestep, t_step_multiplier)
else:
results = np.zeros(grid_info['count'])
cumulative_values.append(results)
return cumulative_values, grids_info
[docs]
def cumulative_values_to_folder(
self, target_folder: str, hoys: list = [],
states: DynamicSchedule = None, t_step_multiplier: float = 1,
grids_filter: str = '*', res_type: str = 'total'):
"""Get cumulative values for each sensor over a given period and write
the values to a folder.
Args:
target_folder: Folder path to write annual metrics in. Usually this
folder is called 'metrics'.
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
t_step_multiplier: A value that will be multiplied with the timestep.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
res_type: Type of results to load. Defaults to 'total'.
"""
folder = Path(target_folder)
folder.mkdir(parents=True, exist_ok=True)
cumulative_values, grids_info = self.cumulative_values(
hoys=hoys, states=states, t_step_multiplier=t_step_multiplier,
grids_filter=grids_filter, res_type=res_type
)
metric_folder = folder.joinpath('cumulative_values')
for count, grid_info in enumerate(grids_info):
d = cumulative_values[count]
full_id = grid_info['full_id']
output_file = metric_folder.joinpath(f'{full_id}.cumulative')
output_file.parent.mkdir(parents=True, exist_ok=True)
np.savetxt(output_file, d, fmt='%.2f')
info_file = metric_folder.joinpath('grids_info.json')
info_file.write_text(json.dumps(grids_info))
[docs]
def peak_values(
self, hoys: list = [], states: DynamicSchedule = None, grids_filter: str = '*',
coincident: bool = False, res_type: str = 'total'
) -> type_hints.peak_values:
"""Get peak values for each sensor over a given period.
The hoys input can be used to filter the data for a particular time
period.
Args:
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
coincident: Boolean to indicate whether output values represent the peak
value for each sensor throughout the entire analysis (False) or they
represent the highest overall value across each sensor grid at a
particular timestep (True). Defaults to False.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with the peak value for each sensor and grid information.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
mask = hoys_mask(self.sun_up_hours, hoys)
filt_suh = hoys if len(hoys) != 0 else self.sun_up_hours
peak_values = []
max_hoys = []
for grid_info in grids_info:
max_i = None
array = self._array_from_states(grid_info, states=states, res_type=res_type)
if np.any(array):
array_filter = np.apply_along_axis(
filter_array, 1, array, mask=mask)
results, max_i = peak_values_array2d(
array_filter, coincident=coincident)
else:
results = np.zeros(grid_info['count'])
peak_values.append(results)
if max_i:
max_hoys.append(filt_suh[max_i])
else:
max_hoys.append(None)
return peak_values, max_hoys, grids_info
[docs]
def peak_values_to_folder(
self, target_folder: str, hoys: list = [], states: DynamicSchedule = None,
grids_filter: str = '*', coincident: bool = False, res_type='total'):
"""Get peak values for each sensor over a given period and write the
values to a folder.
Args:
target_folder: Folder path to write peak values in. Usually this
folder is called 'metrics'.
hoys: An optional numbers or list of numbers to select the hours of
the year (HOYs) for which results will be computed. Defaults to [].
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
coincident: Boolean to indicate whether output values represent the peak
value for each sensor throughout the entire analysis (False) or they
represent the highest overall value across each sensor grid at a
particular timestep (True). Defaults to False.
res_type: Type of results to load. Defaults to 'total'.
"""
folder = Path(target_folder)
folder.mkdir(parents=True, exist_ok=True)
peak_values, max_hoys, grids_info = self.peak_values(
hoys=hoys, states=states, grids_filter=grids_filter,
coincident=coincident, res_type=res_type)
metric_folder = folder.joinpath('peak_values')
for count, grid_info in enumerate(grids_info):
d = peak_values[count]
full_id = grid_info['full_id']
output_file = metric_folder.joinpath(f'{full_id}.peak')
output_file.parent.mkdir(parents=True, exist_ok=True)
np.savetxt(output_file, d, fmt='%.2f')
max_hoys_file = metric_folder.joinpath('max_hoys.txt')
max_hoys_file.write_text('\n'.join(str(h) for h in max_hoys))
info_file = metric_folder.joinpath('grids_info.json')
info_file.write_text(json.dumps(grids_info))
def _array_to_annual_data(
self, grid_info, states: DynamicSchedule = None,
sensor_index: list = None, res_type: str = 'total'
) -> Tuple[List[HourlyContinuousCollection], dict, list]:
"""Get annual data for one or multiple sensors.
Args:
grid_info: Grid information of the grid.
states: A dictionary of states. Defaults to None.
sensor_index: A list of sensor indices as integers. Defaults to None.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with Data Collections for each sensor, grid information,
and a list of the sensors.
"""
analysis_period = AnalysisPeriod(timestep=self.timestep)
# if no sensor_index, create list with all sensors
if not sensor_index:
sensor_index = [range(grid_info['count'])]
data_collections = []
array = self._array_from_states(grid_info, states=states, res_type=res_type)
for idx in sensor_index:
if np.any(array):
values = array[idx, :]
else:
values = np.zeros(len(self.sun_up_hours))
annual_array = Results.values_to_annual(
self.sun_up_hours, values, self.timestep, self.study_hours)
header = Header(self.datatype, self.unit, analysis_period)
header.metadata['sensor grid'] = grid_info['full_id']
header.metadata['sensor index'] = idx
data_collections.append(
HourlyContinuousCollection(header, annual_array.tolist()))
return data_collections, grid_info, sensor_index
[docs]
def annual_data(
self, states: DynamicSchedule = None, grids_filter: str = '*',
sensor_index: dict = None, res_type: str = 'total'
) -> type_hints.annual_data:
"""Get annual data for one or multiple sensors.
Args:
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
sensor_index: A dictionary with grids as keys and a list of sensor
indices as values. Defaults to None.
res_type: Type of results to load. Defaults to 'total'.
Returns:
Tuple: A tuple with Data Collections for each sensor, grid information,
and a dictionary of the sensors.
"""
grids_info = self._filter_grids(grids_filter=grids_filter)
analysis_period = AnalysisPeriod(timestep=self.timestep)
# if no sensor_index, create dict with all sensors
if not sensor_index:
sensor_index = {}
for grid_info in grids_info:
sensor_index[grid_info['full_id']] = \
[i for i in range(grid_info['count'])]
data_collections = []
for grid_info in grids_info:
data_collections_grid = []
grid_id = grid_info['full_id']
array = self._array_from_states(grid_info, states=states, res_type=res_type)
indices = sensor_index[grid_id]
for idx in indices:
if np.any(array):
values = array[idx, :]
else:
values = np.zeros(len(self.sun_up_hours))
annual_array = Results.values_to_annual(
self.sun_up_hours, values, self.timestep)
header = Header(self.datatype, self.unit, analysis_period)
header.metadata['sensor grid'] = grid_id
header.metadata['sensor index'] = idx
data_collections_grid.append(
HourlyContinuousCollection(header, annual_array.tolist()))
data_collections.append(data_collections_grid)
return data_collections, grids_info, sensor_index
[docs]
def annual_data_to_folder(
self, target_folder: str, states: DynamicSchedule = None, grids_filter: str = '*',
sensor_index: dict = None, res_type: str = 'total'):
"""Get annual data for one or multiple sensors and write the data to a
folder as Data Collections.
Args:
target_folder: Folder path to write annual metrics in. Usually this
folder is called 'metrics'.
states: A dictionary of states. Defaults to None.
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
sensor_index: A dictionary with grids as keys and a list of sensor
indices as values. Defaults to None.
res_type: Type of results to load. Defaults to 'total'.
"""
folder = Path(target_folder)
folder.mkdir(parents=True, exist_ok=True)
data_collections, grids_info, sensor_index = self.annual_data(
states=states, grids_filter=grids_filter, sensor_index=sensor_index,
res_type=res_type)
metric_folder = folder.joinpath('datacollections')
for count, grid_info in enumerate(grids_info):
grid_collections = data_collections[count]
for data_collection in grid_collections:
grid_id = grid_info['full_id']
sensor_id = data_collection.header.metadata['sensor index']
data_dict = data_collection.to_dict()
data_file = metric_folder.joinpath(f'{grid_id}_{sensor_id}.json')
data_file.parent.mkdir(parents=True, exist_ok=True)
data_file.write_text(json.dumps(data_dict))
[docs]
@staticmethod
def values_to_annual(
hours: Union[List[float], np.ndarray],
values: Union[List[float], np.ndarray],
timestep: int, base_value: int = 0,
dtype: np.dtype = np.float32) -> np.ndarray:
"""Map a 1D NumPy array based on a set of hours to an annual array.
This method creates an array with a base value of length 8760 and
replaces the base value with the input 'values' at the indices of the
input 'hours'.
Args:
hours: A list of hours. This can be a regular list or a 1D NumPy
array.
values: A list of values to map to an annual array. This can be a
regular list or a 1D NumPy array.
timestep: Time step of the simulation.
base_value: A value that will be applied for all the base array.
dtype: A NumPy dtype for the annual array.
Returns:
A 1D NumPy array.
"""
values = np.array(values)
check_array_dim(values, 1)
hours = np.array(hours)
assert hours.shape == values.shape
full_ap = AnalysisPeriod(timestep=timestep)
indices = np.where(np.isin(full_ap.hoys, hours))[0]
annual_array = np.repeat(base_value, 8760 * timestep).astype(dtype)
annual_array[indices] = values
return annual_array
def _index_from_datetime(self, datetime: DateTime) -> Union[int, None]:
"""Returns the index of the input datetime in the list of datetimes
from the datetimes property.
If the DateTime is not in the list, the function will return None.
Args:
datetime: A DateTime object.
Returns:
Index as an integer or None.
"""
assert isinstance(datetime, DateTime), \
f'Expected DateTime object but received {type(datetime)}'
try:
index = self.datetimes.index(datetime)
except Exception:
# DateTime not in sun up hours
index = None
return index
def _get_array(
self, grid_info: dict, light_path: str, state: int = 0,
res_type: str = 'total', extension: str = '.npy') -> np.ndarray:
"""Get an array for a given grid, light path, and state.
The array will be fetched from the 'arrays' property if it has been
loaded already.
Args:
grid_info: Grid information.
light_path: Identifier of the light path.
state: State as an integer. E.g., 0 for the default state.
Defaults to 0.
res_type: Type of results to load. Defaults to 'total'.
extension: File extension of the array to load. Defaults to '.npy'.
Returns:
np.ndarray: A NumPy array of a given grid, light path, and state.
"""
grid_id = grid_info['full_id']
state_identifier = self._state_identifier(grid_id, light_path, state=state)
try:
array = self.arrays[grid_id][light_path][state_identifier][res_type]
except Exception:
array = self._load_array(
grid_info, light_path, state=state, res_type=res_type,
extension=extension
)
return array
def _load_array(
self, grid_info: dict, light_path: str, state: int = 0,
res_type: str = 'total', extension: str = '.npy') -> np.ndarray:
"""Load a NumPy file to an array.
This method will also update the arrays property value.
Args:
grid_info: Grid information.
light_path: Identifier of the light path.
state: State as an integer. E.g., 0 for the default state.
Defaults to 0.
res_type: Which type of result to return a file for. E.g., 'total' for total
illuminance or 'direct' for direct illuminance.
extension: File extension of the array to load. Defaults to '.npy'.
Returns:
np.ndarray: A NumPy array of a given grid, light path, and state
from a NumPy file.
"""
grid_id = grid_info['full_id']
def merge_dicts(array_dict, arrays):
for key, value in array_dict.items():
if isinstance(value, dict):
node = arrays.setdefault(key, {})
merge_dicts(value, node)
else:
arrays[key] = value
return arrays
state_identifier = self._state_identifier(grid_id, light_path, state=state)
file = self._get_file(grid_id, light_path, state_identifier, res_type,
extension=extension)
array = np.load(file)
array_dict = {grid_id: {light_path: {state_identifier: {res_type: array}}}}
arrays = merge_dicts(array_dict, self.arrays)
self.arrays = arrays
return array
def _state_identifier(
self, grid_id: str, light_path: str, state: int = 0) -> Union[str, None]:
"""Get the state identifier from a light path and state integer.
Args:
grid_id: Grid identifier.
light_path: Identifier of the light path.
state: State as an integer. E.g., 0 for the default state.
Defaults to 0.
Returns:
State identifier. For static apertures the identifier is 'default',
and for other light paths it is the light path identifier preceded
by the state integer, e.g., '0_light_path'. If the state is -1 the
state identifier will be None.
"""
# TODO: Figure out if there is a better way to handle the states.
# I.e., state integer <--> state identifier.
valid_states = self.valid_states[light_path]
if state in valid_states:
if light_path == '__static_apertures__':
state_identifier = 'default'
else:
state_identifier = self.grid_states[grid_id][light_path][state]
return state_identifier
elif state == -1:
return None
else:
error_message = (
f'State of {light_path} must be any of {valid_states} for on '
f'or -1 for off. Received state {state}.'
)
raise ValueError(error_message)
def _get_file(
self, grid_id: str, light_path: str, state_identifier: str,
res_type: str = 'total', extension: str = '.npy') -> Path:
"""Return the path of a results file.
Args:
grid_id: Grid identifier.
light_path: Identifier of the light path.
state_identifier: State identifier.
res_type: Which type of result to return a file for. E.g., 'total' for total
illuminance or 'direct' for direct illuminance.
extension: File extension of the array to load. Defaults to '.npy'.
Returns:
Path to a NumPy file.
"""
file = Path(self.folder, light_path, state_identifier,
res_type, grid_id + extension)
if not file.is_file():
raise FileNotFoundError(f'File {file} not found in the results folder.')
return file
def _validate_dynamic_states(self, states: dict) -> dict:
"""Validate dynamic states and return states dictionary.
If all light paths in the dictionary have 8760 values, the states
dictionary is returned as is. If some light paths have less than 8760
values, pattern of values will be repeated until it reaches a length of
8760.
Args:
states: A dictionary of states.
Returns:
dict: A dictionary of states.
"""
if all(len(values) == 8760 for values in states.values()):
return states
for light_path, values in states.items():
if len(values) < 8760:
states[light_path] = list(islice(cycle(values), 8760))
elif len(values) > 8760:
error_message = (
f'The light path {light_path} has {len(values)} values in '
f'its states schedule. Maximum allowed number of values '
f'is 8760.'
)
raise ValueError(error_message)
return states
def _validate_states(self, states: dict) -> dict:
"""Validate states and return states dictionary.
If all light paths in the dictionary have integers only as values, the
states dictionary is returned as is. If some light paths have values
that are not integers, these values will be mapped as integers if
possible, e.g., if the values are strings ('0', '1', etc.) instead of
integers.
Args:
states: A dictionary of states.
Returns:
dict: A dictionary of states.
"""
if all(isinstance(v, int) for values in states.values() for v in values):
return states
for light_path, values in states.items():
try:
states[light_path] = list(map(int, values))
except ValueError as err:
error_message = (
f'Failed to convert states schedule for light path '
f'{light_path} to integers.'
)
raise ValueError(error_message) from err
return states
def _filter_grid_states(self, grid_info, states: DynamicSchedule = None) -> DynamicSchedule:
"""Filter a dictionary of states by grid. Only light paths relevant to
the given grid will be returned.
Args:
grid_info: Grid information.
states: A dictionary of states. Light paths as keys and lists of
8760 values for each key. The values should be integers
matching the states or -1 for off. Default to None.
Returns:
dict: A filtered states dictionary.
"""
light_paths = []
for lp in grid_info['light_path']:
for _lp in lp:
if _lp == '__static_apertures__' and len(lp) > 1:
pass
else:
light_paths.append(_lp)
if states:
states = states.filter_by_identifiers(light_paths)
else:
default_states = self.default_states
states = DynamicSchedule()
for light_path in light_paths:
ap_group_schedule = ApertureGroupSchedule(
light_path, default_states[light_path], is_static=True)
states.add_aperture_group_schedule(ap_group_schedule)
return states
def _array_from_states(
self, grid_info, states: DynamicSchedule = None,
res_type: str = 'total', zero_array: bool = False
) -> np.ndarray:
"""Create an array for a given grid by the states settings.
Args:
grid_info: Grid information of the grid.
states: A dictionary of states. Light paths as keys and lists of 8760 values
for each key. The values should be integers matching the states or -1 for
off.
res_type: Which type of result to create an array for. E.g., 'total'
for total illuminance or 'direct' for direct illuminance.
zero_array: Boolean to note if a 2D zero array should be created if
the array of the grid is zero. This is the case if the
illuminance of the grid is zero. (Default: False).
Returns:
A NumPy array based on the states settings.
"""
# get states that are relevant for the grid
states = self._filter_grid_states(grid_info, states=states)
arrays = []
for light_path, gr_schedule in states.dynamic_schedule.items():
if gr_schedule.is_static:
state = gr_schedule.schedule[0]
# if state is -1 we continue since it is "turned off"
if state == -1:
continue
# load static array (state is static)
array = self._get_array(
grid_info, light_path, state=state, res_type=res_type)
arrays.append(array)
else:
# create default 0 array, we will add to this later
array = np.zeros((grid_info['count'], len(self.sun_up_hours)))
# slice states to match sun up hours
states_array = np.array(gr_schedule.schedule)[
list(map(int, self.sun_up_hours))]
for state in np.unique(states_array):
if state == -1:
# if state is -1 we continue since it is "turned off"
continue
# load static array (state is static)
_array = self._get_array(
grid_info, light_path, state=state, res_type=res_type)
# get indices and add values to base array
states_indicies = states_array == state
array[:, states_indicies] += _array[:, states_indicies]
arrays.append(array)
array = sum(arrays)
if not np.any(array):
if zero_array:
array = np.zeros((grid_info['count'], len(self.sun_up_hours)))
else:
array = np.array([])
return array
def _update_occ(self):
"""Set properties related to occupancy."""
occ_mask = np.array(self.schedule, dtype=int)[self.sun_up_hours_mask]
sun_down_sch = \
np.array(self.schedule, dtype=int)[self.sun_down_hours_mask].sum()
self._occ_mask = occ_mask
self._total_occ = sum(self.schedule)
self._sun_down_occ_hours = sun_down_sch
def _filter_grids(self, grids_filter: str = '*') -> list:
"""Return grids information.
Args:
grids_filter: The name of a grid or a pattern to filter the grids.
Defaults to '*'.
Returns:
list: List of grid information for filtered grids.
"""
if grids_filter != '*':
grids_info = \
_filter_grids_by_pattern(self.grids_info, grids_filter)
else:
grids_info = self.grids_info
return grids_info
def _load_arrays(self) -> dict:
"""Load all the NumPy arrays in the results folder."""
arrays = {}
grids_info = self.grids_info
for grid_info in grids_info:
grid_id = grid_info['full_id']
light_paths = grid_info['light_path']
arrays[grid_id] = {}
for light_path in light_paths:
light_path = light_path[0]
arrays[grid_id][light_path] = {}
light_path_folder = Path(self.folder, light_path)
for state_folder in Path(light_path_folder).iterdir():
state = state_folder.name
arrays[grid_id][light_path][state] = {}
for res_type_folder in Path(state_folder).iterdir():
res_type = res_type_folder.name
file = Path(res_type_folder, grid_id + '.npy')
array = np.load(file)
arrays[grid_id][light_path][state][res_type] = array
return arrays
def _get_valid_states(self) -> dict:
"""Returns a dictionary with valid states for each light path.
For each light path there will be a key (identifier of the light path)
and its value will be a list of valid states as integers.
Example of output format:
{
'__static_apertures__': [0],
'Room1_North': [0, 1],
'Room1_South': [0, 1],
'Room2_North1': [0, 1],
'Room2_North2': [0, 1]
}
Returns:
dict: Valid states integers for each light path.
"""
valid_states = {}
grid_states = self.grid_states
if '__static_apertures__' in self.light_paths:
valid_states['__static_apertures__'] = [0]
for light_paths in grid_states.values():
for light_path, states in light_paths.items():
if light_path not in valid_states:
valid_states[light_path] = list(range(len(states)))
return valid_states
def _light_paths_from_grid_info(self, grid_info: Union[dict, str]) -> list:
if isinstance(grid_info, str):
for _grid_info in self.grids_info:
if _grid_info['full_id'] == grid_info:
grid_info = _grid_info
break
else:
raise Exception(f'Grid info with full_id "{grid_info}" not found.')
light_paths = []
for lp in grid_info['light_path']:
for _lp in lp:
if _lp == '__static_apertures__' and len(lp) > 1:
pass
else:
light_paths.append(_lp)
return light_paths
def _get_state_combinations(self, grid_info: Union[dict, str]) -> List[dict]:
light_paths = self._light_paths_from_grid_info(grid_info)
valid_states = self._get_valid_states()
filtered_states = {lp: valid_states[lp] for lp in light_paths}
keys, values = zip(*filtered_states.items())
combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
return combinations