Source code for sparrowpy.classes.RadiosityFast

"""Module for the radiosity simulation."""
import numpy as np
import deepdiff
import pyfar as pf
import sparrowpy.form_factor.universal as form_factor
from sparrowpy import ( geometry, sound_object )
try:
    import numba
    prange = numba.prange
except ImportError:
    numba = None
    prange = range


[docs] class DirectionalRadiosityFast(): """Radiosity object for directional scattering coefficients.""" _walls_points: np.ndarray _walls_normal: np.ndarray _walls_up_vector: np.ndarray _patches_points: np.ndarray _n_patches: int _patch_to_wall_ids: np.ndarray # geometrical data _visibility_matrix: np.ndarray _visible_patches: np.ndarray _form_factors: np.ndarray _form_factors_tilde: np.ndarray # general data for material and medium data _frequencies: np.ndarray _brdf: np.ndarray _brdf_index: np.ndarray _brdf_incoming_directions: list[pf.Coordinates] _brdf_outgoing_directions: list[pf.Coordinates] _patch_2_brdf_outgoing_index: np.ndarray _air_attenuation: np.ndarray _speed_of_sound: float # etc metadata _etc_time_resolution: float _etc_duration: float # etc results _distance_patches_to_source: np.ndarray _energy_init_source: np.ndarray _energy_exchange_etc: np.ndarray def __init__( self, walls_points:np.ndarray, walls_normal:np.ndarray, walls_up_vector:np.ndarray, patches_points:np.ndarray, n_patches:int, patch_to_wall_ids:np.ndarray, visibility_matrix:np.ndarray=None, visible_patches:np.ndarray=None, form_factors:np.ndarray=None, form_factors_tilde:np.ndarray=None, frequencies:np.ndarray=None, brdf:list[np.ndarray]=None, brdf_index:np.ndarray=None, brdf_incoming_directions:list[pf.Coordinates]=None, brdf_outgoing_directions:list[pf.Coordinates]=None, patch_2_brdf_outgoing_index:np.ndarray=None, air_attenuation:np.ndarray=None, speed_of_sound:float=None, etc_time_resolution:float=None, etc_duration:float=None, distance_patches_to_source:np.ndarray=None, energy_init_source:np.ndarray=None, energy_exchange_etc:np.ndarray=None): """Create a Radiosity object for directional implementation. Parameters ---------- walls_points : np.ndarray edge points of all walls in cartesian of shape (n_walls, n_points, 3) walls_normal : np.ndarray normals of all walls of shape (n_walls, 3) walls_up_vector : np.ndarray uo vector of all walls of shape (n_walls, 3) patches_points : np.ndarray edge points of all patches in cartesian of shape (n_patches, n_points, 3) n_patches : int number of patches patch_to_wall_ids : np.ndarray maps each patch to a wall of shape (n_patches) visibility_matrix : np.ndarray, optional patch to patch boolean visibility matrix, by default None. visible_patches : np.ndarray, optional list of all patches which are visible, by default None form_factors : np.ndarray, optional geometrical form factor from each patch to each other patch, by default None. form_factors_tilde : np.ndarray, optional Form factor including air attenuation and BRDF of shape (n_patches, n_patches, n_outgoing_directions, n_bins), by default None frequencies : np.ndarray, optional frequency vector used for the simulation. , by default None brdf : list[np.ndarray], optional brdf in its raw postprocessed data format, by default None brdf_index : np.ndarray, optional maps brdfs to walls, must be of shape (n_walls, ), by default None brdf_incoming_directions : list[pf.Coordinates], optional incoming direction of brdfs per wall, by default None brdf_outgoing_directions : list[pf.Coordinates], optional outgoing directions of brdfs per wall, by default None patch_2_brdf_outgoing_index: np.ndarray map of patch positions to relative scattering directions indices air_attenuation : np.ndarray, optional air attenuation coefficients for each frequency, needs to be of shape (n_bins), by default None speed_of_sound : float, optional speed of sound in m/s, by default None etc_time_resolution : float, optional time resolution fo the etc, by default None etc_duration : float, optional duration fo the etc in seconds, by default None distance_patches_to_source : np.ndarray, optional distance from the source to each patch, need to be of shape (n_patches), by default None energy_init_source : np.ndarray, optional initial energy from source to patches, need to be of shape (n_patches, n_outgoing_directions, n_bins), by default None energy_exchange_etc : np.ndarray, optional etc of the energy exchange, must be of shape (n_patches, n_outgoing_directions, n_bins, n_samples), by default None """ # convert inputs walls_points = np.atleast_3d(walls_points) walls_up_vector = np.atleast_2d(walls_up_vector) walls_normal = np.atleast_2d(walls_normal) patches_points = np.atleast_3d(patches_points) patch_to_wall_ids = np.atleast_1d(np.array( patch_to_wall_ids, dtype=int)) if frequencies is not None: frequencies = np.array(frequencies) if visible_patches is not None: visible_patches = np.array(visible_patches) if visibility_matrix is not None: visibility_matrix = np.array(visibility_matrix) if form_factors is not None: form_factors = np.array(form_factors) if form_factors_tilde is not None: form_factors_tilde = np.array(form_factors_tilde) if patch_2_brdf_outgoing_index is not None: patch_2_brdf_outgoing_index = np.array(patch_2_brdf_outgoing_index, dtype=np.int64) if brdf is not None: brdf = [np.array(b) for b in brdf] if air_attenuation is not None: air_attenuation = np.array(air_attenuation) if speed_of_sound is not None: speed_of_sound = float(speed_of_sound) if etc_time_resolution is not None: etc_time_resolution = float(etc_time_resolution) if etc_duration is not None: etc_duration = float(etc_duration) if distance_patches_to_source is not None: distance_patches_to_source = np.array(distance_patches_to_source) if energy_init_source is not None: energy_init_source = np.array(energy_init_source) if energy_exchange_etc is not None: energy_exchange_etc = np.array(energy_exchange_etc) self._walls_points = walls_points self._walls_normal = walls_normal self._walls_up_vector = walls_up_vector self._patches_points = patches_points self._n_patches = n_patches self._patch_to_wall_ids = patch_to_wall_ids # geometrical data self._visibility_matrix = visibility_matrix self._visible_patches = visible_patches self._form_factors = form_factors self._form_factors_tilde = form_factors_tilde # general data for material and medium data self._frequencies = frequencies self._brdf = brdf self._brdf_index = brdf_index self._brdf_incoming_directions = brdf_incoming_directions self._brdf_outgoing_directions = brdf_outgoing_directions self._patch_2_brdf_outgoing_index = patch_2_brdf_outgoing_index self._air_attenuation = air_attenuation self._speed_of_sound = speed_of_sound # etc metadata self._etc_time_resolution = etc_time_resolution self._etc_duration = etc_duration # etc results self._distance_patches_to_source = distance_patches_to_source self._energy_init_source = energy_init_source self._energy_exchange_etc = energy_exchange_etc self.check()
[docs] def check(self): """Check the input data for consistency.""" n_walls = self._walls_points.shape[0] if (self._walls_points.shape[0] != n_walls) or \ (self._walls_points.shape[2] != 3): raise ValueError( "Walls need to be of shape (n_walls, n_points, 3)") if self._walls_up_vector.shape != (n_walls, 3): raise ValueError( "Up vector of walls need to be of shape (n_walls, 3)") if self._walls_normal.shape != (n_walls, 3): raise ValueError( "Normal of walls need to be of shape (n_walls, 3)") # input checks Patches if (self._patches_points.shape[0] != self.n_patches) or \ (self._patches_points.shape[2] != 3): raise ValueError( "Patches need to be of shape (n_patches, n_points, 3)") if self._patch_to_wall_ids.shape != (self.n_patches,): raise ValueError( "patch_to_wall_ids need to be of shape (n_patches,)") if any(i not in np.arange( n_walls) for i in set(self._patch_to_wall_ids)): raise ValueError( "patch_to_wall_ids does contain other ids than range(n_walls)") if any(i not in set( self._patch_to_wall_ids) for i in np.arange(n_walls)): raise ValueError( "patch_to_wall_ids does contain other ids than range(n_walls)") # check frequencies n_bins = 1 if self._frequencies is not None: if len(self._frequencies.shape) != 1: raise ValueError( "Frequencies need to be of shape (n_bins,)") n_bins = self._frequencies.size # check form factors if self._form_factors is not None: if self._form_factors.shape != (self.n_patches, self.n_patches): raise ValueError( "form_factors need to be of shape (n_patches, n_patches)") # check brdf n_outgoing_directions = 1 if self._brdf_index is not None: if len(self._brdf_index) != n_walls: raise ValueError( "brdf_index need to be of shape (n_walls,)") if self._brdf_incoming_directions is not None: if any(not isinstance(i, pf.Coordinates) \ for i in self._brdf_incoming_directions): raise ValueError( "brdf_incoming_directions need to be a list of type " "pf.Coordinates") if self._brdf_outgoing_directions is not None: if any(not isinstance(i, pf.Coordinates) \ for i in self._brdf_outgoing_directions): raise ValueError( "brdf_outgoing_directions need to be a list of type " "pf.Coordinates") n_outgoing_directions = self._brdf_outgoing_directions[0].csize # check form_factors_tilde if self._form_factors_tilde is not None: if self._form_factors_tilde.shape != ( self.n_patches, self.n_patches, n_outgoing_directions, n_bins): raise ValueError( "form_factors_tilde need to be of shape " "(n_patches, n_patches, n_outgoing_directions, n_bins)") # check air_attenuation if self._air_attenuation is not None: if len(self._air_attenuation.shape) != 1: raise ValueError( "Air attenuation need to be of shape (n_bins,)") if self._air_attenuation.shape[0] != n_bins: raise ValueError( "Air attenuation need to be of shape (n_bins,)") # check speed_of_sound if self._speed_of_sound is not None: if self._speed_of_sound <= 0: raise ValueError( "Speed of sound must be positive and non-zero") # check etc_time_resolution if self._etc_time_resolution is not None: if self._etc_time_resolution <= 0: raise ValueError( "Time resolution must be positive and non-zero") # check etc_duration if self._etc_duration is not None: if self._etc_duration <= 0: raise ValueError( "Duration must be positive and non-zero") # check distance_patches_to_source if self._distance_patches_to_source is not None: if self._distance_patches_to_source.shape != (self.n_patches,): raise ValueError( "distance_patches_to_source need to be of shape " "(n_patches,)") # check energy_init_source if self._energy_init_source is not None: if self._energy_init_source.shape != ( self.n_patches, n_outgoing_directions, n_bins): raise ValueError( "energy_init_source need to be of shape " "(n_patches, n_outgoing_directions, n_bins)") # check energy_exchange_etc if self._energy_exchange_etc is not None: n_samples = int( self._etc_duration/self._etc_time_resolution) if self._energy_exchange_etc.shape != ( self.n_patches, n_outgoing_directions, n_bins, n_samples): raise ValueError( "energy_exchange_etc need to be of shape " "(n_patches, n_outgoing_directions, n_bins, n_samples)")
[docs] @classmethod def from_polygon( cls, polygon_list, patch_size): """Create a Radiosity object for directional scattering coefficients. Parameters ---------- polygon_list : list[Polygon] list of patches patch_size : float maximal patch size in meters. """ # save wall information walls_points = np.array([p.pts for p in polygon_list]) walls_normal = np.array([p.normal for p in polygon_list]) walls_up_vector = np.array([p.up_vector for p in polygon_list]) # create patches ( patches_points, _, n_patches, patch_to_wall_ids) = geometry._process_patches( walls_points, walls_normal, patch_size, len(polygon_list)) # create radiosity object return cls( walls_points, walls_normal, walls_up_vector, patches_points, n_patches, patch_to_wall_ids)
[docs] def bake_geometry(self): """Bake the geometry by calculating all the form factors. """ # Check the visibility between patches. self._visibility_matrix = geometry._check_patch2patch_visibility( self.patches_center, self.patches_normal, self.patches_points) n_combinations = np.sum(self.visibility_matrix) visible_patches = np.empty((n_combinations, 2), dtype=np.int32) i_counter = 0 for i_source in range(self.n_patches): for i_receiver in range(self.n_patches): if not self.visibility_matrix[i_source, i_receiver]: continue visible_patches[i_counter, 0] = i_source visible_patches[i_counter, 1] = i_receiver i_counter += 1 self._visible_patches = visible_patches self._form_factors = form_factor.patch2patch_ff_universal( self.patches_points, self.patches_normal, self.patches_area, self._visible_patches) # Calculate the form factors with directivity. if self._brdf_incoming_directions is not None: sources_array = np.array( [s.cartesian for s in self._brdf_incoming_directions]) receivers_array = np.array( [s.cartesian for s in self._brdf_outgoing_directions]) scattering_index = np.array(self._brdf_index) scattering = np.array(self._brdf) # preload patch_2_brdf_outgoing_index map with invalid entries self._patch_2_brdf_outgoing_index = ( receivers_array.shape[1] * np.ones((self.n_patches,self.n_patches),dtype=np.int64)) for j in range(self.n_patches): vis = np.where( (self.visibility_matrix+self.visibility_matrix.T)[:,j]) self._patch_2_brdf_outgoing_index[vis,j]=get_scattering_data_receiver_index( pos_i=self.patches_center[vis],pos_j=self.patches_center[j], receivers=receivers_array, wall_id_i=self._patch_to_wall_ids[vis], ) else: sources_array = None receivers_array = None scattering_index = None scattering = None self._patch_2_brdf_outgoing_index = np.zeros( (self.n_patches,self.n_patches), dtype=np.int64) n_bins = 1 if self._frequencies is None else self.n_bins self._form_factors_tilde = \ _form_factors_with_directivity_dim( self.visibility_matrix, self.form_factors, n_bins, self.patches_center, self.patches_area, self._air_attenuation, self._patch_to_wall_ids, scattering, scattering_index, sources_array, receivers_array)
[docs] def init_source_energy( self, source): """Initialize the source energy. Parameters ---------- source : pf.Coordinates, sparrowpy.sound_object.SoundSource definition of the source position for Coordinates object and orientation and directivity for SoundSource object. If no directivity is given, the directivity is set to 1 for all frequencies. """ if isinstance(source, pf.Coordinates): if source.cshape != (1, ): raise ValueError('just one source position is allowed.') source_position = source.cartesian[0] elif isinstance(source, sound_object.SoundSource): source_position = source.position self._source = source patch_to_wall_ids = self._patch_to_wall_ids if self._brdf_incoming_directions is None: frequencies = np.array([0]) if self._frequencies is None else \ self._frequencies self.set_wall_brdf( np.arange(self.n_walls), pf.FrequencyData(np.ones_like(frequencies), frequencies), pf.Coordinates(0, 0, 1, weights=1), pf.Coordinates(0, 0, 1, weights=1)) self._frequencies = frequencies if self._air_attenuation is None: frequencies = np.array([0]) if self._frequencies is None else \ self._frequencies self.set_air_attenuation( pf.FrequencyData(np.zeros_like(frequencies), frequencies)) self._frequencies = frequencies n_bins = self.n_bins vi = np.array( [s.cartesian for s in self._brdf_incoming_directions]) vo = np.array( [s.cartesian for s in self._brdf_outgoing_directions]) brdf = np.array(self._brdf) brdf_index = self._brdf_index patches_center = self.patches_center source_visibility = geometry._check_point2patch_visibility( eval_point=source_position, patches_center=patches_center, surf_points=self.walls_points, surf_normal=self.walls_normal) self._source_visibility = source_visibility energy_0, distance_0 = form_factor._source2patch_energy_universal( source_position, patches_center, self.patches_points, source_visibility, self._air_attenuation, n_bins) # of shape (n_patches, n_directions, n_bins) energy_0_dir = _add_directional( energy_0, source_position, patches_center, n_bins, patch_to_wall_ids, vi, vo, brdf, brdf_index) # add directivity if given if isinstance(source, sound_object.SoundSource): n_patches = patches_center.shape[0] n_directions = vo.shape[1] if source.directivity is not None: directivity = np.zeros((n_patches, n_directions, n_bins)) for i_frequency in range(n_bins): directivity_local = np.real(source.get_directivity( patches_center, self._frequencies[i_frequency])) if n_directions == 1: directivity[:, :, i_frequency] = directivity_local[ :, np.newaxis] else: directivity[:, :, i_frequency] = np.repeat( directivity_local[..., np.newaxis], n_directions, axis=-1) energy_0_dir *= directivity else: energy_0_dir *= 1 self._energy_init_source = energy_0_dir self._distance_patches_to_source = distance_0
[docs] def calculate_energy_exchange( self, speed_of_sound, etc_time_resolution, etc_duration, max_reflection_order=-1, recalculate=False): """Calculate the energy exchange between patches. # todo? make first order to first reflection. """ n_samples = int(etc_duration/etc_time_resolution) patches_center = self.patches_center distance_0 = self._distance_patches_to_source n_patches = self.n_patches distance_i_j = np.empty((n_patches, n_patches)) for i in range(n_patches): for j in range(n_patches): distance_i_j[i, j] = np.linalg.norm( patches_center[i, :]-patches_center[j, :]) energy_0_dir = self._energy_init_source if self._energy_exchange_etc is None or recalculate: # energy exchange etc # of shape (n_patches, n_directions, n_bins, n_samples) if max_reflection_order < 1: self._energy_exchange_etc = \ _energy_exchange_init_energy( n_samples, energy_0_dir, distance_0, speed_of_sound, etc_time_resolution, ) else: self._energy_exchange_etc = _energy_exchange( n_samples, energy_0_dir, distance_0, distance_i_j, self._form_factors_tilde, self._patch_2_brdf_outgoing_index, speed_of_sound, etc_time_resolution, max_reflection_order, self._visible_patches) self._etc_time_resolution = float(etc_time_resolution) self._speed_of_sound = float(speed_of_sound) self._etc_duration = float(etc_duration)
[docs] def collect_energy_receiver_mono(self, receivers, direct_sound=False): """Collect the energy at the receivers. Parameters ---------- receivers : pf.Coordinates receiver Coordinates in of cshape (n_receivers). direct_sound : bool, optional If True, the direct sound is collected as well, by default False. The direct sound includes spreading loss, air attenuation and source directivity. Returns ------- etc : pf.TimeData energy collected at the receiver in cshape (n_receivers, n_bins) """ if not isinstance(direct_sound, bool): raise ValueError( "direct_sound must be of type boolean") etc = self.collect_energy_receiver_patchwise(receivers) etc.time = np.sum(etc.time, axis=1) if direct_sound: direct_sound, n_sample_delay = self.calculate_direct_sound( receivers) # add the direct sound to the etc i_receivers = np.arange(len(n_sample_delay)) etc.time[i_receivers, :, n_sample_delay] += direct_sound return etc
[docs] def calculate_direct_sound(self, receivers): """Calculate the direct sound at the receivers. It includes the spreading loss, air attenuation and source directivity. Parameters ---------- receivers : pf.Coordinates receiver Coordinates in of cshape (n_receivers). Returns ------- direct_sound : np.ndarray energy of the direct sound at the receivers in shape (n_receivers, n_bins) n_sample_delay : np.ndarray number of samples for the delay at the receivers in shape (n_receivers, ) """ if not isinstance(receivers, pf.Coordinates): raise ValueError( "Receiver positions must be of type pf.Coordinates") # calculate distance from source to receivers if isinstance(self._source, pf.Coordinates): source_position = self._source else: source_position = pf.Coordinates(*self._source.position) r = (receivers-source_position).radius # calculate spreading loss for direct sound direct_sound = np.ones( (receivers.cshape[0], self.n_bins), dtype=float) direct_sound *= (1/(4 * np.pi * r**2))[:, np.newaxis] # add air attenuation if self._air_attenuation is not None: for i in range(self.n_bins): direct_sound[:, i] *= np.exp( -self._air_attenuation[i] * r) # add source directivity if isinstance(self._source, sound_object.SoundSource): for i in range(self.n_bins): direct_sound[:, i] *= np.real(self._source.get_directivity( np.squeeze(receivers.cartesian), self._frequencies[i])) # calculate the number of samples for the delay n_sample_delay = np.array( r/self.speed_of_sound/self._etc_time_resolution, dtype=int) return direct_sound, n_sample_delay
[docs] def collect_energy_receiver_patchwise(self, receivers): """Collect the energy for each patch at the receivers without summing up the patches. Parameters ---------- receivers : pf.Coordinates receiver Coordinates in of cshape (n_receivers). Returns ------- etc : pf.TimeData energy collected at the receiver in cshape (n_receivers, n_patches, n_bins) """ if not isinstance(receivers, pf.Coordinates): raise ValueError( "Receiver positions must be of type pf.Coordinates") if receivers.cdim != 1: raise ValueError( "Receiver positions must be of shape (n_receivers, 3)") etc_data = self._collect_energy_patches( receivers.cartesian, propagation_fx=True) times = np.arange(etc_data.shape[-1]) * self._etc_time_resolution return pf.TimeData(etc_data, times)
def _collect_energy_patches( self, receiver_pos, propagation_fx=False): """Collect patch histograms as detected by receiver.""" air_attenuation = self._air_attenuation patches_points = self._patches_points n_patches = self.n_patches n_bins = self.n_bins receiver_pos = np.atleast_2d(receiver_pos) n_receivers = receiver_pos.shape[0] patches_center = self.patches_center patches_receiver_distance = np.empty( [n_receivers, self.n_patches,patches_center.shape[-1]]) E_matrix = np.empty( (n_patches, n_bins, self._energy_exchange_etc.shape[-1])) histogram_out = np.empty(( n_receivers, n_patches, n_bins, self._energy_exchange_etc.shape[-1])) receiver_visibility=np.empty((n_receivers,n_patches),dtype=bool) for i in range(n_receivers): patches_receiver_distance = patches_center - receiver_pos[i] receiver_visibility[i] = geometry._check_point2patch_visibility( eval_point=receiver_pos[i], patches_center=patches_center, surf_points=self.walls_points, surf_normal=self.walls_normal) # geometrical weighting patch_receiver_energy=form_factor._patch2receiver_energy_universal( receiver_pos[i], patches_points, receiver_visibility[i]) # access histograms with correct scattering weighting receivers_array = np.array( [s.cartesian for s in self._brdf_outgoing_directions]) receiver_idx = get_scattering_data_receiver_index( patches_center, receiver_pos[i], receivers_array, self._patch_to_wall_ids) assert receiver_idx.shape[0] == self.n_patches assert len(receiver_idx.shape) == 1 for k in range(n_patches): E_matrix[k,:]= ( self._energy_exchange_etc[k,int(receiver_idx[k]),:] * patch_receiver_energy[k]) if propagation_fx: # accumulate the patch energies towards the receiver # with atmospheric effects (delay, atmospheric attenuation) histogram_out[i] = _collect_receiver_energy( E_matrix, np.linalg.norm(patches_receiver_distance, axis=1), self.speed_of_sound, self._etc_time_resolution, air_attenuation=air_attenuation) else: histogram_out[i] = E_matrix return histogram_out
[docs] def set_air_attenuation(self, air_attenuation:pf.FrequencyData): """Set air attenuation factor in Np/m. Parameters ---------- air_attenuation : pyfar.FrequencyData Air attenuation factor in Np/m. """ self._check_set_frequency(air_attenuation.frequencies) self._air_attenuation = np.atleast_1d(air_attenuation.freq.squeeze())
[docs] def set_wall_brdf( self, wall_indexes:list[int], brdf:pf.FrequencyData, incoming_directions:pf.Coordinates, outgoing_directions:pf.Coordinates): """Set the wall BRDF representing scattering and absorption. For the incoming and outgoing directions, the radius is ignored as it only represents a direction. The BRDF assumes an up vector of (1, 0, 0) and a normal vector of (0, 0, 1). Therefore, the incoming and outgoing directions must not have negative z-components. For each wall, the incoming and outgoing directions are rotated to align with the given wall's normal vector and up vector. Parameters ---------- wall_indexes : list[int] List of wall indices for the given BRDF data. brdf : pf.FrequencyData BRDF data with shape (n_incoming_directions, n_outgoing_directions). incoming_directions : pf.Coordinates Incoming directions of the BRDF data. outgoing_directions : pf.Coordinates Outgoing directions of the BRDF data. """ assert (incoming_directions.z >= 0).all(), \ "Sources must be in the positive half space" assert (outgoing_directions.z >= 0).all(), \ "Receivers must be in the positive half space" self._check_set_frequency(brdf.frequencies) if self._brdf_incoming_directions is None: self._brdf_incoming_directions = np.empty( (self.n_walls), dtype=pf.Coordinates) self._brdf_outgoing_directions = np.empty( (self.n_walls), dtype=pf.Coordinates) self._brdf_index = np.empty((self.n_walls), dtype=np.int64) self._brdf_index.fill(-1) self._brdf = [] for i in wall_indexes: incoming_rot, outgoing_rot = _rotate_coords_to_normal( self.walls_normal[i], self.walls_up_vector[i], incoming_directions, outgoing_directions) self._brdf_incoming_directions[i] = incoming_rot self._brdf_outgoing_directions[i] = outgoing_rot self._brdf.append(brdf.freq*np.pi) self._brdf_index[wall_indexes] = len(self._brdf)-1
def _check_set_frequency(self, frequencies:np.ndarray): """Check if the frequency data matches the radiosity object.""" if self._frequencies is None: self._frequencies = frequencies else: assert self._frequencies.size == frequencies.size, \ "Number of frequency bins do not match" assert (self._frequencies == frequencies).all(), \ "Frequencies do not match"
[docs] def write(self, filename, compress=True): """Write the object to a far file.""" pf.io.write(filename, compress=compress, **self.to_dict())
[docs] @classmethod def from_read(cls, filename): """Read the object to a far file.""" data = pf.io.read(filename) for key, value in data.items(): if isinstance(value, str) and value == 'None': data[key] = None return cls.from_dict(data)
[docs] def to_dict(self) -> dict: """Convert the object to a dictionary.""" dict_out = { 'walls_points': self._walls_points, 'walls_normal': self._walls_normal, 'walls_up_vector': self._walls_up_vector, 'patches_points': self._patches_points, 'n_patches': self._n_patches, 'patch_to_wall_ids': self._patch_to_wall_ids, 'visibility_matrix': self._visibility_matrix, 'visible_patches': self._visible_patches, 'form_factors': self._form_factors, 'form_factors_tilde': self._form_factors_tilde, 'frequencies': self._frequencies, 'brdf': self._brdf, 'brdf_index': self._brdf_index, 'brdf_incoming_directions': self._brdf_incoming_directions, 'brdf_outgoing_directions': self._brdf_outgoing_directions, 'patch_2_brdf_outgoing_index': self._patch_2_brdf_outgoing_index, 'air_attenuation': self._air_attenuation, 'speed_of_sound': self._speed_of_sound, 'etc_time_resolution': self._etc_time_resolution, 'etc_duration': self._etc_duration, 'distance_patches_to_source': self._distance_patches_to_source, 'energy_init_source': self._energy_init_source, 'energy_exchange_etc': self._energy_exchange_etc, } for key, value in dict_out.items(): if value is None: dict_out[key] = 'None' elif isinstance(value, np.ndarray): dict_out[key] = value.tolist() return dict_out
def __eq__(self, other): """Check for equality of two objects.""" if not isinstance(other, DirectionalRadiosityFast): return False return not deepdiff.DeepDiff(self.to_dict(), other.to_dict())
[docs] @classmethod def from_dict(cls, input_dict: dict): """Create an object from a dictionary. Used for read write.""" obj = cls(**input_dict) return obj
@property def n_bins(self): """Return the number of frequency bins.""" if self._frequencies is None: return None return self._frequencies.shape[0] @property def n_walls(self): """Return the number of walls.""" return self._walls_points.shape[0] @property def n_patches(self): """Return the total number of patches.""" return self._n_patches @property def form_factors(self): """Return the form factor.""" return self._form_factors @property def visibility_matrix(self): """Return the visibility matrix.""" return self._visibility_matrix @property def walls_area(self): """Return the area of the walls.""" return geometry._calculate_area(self._walls_points) @property def walls_points(self): """Return the points of the walls.""" return self._walls_points @property def walls_normal(self): """Return the normal of the walls.""" return self._walls_normal @property def walls_center(self): """Return the center of the walls.""" return geometry._calculate_center(self._walls_points) @property def walls_up_vector(self): """Return the up vector of the walls.""" return self._walls_up_vector @property def patches_area(self): """Return the area of the patches.""" return geometry._calculate_area(self._patches_points) @property def patches_center(self): """Return the center of the patches.""" return geometry._calculate_center(self._patches_points) @property def patches_size(self): """Return the size of the patches.""" return geometry._calculate_size(self._patches_points) @property def patches_points(self): """Return the points of the patches.""" return self._patches_points @property def patches_normal(self): """Return the normal of the patches.""" return self._walls_normal[self._patch_to_wall_ids] @property def speed_of_sound(self): """Return the speed of sound in m/s.""" return self._speed_of_sound
def _rotate_coords_to_normal( wall_normal:np.ndarray, wall_up_vector:np.ndarray, sources:pf.Coordinates, receivers:pf.Coordinates): """Rotate the coordinates to the normal vector.""" o1 = pf.Orientations.from_view_up( wall_normal, wall_up_vector) o2 = pf.Orientations.from_view_up([0, 0, 1], [1, 0, 0]) o_diff = o1.inv()*o2 euler = o_diff.as_euler('xyz', True).flatten() receivers_cp = receivers.copy() receivers_cp.rotate('xyz', euler) receivers_cp.radius = 1 sources_cp = sources.copy() sources_cp.rotate('xyz', euler) sources_cp.radius = 1 return sources_cp, receivers_cp def _add_directional( energy_0, source_position: np.ndarray, patches_center: np.ndarray, n_bins:float, patch_to_wall_ids:np.ndarray, sources: np.ndarray, receivers: np.ndarray, scattering: np.ndarray, scattering_index: np.ndarray): """Add scattering and absorption to the initial energy from the source. Parameters ---------- energy_0 : np.ndarray energy of all patches of shape (n_patches, n_bins) source_position : np.ndarray source position of shape (3,) patches_center : np.ndarray center of all patches of shape (n_patches, 3) n_bins : float number of frequency bins. patch_to_wall_ids : np.ndarray indexes from each patch to the wall of shape (n_patches) sources : np.ndarray source positions of shape (n_walls, n_sources, 3) receivers : np.ndarray receiver positions of shape (n_walls, n_receivers, 3) scattering : np.ndarray scattering data of shape (n_walls, n_sources, n_receivers, n_bins) scattering_index : np.ndarray mapping from the wall id to scattering database index (n_walls) Returns ------- energy : np.ndarray energy of all patches of shape (n_patches, n_directions, n_bins) """ n_patches = patches_center.shape[0] n_directions = receivers.shape[1] energy_0_directivity = np.zeros((n_patches, n_directions, n_bins)) for i in prange(n_patches): wall_id_i = int(patch_to_wall_ids[i]) scattering_factor = get_scattering_data_source( source_position, patches_center[i], sources, wall_id_i, scattering, scattering_index) energy_0_directivity[i, :, :] = energy_0[i] \ * np.real(scattering_factor) return energy_0_directivity def _energy_exchange_init_energy( n_samples, energy_0_directivity, distance_0, speed_of_sound, histogram_time_resolution): """Calculate energy exchange between patches. Parameters ---------- n_samples : int number of samples of the histogram. energy_0_directivity : np.ndarray init energy of all patches of shape (n_patches, n_directions, n_bins) distance_0 : np.ndarray distance from the source to all patches of shape (n_patches) speed_of_sound : float speed of sound in m/s. histogram_time_resolution : float time resolution of the histogram in s. Returns ------- E_matrix_total : np.ndarray energy of all patches of shape (n_patches, n_directions, n_bins, n_samples) """ n_patches = energy_0_directivity.shape[0] n_directions = energy_0_directivity.shape[1] n_bins = energy_0_directivity.shape[2] E_matrix_total = np.zeros((n_patches, n_directions, n_bins, n_samples)) for i in prange(n_patches): n_delay_samples = int( distance_0[i]/speed_of_sound/histogram_time_resolution) E_matrix_total[i, :, :, n_delay_samples] += energy_0_directivity[i] return E_matrix_total def _energy_exchange( n_samples, energy_0_directivity, distance_0, distance_ij, form_factors_tilde, patch_2_out_directions, speed_of_sound, histogram_time_resolution, max_order, visible_patches): """Calculate energy exchange between patches. Parameters ---------- n_samples : int number of samples of the histogram. energy_0_directivity : np.ndarray init energy of all patches of shape (n_patches, n_directions, n_bins) distance_0 : np.ndarray distance from the source to all patches of shape (n_patches) distance_ij : np.ndarray distance between all patches of shape (n_patches, n_patches) form_factors_tilde : np.ndarray form factors of shape (n_patches, n_patches, n_directions, n_bins) patch_2_out_directions: np.ndarray patchwise map of patch centers to scattering outgoing directions of shape (n_patches,n_patches) speed_of_sound : float speed of sound in m/s. histogram_time_resolution : float time resolution of the histogram in s. max_order : int maximum order of reflections. visible_patches : np.ndarray indexes of all visible patches of shape (n_visible, 2) Returns ------- E_matrix_total : np.ndarray energy of all patches of shape (n_patches, n_directions, n_bins, n_samples) """ n_patches = form_factors_tilde.shape[0] n_directions = form_factors_tilde.shape[2] n_bins = energy_0_directivity.shape[2] form_factors_tilde = form_factors_tilde[..., np.newaxis] E_matrix_total = _energy_exchange_init_energy( n_samples, energy_0_directivity, distance_0, speed_of_sound, histogram_time_resolution) E_matrix = np.zeros((2, n_patches, n_directions, n_bins, n_samples)) E_matrix[0] += E_matrix_total if max_order == 0: return E_matrix_total for k in range(max_order): current_index = (1+k) % 2 E_matrix[current_index, :, :, :] = 0 for ii in range(visible_patches.shape[0]): for jj in range(2): if jj == 0: i = visible_patches[ii, 0] j = visible_patches[ii, 1] else: j = visible_patches[ii, 0] i = visible_patches[ii, 1] dir_id = patch_2_out_directions[i,j] n_delay_samples = int( distance_ij[i, j]/speed_of_sound/histogram_time_resolution) if n_delay_samples > 0: E_matrix[current_index, j, :, :, n_delay_samples:] += \ form_factors_tilde[i, j] * E_matrix[ current_index-1, i, dir_id, :, :-n_delay_samples] else: E_matrix[current_index, j, :, :, :] += form_factors_tilde[ i, j] * E_matrix[current_index-1, i, dir_id, :, :] E_matrix_total += E_matrix[current_index] return E_matrix_total def _collect_receiver_energy( E_matrix_total, patch_receiver_distance, speed_of_sound, histogram_time_resolution, air_attenuation): """Collect the energy at the receiver. Parameters ---------- E_matrix_total : np.ndarray energy of all patches of shape (n_patches, n_directions, n_bins, n_samples) patch_receiver_distance : np.ndarray distance from the patch to the receiver of shape (n_patches) speed_of_sound : float speed of sound in m/s. histogram_time_resolution : float time resolution of the histogram in s. air_attenuation : np.ndarray air attenuation factor for each frequency bin of shape (n_bins) Returns ------- ir : np.ndarray impulse response of shape (n_samples, n_bins) """ E_mat_out = np.zeros_like(E_matrix_total) n_patches = E_matrix_total.shape[0] n_bins = E_matrix_total.shape[1] for i in prange(n_patches): n_delay_samples = int(np.ceil( patch_receiver_distance[i]/speed_of_sound/histogram_time_resolution)) for j in range(n_bins): E_mat_out[i,j] = np.roll( E_matrix_total[i,j]*np.exp(-air_attenuation[j]*patch_receiver_distance[i]), n_delay_samples) return E_mat_out def _form_factors_with_directivity( visibility_matrix, form_factors, n_bins, patches_center, patches_area, air_attenuation, absorption, absorption_index, patch_to_wall_ids, scattering, scattering_index, sources, receivers): """Calculate the form factors with directivity.""" n_patches = patches_center.shape[0] form_factors_tilde = np.zeros((n_patches, n_patches, n_patches, n_bins)) # loop over previous patches, current and next patch for ii in prange(n_patches**3): h = ii % n_patches i = int(ii/n_patches) % n_patches j = int(ii/n_patches**2) % n_patches visible_hi = visibility_matrix[ h, i] if h < i else visibility_matrix[i, h] visible_ij = visibility_matrix[ i, j] if i < j else visibility_matrix[j, i] if visible_hi and visible_ij: difference_receiver = patches_center[i]-patches_center[j] wall_id_i = int(patch_to_wall_ids[i]) difference_receiver /= np.linalg.norm(difference_receiver) ff = form_factors[i, j] if i<j else (form_factors[j, i] *patches_area[i]/patches_area[j]) distance = np.linalg.norm(difference_receiver) form_factors_tilde[h, i, j, :] = ff if air_attenuation is not None: form_factors_tilde[h, i, j, :] = form_factors_tilde[ h, i, j, :] * np.exp(-air_attenuation * distance) if scattering is not None: scattering_factor = get_scattering_data( patches_center[h], patches_center[i], patches_center[j], sources, receivers, wall_id_i, scattering, scattering_index) form_factors_tilde[h, i, j, :] = form_factors_tilde[ h, i, j, :] * scattering_factor if absorption is not None: source_wall_idx = absorption_index[wall_id_i] form_factors_tilde[h, i, j, :] = form_factors_tilde[ h, i, j, :] * (1-absorption[source_wall_idx]) return form_factors_tilde def _form_factors_with_directivity_dim( visibility_matrix, form_factors, n_bins, patches_center, patches_area, air_attenuation, patch_to_wall_ids, scattering, scattering_index, sources, receivers): """Calculate the form factors with directivity.""" n_patches = patches_center.shape[0] n_directions = receivers.shape[1] if receivers is not None else 1 form_factors_tilde = np.zeros((n_patches, n_patches, n_directions, n_bins)) # loop over previous patches, current and next patch for ii in prange(n_patches**2): i = ii % n_patches j = int(ii/n_patches) % n_patches visible_ij = visibility_matrix[ i, j] if i < j else visibility_matrix[j, i] if visible_ij: difference_receiver = patches_center[i]-patches_center[j] wall_id_i = int(patch_to_wall_ids[i]) difference_receiver /= np.linalg.norm(difference_receiver) ff = form_factors[i, j] if i<j else (form_factors[j, i] *patches_area[j]/patches_area[i]) distance = np.linalg.norm(difference_receiver) form_factors_tilde[i, j, :, :] = ff if air_attenuation is not None: form_factors_tilde[i, j, :, :] = form_factors_tilde[ i, j, :, :] * np.exp(-air_attenuation * distance) if scattering is not None: scattering_factor = get_scattering_data_source( patches_center[i], patches_center[j], sources, wall_id_i, scattering, scattering_index) form_factors_tilde[i, j, :, :] = form_factors_tilde[ i, j, :, :] * scattering_factor return form_factors_tilde ## scattering data access def get_scattering_data_receiver_index( pos_i:np.ndarray, pos_j:np.ndarray, receivers:np.ndarray, wall_id_i:np.ndarray, ): """Get scattering receiver index based on current and next position. Parameters ---------- pos_i : np.ndarray current position of shape (n,3) pos_j : np.ndarray next position of shape (3) receivers : np.ndarray receiver directions of all walls of shape (n_walls, n_receivers, 3) wall_id_i : np.ndarray current wall id to get write directional data Returns ------- scattering_factor: float scattering factor from directivity """ n_patches = pos_i.shape[0] if pos_i.ndim > 1 else 1 receiver_idx = np.empty((n_patches), dtype=np.int64) for i in range(n_patches): difference_receiver = pos_j-pos_i[i] difference_receiver /= np.linalg.norm( difference_receiver) receiver_idx[i] = np.argmin(np.sum( (receivers[wall_id_i[i], :]-difference_receiver)**2, axis=-1), axis=-1) return receiver_idx def get_scattering_data( pos_h:np.ndarray, pos_i:np.ndarray, pos_j:np.ndarray, sources:np.ndarray, receivers:np.ndarray, wall_id_i:np.ndarray, scattering:np.ndarray, scattering_index:np.ndarray): """Get scattering data depending on previous, current and next position. Parameters ---------- pos_h : np.ndarray previous position of shape (3) pos_i : np.ndarray current position of shape (3) pos_j : np.ndarray next position of shape (3) sources : np.ndarray source directions of all walls of shape (n_walls, n_sources, 3) receivers : np.ndarray receiver directions of all walls of shape (n_walls, n_receivers, 3) wall_id_i : np.ndarray current wall id to get write directional data scattering : np.ndarray scattering data of shape (n_scattering, n_sources, n_receivers, n_bins) scattering_index : np.ndarray index of the scattering data of shape (n_walls) Returns ------- scattering_factor: float scattering factor from directivity """ difference_source = pos_h-pos_i difference_receiver = pos_i-pos_j difference_source /= np.linalg.norm(difference_source) difference_receiver /= np.linalg.norm(difference_receiver) source_idx = np.argmin(np.sum( (sources[wall_id_i, :, :]-difference_source)**2, axis=-1)) receiver_idx = np.argmin(np.sum( (receivers[wall_id_i, :]-difference_receiver)**2, axis=-1)) return scattering[scattering_index[wall_id_i], source_idx, receiver_idx, :] def get_scattering_data_source( pos_h:np.ndarray, pos_i:np.ndarray, sources:np.ndarray, wall_id_i:np.ndarray, scattering:np.ndarray, scattering_index:np.ndarray): """Get scattering data depending on previous, current position. Parameters ---------- pos_h : np.ndarray previous position of shape (3) pos_i : np.ndarray current position of shape (3) sources : np.ndarray source directions of all walls of shape (n_walls, n_sources, 3) wall_id_i : np.ndarray current wall id to get write directional data scattering : np.ndarray scattering data of shape (n_scattering, n_sources, n_receivers, n_bins) scattering_index : np.ndarray index of the scattering data of shape (n_walls) Returns ------- scattering_factor: float scattering factor from directivity """ difference_source = pos_h-pos_i difference_source /= np.linalg.norm(difference_source) source_idx = np.argmin(np.sum( (sources[wall_id_i, :, :]-difference_source)**2, axis=-1)) return scattering[scattering_index[wall_id_i], source_idx] if numba is not None: _add_directional = numba.njit(parallel=True)(_add_directional) _energy_exchange_init_energy = numba.njit()(_energy_exchange_init_energy) _collect_receiver_energy = numba.njit()(_collect_receiver_energy) _energy_exchange = numba.njit()(_energy_exchange) _form_factors_with_directivity_dim = numba.njit(parallel=True)( _form_factors_with_directivity_dim) _form_factors_with_directivity = numba.njit(parallel=True)( _form_factors_with_directivity) get_scattering_data_receiver_index = numba.njit()( get_scattering_data_receiver_index) get_scattering_data = numba.njit()(get_scattering_data) get_scattering_data_source = numba.njit()(get_scattering_data_source)