diff --git a/CHANGELOG.md b/CHANGELOG.md index addfbb13ce..b0632f2f4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Access field decay values in `SimulationData` via `sim_data.field_decay` as `TimeDataArray`. +- Selective simulation capabilities to `TerminalComponentModeler` via `run_only` and `element_mappings` fields, allowing users to run fewer simulations and extract only needed scattering matrix elements. ### Changed - By default, batch downloads will skip files that already exist locally. To force re-downloading and replace existing files, pass the `replace_existing=True` argument to `Batch.load()`, `Batch.download()`, or `BatchData.load()`. diff --git a/tests/test_plugins/smatrix/terminal_component_modeler_def.py b/tests/test_plugins/smatrix/terminal_component_modeler_def.py index f4b7e665d5..d6fd322733 100644 --- a/tests/test_plugins/smatrix/terminal_component_modeler_def.py +++ b/tests/test_plugins/smatrix/terminal_component_modeler_def.py @@ -276,7 +276,7 @@ def make_port(center, direction, type, name) -> Union[CoaxialLumpedPort, WavePor inner_diameter=2 * Rinner, normal_axis=2, direction=direction, - name=name, + name="coax" + name, num_grid_cells=port_cells, impedance=reference_impedance, ) @@ -311,7 +311,7 @@ def make_port(center, direction, type, name) -> Union[CoaxialLumpedPort, WavePor center=center, size=[2 * Router, 2 * Router, 0], direction=direction, - name=name, + name="wave" + name, mode_spec=td.ModeSpec(num_modes=1), mode_index=0, voltage_integral=voltage_integral, @@ -321,9 +321,9 @@ def make_port(center, direction, type, name) -> Union[CoaxialLumpedPort, WavePor return port center_src1 = [0, 0, -length / 2] - port_1 = make_port(center_src1, direction="+", type=port_types[0], name="coax_port_1") + port_1 = make_port(center_src1, direction="+", type=port_types[0], name="_1") center_src2 = [0, 0, length / 2] - port_2 = make_port(center_src2, direction="-", type=port_types[1], name="coax_port_2") + port_2 = make_port(center_src2, direction="-", type=port_types[1], name="_2") ports = [port_1, port_2] freqs = np.linspace(freq_start, freq_stop, 100) diff --git a/tests/test_plugins/smatrix/test_component_modeler.py b/tests/test_plugins/smatrix/test_component_modeler.py index 74064826d9..6cc78d192c 100644 --- a/tests/test_plugins/smatrix/test_component_modeler.py +++ b/tests/test_plugins/smatrix/test_component_modeler.py @@ -374,6 +374,35 @@ def test_mapping_exclusion(monkeypatch): _test_mappings(element_mappings, s_matrix) +def test_mapping_with_run_only(): + """Make sure that the Modeler is correctly validated when both run_only and + element_mappings are provided.""" + ports = make_ports() + + EXCLUDE_INDEX = ("right_bot", 0) + element_mappings = [] + run_only = [] + # add a mapping to each element in the row of EXCLUDE_INDEX + for port in ports: + for mode_index in range(port.mode_spec.num_modes): + row_index = (port.name, mode_index) + run_only.append(row_index) + if row_index != EXCLUDE_INDEX: + mapping = ((row_index, row_index), (row_index, EXCLUDE_INDEX), +1) + element_mappings.append(mapping) + + # add the self-self coupling element to complete row + mapping = ((("right_bot", 1), ("right_bot", 1)), (EXCLUDE_INDEX, EXCLUDE_INDEX), +1) + element_mappings.append(mapping) + + # Will pass, since run_only covers all source indices in element_mapping + _ = make_component_modeler(element_mappings=element_mappings, run_only=run_only) + + run_only.remove(EXCLUDE_INDEX) + with pytest.raises(pydantic.ValidationError): + _ = make_component_modeler(element_mappings=element_mappings, run_only=run_only) + + def test_batch_filename(tmp_path): modeler = make_component_modeler() path = modeler._batch_path diff --git a/tests/test_plugins/smatrix/test_terminal_component_modeler.py b/tests/test_plugins/smatrix/test_terminal_component_modeler.py index c601d34e76..d7eca990f9 100644 --- a/tests/test_plugins/smatrix/test_terminal_component_modeler.py +++ b/tests/test_plugins/smatrix/test_terminal_component_modeler.py @@ -900,3 +900,44 @@ def test_get_combined_antenna_parameters_data(monkeypatch, tmp_path): assert not np.allclose( antenna_params.radiation_efficiency, single_port_params.radiation_efficiency ) + + +def test_run_only_and_element_mappings(monkeypatch, tmp_path): + """Checks the terminal component modeler works when running with a subset of excitations.""" + z_grid = td.UniformGrid(dl=1 * 1e3) + xy_grid = td.UniformGrid(dl=0.1 * 1e3) + grid_spec = td.GridSpec(grid_x=xy_grid, grid_y=xy_grid, grid_z=z_grid) + modeler = make_coaxial_component_modeler( + path_dir=str(tmp_path), port_types=(CoaxialLumpedPort, WavePort), grid_spec=grid_spec + ) + port0_idx = modeler.network_index(modeler.ports[0]) + port1_idx = modeler.network_index(modeler.ports[1]) + modeler_run1 = modeler.updated_copy(run_only=(port0_idx,)) + + # Make sure the smatrix and impedance calculations work for reduced simulations + s_matrix = run_component_modeler(monkeypatch, modeler_run1) + with pytest.raises(ValueError): + TerminalComponentModeler._validate_square_matrix(s_matrix, "test_method") + _ = modeler_run1.port_reference_impedances + + assert len(modeler_run1.sim_dict) == 1 + S11 = (port0_idx, port0_idx) + S21 = (port1_idx, port0_idx) + S12 = (port0_idx, port1_idx) + S22 = (port1_idx, port1_idx) + element_mappings = ((S11, S22, 1),) + modeler_with_mappings = modeler.updated_copy(element_mappings=element_mappings) + assert len(modeler_with_mappings.sim_dict) == 2 + + # Column 1 is mapped to column 2, resulting in one simulation + element_mappings = ((S11, S22, 1), (S21, S12, 1)) + modeler_with_mappings = modeler.updated_copy(element_mappings=element_mappings) + s_matrix = run_component_modeler(monkeypatch, modeler_with_mappings) + assert np.all(s_matrix.values[:, 0, 0] == s_matrix.values[:, 1, 1]) + assert np.all(s_matrix.values[:, 0, 1] == s_matrix.values[:, 1, 0]) + assert len(modeler_with_mappings.sim_dict) == 1 + + # Mapping is incomplete, so two simulations are run + element_mappings = ((S11, S22, 1), (S12, S21, 1)) + modeler_with_mappings = modeler.updated_copy(element_mappings=element_mappings) + assert len(modeler_with_mappings.sim_dict) == 2 diff --git a/tidy3d/plugins/smatrix/component_modelers/base.py b/tidy3d/plugins/smatrix/component_modelers/base.py index e4b77ad7ab..a02b97b43e 100644 --- a/tidy3d/plugins/smatrix/component_modelers/base.py +++ b/tidy3d/plugins/smatrix/component_modelers/base.py @@ -4,7 +4,7 @@ import os from abc import ABC, abstractmethod -from typing import Optional, Union, get_args +from typing import Generic, Optional, TypeVar, Union, get_args import numpy as np import pydantic.v1 as pd @@ -13,7 +13,8 @@ from tidy3d.components.data.data_array import DataArray from tidy3d.components.data.sim_data import SimulationData from tidy3d.components.simulation import Simulation -from tidy3d.components.types import FreqArray +from tidy3d.components.types import Complex, FreqArray +from tidy3d.components.validators import assert_unique_names from tidy3d.config import config from tidy3d.constants import HERTZ from tidy3d.exceptions import SetupError, Tidy3dKeyError @@ -31,8 +32,12 @@ LumpedPortType = Union[LumpedPort, CoaxialLumpedPort] TerminalPortType = Union[LumpedPortType, WavePort] +# Generic type variables for matrix indices and elements +IndexType = TypeVar("IndexType") +ElementType = TypeVar("ElementType") -class AbstractComponentModeler(ABC, Tidy3dBaseModel): + +class AbstractComponentModeler(ABC, Generic[IndexType, ElementType], Tidy3dBaseModel): """Tool for modeling devices and computing port parameters.""" simulation: Simulation = pd.Field( @@ -108,6 +113,24 @@ class AbstractComponentModeler(ABC, Tidy3dBaseModel): "fields that were not used to create the task will cause errors.", ) + run_only: Optional[tuple[IndexType, ...]] = pd.Field( + None, + title="Run Only", + description="Set of matrix indices that define the simulations to run. " + "If ``None``, simulations will be run for all indices in the scattering matrix. " + "If a tuple is given, simulations will be run only for the given matrix indices.", + ) + + element_mappings: tuple[tuple[ElementType, ElementType, Complex], ...] = pd.Field( + (), + title="Element Mappings", + description="Tuple of S matrix element mappings, each described by a tuple of " + "(input_element, output_element, coefficient), where the coefficient is the " + "element_mapping coefficient describing the relationship between the input and output " + "matrix element. If all elements of a given column of the scattering matrix are defined " + "by ``element_mappings``, the simulation corresponding to this column is skipped automatically.", + ) + @pd.root_validator(pre=False) def _warn_deprecation_2_10(cls, values): log.warning( @@ -139,6 +162,30 @@ def _warn_rf_license(cls, val): ) return val + @pd.validator("element_mappings", always=True) + def _validate_element_mappings(cls, element_mappings, values): + """ + Validate that each source index referenced in element_mappings is included in run_only. + """ + run_only = values.get("run_only") + if run_only is None: + return element_mappings + + valid_set = set(run_only) + invalid_indices = set() + for mapping in element_mappings: + input_element = mapping[0] + output_element = mapping[1] + for source_index in [input_element[1], output_element[1]]: + if source_index not in valid_set: + invalid_indices.add(source_index) + if invalid_indices: + raise SetupError( + f"'element_mappings' references source index(es) {invalid_indices} " + f"that are not present in run_only: {run_only}." + ) + return element_mappings + @staticmethod def _task_name(port: Port, mode_index: Optional[int] = None) -> str: """The name of a task, determined by the port of the source and mode index, if given.""" @@ -238,6 +285,44 @@ def get_port_by_name(self, port_name: str) -> Port: raise Tidy3dKeyError(f'Port "{port_name}" not found.') return ports[0] + @property + @abstractmethod + def matrix_indices_monitor(self) -> tuple[IndexType, ...]: + """Abstract property for all matrix indices that will be used to collect data.""" + + @cached_property + def matrix_indices_source(self) -> tuple[IndexType, ...]: + """Tuple of all the source matrix indices, which may be less than the total number of ports.""" + if self.run_only is not None: + return self.run_only + return self.matrix_indices_monitor + + @cached_property + def matrix_indices_run_sim(self) -> tuple[IndexType, ...]: + """Tuple of all the matrix indices that will be used to run simulations.""" + + if not self.element_mappings: + return self.matrix_indices_source + + # all the (i, j) pairs in `S_ij` that are tagged as covered by `element_mappings` + elements_determined_by_map = [element_out for (_, element_out, _) in self.element_mappings] + + # loop through rows of the full s matrix and record rows that still need running. + source_indices_needed = [] + for col_index in self.matrix_indices_source: + # loop through columns and keep track of whether each element is covered by mapping. + matrix_elements_covered = [] + for row_index in self.matrix_indices_monitor: + element = (row_index, col_index) + element_covered_by_map = element in elements_determined_by_map + matrix_elements_covered.append(element_covered_by_map) + + # if any matrix elements in row still not covered by map, a source is needed for row. + if not all(matrix_elements_covered): + source_indices_needed.append(col_index) + + return source_indices_needed + @abstractmethod def _construct_smatrix(self, batch_data: BatchData) -> DataArray: """Post process :class:`.BatchData` to generate scattering matrix.""" @@ -316,3 +401,5 @@ def sim_data_by_task_name(self, task_name: str) -> SimulationData: sim_data = self.batch_data[task_name] config.logging_level = log_level_cache return sim_data + + _unique_port_names = assert_unique_names("ports") diff --git a/tidy3d/plugins/smatrix/component_modelers/modal.py b/tidy3d/plugins/smatrix/component_modelers/modal.py index 251d48252a..c21706a516 100644 --- a/tidy3d/plugins/smatrix/component_modelers/modal.py +++ b/tidy3d/plugins/smatrix/component_modelers/modal.py @@ -15,9 +15,8 @@ from tidy3d.components.simulation import Simulation from tidy3d.components.source.field import ModeSource from tidy3d.components.source.time import GaussianPulse -from tidy3d.components.types import Ax, Complex +from tidy3d.components.types import Ax from tidy3d.components.viz import add_ax_if_none, equal_aspect -from tidy3d.exceptions import SetupError from tidy3d.plugins.smatrix.ports.modal import ModalPortDataArray, Port from tidy3d.web.api.container import BatchData @@ -27,7 +26,7 @@ Element = tuple[MatrixIndex, MatrixIndex] # the 'ij' in S_ij -class ComponentModeler(AbstractComponentModeler): +class ComponentModeler(AbstractComponentModeler[MatrixIndex, Element]): """ Tool for modeling devices and computing scattering matrix elements. @@ -47,52 +46,6 @@ class ComponentModeler(AbstractComponentModeler): "For each input mode, one simulation will be run with a modal source.", ) - element_mappings: tuple[tuple[Element, Element, Complex], ...] = pd.Field( - (), - title="Element Mappings", - description="Mapping between elements of the scattering matrix, " - "as specified by pairs of ``(port name, mode index)`` matrix indices, where the " - "first element of the pair is the output and the second element of the pair is the input." - "Each item of ``element_mappings`` is a tuple of ``(element1, element2, c)``, where " - "the scattering matrix ``Smatrix[element2]`` is set equal to ``c * Smatrix[element1]``." - "If all elements of a given column of the scattering matrix are defined by " - " ``element_mappings``, the simulation corresponding to this column " - "is skipped automatically.", - ) - - run_only: Optional[tuple[MatrixIndex, ...]] = pd.Field( - None, - title="Run Only", - description="If given, a tuple of matrix indices, specified by (:class:`.Port`, ``int``)," - " to run only, excluding the other rows from the scattering matrix. " - "If this option is used, " - "the data corresponding to other inputs will be missing in the resulting matrix.", - ) - """Finally, to exclude some rows of the scattering matrix, one can supply a ``run_only`` parameter to the - :class:`ComponentModeler`. ``run_only`` contains the scattering matrix indices that the user wants to run as a - source. If any indices are excluded, they will not be run.""" - - verbose: bool = pd.Field( - False, - title="Verbosity", - description="Whether the :class:`.ComponentModeler` should print status and progressbars.", - ) - - callback_url: str = pd.Field( - None, - title="Callback URL", - description="Http PUT url to receive simulation finish event. " - "The body content is a json file with fields " - "``{'id', 'status', 'name', 'workUnit', 'solverVersion'}``.", - ) - - @pd.validator("simulation", always=True) - def _sim_has_no_sources(cls, val): - """Make sure simulation has no sources as they interfere with tool.""" - if len(val.sources) > 0: - raise SetupError("'ComponentModeler.simulation' must not have any sources.") - return val - @cached_property def sim_dict(self) -> dict[str, Simulation]: """Generate all the :class:`.Simulation` objects for the S matrix calculation.""" @@ -121,39 +74,6 @@ def matrix_indices_monitor(self) -> tuple[MatrixIndex, ...]: matrix_indices.append((port.name, mode_index)) return tuple(matrix_indices) - @cached_property - def matrix_indices_source(self) -> tuple[MatrixIndex, ...]: - """Tuple of all the source matrix indices (port, mode_index) in the Component Modeler.""" - if self.run_only is not None: - return self.run_only - return self.matrix_indices_monitor - - @cached_property - def matrix_indices_run_sim(self) -> tuple[MatrixIndex, ...]: - """Tuple of all the source matrix indices (port, mode_index) in the Component Modeler.""" - - if self.element_mappings is None or self.element_mappings == {}: - return self.matrix_indices_source - - # all the (i, j) pairs in `S_ij` that are tagged as covered by `element_mappings` - elements_determined_by_map = [element_out for (_, element_out, _) in self.element_mappings] - - # loop through rows of the full s matrix and record rows that still need running. - source_indices_needed = [] - for col_index in self.matrix_indices_source: - # loop through columns and keep track of whether each element is covered by mapping. - matrix_elements_covered = [] - for row_index in self.matrix_indices_monitor: - element = (row_index, col_index) - element_covered_by_map = element in elements_determined_by_map - matrix_elements_covered.append(element_covered_by_map) - - # if any matrix elements in row still not covered by map, a source is needed for row. - if not all(matrix_elements_covered): - source_indices_needed.append(col_index) - - return source_indices_needed - @cached_property def port_names(self) -> tuple[list[str], list[str]]: """List of port names for inputs and outputs, respectively.""" diff --git a/tidy3d/plugins/smatrix/component_modelers/terminal.py b/tidy3d/plugins/smatrix/component_modelers/terminal.py index 0e5b9cad0a..44fa3fcaaf 100644 --- a/tidy3d/plugins/smatrix/component_modelers/terminal.py +++ b/tidy3d/plugins/smatrix/component_modelers/terminal.py @@ -30,8 +30,11 @@ from .base import AbstractComponentModeler, TerminalPortType +NetworkIndex = str # the 'i' in S_ij +NetworkElement = tuple[NetworkIndex, NetworkIndex] # the 'ij' in S_ij -class TerminalComponentModeler(AbstractComponentModeler): + +class TerminalComponentModeler(AbstractComponentModeler[NetworkIndex, NetworkElement]): """Tool for modeling two-terminal multiport devices and computing port parameters with lumped and wave ports.""" @@ -49,6 +52,18 @@ class TerminalComponentModeler(AbstractComponentModeler): "These monitor will be included in every simulation and record the radiated fields. ", ) + assume_ideal_excitation: bool = pd.Field( + False, + title="Assume Ideal Excitation", + description="If ``True``, only the excited port is assumed to have incident power, so the " + "vector of incident power wave amplitudes (a) is assumed to be all zeros except for the " + "entry associated with the excited port. This choice simplifies the calculation of the " + "scattering matrix. If ``False``, every entry in the vector of incident power wave " + "amplitudes (a) is calculated explicitly. This choice requires a matrix inversion when " + "calculating the scattering matrix, but may lead to more accurate scattering parameters " + "when there are reflections from simulation boundaries. ", + ) + @pd.root_validator(pre=False) def _warn_rf_license(cls, values): log.warning( @@ -95,12 +110,70 @@ def plot_sim_eps( sim_plot = self.simulation.copy(update={"sources": plot_sources}) return sim_plot.plot_eps(x=x, y=y, z=z, ax=ax, **kwargs) + @staticmethod + def network_index(port: TerminalPortType, mode_index: Optional[int] = None) -> NetworkIndex: + """Converts the port, and a ``mode_index`` when the port is a :class:`.WavePort``, to a unique string specifier. + + Parameters + ---------- + port : ``TerminalPortType`` + The port to convert to an index. + mode_index : Optional[int] + Selects a single mode from those supported by the ``port``, which is only used when + the ``port`` is a :class:`.WavePort`` + + Returns + ------- + NetworkIndex + A unique string that is used to identify the row/column of the scattering matrix. + """ + # Currently the mode_index is ignored, but will be supported once multimodal WavePorts are enabled. + return f"{port.name}" + + @cached_property + def network_dict(self) -> dict[NetworkIndex, tuple[TerminalPortType, int]]: + """Dictionary associating each unique ``NetworkIndex`` to a port and mode index.""" + network_dict = {} + for port in self.ports: + mode_index = None + if isinstance(port, WavePort): + mode_index = port.mode_index + key = TerminalComponentModeler.network_index(port, mode_index) + network_dict[key] = (port, mode_index) + return network_dict + + @cached_property + def matrix_indices_monitor(self) -> tuple[NetworkIndex, ...]: + """Tuple of all the possible matrix indices.""" + matrix_indices = [] + for port in self.ports: + if isinstance(port, WavePort): + matrix_indices.append(self.network_index(port, port.mode_index)) + else: + matrix_indices.append(self.network_index(port)) + return tuple(matrix_indices) + @cached_property def sim_dict(self) -> dict[str, Simulation]: """Generate all the :class:`.Simulation` objects for the port parameter calculation.""" - sim_dict = {} + # Now, create simulations with wave port sources and mode solver monitors for computing port modes + for network_index in self.matrix_indices_run_sim: + task_name, sim_with_src = self._add_source_to_sim(network_index) + sim_dict[task_name] = sim_with_src + + # Check final simulations for grid size at ports + for _, sim in sim_dict.items(): + TerminalComponentModeler._check_grid_size_at_ports(sim, self._lumped_ports) + TerminalComponentModeler._check_grid_size_at_wave_ports(sim, self._wave_ports) + return sim_dict + + @cached_property + def base_sim(self) -> Simulation: + """The base simulation with all grid refinement options, port loads (if present), and monitors added, + which is only missing the source excitations. + """ # internal mesh override and snapping points are automatically generated from lumped elements. lumped_resistors = [port.to_load() for port in self._lumped_ports] @@ -156,35 +229,25 @@ def sim_dict(self) -> dict[str, Simulation]: } # This is the new default simulation will all shared components added - sim_wo_source = sim_wo_source.copy(update=update_dict) + return sim_wo_source.copy(update=update_dict) - # Next, simulations are generated that include the source corresponding with the excitation port - for port in self._lumped_ports: - port_source = port.to_source( - self._source_time, snap_center=snap_centers[port.name], grid=sim_wo_source.grid - ) - task_name = self._task_name(port=port) - sim_dict[task_name] = sim_wo_source.updated_copy(sources=[port_source]) - - # Now, create simulations with wave port sources and mode solver monitors for computing port modes - for wave_port in self._wave_ports: + def _add_source_to_sim(self, source_index: NetworkIndex) -> tuple[str, Simulation]: + """Adds the source corresponding to the ``source_index`` to the base simulation.""" + port, mode_index = self.network_dict[source_index] + if isinstance(port, WavePort): # Source is placed just before the field monitor of the port - mode_src_pos = wave_port.center[wave_port.injection_axis] + self._shift_value_signed( - wave_port + mode_src_pos = port.center[port.injection_axis] + self._shift_value_signed(port) + port_source = port.to_source(self._source_time, snap_center=mode_src_pos) + else: + port_center_on_axis = port.center[port.injection_axis] + new_port_center = snap_coordinate_to_grid( + self.base_sim.grid, port_center_on_axis, port.injection_axis ) - port_source = wave_port.to_source(self._source_time, snap_center=mode_src_pos) - - update_dict = {"sources": [port_source]} - - task_name = self._task_name(port=wave_port) - sim_dict[task_name] = sim_wo_source.copy(update=update_dict) - - # Check final simulations for grid size at ports - for _, sim in sim_dict.items(): - TerminalComponentModeler._check_grid_size_at_ports(sim, self._lumped_ports) - TerminalComponentModeler._check_grid_size_at_wave_ports(sim, self._wave_ports) - - return sim_dict + port_source = port.to_source( + self._source_time, snap_center=new_port_center, grid=self.base_sim.grid + ) + task_name = self._task_name(port=port, mode_index=mode_index) + return (task_name, self.base_sim.updated_copy(sources=[port_source])) @cached_property def _source_time(self): @@ -195,21 +258,27 @@ def _source_time(self): def _construct_smatrix(self) -> TerminalPortDataArray: """Post process :class:`.BatchData` to generate scattering matrix.""" - return self._internal_construct_smatrix(batch_data=self.batch_data) + return self._internal_construct_smatrix( + batch_data=self.batch_data, assume_ideal_excitation=self.assume_ideal_excitation + ) - def _internal_construct_smatrix(self, batch_data: BatchData) -> TerminalPortDataArray: + def _internal_construct_smatrix( + self, batch_data: BatchData, assume_ideal_excitation: bool = True + ) -> TerminalPortDataArray: """Post process :class:`.BatchData` to generate scattering matrix, for internal use only.""" - port_names = [port.name for port in self.ports] + monitor_indices = list(self.matrix_indices_monitor) + source_indices = list(self.matrix_indices_source) + run_source_indices = list(self.matrix_indices_run_sim) values = np.zeros( - (len(self.freqs), len(port_names), len(port_names)), + (len(self.freqs), len(monitor_indices), len(source_indices)), dtype=complex, ) coords = { "f": np.array(self.freqs), - "port_out": port_names, - "port_in": port_names, + "port_out": monitor_indices, + "port_in": source_indices, } a_matrix = TerminalPortDataArray(values, coords=coords) b_matrix = a_matrix.copy(deep=True) @@ -218,14 +287,36 @@ def _internal_construct_smatrix(self, batch_data: BatchData) -> TerminalPortData port_impedances = self._port_reference_impedances(batch_data=batch_data) # loop through source ports - for port_in in self.ports: - sim_data = batch_data[self._task_name(port=port_in)] + for source_index in run_source_indices: + port, mode_index = self.network_dict[source_index] + sim_data = batch_data[self._task_name(port=port, mode_index=mode_index)] a, b = self.compute_power_wave_amplitudes_at_each_port(port_impedances, sim_data) - indexer = {"f": a.f, "port_in": port_in.name, "port_out": a.port} + indexer = {"f": a.f, "port_out": a.port, "port_in": source_index} a_matrix.loc[indexer] = a b_matrix.loc[indexer] = b - s_matrix = self.ab_to_s(a_matrix, b_matrix) + # If excitation is assumed ideal, a_matrix is assumed to be diagonal + # and the explicit inverse can be avoided. When only a subset of excitations + # have been run, we cannot find the inverse anyways so must make this assumption. + if len(monitor_indices) == len(run_source_indices) and not assume_ideal_excitation: + s_matrix = self.ab_to_s(a_matrix, b_matrix) + else: + a_diag = np.diagonal(a_matrix, axis1=1, axis2=2) + # Scale each column by the corresponding diagonal entry + s_matrix = b_matrix / a_diag[:, np.newaxis, :] + + # element can be determined by user-defined mapping + for (row_in, col_in), (row_out, col_out), mult_by in self.element_mappings: + coords_from = { + "port_in": col_in, + "port_out": row_in, + } + coords_to = { + "port_in": col_out, + "port_out": row_out, + } + s_matrix.loc[coords_to] = mult_by * s_matrix.loc[coords_from].values + return s_matrix @pd.validator("simulation") @@ -298,14 +389,14 @@ def compute_power_wave_amplitudes_at_each_port( tuple[:class:`.PortDataArray`, :class:`.PortDataArray`] Incident (a) and reflected (b) power wave amplitudes at each port. """ - port_names = [port.name for port in self.ports] + network_indices = list(self.matrix_indices_monitor) values = np.zeros( - (len(self.freqs), len(port_names)), + (len(self.freqs), len(network_indices)), dtype=complex, ) coords = { "f": np.array(self.freqs), - "port": port_names, + "port": network_indices, } V_matrix = PortDataArray(values, coords=coords) @@ -313,9 +404,10 @@ def compute_power_wave_amplitudes_at_each_port( a = V_matrix.copy(deep=True) b = V_matrix.copy(deep=True) - for port_out in self.ports: - V_out, I_out = self.compute_port_VI(port_out, sim_data) - indexer = {"port": port_out.name} + for network_index in network_indices: + port, mode_index = self.network_dict[network_index] + V_out, I_out = self.compute_port_VI(port, sim_data) + indexer = {"port": network_index} V_matrix.loc[indexer] = V_out I_matrix.loc[indexer] = I_out @@ -413,6 +505,7 @@ def ab_to_s( a_matrix: TerminalPortDataArray, b_matrix: TerminalPortDataArray ) -> TerminalPortDataArray: """Get the scattering matrix given the power wave matrices.""" + TerminalComponentModeler._validate_square_matrix(a_matrix, "ab_to_s") # Ensure dimensions are ordered properly a_matrix = a_matrix.transpose(*TerminalPortDataArray._dims) b_matrix = b_matrix.transpose(*TerminalPortDataArray._dims) @@ -431,7 +524,7 @@ def s_to_z( s_matrix: TerminalPortDataArray, reference: Union[complex, PortDataArray] ) -> DataArray: """Get the impedance matrix given the scattering matrix and a reference impedance.""" - + TerminalComponentModeler._validate_square_matrix(s_matrix, "s_to_z") # Ensure dimensions are ordered properly z_matrix = s_matrix.transpose(*TerminalPortDataArray._dims).copy(deep=True) s_vals = z_matrix.values @@ -466,25 +559,27 @@ def _port_reference_impedances(self, batch_data: BatchData) -> PortDataArray: """Tabulates the reference impedance of each port at each frequency using the supplied :class:`.BatchData`. """ - port_names = [port.name for port in self.ports] - values = np.zeros( - (len(self.freqs), len(port_names)), + (len(self.freqs), len(self.matrix_indices_monitor)), dtype=complex, ) - coords = {"f": np.array(self.freqs), "port": port_names} + coords = {"f": np.array(self.freqs), "port": list(self.matrix_indices_monitor)} port_impedances = PortDataArray(values, coords=coords) - for port in self.ports: + # Mode solver data is available in each SimulationData, + # so just choose the first available one + sim_data = next(iter(batch_data.values())) + for network_index in self.matrix_indices_monitor: + port, mode_index = self.network_dict[network_index] if isinstance(port, WavePort): - # Mode solver data for each wave port is stored in its associated SimulationData - sim_data_port = batch_data[self._task_name(port=port)] # WavePorts have a port impedance calculated from its associated modal field distribution # and is frequency dependent. - impedances = port.compute_port_impedance(sim_data_port).values - port_impedances.loc[{"port": port.name}] = impedances.squeeze() + impedances = port.compute_port_impedance(sim_data).values + port_impedances.loc[{"port": network_index}] = impedances.squeeze() else: # LumpedPorts have a constant reference impedance - port_impedances.loc[{"port": port.name}] = np.full(len(self.freqs), port.impedance) + port_impedances.loc[{"port": network_index}] = np.full( + len(self.freqs), port.impedance + ) port_impedances = TerminalComponentModeler._set_port_data_array_attributes(port_impedances) return port_impedances @@ -524,6 +619,32 @@ def _check_port_impedance_sign(Z_numpy: np.ndarray): "github repository." ) + @staticmethod + def _validate_square_matrix(matrix: TerminalPortDataArray, method_name: str) -> None: + """Check if the matrix has equal input and output port dimensions. + + Parameters + ---------- + matrix : TerminalPortDataArray + Matrix to validate + method_name : str + Name of the calling method for error message + + Raises + ------ + DataError + If the matrix is not square (unequal input/output dimensions). + """ + n_out = len(matrix.port_out) + n_in = len(matrix.port_in) + if n_out != n_in: + raise Tidy3dError( + f"Cannot compute {method_name}: number of input ports ({n_in}) " + f"!= the number of output ports ({n_out}). This usually means the 'TerminalComponentModeler' " + "was run with only a subset of port excitations. Please ensure that the `run_only` field in " + "the 'TerminalComponentModeler' is not being used." + ) + def get_radiation_monitor_by_name(self, monitor_name: str) -> DirectivityMonitor: """Find and return a :class:`.DirectivityMonitor` monitor by its name. @@ -561,7 +682,7 @@ def _monitor_data_at_port_amplitude( a_raw, _ = self.compute_power_wave_amplitudes_at_each_port( self.port_reference_impedances, sim_data ) - a_raw_port = a_raw.sel(port=port.name) + a_raw_port = a_raw.sel(port=self.network_index(port)) if not isinstance(a_port, FreqDataArray): freqs = list(monitor_data.monitor.freqs) array_vals = a_port * np.ones(len(freqs)) @@ -599,11 +720,11 @@ def get_antenna_metrics_data( # Use the first port as default if none specified if port_amplitudes is None: port_amplitudes = {self.ports[0].name: None} - port_names = [port.name for port in self.ports] + # Check port names, and create map from port to amplitude port_dict = {} for key in port_amplitudes.keys(): - port = self.get_port_by_name(port_name=key) + port, _ = self.network_dict[key] port_dict[port] = port_amplitudes[key] # Get the radiation monitor, use first as default # if none specified @@ -614,8 +735,10 @@ def get_antenna_metrics_data( # Create data arrays for holding the superposition of all port power wave amplitudes f = list(rad_mon.freqs) - coords = {"f": f, "port": port_names} - a_sum = PortDataArray(np.zeros((len(f), len(port_names)), dtype=complex), coords=coords) + coords = {"f": f, "port": list(self.matrix_indices_monitor)} + a_sum = PortDataArray( + np.zeros((len(f), len(self.matrix_indices_monitor)), dtype=complex), coords=coords + ) b_sum = a_sum.copy() # Retrieve associated simulation data combined_directivity_data = None @@ -629,7 +752,7 @@ def get_antenna_metrics_data( # Select a possible subset of frequencies a = a.sel(f=f) b = b.sel(f=f) - a_raw = a.sel(port=port.name) + a_raw = a.sel(port=self.network_index(port)) if amplitude is None: # No scaling performed when amplitude is None