Source code for honeybee_radiance_postprocess.well.well

"""Functions for WELL post-processing."""
from typing import Tuple, Union
from pathlib import Path
from collections import defaultdict
import json
import itertools
import numpy as np

from ladybug.analysisperiod import AnalysisPeriod
from ladybug.datatype.generic import GenericType
from ladybug.datacollection import HourlyContinuousCollection
from ladybug.header import Header
from honeybee.model import Model
from honeybee.units import conversion_factor_to_meters
from honeybee_radiance.writer import _filter_by_pattern

from ..metrics import da_array2d
from ..annual import occupancy_schedule_8_to_6
from ..results.annual_daylight import AnnualDaylight
from ..util import filter_array, recursive_dict_merge
from ..ies.lm import dynamic_schedule_direct_illuminance
from ..en17037 import en17037_to_folder


def _create_grid_summary(
    grid_info, sda_grid, sda_blinds_up_grid, sda_blinds_down_grid, pass_sda,
    total_floor, area_weighted=True):
    """Create a WELL summary for a single grid.

    Args:
        grid_info: Grid information.
        sda_grid: Spatial Daylight Autonomy.
        pass_sda: The percentage of the sensor points or floor area that
            passes sDA.
        total_floor: The number of sensor points or floor area.
        area_weighted: Boolean to determine if the results are area
            weighted. Defaults to True.

    Returns:
        Tuple:
        -   summary_grid: Summary of each grid individually.
    """
    grid_id = grid_info['full_id']
    grid_name = grid_info['name']
    grid_summary = {
        grid_id: {}
    }

    if area_weighted:
        _grid_summary = {
            grid_id: {
                'name': grid_name,
                'full_id': grid_id,
                'sda': round(sda_grid, 2),
                'sda_blinds_up': round(sda_blinds_up_grid, 2),
                'sda_blinds_down': round(sda_blinds_down_grid, 2),
                'floor_area_passing_sda': round(pass_sda, 2),
                'total_floor_area': round(total_floor, 2)
            }
        }
    else:
        _grid_summary = {
            grid_id: {
                'name': grid_name,
                'full_id': grid_id,
                'sda': round(sda_grid, 2),
                'sda_blinds_up': round(sda_blinds_up_grid, 2),
                'sda_blinds_down': round(sda_blinds_down_grid, 2),
                'sensor_count_passing_sda': int(round(pass_sda, 2)),
                'total_sensor_count': total_floor
            }
        }

    recursive_dict_merge(grid_summary, _grid_summary)

    return grid_summary


def _well_summary(
    pass_sda_grids: list, grids_info: list,
    grid_areas: list, pass_sda_blinds_up_grids: list,
    pass_sda_blinds_down_grids: list) -> Tuple[dict, dict]:
    """Create combined summary and summary for each grid individually.

    Args:
        pass_sda_grids: A list where each sublist is a list of True/False that
            tells if each sensor point passes sDA.
        grids_info: A list of grid information.
        grid_areas: A list where each sublist is the area of each sensor point.
            The alternative is a list of None values for each grid information.

    Returns:
        Tuple:
        -   summary: Summary of of all grids combined.
        -   summary_grid: Summary of each grid individually.
    """
    summary = {}
    summary_grid = {}

    if all(grid_area is not None for grid_area in grid_areas):
        # weighted by mesh face area
        total_area = 0
        total_area_pass_sda = 0
        for (pass_sda, grid_area, grid_info, pass_sda_blinds_up,
             pass_sda_blinds_down) in \
            zip(pass_sda_grids, grid_areas, grids_info,
                pass_sda_blinds_up_grids, pass_sda_blinds_down_grids):
            total_grid_area = grid_area.sum()

            area_pass_sda = grid_area[pass_sda].sum()
            area_pass_sda_blind_up = grid_area[pass_sda_blinds_up].sum()
            area_pass_sda_blinds_down = grid_area[pass_sda_blinds_down].sum()
            sda_grid = area_pass_sda / total_grid_area * 100
            sda_blinds_up_grid = area_pass_sda_blind_up / total_grid_area * 100
            sda_blinds_down_grid = area_pass_sda_blinds_down / total_grid_area * 100

            # grid summary
            grid_summary = \
                _create_grid_summary(
                    grid_info, sda_grid, sda_blinds_up_grid, sda_blinds_down_grid,
                    area_pass_sda, total_grid_area, area_weighted=True
                )

            recursive_dict_merge(summary_grid, grid_summary)

            total_area += total_grid_area
            total_area_pass_sda += area_pass_sda

        summary['sda'] = round(total_area_pass_sda / total_area * 100, 2)
        summary['floor_area_passing_sda'] = total_area_pass_sda
        summary['total_floor_area'] = total_area
    else:
        # assume all sensor points cover the same area
        total_sensor_count = 0
        total_sensor_count_pass_sda = 0
        for (pass_sda, grid_info, pass_sda_blinds_up, pass_sda_blinds_down) in \
            zip(pass_sda_grids, grids_info, pass_sda_blinds_up_grids,
                pass_sda_blinds_down_grids):
            grid_count = grid_info['count']

            sensor_count_pass_sda = pass_sda.sum()
            sensor_count_pass_sda_blinds_up = pass_sda_blinds_up.sum()
            sensor_count_pass_sda_blinds_down = pass_sda_blinds_down.sum()
            sda_grid = sensor_count_pass_sda / grid_count * 100
            sda_blinds_up_grid = sensor_count_pass_sda_blinds_up / grid_count * 100
            sda_blinds_down_grid = sensor_count_pass_sda_blinds_down / grid_count * 100

            # grid summary
            grid_summary = \
                _create_grid_summary(
                    grid_info, sda_grid, sda_blinds_up_grid, sda_blinds_down_grid,
                    sensor_count_pass_sda, grid_count, area_weighted=False
                )

            recursive_dict_merge(summary_grid, grid_summary)

            total_sensor_count += grid_count
            total_sensor_count_pass_sda += sensor_count_pass_sda

        summary['sda'] = round(total_sensor_count_pass_sda / total_sensor_count * 100, 2)
        summary['sensor_count_passing_sda'] = int(total_sensor_count_pass_sda)
        summary['total_sensor_count'] = total_sensor_count

    return summary, summary_grid


[docs] def well_annual_daylight( results: Union[str, AnnualDaylight], daylight_hours: list, sub_folder, grids_filter: str = '*', states_schedule: dict = None): """Calculate credits for WELL L06. Args: results: Path to results folder or a Results class object. daylight_hours: Schedule of daylight hours used for EN 17037 sub_folder: Relative path for a subfolder to write the output. grids_filter: The name of a grid or a pattern to filter the grids. Defaults to '*'. states_schedule: A custom dictionary of shading states. In case this is left empty, the function will calculate a shading schedule by using the shade_transmittance input. If a states schedule is provided it will check that it is complying with the 2% rule. Defaults to None. Returns: Tuple: - well_summary: Summary of WELL analysis. - ies_lm_summary: Summary of IES LM analysis. - ies_lm_summary_grid: Summary of IES LM analysis for each grid. - da_grids: List of daylight autonomy values for each grid. Each item in the list is a NumPy array of DA values. - states_schedule: A dictionary of annual shading schedules for each aperture group. - fail_to_comply: A dictionary with the hoys where the 2% rule failed. - grids_info: Grid information. """ schedule = occupancy_schedule_8_to_6(as_list=True) if not isinstance(results, AnnualDaylight): results = AnnualDaylight(results, schedule=schedule) else: # set schedule to default leed schedule results.schedule = schedule occ_mask = results.occ_mask total_occ = results.total_occ grids_info = results._filter_grids(grids_filter=grids_filter) if not states_schedule: states_schedule, fail_to_comply, shd_trans_dict = dynamic_schedule_direct_illuminance( results, grids_filter=grids_filter, use_states=True) else: raise NotImplementedError( 'Custom input for argument states_schedule is not yet implemented.' ) # check to see if there is a HBJSON with sensor grid meshes for areas grid_areas, units_conversion = [], 1 for base_file in Path(results.folder).parent.iterdir(): if base_file.suffix in ('.hbjson', '.hbpkl'): hb_model = Model.from_file(base_file) units_conversion = conversion_factor_to_meters(hb_model.units) filt_grids = _filter_by_pattern( hb_model.properties.radiance.sensor_grids, filter=grids_filter) for s_grid in filt_grids: if s_grid.mesh is not None: grid_areas.append(s_grid.mesh.face_areas) grid_areas = [np.array(grid) for grid in grid_areas] break if not grid_areas: grid_areas = [None] * len(grids_info) # spatial daylight autonomy l06_da_grids = [] l06_pass_sda_grids = [] l06_pass_sda_blinds_up_grids = [] l06_pass_sda_blinds_down_grids = [] l01_da_grids = [] l01_pass_sda_grids = [] l01_pass_sda_blinds_up_grids = [] l01_pass_sda_blinds_down_grids = [] for grid_info in grids_info: light_paths = [lp[0] for lp in grid_info['light_path']] base_zero_array = np.apply_along_axis(filter_array, 1, np.zeros( (grid_info['count'], len(results.sun_up_hours))), occ_mask) arrays_blinds_up = [base_zero_array.copy()] arrays_blinds_down = [base_zero_array.copy()] # combine total array for all light paths array = results._array_from_states(grid_info, states=states_schedule, zero_array=True) array = np.apply_along_axis(filter_array, 1, array, occ_mask) for light_path in light_paths: # do an extra pass to calculate with blinds always up or down if light_path != '__static_apertures__': array_blinds_up = results._get_array( grid_info, light_path, state=0, res_type='total') array_filter = np.apply_along_axis( filter_array, 1, array_blinds_up, occ_mask) arrays_blinds_up.append(array_filter) array_blinds_down = results._get_array( grid_info, light_path, state=1, res_type='total') array_filter = np.apply_along_axis( filter_array, 1, array_blinds_down, occ_mask) arrays_blinds_down.append(array_filter) else: static_array = results._get_array( grid_info, light_path, state=0, res_type='total') array_filter = np.apply_along_axis( filter_array, 1, static_array, occ_mask) arrays_blinds_up.append(array_filter) arrays_blinds_down.append(array_filter) array_blinds_up = sum(arrays_blinds_up) array_blinds_down = sum(arrays_blinds_down) # calculate da per grid da_grid = da_array2d(array, total_occ=total_occ, threshold=300) l06_da_grids.append(da_grid) da_blinds_up_grid = da_array2d( array_blinds_up, total_occ=total_occ, threshold=300) da_blinds_down_grid = da_array2d( array_blinds_down, total_occ=total_occ, threshold=300) # calculate sda per grid l06_pass_sda_grids.append(da_grid >= 50) l06_pass_sda_blinds_up_grids.append(da_blinds_up_grid >= 50) l06_pass_sda_blinds_down_grids.append(da_blinds_down_grid >= 50) array_blinds_up = sum(arrays_blinds_up) array_blinds_down = sum(arrays_blinds_down) # calculate da per grid da_grid = da_array2d(array, total_occ=total_occ, threshold=200) l01_da_grids.append(da_grid) da_blinds_up_grid = da_array2d( array_blinds_up, total_occ=total_occ, threshold=200) da_blinds_down_grid = da_array2d( array_blinds_down, total_occ=total_occ, threshold=200) # calculate sda per grid l01_pass_sda_grids.append(da_grid >= 40) l01_pass_sda_blinds_up_grids.append(da_blinds_up_grid >= 40) l01_pass_sda_blinds_down_grids.append(da_blinds_down_grid >= 40) # create summaries for all grids and each grid individually l06_ies_lm_summary, l06_ies_lm_summary_grid = _well_summary( l06_pass_sda_grids, grids_info, grid_areas, l06_pass_sda_blinds_up_grids, l06_pass_sda_blinds_down_grids) l01_ies_lm_summary, l01_ies_lm_summary_grid = _well_summary( l01_pass_sda_grids, grids_info, grid_areas, l01_pass_sda_blinds_up_grids, l01_pass_sda_blinds_down_grids) sub_folder = Path(sub_folder) en17037_folder = en17037_to_folder( results, schedule=daylight_hours, sub_folder=sub_folder.joinpath('en17037')) l06_well_summary = [] l01_well_summary = [] l06_well_summary_ies_lm = {} l06_well_summary_en17037 = {} l01_well_summary_ies_lm = {} l01_well_summary_en17037 = {} l06_well_summary_ies_lm['method'] = 'IES LM-83-12' l06_well_summary_en17037['method'] = 'EN 17037' l01_well_summary_ies_lm['method'] = 'IES LM-83-12' l01_well_summary_en17037['method'] = 'EN 17037' l06_combined_da_target = [] l06_combined_da_minimum = [] l01_combined_da_target = [] for grid_info in grids_info: l06_grid_da_target = np.loadtxt(en17037_folder.joinpath('da', 'target_illuminance_300', f'{grid_info["full_id"]}.da')) l06_grid_da_minimum = np.loadtxt(en17037_folder.joinpath('da', 'minimum_illuminance_100', f'{grid_info["full_id"]}.da')) l06_combined_da_target.append(l06_grid_da_target >= 50) l06_combined_da_minimum.append(l06_grid_da_minimum >= 50) array = results._array_from_states( grid_info, res_type='total', zero_array=True) if np.any(array): array = np.apply_along_axis( filter_array, 1, array, occ_mask) l01_grid_da_target = da_array2d(array, total_occ=4380, threshold=200) l01_combined_da_target.append(l01_grid_da_target >= 50) l06_combined_sda_target_illuminance = np.concatenate(l06_combined_da_target).mean() * 100 l06_combined_sda_minimum_illuminance = np.concatenate(l06_combined_da_minimum).mean() * 100 l01_combined_sda_target_illuminance = np.concatenate(l01_combined_da_target).mean() * 100 if l01_combined_sda_target_illuminance > 30: l01_well_summary_en17037['comply'] = True else: l01_well_summary_en17037['comply'] = False if l06_combined_sda_target_illuminance > 50 and l06_combined_sda_minimum_illuminance > 95: l06_well_summary_en17037['points'] = 2 elif l06_combined_sda_target_illuminance > 50: l06_well_summary_en17037['points'] = 1 else: l06_well_summary_en17037['points'] = 0 # credits if not fail_to_comply: if l06_ies_lm_summary['sda'] >= 75: l06_ies_lm_summary['credits'] = 3 l06_well_summary_ies_lm['points'] = 2 elif l06_ies_lm_summary['sda'] >= 55: l06_ies_lm_summary['credits'] = 2 l06_well_summary_ies_lm['points'] = 1 elif l06_ies_lm_summary['sda'] >= 40: l06_ies_lm_summary['credits'] = 1 l06_well_summary_ies_lm['points'] = 0 else: l06_ies_lm_summary['credits'] = 0 l06_well_summary_ies_lm['points'] = 0 if all(grid_summary['sda'] >= 55 for grid_summary in l06_ies_lm_summary_grid.values()): if l06_ies_lm_summary['credits'] <= 2: l06_ies_lm_summary['credits'] += 1 else: l06_ies_lm_summary['credits'] = 'Exemplary performance' if l01_ies_lm_summary['sda'] >= 30: l01_well_summary_ies_lm['comply'] = True else: l01_well_summary_ies_lm['comply'] = False l06_well_summary_ies_lm['sda'] = l06_ies_lm_summary['sda'] l01_well_summary_ies_lm['sda'] = l01_ies_lm_summary['sda'] else: l06_ies_lm_summary['credits'] = 0 fail_to_comply_rooms = ', '.join(list(fail_to_comply.keys())) note = ( '0 credits have been awarded. The following sensor grids have at ' 'least one hour where 2% of the floor area receives direct ' f'illuminance of 1000 lux or more: {fail_to_comply_rooms}.' ) l06_ies_lm_summary['note'] = note l06_well_summary_ies_lm['points'] = 0 l01_ies_lm_summary['note'] = note l01_well_summary_ies_lm['comply'] = False l06_well_summary_ies_lm['total_floor_area'] = sum(np.sum(arr) for arr in grid_areas) l01_well_summary_ies_lm['total_floor_area'] = sum(np.sum(arr) for arr in grid_areas) # convert to datacollection def to_datacollection(aperture_group: str, values: np.ndarray): # convert values to 0 and 1 (0 = no shading, 1 = shading) header = Header(data_type=GenericType(aperture_group, ''), unit='', analysis_period=AnalysisPeriod()) hourly_data = HourlyContinuousCollection(header=header, values=values) return hourly_data.to_dict() states_schedule = {k:to_datacollection(k, v['schedule']) for k, v in states_schedule.to_dict().items()} ies_lm_folder = sub_folder.joinpath('ies_lm') ies_lm_folder.mkdir(parents=True, exist_ok=True) l06_ies_lm_folder = ies_lm_folder.joinpath('l06_ies_lm_summary') l01_ies_lm_folder = ies_lm_folder.joinpath('l01_ies_lm_summary') l06_ies_lm_folder.mkdir(parents=True, exist_ok=True) l01_ies_lm_folder.mkdir(parents=True, exist_ok=True) ies_lm_summary_file = l06_ies_lm_folder.joinpath('ies_lm_summary.json') ies_lm_summary_file.write_text(json.dumps(l06_ies_lm_summary, indent=2)) ies_lm_summary_file = l01_ies_lm_folder.joinpath('ies_lm_summary.json') ies_lm_summary_file.write_text(json.dumps(l01_ies_lm_summary, indent=2)) ies_lm_summary_grid_file = l06_ies_lm_folder.joinpath('ies_lm_summary_grid.json') ies_lm_summary_grid_file.write_text(json.dumps(l06_ies_lm_summary_grid, indent=2)) ies_lm_summary_grid_file = l01_ies_lm_folder.joinpath('ies_lm_summary_grid.json') ies_lm_summary_grid_file.write_text(json.dumps(l01_ies_lm_summary_grid, indent=2)) states_schedule_file = l06_ies_lm_folder.joinpath('states_schedule.json') states_schedule_file.write_text(json.dumps(states_schedule)) states_schedule_file = l01_ies_lm_folder.joinpath('states_schedule.json') states_schedule_file.write_text(json.dumps(states_schedule)) grids_info_file = l06_ies_lm_folder.joinpath('grids_info.json') grids_info_file.write_text(json.dumps(grids_info, indent=2)) grids_info_file = l01_ies_lm_folder.joinpath('grids_info.json') grids_info_file.write_text(json.dumps(grids_info, indent=2)) for (da, grid_info) in \ zip(l06_da_grids, grids_info): grid_id = grid_info['full_id'] da_file = l06_ies_lm_folder.joinpath('results', 'da', f'{grid_id}.da') da_file.parent.mkdir(parents=True, exist_ok=True) np.savetxt(da_file, da, fmt='%.2f') for (da, grid_info) in \ zip(l01_da_grids, grids_info): grid_id = grid_info['full_id'] da_file = l01_ies_lm_folder.joinpath('results', 'da', f'{grid_id}.da') da_file.parent.mkdir(parents=True, exist_ok=True) np.savetxt(da_file, da, fmt='%.2f') da_grids_info_file = l06_ies_lm_folder.joinpath( 'results', 'da', 'grids_info.json') da_grids_info_file.write_text(json.dumps(grids_info, indent=2)) da_grids_info_file = l01_ies_lm_folder.joinpath( 'results', 'da', 'grids_info.json') da_grids_info_file.write_text(json.dumps(grids_info, indent=2)) states_schedule_err_file = \ l06_ies_lm_folder.joinpath('states_schedule_err.json') states_schedule_err_file.write_text(json.dumps(fail_to_comply)) states_schedule_err_file = \ l01_ies_lm_folder.joinpath('states_schedule_err.json') states_schedule_err_file.write_text(json.dumps(fail_to_comply)) l06_well_summary.append(l06_well_summary_ies_lm) l06_well_summary.append(l06_well_summary_en17037) well_summary_file = sub_folder.joinpath('l06_well_summary.json') well_summary_file.write_text(json.dumps(l06_well_summary, indent=2)) l01_well_summary.append(l01_well_summary_ies_lm) l01_well_summary.append(l01_well_summary_en17037) well_summary_file = sub_folder.joinpath('l01_well_summary.json') well_summary_file.write_text(json.dumps(l01_well_summary, indent=2)) return (l06_well_summary, l01_well_summary, states_schedule, fail_to_comply, grids_info)