diff --git a/src/ansys/dpf/core/data_sources.py b/src/ansys/dpf/core/data_sources.py index 1283012bf8c..a78856c7878 100644 --- a/src/ansys/dpf/core/data_sources.py +++ b/src/ansys/dpf/core/data_sources.py @@ -20,12 +20,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -"""Data Sources.""" +"""Data sources.""" + +from __future__ import annotations import os from pathlib import Path import traceback -from typing import Union +from typing import TYPE_CHECKING, Union import warnings from ansys.dpf.core import errors, server as server_module @@ -38,10 +40,17 @@ integral_types, ) +if TYPE_CHECKING: # pragma: no cover + from ansys.dpf import core as dpf + from ansys.dpf.core import server_types + from ansys.dpf.core.server_types import AnyServerType + from ansys.grpc.dpf import data_sources_pb2 + class DataSources: - """Contains files with analysis results. + """Manages paths to files as sources of data. + Use this object to declare data inputs for DPF and define their locations. An extension key (``'rst'`` for example) is used to choose which files represent results files versus accessory files. You can set a result file path when initializing this class. @@ -49,11 +58,11 @@ class DataSources: Parameters ---------- - result_path : str or os.PathLike object, optional - Path of the result. The default is ``None``. - data_sources : ansys.grpc.dpf.data_sources_pb2.DataSources - gRPC data sources message. The default is ``None``. - server : server.DPFServer, optional + result_path: + Path of the result. + data_sources: + gRPC data sources message. + server: Server with the channel connected to the remote or local instance. The default is ``None``, in which case an attempt is made to use the global server. @@ -63,13 +72,20 @@ class DataSources: Initialize a model from a result path. >>> from ansys.dpf import core as dpf - >>> my_data_sources = dpf.DataSources('file.rst') + >>> # Create the DataSources object with a main file path + >>> my_data_sources = dpf.DataSources(result_path='file.rst') + >>> # Get the path to the main result file >>> my_data_sources.result_files ['file.rst'] """ - def __init__(self, result_path=None, data_sources=None, server=None): + def __init__( + self, + result_path: Union[str, os.PathLike] = None, + data_sources: Union[dpf.DataSources, int, data_sources_pb2.DataSources] = None, + server: AnyServerType = None, + ): """Initialize a connection with the server.""" # step 1: get server self._server = server_module.get_or_create_server( @@ -113,26 +129,33 @@ def __init__(self, result_path=None, data_sources=None, server=None): if result_path is not None: self.set_result_file_path(result_path) - def set_result_file_path(self, filepath, key=""): - """Add a result file path to the data sources. + def set_result_file_path( + self, + filepath: Union[str, os.PathLike], + key: str = "", + ) -> None: + """Set the main result file path to the data sources. Parameters ---------- - filepath : str or os.PathLike object + filepath: Path to the result file. - key : str, optional + key: Extension of the file, which is used as a key for choosing the correct - plugin when a result is requested by an operator. The default is ``""``, - in which case the key is found directly. + plugin when a result is requested by an operator. + Overrides the default key detection logic. Examples -------- Create a data source and set the result file path. >>> from ansys.dpf import core as dpf - >>> data_sources = dpf.DataSources() - >>> data_sources.set_result_file_path('/tmp/file.rst') - >>> data_sources.result_files + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result file can be found + >>> my_data_sources.set_result_file_path(filepath='/tmp/file.rst', key='rst') + >>> # Get the path to the main result file + >>> my_data_sources.result_files ['...tmp...file.rst'] """ @@ -154,8 +177,38 @@ def set_result_file_path(self, filepath, key=""): self._api.data_sources_set_result_file_path_with_key_utf8(self, str(filepath), key) @staticmethod - def guess_result_key(filepath: str) -> str: - """Guess result key for files without a file extension.""" + def guess_result_key(filepath: Union[str, os.PathLike]) -> str: + """Guess result key for files without a file extension. + + Parameters + ---------- + filepath: + Path to the file. + + Returns + ------- + str: + Extension key name. + + Examples + -------- + Gives the result key for the result file of the given path + + >>> from ansys.dpf import core as dpf + >>> from ansys.dpf.core import examples + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Download the result files + >>> path = examples.download_d3plot_beam() + >>> # Define the path where the main result file can be found + >>> my_data_sources.set_result_file_path(filepath=path[0]) + >>> # Detect the result key for the file in the given path + >>> my_file_key = my_data_sources.guess_result_key(filepath=path[0]) + >>> print(my_file_key) + d3plot + + """ result_keys = ["d3plot", "binout"] base_name = Path(filepath).name # Handle files without extension @@ -165,8 +218,40 @@ def guess_result_key(filepath: str) -> str: return "" @staticmethod - def guess_second_key(filepath: str) -> str: - """For files with an h5 or cff extension, look for another extension.""" + def guess_second_key(filepath: Union[str, os.PathLike]) -> str: + """For files with an h5 or cff extension, look for another extension. + + Parameters + ---------- + filepath: + Path to the file. + + Returns + ------- + str: + First extension key name. + + Examples + -------- + Find the first extension key of a result file with multiple extensions keys. + + >>> from ansys.dpf import core as dpf + >>> from ansys.dpf.core import examples + >>> + >>> # Download the result files + >>> paths = examples.download_fluent_axial_comp() + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the extension key for the file in the given path + >>> # We see that the paths are given in a dictionary. + >>> # So to choose the correct file you need to give as an argument: + >>> # - the list label + >>> # - the file index in that list + >>> my_file_key = my_data_sources.guess_second_key(filepath=paths["cas"][0]) + >>> print(my_file_key) + cas + + """ # These files usually end with .cas.h5 or .dat.h5 accepted = ["cas", "dat"] new_split = Path(filepath).suffixes @@ -176,9 +261,9 @@ def guess_second_key(filepath: str) -> str: return new_key def set_domain_result_file_path( - self, path: Union[str, os.PathLike], domain_id: int, key: Union[str, None] = None - ): - """Add a result file path by domain. + self, path: Union[str, os.PathLike], domain_id: int, key: str = None + ) -> None: + """Set a result file path for a specific domain. This method is used to handle files created by a distributed solve. @@ -194,10 +279,15 @@ def set_domain_result_file_path( Examples -------- + Set the main result file path to the data sources in their respective domains. + >>> from ansys.dpf import core as dpf - >>> data_sources = dpf.DataSources() - >>> data_sources.set_domain_result_file_path('/tmp/file0.sub', 0) - >>> data_sources.set_domain_result_file_path('/tmp/file1.sub', 1) + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result data can be found and specify its domain + >>> my_data_sources.set_domain_result_file_path(path='/tmp/file0.rst', key='rst', domain_id=0) + >>> my_data_sources.set_domain_result_file_path(path='/tmp/file1.rst', key='rst', domain_id=1) """ path = Path(path) @@ -208,31 +298,44 @@ def set_domain_result_file_path( else: self._api.data_sources_set_domain_result_file_path_utf8(self, str(path), domain_id) - def add_file_path(self, filepath, key="", is_domain: bool = False, domain_id=0): - """Add a file path to the data sources. + def add_file_path( + self, + filepath: Union[str, os.PathLike], + key: str = "", + is_domain: bool = False, + domain_id: int = 0, + ) -> None: + """Add an accessory file path to the data sources. Files not added as result files are accessory files, which contain accessory information not present in the result files. Parameters ---------- - filepath : str or os.PathLike object + filepath: Path of the file. - key : str, optional + key: Extension of the file, which is used as a key for choosing the correct - plugin when a result is requested by an operator. The default is ``""``, - in which case the key is found directly. - is_domain: bool, optional - Whether the file path is the domain path. The default is ``False``. - domain_id: int, optional - Domain ID for the distributed files. The default is ``0``. For this - parameter to be taken into account, ``domain_path=True`` must be set. + plugin when a result is requested by an operator. + Overrides the default key detection logic. + is_domain: + Whether the file path is the domain path. + domain_id: + Domain ID for the distributed files. + For this parameter to be taken into account, ``domain_path=True`` must be set. Examples -------- + Add an accessory file to the DataSources object. + >>> from ansys.dpf import core as dpf - >>> data_sources = dpf.DataSources() - >>> data_sources.add_file_path('/tmp/ds.dat') + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result file can be found + >>> my_data_sources.set_result_file_path(filepath='/tmp/file.cas', key='cas') + >>> # Add the additional result file to the DataSources object + >>> my_data_sources.add_file_path(filepath='/tmp/ds.dat', key='dat') """ # The filename needs to be a fully qualified file name @@ -255,8 +358,10 @@ def add_file_path(self, filepath, key="", is_domain: bool = False, domain_id=0): else: self._api.data_sources_add_file_path_with_key_utf8(self, str(filepath), key) - def add_domain_file_path(self, filepath, key, domain_id): - """Add a file path to the data sources. + def add_domain_file_path( + self, filepath: Union[str, os.PathLike], key: str, domain_id: int + ) -> None: + """Add an accessory file path to the data sources in the given domain. Files not added as result files are accessory files, which contain accessory information not present in the result files. @@ -273,9 +378,16 @@ def add_domain_file_path(self, filepath, key, domain_id): Examples -------- + Add an accessory file for a specific domain + >>> from ansys.dpf import core as dpf - >>> data_sources = dpf.DataSources() - >>> data_sources.add_domain_file_path('/tmp/ds.dat', "dat", 1) + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result data can be found and specify its domain + >>> my_data_sources.set_domain_result_file_path(path='/tmp/ds.cas', key='cas', domain_id=1) + >>> # Add the additional result data to the DataSources object and specify its domain + >>> my_data_sources.add_domain_file_path(filepath='/tmp/ds.dat', key="dat", domain_id=1) """ # The filename needs to be a fully qualified file name @@ -287,7 +399,12 @@ def add_domain_file_path(self, filepath, key, domain_id): self, str(filepath), key, domain_id ) - def add_file_path_for_specified_result(self, filepath, key="", result_key=""): + def add_file_path_for_specified_result( + self, + filepath: Union[str, os.PathLike], + key: str = "", + result_key: str = "", + ) -> None: """Add a file path for a specified result file key to the data sources. This method can be used when results files with different keys (extensions) are @@ -296,13 +413,13 @@ def add_file_path_for_specified_result(self, filepath, key="", result_key=""): Parameters ---------- - filepath : str or os.PathLike object + filepath: Path of the file. - key : str, optional + key: Extension of the file, which is used as a key for choosing the correct - plugin when a result is requested by an operator. The default is ``""``, - in which case the key is found directly. - result_key: str, optional + plugin when a result is requested by an operator. + Overrides the default key detection logic. + result_key: Extension of the results file that the specified file path belongs to. The default is ``""``, in which case the key is found directly. """ @@ -316,19 +433,42 @@ def add_file_path_for_specified_result(self, filepath, key="", result_key=""): self, str(filepath), key, result_key ) - def add_upstream(self, upstream_data_sources, result_key=""): - """Add upstream data sources. + def add_upstream(self, upstream_data_sources: DataSources, result_key: str = "") -> None: + """Add upstream data sources to the main DataSources object. - This is used to add a set of path creating an upstream for + This is used to add a set of paths creating an upstream for recursive workflows. Parameters ---------- - upstream_data_sources : DataSources + upstream_data_sources: Set of paths creating an upstream for recursive workflows. + result_key: + Extension of the result file group for the upstream data source. - result_key: str, optional - Extension of the result file group with which this upstream belongs + Examples + -------- + Add upstream data to the main DataSources object of an expansion analysis. + + >>> from ansys.dpf import core as dpf + >>> from ansys.dpf.core import examples + >>> + >>> # Download the result files + >>> paths = examples.download_msup_files_to_dict() + >>> # Create the main DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result data can be found + >>> my_data_sources.set_result_file_path(filepath=paths["rfrq"], key='rfrq') + >>> + >>> # Create the DataSources object for the upstream data + >>> my_data_sources_upstream = dpf.DataSources() + >>> # Define the path where the main upstream data can be found + >>> my_data_sources_upstream.set_result_file_path(filepath=paths["mode"], key='mode') + >>> # Add the additional upstream data to the upstream DataSources object + >>> my_data_sources_upstream.add_file_path(filepath=paths["rst"], key='rst') + >>> + >>> # Add the upstream DataSources to the main DataSources object + >>> my_data_sources.add_upstream(upstream_data_sources=my_data_sources_upstream) """ if result_key == "": @@ -338,45 +478,147 @@ def add_upstream(self, upstream_data_sources, result_key=""): self, upstream_data_sources, result_key ) - def add_upstream_for_domain(self, upstream_data_sources, domain_id): - """Add an upstream data sources for a given domain. + def add_upstream_for_domain(self, upstream_data_sources: DataSources, domain_id: int) -> None: + """Add an upstream data sources to the main DataSources object for a given domain. This is used to add a set of path creating an upstream for recursive workflows in a distributed solve. Parameters ---------- - upstream_data_sources : DataSources + upstream_data_sources: Set of paths creating an upstream for recursive workflows. - - domain_id: int + domain_id: Domain id for distributed files. + Examples + -------- + Add an upstream data to the main DataSources object of an expansion distributed analysis. + + >>> import os + >>> + >>> from ansys.dpf import core as dpf + >>> from ansys.dpf.core import examples + >>> + >>> # Download the result files + >>> paths = examples.find_distributed_msup_folder() + >>> # Create the main DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result file can be found and specify its domain + >>> # We use a format string here because the function used to define the path gives the path to a folder + >>> my_data_sources.set_domain_result_file_path(path=os.path.join(paths, "file_load_1.rfrq"), key='rfrq', domain_id=0) + >>> # Add the additional result file to the DataSources object and specify its domain + >>> my_data_sources.add_domain_file_path(filepath=os.path.join(paths, "file_load_2.rfrq"), key='rfrq', domain_id=1) + >>> + >>> # Create the DataSources object for the first and second upstream files + >>> my_data_sources_upstream_g0 = dpf.DataSources() + >>> my_data_sources_upstream_g1 = dpf.DataSources() + >>> # Define the path where the main upstream files can be found + >>> my_data_sources_upstream_g0.set_result_file_path(filepath=os.path.join(paths, "file0.mode"), key='mode') + >>> my_data_sources_upstream_g1.set_result_file_path(filepath=os.path.join(paths, "file1.mode"), key='mode') + >>> # Add the additional upstream files to the upstream DataSources objectS + >>> my_data_sources_upstream_g0.add_file_path(filepath=os.path.join(paths, "file0.rst"), key='rst') + >>> my_data_sources_upstream_g1.add_file_path(filepath=os.path.join(paths, "file1.rst"), key='rst') + >>> + >>> # Add the upstream DataSources to the main DataSources object and specify its domain + >>> my_data_sources.add_upstream_for_domain(upstream_data_sources=my_data_sources_upstream_g0, domain_id=0) + >>> my_data_sources.add_upstream_for_domain(upstream_data_sources=my_data_sources_upstream_g1, domain_id=1) + """ self._api.data_sources_add_upstream_domain_data_sources( self, upstream_data_sources, domain_id ) @property - def result_key(self): + def result_key(self) -> str: """Result key used by the data sources. Returns ------- - str + str: Result key. + Examples + -------- + >>> from ansys.dpf import core as dpf + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result file can be found + >>> my_data_sources.set_result_file_path(filepath='/tmp/file.rst', key='rst') + >>> # Get the to the main result file + >>> my_data_sources.result_key + 'rst' + """ return self._api.data_sources_get_result_key(self) @property - def result_files(self): + def result_files(self) -> list[str]: """List of result files contained in the data sources. Returns ------- - list - List of result files. + list: + List of result file paths. + + Examples + -------- + Get the path to the result file set using + :func:`set_result_file_path() `. + + >>> from ansys.dpf import core as dpf + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result file can be found + >>> my_data_sources.set_result_file_path(filepath='/tmp/file.cas', key='cas') + >>> # Add the additional result file to the DataSources object + >>> my_data_sources.add_file_path(filepath='/tmp/ds.dat', key='dat') + >>> # Get the path to the main result file + >>> my_data_sources.result_files + ['...tmp...file.cas'] + + If you added an upstream result file, it is not listed in the main ``DataSources`` object. You have to + check directly in the ``DataSources`` object created to define the upstream data. + + >>> from ansys.dpf import core as dpf + >>> + >>> # Create the main DataSources object with a main file path + >>> my_data_sources = dpf.DataSources(result_path='/tmp/file.rfrq') + >>> + >>> # Create the DataSources object for the upstream data + >>> my_data_sources_upstream = dpf.DataSources(result_path='/tmp/file.mode') + >>> # Add the additional upstream data to the upstream DataSources object + >>> my_data_sources_upstream.add_file_path(filepath='/tmp/file.rst', key='rst') + >>> + >>> # Add the upstream DataSources to the main DataSources object + >>> my_data_sources.add_upstream(upstream_data_sources=my_data_sources_upstream) + >>> + >>> # Get the path to the main result file of the main DataSources object + >>> my_data_sources.result_files + ['...tmp...file.rfrq'] + + If you are checking the DataSources object created to define the upstream data, only the first one is listed. + + >>> # Get the path to the upstream file of the upstream DataSources object + >>> my_data_sources_upstream.result_files + ['...tmp...file.mode'] + + If you have a ``DataSources`` object with more than one domain, an empty list is returned. + + >>> from ansys.dpf import core as dpf + >>> + >>> # Create the DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result data can be found and specify its domain + >>> my_data_sources.set_domain_result_file_path(path='/tmp/file0.rst', key='rst', domain_id=0) + >>> my_data_sources.set_domain_result_file_path(path='/tmp/file1.rst', key='rst', domain_id=1) + >>> + >>> # Get the path to the main result files of the DataSources object + >>> my_data_sources.result_files + [None, None] + """ result_key = self.result_key if result_key == "": @@ -395,14 +637,46 @@ def result_files(self): @version_requires("7.0") def register_namespace(self, result_key: str, namespace: str): - """Add a link from this ``result_key`` to this ``namespace`` in the DataSources. + """Associate a ``result_key`` to a ``namespace`` for this `DataSources`` instance. + + The ``result_key`` to ``namespace`` mapping of a ``DataSources`` instance is used by + source operators to redirect a specific implementation of the operator. + + Most public source operators in the documentation are solver-independant interfaces. + Plugins bring solver-specific implementations of these operators and record them using a + combination of the namespace, the file extension, and the operator name: + ``namespace::key::operator_name``. - This ``result_key`` to ``namespace`` mapping is used by source operators - to find internal operators to call. + For example, if the namespace associated to the file extension 'rst' is 'mapdl' + (which is the case in the default mapping), the 'displacement' source operator tries calling + operator ``mapdl::rst::displacement``. + + This function is useful when creating custom operators or plugins for files with extensions + unknown to the DPF framework, or to override the default extension to namespace association. + + Parameters + ---------- + result_key: + Extension of the file, which is used as a key for choosing the correct + plugin when a result is requested by an operator. + namespace: + Namespace to associate the file extension to. Notes ----- - Available with server's version starting at 7.0. + Available with server version starting at 7.0. + + Examples + -------- + >>> from ansys.dpf import core as dpf + >>> + >>> # Create the main DataSources object + >>> my_data_sources = dpf.DataSources() + >>> # Define the path where the main result data can be found + >>> my_data_sources.set_result_file_path(filepath=r'file.extension', key='extension') + >>> # Define the namespace for the results in the given path + >>> my_data_sources.register_namespace(result_key='extension', namespace='namespace') + """ self._api.data_sources_register_namespace(self, result_key, namespace)