diff --git a/gnssanalysis/filenames.py b/gnssanalysis/filenames.py index 972ebcb..58a7e7d 100644 --- a/gnssanalysis/filenames.py +++ b/gnssanalysis/filenames.py @@ -7,7 +7,7 @@ # The collections.abc (rather than typing) versions don't support subscripting until 3.9 # from collections import Iterable -from typing import Iterable, Mapping, Any, Dict, Optional, Tuple, Union, overload +from typing import Iterable, Mapping, Any, Optional, Union, overload import warnings import click @@ -99,8 +99,8 @@ @click.option("--verbose", is_flag=True) def determine_file_name_main( files: Iterable[pathlib.Path], - defaults: Iterable[Tuple[str, str]], - overrides: Iterable[Tuple[str, str]], + defaults: Iterable[tuple[str, str]], + overrides: Iterable[tuple[str, str]], current_name: bool, delimiter: str, verbose: bool, @@ -165,8 +165,8 @@ def determine_file_name( defined as a parameter to maintain syntactic simplicity when calling. :param pathlib.Path file_path: Path to the file for which to determine name - :param Dict[str, Any] defaults: Default name properties to use when properties can't be determined - :param Dict[str, Any] overrides: Name properties that should override anything detected in the file + :param dict[str, Any] defaults: Default name properties to use when properties can't be determined + :param dict[str, Any] overrides: Name properties that should override anything detected in the file :raises NotImplementedError: For files that we should support but currently don't (bia, iox, obx, sum, tro) :return str: Proposed IGS long filename """ @@ -182,7 +182,7 @@ def determine_properties_from_contents_and_filename( file_path: pathlib.Path, defaults: Optional[Mapping[str, Any]] = None, overrides: Optional[Mapping[str, Any]] = None, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Determine the properties of a file based on its contents The function reads both the existing filename of the provided file as well as @@ -204,8 +204,8 @@ def determine_properties_from_contents_and_filename( - project: str :param pathlib.Path file_path: Path to the file for which to determine properties - :param Dict[str, Any] defaults: Default name properties to use when properties can't be determined - :param Dict[str, Any] overrides: Name properties that should override anything detected in the file + :param dict[str, Any] defaults: Default name properties to use when properties can't be determined + :param dict[str, Any] overrides: Name properties that should override anything detected in the file :raises NotImplementedError: For files that we should support but currently don't (bia, iox, obx, sum, tro) :return str: Dictionary of file properties """ @@ -459,7 +459,7 @@ def convert_nominal_span(nominal_span: str) -> datetime.timedelta: return datetime.timedelta() -def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]: +def determine_clk_name_props(file_path: pathlib.Path) -> dict[str, Any]: """Determine the IGS filename properties for a CLK files Like all functions in this series, the function reads both a filename and the files contents @@ -467,7 +467,7 @@ def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]: function returns a dictionary with any properties it manages to successfully determine. :param pathlib.Path file_path: file for which to determine name properties - :return Dict[str, Any]: dictionary containing the extracted name properties + :return dict[str, Any]: dictionary containing the extracted name properties """ name_props = {} try: @@ -519,7 +519,7 @@ def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]: return name_props -def determine_erp_name_props(file_path: pathlib.Path) -> Dict[str, Any]: +def determine_erp_name_props(file_path: pathlib.Path) -> dict[str, Any]: """Determine the IGS filename properties for a ERP files Like all functions in this series, the function reads both a filename and the files contents @@ -527,7 +527,7 @@ def determine_erp_name_props(file_path: pathlib.Path) -> Dict[str, Any]: function returns a dictionary with any properties it manages to successfully determine. :param pathlib.Path file_path: file for which to determine name properties - :return Dict[str, Any]: dictionary containing the extracted name properties + :return dict[str, Any]: dictionary containing the extracted name properties """ name_props = {} try: @@ -574,7 +574,7 @@ def determine_erp_name_props(file_path: pathlib.Path) -> Dict[str, Any]: return name_props -def determine_snx_name_props(file_path: pathlib.Path) -> Dict[str, Any]: +def determine_snx_name_props(file_path: pathlib.Path) -> dict[str, Any]: """Determine the IGS filename properties for a SINEX files Like all functions in this series, the function reads both a filename and the files contents @@ -582,7 +582,7 @@ def determine_snx_name_props(file_path: pathlib.Path) -> Dict[str, Any]: function returns a dictionary with any properties it manages to successfully determine. :param pathlib.Path file_path: file for which to determine name properties - :return Dict[str, Any]: dictionary containing the extracted name properties + :return dict[str, Any]: dictionary containing the extracted name properties """ name_props = {} try: @@ -668,7 +668,7 @@ def determine_snx_name_props(file_path: pathlib.Path) -> Dict[str, Any]: def determine_sp3_name_props( file_path: pathlib.Path, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN -) -> Dict[str, Any]: +) -> dict[str, Any]: """Determine the IGS filename properties for a SP3 files Like all functions in this series, the function reads both a filename and the files contents @@ -678,7 +678,7 @@ def determine_sp3_name_props( :param pathlib.Path file_path: file for which to determine name properties :param type[StrictMode] strict_mode: indicates whether to raise, warn, or silently continue on errors such as failure to get properties from a filename. - :return Dict[str, Any]: dictionary containing the extracted name properties. May be empty on some errors, if + :return dict[str, Any]: dictionary containing the extracted name properties. May be empty on some errors, if strict_mode is not set to RAISE. :raises ValueError: if strict_mode set to RAISE, and unable to statically extract properties from a filename """ diff --git a/gnssanalysis/gn_aux.py b/gnssanalysis/gn_aux.py index 881327a..72cffbb 100644 --- a/gnssanalysis/gn_aux.py +++ b/gnssanalysis/gn_aux.py @@ -1,7 +1,7 @@ """Auxiliary functions""" import logging as _logging -from typing import overload, Tuple, Union +from typing import overload, Union import numpy as _np import pandas as _pd @@ -183,7 +183,7 @@ def rms( def get_std_bounds( a: _np.ndarray, - axis: Union[None, int, Tuple[int, ...]] = None, + axis: Union[None, int, tuple[int, ...]] = None, sigma_coeff: int = 3, ): """ @@ -287,7 +287,7 @@ def degminsec2deg(a: Union[_pd.Series, _pd.DataFrame, list, str]) -> Union[_pd.S assert isinstance(a_series, _pd.Series) return _series_str_degminsec2deg(a_series).unstack() else: - raise TypeError("Unsupported input type. Please use either of _pd.Series, _pd.DataFrame, List or str") + raise TypeError("Unsupported input type. Please use either of _pd.Series, _pd.DataFrame, list or str") def _deg2degminsec(a: _np.ndarray) -> _np.ndarray: diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index a1a639c..fca407c 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -28,7 +28,7 @@ import ftplib as _ftplib from ftplib import FTP_TLS as _FTP_TLS from pathlib import Path as _Path -from typing import Any, Generator, List, Literal, Optional, Tuple, Union +from typing import Any, Generator, Literal, Optional, Union from urllib import request as _request from urllib.error import HTTPError as _HTTPError @@ -391,7 +391,7 @@ def generate_product_filename( version: str = "0", project: str = "OPS", content_type: str = None, -) -> Tuple[str, GPSDate, _datetime.datetime]: +) -> tuple[str, GPSDate, _datetime.datetime]: """Given a reference datetime and extention of file, generate the IGS filename and GPSDate obj for use in download :param _datetime.datetime reference_start: Datetime of the start period of interest @@ -405,7 +405,7 @@ def generate_product_filename( :param str version: Version of the file, defaults to "0" :param str project: IGS project descriptor, defaults to "OPS" :param str content_type: IGS content specifier - if None set automatically based on file_ext, defaults to None - :return _Tuple[str, GPSDate, _datetime.datetime]: Tuple of filename str, GPSDate and datetime obj (based on shift) + :return tuple[str, GPSDate, _datetime.datetime]: Tuple of filename str, GPSDate and datetime obj (based on shift) """ reference_start += _datetime.timedelta(hours=shift) if type(reference_start == _datetime.date): @@ -822,7 +822,7 @@ def download_product_from_cddis( project_type: str = "OPS", timespan: _datetime.timedelta = _datetime.timedelta(days=2), if_file_present: str = "prompt_user", -) -> List[_Path]: +) -> list[_Path]: """Download the file/s from CDDIS based on start and end epoch, to the download directory (download_dir) :param _Path download_dir: Where to download files (local directory) @@ -838,7 +838,7 @@ def download_product_from_cddis( :param _datetime.timedelta timespan: Timespan of the file/s to download, defaults to _datetime.timedelta(days=2) :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" :raises FileNotFoundError: Raise error if the specified file cannot be found on CDDIS - :return List[_Path]: Return list of paths of downloaded files + :return list[_Path]: Return list of paths of downloaded files """ # Download the correct IGS FIN ERP files if file_ext == "ERP" and analysis_center == "IGS" and solution_type == "FIN": # get the correct start_epoch @@ -1193,12 +1193,12 @@ def download_satellite_metadata_snx(download_dir: _Path, if_file_present: str = return download_filepath -def download_yaw_files(download_dir: _Path, if_file_present: str = "prompt_user") -> List[_Path]: +def download_yaw_files(download_dir: _Path, if_file_present: str = "prompt_user") -> list[_Path]: """Download yaw rate / bias files needed to for Ginan's PEA :param _Path download_dir: Where to download files (local directory) :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" - :return List[_Path]: Return list paths of downloaded files + :return list[_Path]: Return list paths of downloaded files """ ensure_folders([download_dir]) download_filepaths = [] diff --git a/gnssanalysis/gn_io/erp.py b/gnssanalysis/gn_io/erp.py index 2bc008b..3ead48b 100644 --- a/gnssanalysis/gn_io/erp.py +++ b/gnssanalysis/gn_io/erp.py @@ -7,7 +7,7 @@ # from collections.abc import Callable, Iterable from typing import Callable, Iterable from io import BytesIO as _BytesIO -from typing import List, TextIO, Union +from typing import TextIO, Union from urllib import request as _rqs import numpy as _np @@ -17,11 +17,11 @@ from gnssanalysis import gn_io as _gn_io -def normalise_headers(headers: Iterable[str]) -> List[str]: +def normalise_headers(headers: Iterable[str]) -> list[str]: """Apply :func: `gn_io.erp.normalise_headers` to all headers in an iterable :param Iterable[str] headers: Iterable of header strings obtained from an ERP file - :return List[str]: List of normalised headers as per :func: `gn_io.erp.normalise_headers` + :return list[str]: List of normalised headers as per :func: `gn_io.erp.normalise_headers` """ return [normalise_header(h) for h in headers] @@ -44,7 +44,7 @@ def normalise_header(header: str) -> str: return get_canonical_header(header) -def merge_hyphen_headers(raw_split_header: Iterable[str]) -> List[str]: +def merge_hyphen_headers(raw_split_header: Iterable[str]) -> list[str]: """Take a list of raw headers from an ERP file and merge hyphenated headers that got split In some ERP files hyphenated headers, such as UTC-TAI, occasionally have spaces before or after @@ -52,7 +52,7 @@ def merge_hyphen_headers(raw_split_header: Iterable[str]) -> List[str]: This function re-merges those header components. :param Iterable[str] raw_split_header: ERP header line that has been split/tokenized - :return List[str]: List of ERP headers with hyphen-separated headers merged + :return list[str]: List of ERP headers with hyphen-separated headers merged """ # Copy to avoid mutating input list headers = list(raw_split_header) diff --git a/gnssanalysis/gn_io/igslog.py b/gnssanalysis/gn_io/igslog.py index 0b13916..d6fc02a 100644 --- a/gnssanalysis/gn_io/igslog.py +++ b/gnssanalysis/gn_io/igslog.py @@ -4,7 +4,7 @@ import glob as _glob import re as _re from multiprocessing import Pool as _Pool -from typing import Union, List, Tuple +from typing import Union import numpy as _np import pandas as _pd @@ -169,7 +169,7 @@ def determine_log_version(data: bytes) -> str: def extract_id_block( data: bytes, file_path: str, file_code: str, version: Union[str, None] = None -) -> Union[List[str], _np.ndarray]: +) -> Union[list[str], _np.ndarray]: """Extract the site ID block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode @@ -229,12 +229,12 @@ def extract_location_block(data: bytes, file_path: str, version: Union[str, None return location_block -def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]: +def extract_receiver_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]: """Extract the location block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode :param str file_path: The path to the file from which the "data" bytes object was obtained - :return List[Tuple[bytes]] or _np.ndarray: The receiver block of the data. Each list element specifies an receiver. + :return list[tuple[bytes]] or _np.ndarray: The receiver block of the data. Each list element specifies an receiver. If regex doesn't match, an empty numpy NDArray is returned instead. """ receiver_block = _REGEX_REC.findall(data) @@ -244,12 +244,12 @@ def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[byte return receiver_block -def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]: +def extract_antenna_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]: """Extract the antenna block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode :param str file_path: The path to the file from which the "data" bytes object was obtained - :return List[Tuple[bytes]] or _np.ndarray: The antenna block of the data. Each list element specifies an antenna. + :return list[tuple[bytes]] or _np.ndarray: The antenna block of the data. Each list element specifies an antenna. If regex doesn't match, an empty numpy NDArray is returned instead. """ antenna_block = _REGEX_ANT.findall(data) @@ -397,13 +397,13 @@ def translate_series(series: _pd.Series, translation: dict) -> _pd.Series: def gather_metadata( logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path: str = None, num_threads: int = 1 -) -> List[_pd.DataFrame]: +) -> list[_pd.DataFrame]: """Parses log files found with glob expressions into pd.DataFrames :param str logs_glob_path: A glob expression for log files, defaults to "/data/station_logs_IGS/*/*.log" :param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None :param int num_threads: Number of threads to run, defaults to 1 - :return List[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data + :return list[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data """ parsed_filenames = find_recent_logs(logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path).values diff --git a/gnssanalysis/gn_io/sinex.py b/gnssanalysis/gn_io/sinex.py index 69663e2..be48fee 100644 --- a/gnssanalysis/gn_io/sinex.py +++ b/gnssanalysis/gn_io/sinex.py @@ -1,4 +1,4 @@ -"""IO functions for various formats used: trace, sinex etc """ +"""IO functions for various formats used: trace, sinex etc""" import logging as _logging import math as _math @@ -6,10 +6,8 @@ import re as _re import zlib as _zlib from io import BytesIO as _BytesIO -from typing import Any as _Any, Union -from typing import Dict as _Dict +from typing import Any as _Any from typing import Iterable as _Iterable -from typing import List as _List from typing import Union as _Union import numpy as _np @@ -61,7 +59,7 @@ def file_desc(pb) -> str: # This is in tension with the existing above function but is what was used by # the filenames functionality and so is ported here for now. -def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> _Dict[str, _Any]: +def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> dict[str, _Any]: """Extract the data contained in the header of a sinex file The extracted data is returned in a dictionary containing: @@ -75,7 +73,7 @@ def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> _Dict[str, _ - "contents": list[str] :param _Union[str, bytes, _os.PathLike] file_path: sinex file from which to read header - :return _Dict[str, _Any]: dictionary containing the properties extracted from the header + :return dict[str, _Any]: dictionary containing the properties extracted from the header """ with open(file_path, mode="r", encoding="utf-8") as f: header_line = f.readline() @@ -107,20 +105,20 @@ def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> _Dict[str, _ return {} -def get_available_blocks(file_path: _Union[str, bytes, _os.PathLike]) -> _List[str]: +def get_available_blocks(file_path: _Union[str, bytes, _os.PathLike]) -> list[str]: """Return the blocks available within a sinex file :param _Union[str, bytes, _os.PathLike] file_path: sinex file to read for blocks - :return _List[str]: list of names of blocks available in sinex file + :return list[str]: list of names of blocks available in sinex file """ with open(file_path, "r", encoding="utf-8") as f: return [line[1:-1].strip() for line in f if line.startswith("+")] -def includes_noncrd_block(block_labels: _List[str]) -> bool: +def includes_noncrd_block(block_labels: list[str]) -> bool: """Check whether list of block names includes at least one non-coordinate block - :param _List[str] block_labels: list of block names to check + :param list[str] block_labels: list of block names to check :return bool: whether any block names correspond to non-coordinate blocks """ return any(is_noncrd_block(b) for b in block_labels) @@ -159,11 +157,11 @@ def all_notnan(iterable: _Iterable) -> bool: # TODO: Generalise to handle a path or bytes object? -def read_sinex_comment_block(filename: _Union[str, bytes, _os.PathLike]) -> _List[str]: +def read_sinex_comment_block(filename: _Union[str, bytes, _os.PathLike]) -> list[str]: """Extract comments from a provided sinex file :param Union[str, bytes, os.PathLike] filename: path to sinex file - :return List[str]: list containing all lines in sinex comment block + :return list[str]: list containing all lines in sinex comment block """ with open(filename, "r", encoding="utf-8") as f: # Find start of "+FILE/COMMENT" @@ -179,7 +177,7 @@ def read_sinex_comment_block(filename: _Union[str, bytes, _os.PathLike]) -> _Lis return comment_lines -def extract_mincon_from_comments(comment_block: _Iterable[str]) -> _Dict[str, _Any]: +def extract_mincon_from_comments(comment_block: _Iterable[str]) -> dict[str, _Any]: """Extract PEA-style minimum constraints data from sinex comments PEA can place information about the minimum constraint solution applied into a sinex @@ -198,7 +196,7 @@ def extract_mincon_from_comments(comment_block: _Iterable[str]) -> _Dict[str, _A The entries will only be included if complete data is extracted for them. :param _Iterable[str] comment_block: iterable containing comment lines to parse - :return _Dict[str, _Any]: dictionary containing extracted minimum constraints information + :return dict[str, _Any]: dictionary containing extracted minimum constraints information """ # Initialise the places where we'll store our output data unused = [] @@ -269,7 +267,7 @@ def extract_mincon_from_comments(comment_block: _Iterable[str]) -> _Dict[str, _A transform["rotation_uncertainty"] = rotation_uncertainty # Set up return data dictionary - mincon_dict: _Dict[str, _Any] = {"used": used, "unused": unused} + mincon_dict: dict[str, _Any] = {"used": used, "unused": unused} if len(transform) != 0: mincon_dict["transform"] = transform @@ -277,7 +275,7 @@ def extract_mincon_from_comments(comment_block: _Iterable[str]) -> _Dict[str, _A # TODO: Generalise to handle a path or bytes object? -def read_sinex_mincon(filename: _Union[str, bytes, _os.PathLike]) -> _Dict[str, _Any]: +def read_sinex_mincon(filename: _Union[str, bytes, _os.PathLike]) -> dict[str, _Any]: """Extract PEA-style minimum constraints data from sinex file PEA can place information about the minimum constraint solution applied into a sinex @@ -296,7 +294,7 @@ def read_sinex_mincon(filename: _Union[str, bytes, _os.PathLike]) -> _Dict[str, The entries will only be included if complete data is extracted for them. :param _Union[str, bytes, _os.PathLike] filename: sinex file from which to read minimum constraints data - :return _Dict[str, _Any]: dictionary containing extracted minimum constraints information + :return dict[str, _Any]: dictionary containing extracted minimum constraints information """ return extract_mincon_from_comments(read_sinex_comment_block(filename)) @@ -323,7 +321,7 @@ def snx_soln_int_to_str(soln: _pd.Series, nan_as_dash=True) -> _pd.Series: return soln_str -def _get_valid_stypes(stypes: Union[list[str], set[str]]) -> _List[str]: +def _get_valid_stypes(stypes: _Union[list[str], set[str]]) -> list[str]: """Returns only stypes in allowed list Fastest if stypes size is small""" allowed_stypes = ["EST", "APR", "NEQ"] @@ -515,13 +513,13 @@ def snxdf2xyzdf(snx_df: _pd.DataFrame, unstack: bool = True, keep_all_soln: _Uni def _get_snx_vector( path_or_bytes: _Union[str, bytes], - stypes: Union[set[str], list[str]] = set(["EST", "APR"]), + stypes: _Union[set[str], list[str]] = set(["EST", "APR"]), format: str = "long", keep_all_soln: _Union[bool, None] = None, verbose: bool = True, recenter_epochs: bool = False, snx_header: dict = {}, -) -> Union[_pd.DataFrame, None]: +) -> _Union[_pd.DataFrame, None]: """Main function of reading vector data from sinex file. Doesn't support sinex files from EMR AC as APRIORI and ESTIMATE indices are not in sync (APRIORI params might not even exist in he ESTIMATE block). While will parse the file, the alignment of EST and APR values might be wrong. No easy solution was found for the issue thus unsupported for now. TODO parse header and add a warning if EMR agency Args: diff --git a/gnssanalysis/gn_io/sp3.py b/gnssanalysis/gn_io/sp3.py index 6487435..88b8ac1 100644 --- a/gnssanalysis/gn_io/sp3.py +++ b/gnssanalysis/gn_io/sp3.py @@ -3,7 +3,7 @@ import io as _io import os as _os import re as _re -from typing import Callable, Literal, Mapping, Optional, Union, List, Tuple, overload +from typing import Callable, Literal, Mapping, Optional, Union, overload from pathlib import Path import warnings @@ -492,13 +492,13 @@ def filter_by_svs( return sp3_df -def mapparm(old: Tuple[float, float], new: Tuple[float, float]) -> Tuple[float, float]: +def mapparm(old: tuple[float, float], new: tuple[float, float]) -> tuple[float, float]: """ Evaluate the offset and scale factor needed to map values from the old range to the new range. - :param Tuple[float, float] old: The range of values to be mapped from. - :param Tuple[float, float] new: The range of values to be mapped to. - :return Tuple[float, float]: The offset and scale factor for the mapping. + :param tuple[float, float] old: The range of values to be mapped from. + :param tuple[float, float] new: The range of values to be mapped to. + :return tuple[float, float]: The offset and scale factor for the mapping. """ old_range = old[1] - old[0] new_range = new[1] - new[0] @@ -606,8 +606,8 @@ def _check_column_alignment_of_sp3_block( def _process_sp3_block( date: str, data: str, - widths: List[int] = _SP3_DEF_PV_WIDTH, - names: List[str] = _SP3_DEF_PV_NAME, + widths: list[int] = _SP3_DEF_PV_WIDTH, + names: list[str] = _SP3_DEF_PV_NAME, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN, ignore_short_data_lines: bool = True, ) -> _pd.DataFrame: @@ -619,8 +619,8 @@ def _process_sp3_block( :param str date: The date of the SP3 data block. :param str data: The SP3 data block. - :param List[int] widths: The widths of the columns in the SP3 data block. - :param List[str] names: The names of the columns in the SP3 data block. + :param list[int] widths: The widths of the columns in the SP3 data block. + :param list[str] names: The names of the columns in the SP3 data block. :param type[StrictMode] strict_mode: (default: WARN) level of strictness with which to check for SP3d format compliance. StrictModes.STRICT_RAISE will raise an exception if a format issue is detected (except if ignore_short_data_lines is enabled). Set to StrictModes.STRICT_OFF to neither warn nor raise. @@ -1009,7 +1009,7 @@ def read_sp3( if strict_mode != StrictModes.STRICT_OFF: # Check no (non-comment) line is overlong (>80 chars not counting \n) - sp3_lines: List[str] = content.decode("utf-8", errors="ignore").split("\n") + sp3_lines: list[str] = content.decode("utf-8", errors="ignore").split("\n") overlong_lines_found: int = 0 for line in sp3_lines: if len(line) > _SP3_MAX_WIDTH: @@ -1204,12 +1204,12 @@ def _reformat_df(sp3_df: _pd.DataFrame) -> _pd.DataFrame: return sp3_df -def _split_sp3_content(content: bytes) -> Tuple[List[str], _np.ndarray]: +def _split_sp3_content(content: bytes) -> tuple[list[str], _np.ndarray]: """ Split the content of an SP3 file into date lines and data blocks. :param bytes content: The content of the SP3 file. - :return Tuple[List[str], _np.ndarray]: The date lines and data blocks. + :return tuple[list[str], _np.ndarray]: The date lines and data blocks. """ pattern = _re.compile(r"^\*(.+)$", _re.MULTILINE) blocks = pattern.split(content[: content.rfind(b"EOF")].decode()) @@ -1841,10 +1841,10 @@ def write_sp3(sp3_df: _pd.DataFrame, path: str) -> None: file.write(content) -def merge_attrs(df_list: List[_pd.DataFrame]) -> _pd.Series: +def merge_attrs(df_list: list[_pd.DataFrame]) -> _pd.Series: """Merges attributes of a list of sp3 dataframes into a single set of attributes. - :param List[pd.DataFrame] df_list: The list of sp3 dataframes. + :param list[pd.DataFrame] df_list: The list of sp3 dataframes. :return _pd.Series: The merged attributes. """ df = _pd.concat(list(map(lambda obj: obj.attrs["HEADER"], df_list)), axis=1) @@ -1888,15 +1888,15 @@ def merge_attrs(df_list: List[_pd.DataFrame]) -> _pd.Series: def sp3merge( - sp3paths: List[str], - clkpaths: Union[List[str], None] = None, + sp3paths: list[str], + clkpaths: Union[list[str], None] = None, nodata_to_nan: bool = False, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN, ) -> _pd.DataFrame: """Reads in a list of sp3 files and optional list of clk files and merges them into a single sp3 file. - :param List[str] sp3paths: The list of paths to the sp3 files. - :param Union[List[str], None] clkpaths: The list of paths to the clk files, or None if no clk files are provided. + :param list[str] sp3paths: The list of paths to the sp3 files. + :param Union[list[str], None] clkpaths: The list of paths to the clk files, or None if no clk files are provided. :param bool nodata_to_nan: Flag indicating whether to convert nodata values to NaN. :param type[StrictMode] strict_mode: (default: WARN) Strictness with which to check the SP3 files read in, for compliance with the SP3 d spec. diff --git a/gnssanalysis/gn_transform.py b/gnssanalysis/gn_transform.py index 6ea465e..922a5f3 100644 --- a/gnssanalysis/gn_transform.py +++ b/gnssanalysis/gn_transform.py @@ -2,7 +2,6 @@ import numpy as _np import pandas as _pd -from typing import Tuple from . import gn_aux as _gn_aux from . import gn_const as _gn_const @@ -12,13 +11,13 @@ def gen_helm_aux( pt1: _np.ndarray, pt2: _np.ndarray, dropna: bool = True, -) -> Tuple[_np.ndarray, _np.ndarray]: +) -> tuple[_np.ndarray, _np.ndarray]: """Aux function for helmert values inversion. :param _np.ndarray pt1: The first set of points. :param _np.ndarray pt2: The second set of points. :param bool dropna: Whether to drop NaN values in input data, defaults to True. - :return Tuple[_np.ndarray, _np.ndarray]: A tuple containing the design matrix and right hand side of the equation for least square estimation. + :return tuple[_np.ndarray, _np.ndarray]: A tuple containing the design matrix and right hand side of the equation for least square estimation. """ if dropna: mask = ~_np.isnan(pt1).any(axis=1) & ~_np.isnan(pt2).any(axis=1) diff --git a/gnssanalysis/gn_utils.py b/gnssanalysis/gn_utils.py index ab2d626..f381a37 100644 --- a/gnssanalysis/gn_utils.py +++ b/gnssanalysis/gn_utils.py @@ -6,7 +6,7 @@ import click as _click -from typing import List, Union +from typing import Union from gnssanalysis.enum_meta_properties import EnumMetaProperties @@ -123,10 +123,10 @@ def configure_logging(verbose: bool, output_logger: bool = False) -> Union[_logg return None -def ensure_folders(paths: List[_pathlib.Path]): +def ensure_folders(paths: list[_pathlib.Path]): """Ensures the folders in the input list exist in the file system - if not, create them - :param List[_pathlib.Path] paths: list of pathlib.Path/s to check + :param list[_pathlib.Path] paths: list of pathlib.Path/s to check """ for path in paths: if not isinstance(path, _pathlib.Path):