diff --git a/.github/workflows/python-cicd-units.yml b/.github/workflows/python-cicd-units.yml index 0551e76..19e81b5 100644 --- a/.github/workflows/python-cicd-units.yml +++ b/.github/workflows/python-cicd-units.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.10", "3.11", "3.12", "3.13"] name: Build and Test on Python ${{ matrix.python-version }} diff --git a/gnssanalysis/filenames.py b/gnssanalysis/filenames.py index 3789229..40593a6 100644 --- a/gnssanalysis/filenames.py +++ b/gnssanalysis/filenames.py @@ -7,7 +7,7 @@ # The collections.abc (rather than typing) versions don't support subscripting until 3.9 # from collections import Iterable -from typing import Iterable, Literal, Mapping, Any, Optional, Union, overload +from typing import Iterable, Literal, Mapping, Any, Optional, overload import warnings import click @@ -262,7 +262,7 @@ def generate_IGS_long_filename( start_epoch: datetime.datetime, *, end_epoch: datetime.datetime, - timespan: Union[datetime.timedelta, str, None] = ..., + timespan: datetime.timedelta | str | None = ..., solution_type: str = ..., sampling_rate: str = ..., sampling_rate_seconds: Optional[int] = ..., @@ -280,7 +280,7 @@ def generate_IGS_long_filename( start_epoch: datetime.datetime, *, end_epoch: None = ..., - timespan: Union[datetime.timedelta, str], + timespan: datetime.timedelta | str, solution_type: str = ..., sampling_rate: str = ..., sampling_rate_seconds: Optional[int] = ..., @@ -297,7 +297,7 @@ def generate_IGS_long_filename( start_epoch: datetime.datetime, *, end_epoch: Optional[datetime.datetime] = None, - timespan: Union[datetime.timedelta, str, None] = None, + timespan: datetime.timedelta | str | None = None, solution_type: str = "", # TTT sampling_rate: str = "15M", # SMP sampling_rate_seconds: Optional[int] = None, # Not used here, but passed for structural consistency @@ -321,7 +321,7 @@ def generate_IGS_long_filename( :param str format_type: File extension :param datetime.datetime start_epoch: datetime representing initial epoch in file :param Optional[datetime.datetime] end_epoch: datetime representing final epoch in file, defaults to None - :param timespan: Union[datetime.timedelta, str, None] timespan: timedelta representing time range of data in file, + :param timespan: datetime.timedelta | str | None timespan: timedelta representing time range of data in file, defaults to None :param str solution_type: Three letter solution type identifier, defaults to "" :param str sampling_rate: Three letter sampling rate string, defaults to "15M" @@ -437,7 +437,7 @@ def nominal_span_string(span_seconds: float) -> str: def convert_nominal_span( nominal_span: str, non_timed_span_output: Literal["none", "timedelta"] = "timedelta", -) -> Union[datetime.timedelta, None]: +) -> datetime.timedelta | None: """Effectively invert :func: `filenames.generate_nominal_span`, turn a span string into a timedelta :param str nominal_span: Three-character span string in IGS format (e.g. 01D, 15M, 01L ?) @@ -729,7 +729,7 @@ def determine_sp3_name_props( # Next, properties from the filename: try: - props_from_existing_name: Union[dict, None] = determine_properties_from_filename( + props_from_existing_name: dict | None = determine_properties_from_filename( file_path.name, strict_mode=strict_mode ) logging.debug(f"props_from_existing_name =\n{str(props_from_existing_name)}") diff --git a/gnssanalysis/gn_aux.py b/gnssanalysis/gn_aux.py index 72cffbb..a8c48fe 100644 --- a/gnssanalysis/gn_aux.py +++ b/gnssanalysis/gn_aux.py @@ -1,7 +1,7 @@ """Auxiliary functions""" import logging as _logging -from typing import overload, Union +from typing import overload import numpy as _np import pandas as _pd @@ -26,7 +26,7 @@ def rad2arcsec(x: _np.ndarray) -> _np.ndarray: return _np.rad2deg(x) * 3600 -def wrap_radians(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]: +def wrap_radians(x: float | _np.ndarray) -> float | _np.ndarray: """Overwrite negative angles in radians with positive coterminal angles :param float or _np.ndarray x: angles in radians @@ -35,7 +35,7 @@ def wrap_radians(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]: return x % (2 * _np.pi) -def wrap_degrees(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]: +def wrap_degrees(x: float | _np.ndarray) -> float | _np.ndarray: """Overwrite negative angles in decimal degrees with positive coterminal angles :param float or _np.ndarray x: angles in decimal degrees @@ -99,7 +99,7 @@ def unique_cols(df: _pd.DataFrame) -> _np.ndarray: return (a[:, 0][:, None] == a).all(1) -def rm_duplicates_df(df: Union[_pd.DataFrame, _pd.Series], rm_nan_level: Union[int, str, None] = None): +def rm_duplicates_df(df: _pd.DataFrame | _pd.Series, rm_nan_level: int | str | None = None): """ Takes in a clk/sp3/other dataframe and removes any duplicate indices. Optionally, removes level_values from the index which contain NaNs @@ -134,7 +134,7 @@ def rm_duplicates_df(df: Union[_pd.DataFrame, _pd.Series], rm_nan_level: Union[i return df -def get_sampling(arr: _np.ndarray) -> Union[int, None]: +def get_sampling(arr: _np.ndarray) -> int | None: """ Simple function to compute sampling of the J2000 array @@ -170,10 +170,10 @@ def array_equal_unordered(a1: _np.ndarray, a2: _np.ndarray) -> bool: def rms( - arr: Union[_pd.DataFrame, _pd.Series], - axis: Union[None, int] = 0, - level: Union[None, int, str] = None, -) -> Union[_pd.Series, _pd.DataFrame]: + arr: _pd.DataFrame | _pd.Series, + axis: None | int = 0, + level: None | int | str = None, +) -> _pd.Series | _pd.DataFrame: """Trivial function to compute root mean square""" if level is not None: return (arr**2).groupby(axis=axis, level=level).mean() ** 0.5 @@ -183,7 +183,7 @@ def rms( def get_std_bounds( a: _np.ndarray, - axis: Union[None, int, tuple[int, ...]] = None, + axis: None | int | tuple[int, ...] = None, sigma_coeff: int = 3, ): """ @@ -210,7 +210,7 @@ def get_std_bounds( return bounds if axis is None else _np.expand_dims(a=bounds, axis=axis) -def df_quick_select(df: _pd.DataFrame, ind_lvl: Union[str, int], ind_keys, as_mask: bool = False) -> _np.ndarray: +def df_quick_select(df: _pd.DataFrame, ind_lvl: str | int, ind_keys, as_mask: bool = False) -> _np.ndarray: """A faster alternative to do index selection over pandas dataframe, if multiple index levels are being used then better generate masks with this function and add them later into a single mask. df.loc(axis=0)[:,:,'IND_KEY',:] is the same as df_quick_select(df, 2, 'IND_KEY'), or, if used as mask: df[df_quick_select(df, 2, 'IND_NAME', as_mask=True)]""" @@ -269,11 +269,11 @@ def degminsec2deg(a: list) -> _pd.Series: ... def degminsec2deg(a: str) -> float: ... -def degminsec2deg(a: Union[_pd.Series, _pd.DataFrame, list, str]) -> Union[_pd.Series, _pd.DataFrame, float]: +def degminsec2deg(a: _pd.Series | _pd.DataFrame | list | str) -> _pd.Series | _pd.DataFrame | float: """Converts degrees/minutes/seconds to decimal degrees. - :param _Union[_pd.Series, _pd.DataFrame, list, str] a: space-delimited string values of degrees/minutes/seconds - :return _Union[_pd.Series, _pd.DataFrame, float]: Series, DataFrame or scalar float decimal degrees, depending on the input + :param __pd.Series | _pd.DataFrame | list | str a: space-delimited string values of degrees/minutes/seconds + :return _pd.Series | _pd.DataFrame | float: Series, DataFrame or scalar float decimal degrees, depending on the input """ if isinstance(a, str): a_single = _np.asarray(a.split(maxsplit=2)).astype(float) @@ -315,7 +315,7 @@ def deg2degminsec(a: list) -> _np.ndarray: ... def deg2degminsec(a: _np.ndarray) -> _np.ndarray: ... -def deg2degminsec(a: Union[_np.ndarray, list, float]) -> Union[_np.ndarray, float]: +def deg2degminsec(a: _np.ndarray | list | float) -> _np.ndarray | float: """Converts decimal degrees to string representation in the form of degrees minutes seconds as in the sinex SITE/ID block. Could be used with multiple columns at once (2D ndarray) @@ -363,7 +363,7 @@ def throw_if_nans(trace_bytes: bytes, nan_to_find=b"-nan", max_reported_nans: in raise ValueError(f"Found nan values (max_nans = {max_reported_nans})\n{nans_bytes.decode()}") -def df_groupby_statistics(df: Union[_pd.Series, _pd.DataFrame], lvl_name: Union[list, str]): +def df_groupby_statistics(df: _pd.Series | _pd.DataFrame, lvl_name: list | str): """Generate AVG/STD/RMS statistics from a dataframe summarizing over levels :param _pd.Series df: an input dataframe or series @@ -404,14 +404,14 @@ def _get_trend(dataset, deg=1): def remove_outliers( dataframe: _pd.DataFrame, - cutoff: Union[int, float, None] = None, - coeff_std: Union[int, float] = 3, + cutoff: int | float | None = None, + coeff_std: int | float = 3, ) -> _pd.DataFrame: """Filters a dataframe with linear data. Runs detrending of the data to normalize to zero and applies absolute cutoff and std-based filtering :param _pd.DataFrame dataframe: a dataframe to filter the columns - :param _Union[int, float, None] cutoff: an absolute cutoff value to apply over detrended data, defaults to None - :param _Union[int, float] coeff_std: STD coefficient, defaults to 3 + :param _int | float | None cutoff: an absolute cutoff value to apply over detrended data, defaults to None + :param _int | float coeff_std: STD coefficient, defaults to 3 :return _pd.DataFrame: a filtered dataframe """ detrend = dataframe - _get_trend(dataframe) diff --git a/gnssanalysis/gn_datetime.py b/gnssanalysis/gn_datetime.py index 55bd762..e81f891 100644 --- a/gnssanalysis/gn_datetime.py +++ b/gnssanalysis/gn_datetime.py @@ -6,7 +6,7 @@ from datetime import date as _date from datetime import timedelta as _timedelta from io import StringIO as _StringIO -from typing import Optional, overload, Union +from typing import Optional, overload import numpy as _np import pandas as _pd @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -def derive_gps_week(year: Union[int, str], day_of_year: Union[int, str], weekday_suffix: bool = False) -> str: +def derive_gps_week(year: int | str, day_of_year: int | str, weekday_suffix: bool = False) -> str: """ Convert year, day-of-year to GPS week format: WWWWD or WWWW Based on code from Kristine Larson's gps.py @@ -78,7 +78,7 @@ class GPSDate: # For compatibility, we have accessors called 'ts' and 'timestamp'. _internal_dt64: _np.datetime64 - def __init__(self, time: Union[_np.datetime64, _datetime, _date, str]): + def __init__(self, time: _np.datetime64 | _datetime | _date | str): if isinstance(time, _np.datetime64): self._internal_dt64 = time elif isinstance(time, (_datetime, _date, str)): @@ -172,7 +172,7 @@ def datetime_to_gps_week(dt: _datetime, wkday_suff: bool = False) -> str: return derive_gps_week(yr, doy, weekday_suffix=wkday_suff) -def dt2gpswk(dt: _datetime, wkday_suff: bool = False, both: bool = False) -> Union[str, tuple[str, str]]: +def dt2gpswk(dt: _datetime, wkday_suff: bool = False, both: bool = False) -> str | tuple[str, str]: """ TODO DEPRECATED. Please use datetime_to_gps_week() """ @@ -222,7 +222,7 @@ def gpswkD2dt(gpswkD: str) -> _datetime: def yydoysec2datetime( - arr: Union[_np.ndarray, _pd.Series, list], recenter: bool = False, as_j2000: bool = True, delimiter: str = ":" + arr: _np.ndarray | _pd.Series | list, recenter: bool = False, as_j2000: bool = True, delimiter: str = ":" ) -> _np.ndarray: """Converts snx YY:DOY:SSSSS [snx] or YYYY:DOY:SSSSS [bsx/bia] object Series/ndarray to datetime64. recenter overrides day seconds value to midday @@ -241,7 +241,7 @@ def yydoysec2datetime( return datetime2j2000(datetime64) if as_j2000 else datetime64 -def datetime2yydoysec(datetime: Union[_np.ndarray, _pd.Series]) -> _np.ndarray: +def datetime2yydoysec(datetime: _np.ndarray | _pd.Series) -> _np.ndarray: """datetime64[s] -> yydoysecond The '2000-01-01T00:00:00' (-43200 J2000 for 00:000:00000) datetime becomes 00:000:00000 as it should, No masking and overriding with year 2100 is needed""" @@ -270,7 +270,7 @@ def gpsweeksec2datetime(gps_week: _np.ndarray, tow: _np.ndarray, as_j2000: bool return datetime -def datetime2gpsweeksec(array: _np.ndarray, as_decimal=False) -> Union[tuple, _np.ndarray]: +def datetime2gpsweeksec(array: _np.ndarray, as_decimal=False) -> tuple | _np.ndarray: if array.dtype == int: ORIGIN = _gn_const.J2000_ORIGIN.astype("int64") - _gn_const.GPS_ORIGIN.astype("int64") gps_time = array + ORIGIN # need int conversion for the case of datetime64 @@ -523,10 +523,10 @@ def round_timedelta(delta, roundto, *, tol=0.5, abs_tol=None): :delta:, :roundto:, and :abs_tol: (if used) must all have the same type. - :param Union[datetime.timedelta, numpy.timedelta64] delta: timedelta to round - :param Union[datetime.timedelta, numpy.timedelta64] roundto: "measuring stick", :delta: is rounded to integer multiples of this value + :param datetime.timedelta | numpy.timedelta64 delta: timedelta to round + :param datetime.timedelta | numpy.timedelta64 roundto: "measuring stick", :delta: is rounded to integer multiples of this value :param float tol: relative tolerance to use for the measure of "near" - :param Union[datetime.timedelta, numpy.timedelta64] abs_tol: absolute tolerance to use for the measure of "near" + :param datetime.timedelta | numpy.timedelta64 abs_tol: absolute tolerance to use for the measure of "near" """ # TODO: Test this with numpy timedeltas, it was written for datetime.timedelta but should work if abs_tol is not None: diff --git a/gnssanalysis/gn_diffaux.py b/gnssanalysis/gn_diffaux.py index 3f29eea..4e2eb5f 100644 --- a/gnssanalysis/gn_diffaux.py +++ b/gnssanalysis/gn_diffaux.py @@ -1,6 +1,6 @@ import logging as _logging from pathlib import Path as _Path -from typing import Literal, Union +from typing import Literal import numpy as _np import pandas as _pd @@ -24,7 +24,7 @@ def _valvar2diffstd(valvar1, valvar2, std_coeff=1): return df_combo -def _diff2msg(diff, tol=None, dt_as_gpsweek: Union[bool, None] = False): +def _diff2msg(diff, tol=None, dt_as_gpsweek: bool | None = False): _pd.set_option("display.max_colwidth", 10000) from_valvar = _np.all(_np.isin(["DIFF", "STD"], diff.columns.get_level_values(0).values)) @@ -103,13 +103,13 @@ def _diff2msg(diff, tol=None, dt_as_gpsweek: Union[bool, None] = False): return msg -def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None] = None, plot: bool = False) -> int: +def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: float | None = None, plot: bool = False) -> int: """_summary_ Args: diffstd (_pd.DataFrame): a difference DataFrame to assess log_lvl (int): logging level of the produced messages - tol (_Union[float, None], optional): Either a float threshold or None to use the present STD values. Defaults to None. + tol (float, optional): Either a float threshold or None to use the present STD values. Defaults to None. plot (bool, optional): So you want a simple plot to terminal? Defaults to False. Returns: @@ -142,13 +142,13 @@ def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None return 0 -def _compare_residuals(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None] = None): +def _compare_residuals(diffstd: _pd.DataFrame, log_lvl: int, tol: float | None = None): """Compares extracted POSTFIT residuals from the trace file and generates a comprehensive statistics on the present differences. Alternatively logs an OK message. Args: diffstd (_pd.DataFrame): a difference DataFrame to assess log_lvl (int): logging level of the produced messages - tol (_Union[float, None], optional): Either a float threshold or None to use the present STD values. Defaults to None. + tol (float, optional): Either a float threshold or None to use the present STD values. Defaults to None. Returns: int: status (0 means differences within threshold) @@ -310,8 +310,8 @@ def compare_clk( clk_a: _pd.DataFrame, clk_b: _pd.DataFrame, norm_types: list[str] = ["daily", "epoch"], - ext_dt: Union[_np.ndarray, _pd.Index, None] = None, - ext_svs: Union[_np.ndarray, _pd.Index, None] = None, + ext_dt: _np.ndarray | _pd.Index | None = None, + ext_svs: _np.ndarray | _pd.Index | None = None, ) -> _pd.DataFrame: """ DEPRECATED Please use diff_clk() instead. @@ -333,8 +333,8 @@ def diff_clk( clk_baseline: _pd.DataFrame, clk_test: _pd.DataFrame, norm_types: list = ["daily", "epoch"], - ext_dt: Union[_np.ndarray, _pd.Index, None] = None, - ext_svs: Union[_np.ndarray, _pd.Index, None] = None, + ext_dt: _np.ndarray | _pd.Index | None = None, + ext_svs: _np.ndarray | _pd.Index | None = None, ) -> _pd.DataFrame: """Compares clock dataframes, removed common mode. @@ -343,8 +343,8 @@ def diff_clk( :param _pd.DataFrame clk_baseline: clk dataframe 2 / b :param _pd.DataFrame clk_test: clk dataframe 1 / a :param list[str] norm_types: normalization to apply, defaults to ["daily", "epoch"] - :param _Union[_np.ndarray, _pd.Index, None] ext_dt: external datetime values to filter the clk dfs, defaults to None - :param _Union[_np.ndarray, _pd.Index, None] ext_svs: external satellites to filter the clk dfs, defaults to None + :param _np.ndarray | _pd.Index | None ext_dt: external datetime values to filter the clk dfs, defaults to None + :param _np.ndarray | _pd.Index | None ext_svs: external satellites to filter the clk dfs, defaults to None :raises ValueError: if no common epochs between clk_a and external datetime were found :raises ValueError: if no common epochs between files were found :return _pd.DataFrame: clk differences in the same units as input clk dfs (usually seconds) @@ -412,12 +412,12 @@ def diff_clk( def sisre( sp3_a: _pd.DataFrame, sp3_b: _pd.DataFrame, - clk_a: Union[_pd.DataFrame, None] = None, - clk_b: Union[_pd.DataFrame, None] = None, + clk_a: _pd.DataFrame | None = None, + clk_b: _pd.DataFrame | None = None, norm_types: list[str] = ["daily", "epoch"], output_mode: str = "rms", clean: bool = True, - cutoff: Union[int, float, None] = None, + cutoff: int | float | None = None, use_rms: bool = False, hlm_mode=None, plot: bool = False, @@ -450,12 +450,12 @@ def sisre( def calculate_sisre( sp3_baseline: _pd.DataFrame, sp3_test: _pd.DataFrame, - clk_baseline: Union[_pd.DataFrame, None] = None, # Clk b - clk_test: Union[_pd.DataFrame, None] = None, # Clk a + clk_baseline: _pd.DataFrame | None = None, # Clk b + clk_test: _pd.DataFrame | None = None, # Clk a norm_types: list[str] = ["daily", "epoch"], output_mode: str = "rms", clean: bool = True, - cutoff: Union[int, float, None] = None, + cutoff: int | float | None = None, use_rms: bool = False, hlm_mode=None, plot: bool = False, diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index 767897d..8fb3c2d 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -12,7 +12,6 @@ import concurrent as _concurrent from contextlib import contextmanager as _contextmanager import datetime as _datetime -from itertools import repeat as _repeat import logging import os as _os from copy import deepcopy as _deepcopy @@ -28,9 +27,9 @@ import ftplib as _ftplib from ftplib import FTP_TLS as _FTP_TLS from pathlib import Path as _Path -from typing import Any, Generator, Literal, Optional, Union, Tuple, List +from typing import Any, Generator, Literal, Optional, Tuple, List from urllib import request as _request -from urllib.error import HTTPError as _HTTPError, URLError as _URLError +from urllib.error import HTTPError as _HTTPError import requests as _requests import warnings as _warnings import netrc as _netrc @@ -94,7 +93,6 @@ def __call__(self, bytes_transferred): _sys.stdout.flush() - def get_earthdata_credentials(username: str = None, password: str = None) -> Tuple[str, str]: """ Get NASA Earthdata credentials from .netrc file or direct parameters. @@ -188,7 +186,7 @@ def request_metadata(url: str, max_retries: int = 5, metadata_header: str = "x-a def download_url( - url: str, destfile: Union[str, _os.PathLike], max_retries: int = 5, raise_on_failure: bool = False + url: str, destfile: str | _os.PathLike, max_retries: int = 5, raise_on_failure: bool = False ) -> Optional[_Path]: """ TODO finish docstring @@ -481,9 +479,7 @@ def generate_product_filename( return product_filename, gps_date, reference_start -def check_whether_to_download( - filename: str, download_dir: _Path, if_file_present: str = "prompt_user" -) -> Union[_Path, None]: +def check_whether_to_download(filename: str, download_dir: _Path, if_file_present: str = "prompt_user") -> _Path | None: """Determine whether to download given file (filename) to the desired location (download_dir) based on whether it is already present and what action to take if it is (if_file_present) @@ -537,7 +533,7 @@ def attempt_ftps_download( filename: str, type_of_file: Optional[str] = None, if_file_present: str = "prompt_user", -) -> Union[_Path, None]: +) -> _Path | None: """Attempt download of file (filename) given the ftps client object (ftps) to chosen location (download_dir) :param _Path download_dir: Where to download files (local directory) @@ -698,7 +694,7 @@ def decompress_file(input_filepath: _Path, delete_after_decompression: bool = Fa return output_file -def check_n_download_url(url: str, dwndir, filename: Union[str, None] = None): +def check_n_download_url(url: str, dwndir, filename: str | None = None): """ Download single file given URL to download from. Optionally provide filename if different from url name @@ -774,8 +770,8 @@ def ftp_tls(url: str, **kwargs) -> Generator[Any, Any, Any]: def download_file_from_cddis( filename: str, - ftp_folder: Optional[str] = None, # deprecated - url_folder: Optional[str] = None, # preferred + ftp_folder: Optional[str] = None, # deprecated + url_folder: Optional[str] = None, # preferred output_folder: _Path = _Path("."), max_retries: int = 3, decompress: bool = True, @@ -783,7 +779,7 @@ def download_file_from_cddis( username: str = None, password: str = None, note_filetype: Optional[str] = None, -) -> Union[_Path, None]: +) -> _Path | None: """ Download a single file from the CDDIS HTTPS archive using NASA Earthdata authentication :param str filename: Name of the file to download @@ -1057,7 +1053,7 @@ def download_iau2000_variant( download_dir: _Path, iau2000_file_variant: Literal["standard", "daily"], if_file_present: str = "prompt_user", -) -> Union[_Path, None]: +) -> _Path | None: """ Downloads IAU2000 file based on the variant requested ("daily" or "standard" file). Added in approximately version 0.0.58 @@ -1067,7 +1063,7 @@ def download_iau2000_variant( download the recent "daily" file, or the historical "standard" file. :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" :raises Exception: On download failures. In some cases the underlying exception will be wrapped in the one raised. - :return Union[_Path, None]: _Path to the downloaded file, or None if not downloaded (based on if_file_present + :return _Path | None: _Path to the downloaded file, or None if not downloaded (based on if_file_present setting). """ ensure_folders([download_dir]) @@ -1128,8 +1124,8 @@ def download_iau2000_variant( def get_iau2000_file_variants_for_dates( - start_epoch: Union[_datetime.datetime, None] = None, - end_epoch: Union[_datetime.datetime, None] = None, + start_epoch: _datetime.datetime | None = None, + end_epoch: _datetime.datetime | None = None, preferred_variant: Literal["standard", "daily"] = "daily", legacy_mode: bool = False, # TODO remove once wrapper function (download_iau2000_file()) is removed. ) -> set[Literal["standard", "daily"]]: @@ -1145,8 +1141,8 @@ def get_iau2000_file_variants_for_dates( finals.data https://datacenter.iers.org/versionMetadata.php?filename=latestVersionMeta/10_FINALS.DATA_IAU2000_V2013_0110.txt finals.daily https://datacenter.iers.org/versionMetadata.php?filename=latestVersionMeta/13_FINALS.DAILY_IAU2000_V2013_0113.txt - :param Union[_datetime.datetime, None] start_epoch: Start of date range. Optional if end_epoch is provided. - :param Union[_datetime.datetime, None] end_epoch: End of date range. Optional if start_epoch is provided. + :param _datetime.datetime | None start_epoch: Start of date range. Optional if end_epoch is provided. + :param _datetime.datetime | None end_epoch: End of date range. Optional if start_epoch is provided. :param set[Literal["standard", "daily"] preferred_variant: For date ranges that don't require us to use a specific variant, which variant should we fall back on as a tie-breaker. Defaults to the 'daily' file. :param bool legacy_mode: (Deprecated) for backwards compatibility only: limit to only the variant we definately @@ -1231,7 +1227,7 @@ def download_iau2000_file( download_dir: _Path, start_epoch: _datetime.datetime, if_file_present: str = "prompt_user", -) -> Union[_Path, None]: +) -> _Path | None: """ Compatibility wrapper around new functions DEPRECATED since approximately version 0.0.58 @@ -1251,7 +1247,7 @@ def download_iau2000_file( def download_atx( download_dir: _Path, reference_frame: str = "IGS20", if_file_present: str = "prompt_user" -) -> Union[_Path, None]: +) -> _Path | None: """Download the ATX file necessary for running the PEA provided the download directory (download_dir) :param _Path download_dir: Where to download files (local directory) @@ -1290,7 +1286,7 @@ def download_atx( return download_filepath -def download_satellite_metadata_snx(download_dir: _Path, if_file_present: str = "prompt_user") -> Union[_Path, None]: +def download_satellite_metadata_snx(download_dir: _Path, if_file_present: str = "prompt_user") -> _Path | None: """Download the most recent IGS satellite metadata file :param _Path download_dir: Where to download files (local directory) diff --git a/gnssanalysis/gn_frame.py b/gnssanalysis/gn_frame.py index 4a8c263..c0a5e48 100644 --- a/gnssanalysis/gn_frame.py +++ b/gnssanalysis/gn_frame.py @@ -1,7 +1,6 @@ """frame of day generation module""" import logging -from typing import Union import numpy as _np import pandas as _pd @@ -28,10 +27,10 @@ def _get_core_list(core_list_path): def get_frame_of_day( date_or_j2000, - itrf_path_or_df: Union[_pd.DataFrame, str], - discon_path_or_df: Union[_pd.DataFrame, str], - psd_path_or_df: Union[None, _pd.DataFrame, str] = None, - list_path_or_df: Union[None, _pd.DataFrame, str, _np.ndarray] = None, + itrf_path_or_df: _pd.DataFrame | str, + discon_path_or_df: _pd.DataFrame | str, + psd_path_or_df: None | _pd.DataFrame | str = None, + list_path_or_df: None | _pd.DataFrame | str | _np.ndarray = None, ): """Main function to propagate frame into datetime of interest""" diff --git a/gnssanalysis/gn_io/bia.py b/gnssanalysis/gn_io/bia.py index 2c26262..3eaf23b 100644 --- a/gnssanalysis/gn_io/bia.py +++ b/gnssanalysis/gn_io/bia.py @@ -1,7 +1,6 @@ import logging as _logging from io import BytesIO as _BytesIO from pathlib import Path as _Path -from typing import Union as _Union import pandas as _pd from typing_extensions import Literal @@ -12,10 +11,10 @@ from .sinex import _snx_extract_blk -def read_bia(path: _Union[_Path, str, bytes]) -> _pd.DataFrame: +def read_bia(path: _Path | str | bytes) -> _pd.DataFrame: """Reads (Parses already read bytes) .bia/.bsx file at the path provided into a pandas DataFrame - :param _Union[_Path, str] path: path to bia/bsx file to read, could also be a bytes object that path2bytes will pass through + :param _Path | str | bytes path: path to bia/bsx file to read, could also be a bytes object that path2bytes will pass through :return _pd.DataFrame: bia_df DataFrame with bias values """ bia_bytes = path2bytes(path) @@ -76,12 +75,12 @@ def bias_B_to_cB(bia_df: _pd.DataFrame) -> _pd.DataFrame: def get_IF_pairs( - IF_bias_1: _pd.DataFrame, IF_bias_2: _Union[_pd.DataFrame, None] = None, force_C_L: Literal["C", "L", False] = False + IF_bias_1: _pd.DataFrame, IF_bias_2: _pd.DataFrame | None = None, force_C_L: Literal["C", "L", False] = False ) -> _pd.Index: """Analyses the provided bias DataFrames (bias_B_to_cB output) and provides index with signal pairs to use for IF bias summation. :param _pd.DataFrame IF_bias_1: bias_B_to_cB output DataFrame - :param _Union[_pd.DataFrame, None] IF_bias_2: bias_B_to_cB output DataFrame 2 if two files are used, e.g. for comparison. Ensures common indices, defaults to None + :param __pd.DataFrame | None IF_bias_2: bias_B_to_cB output DataFrame 2 if two files are used, e.g. for comparison. Ensures common indices, defaults to None :param Literal[C, L, False] force_C_L: whether to force Code (C) signals or Phase (C), defaults to False :return _pd.Index: index with selected signal pairs only """ @@ -98,12 +97,12 @@ def get_IF_pairs( def bias_to_IFbias( - bia_df1: _pd.DataFrame, bia_df2: _Union[_pd.DataFrame, None] = None, force_C_L: Literal["C", "L", False] = False + bia_df1: _pd.DataFrame, bia_df2: _pd.DataFrame | None = None, force_C_L: Literal["C", "L", False] = False ) -> _pd.Series: """Converts bias DataFrame (output of read_bia) or DataFrames to a set of IF bias DataFrames to use for clk values correction :param _pd.DataFrame bia_df1: a DataFrame from read_bia function - :param _Union[_pd.DataFrame, None] bia_df2: a second DataFrame from read_bia function, defaults to None + :param __pd.DataFrame | None bia_df2: a second DataFrame from read_bia function, defaults to None :param Literal[C, L, False] force_C_L: whether to force Code (C) signals or Phase (C), defaults to False :return _pd.Series: a pandas series with IF bias values computed via combination of a pair of signals """ diff --git a/gnssanalysis/gn_io/clk.py b/gnssanalysis/gn_io/clk.py index 9d3f789..41a5001 100644 --- a/gnssanalysis/gn_io/clk.py +++ b/gnssanalysis/gn_io/clk.py @@ -3,7 +3,6 @@ import logging as _logging import re as _re from io import BytesIO as _BytesIO -from typing import Union as _Union import numpy as _np import pandas as _pd @@ -105,7 +104,7 @@ def rm_daily_sv_bias(clk_df_unst: _pd.DataFrame): clk_df_unst -= per_day_mean -def rm_sv_bias(clk_df_unst: _pd.DataFrame, sv: _Union[list, str, _np.ndarray]): +def rm_sv_bias(clk_df_unst: _pd.DataFrame, sv: list | str | _np.ndarray): """Takes an unstacked clk_df and normalizes satellite data (AS) by \ a set satellite clk offsets, specific to constellation - G01 for GPS, R01 per GLONASS etc that are taken from the per_gnss_svs list diff --git a/gnssanalysis/gn_io/common.py b/gnssanalysis/gn_io/common.py index 305ff94..f7401f2 100644 --- a/gnssanalysis/gn_io/common.py +++ b/gnssanalysis/gn_io/common.py @@ -10,12 +10,11 @@ import unlzw3 as _unlzw3 from typing_extensions import Literal as _Literal -from typing import Union as _Union MB = 1024 * 1024 -def path2bytes(path_or_bytes: _Union[_Path, str, bytes]) -> bytes: +def path2bytes(path_or_bytes: _Path | str | bytes) -> bytes: """Main file reading function. Checks file extension and calls appropriate reading function. Passes through bytes if given, thus one may not routinely leave it in the top of the specific file reading function and be able to call it with bytes or str path without additional modifications. diff --git a/gnssanalysis/gn_io/erp.py b/gnssanalysis/gn_io/erp.py index 3ead48b..4dc588a 100644 --- a/gnssanalysis/gn_io/erp.py +++ b/gnssanalysis/gn_io/erp.py @@ -5,9 +5,8 @@ # The collections.abc (rather than typing) versions don't support subscripting until 3.9 # from collections.abc import Callable, Iterable -from typing import Callable, Iterable +from typing import Callable, Iterable, TextIO from io import BytesIO as _BytesIO -from typing import TextIO, Union from urllib import request as _rqs import numpy as _np @@ -162,7 +161,7 @@ def get_canonical_header(header: str) -> str: return header -def get_erp_scaling(normalised_header: str, original_header: str) -> Union[int, float]: +def get_erp_scaling(normalised_header: str, original_header: str) -> int | float: """Get scaling factor to go from ERP stored data to "SI units" Scare quotes around "SI units" because rates are still per day, but in general converts to @@ -170,7 +169,7 @@ def get_erp_scaling(normalised_header: str, original_header: str) -> Union[int, :param str normalised_header: Normalised ERP column header :param str original_header: Original ERP column header (needed for correlation data) - :return Union[int, float]: Scaling factor to (multiplicatively) go from ERP-file data to "SI units" + :return int | float: Scaling factor to (multiplicatively) go from ERP-file data to "SI units" """ if normalised_header in ["Xpole", "Xsig", "Ypole", "Ysig", "Xrt", "Xrtsig", "Yrt", "Yrtsig"]: return 1e-6 @@ -220,11 +219,11 @@ def get_erp_unit_string(normalised_header: str, original_header: str) -> str: def read_erp( - erp_path: Union[str, bytes, os.PathLike], normalise_header_names: bool = True, convert_units: bool = True + erp_path: str | bytes | os.PathLike, normalise_header_names: bool = True, convert_units: bool = True ) -> _pd.DataFrame: """Read an ERP file from disk into a pandas DataFrame - :param Union[str, bytes, os.PathLike] erp_path: Path to ERP file on disk + :param str | bytes | os.PathLike erp_path: Path to ERP file on disk :param bool normalise_header_names: If True, change header names to canonical versions, defaults to True :param bool convert_units: Convert natural ERP file units to "SI units", forces `normalise_header_names`, defaults to True :raises RuntimeError: Raised if the start of the column headers can't be found and the file can't be parsed @@ -316,11 +315,11 @@ def format_erp_column(column_series: _pd.Series, mjd_precision: int = 2) -> _pd. return column_series.apply(formatter) -def write_erp(erp_df: _pd.DataFrame, path: Union[str, bytes, os.PathLike], mjd_precision: int = 2): +def write_erp(erp_df: _pd.DataFrame, path: str | bytes | os.PathLike, mjd_precision: int = 2): """Write an ERP DataFrame to a file on disk :param _pd.DataFrame erp_df: DataFrame of ERP data to write to disk - :param Union[str, bytes, os.PathLike] path: Path to output file + :param str | bytes | os.PathLike path: Path to output file :param int mjd_precision: Number of decimal places to user for MJD column, defaults to 2 """ with open(path, "w") as file: @@ -362,14 +361,14 @@ def write_erp_to_stream(erp_df: _pd.DataFrame, stream: TextIO, mjd_precision: in stream.write("\n") -def read_iau2000(iau2000_path: Union[str, bytes, os.PathLike], use_erp_style_headers: bool = True) -> _pd.DataFrame: +def read_iau2000(iau2000_path: str | bytes | os.PathLike, use_erp_style_headers: bool = True) -> _pd.DataFrame: """Read an IAU2000 file from disk into a pandas DataFrame All columns of data are preserved, included the IERS/Predicted markers. Where data is absent in the IAU2000 file a NaN is placed into the pandas DataFrame. The returned DataFrame can either be provided with ERP style headers, eg. Xpole for polar motion, or titles that align closer to the IERS description of the data, eg. PM-x. - :param Union[str, bytes, os.PathLike] iau2000_path: Path to IAU2000 file to read + :param str | bytes | os.PathLike iau2000_path: Path to IAU2000 file to read :param bool use_erp_style_headers: Use headers that align with ERP column names, defaults to True :return _pd.DataFrame: A pandas DataFrame containing the data in the IAU2000 file """ @@ -532,14 +531,14 @@ def get_iau2000_to_erp_scaling(column_header: str, erp_units: bool) -> float: return iau2000_scaling / erp_scaling -def get_iau2000_scaling(column_header: str) -> Union[int, float]: +def get_iau2000_scaling(column_header: str) -> int | float: """Given an IAU2000 column header, return the scaling factor from IAU2000 data to "SI units" Namely the scaling shifts LOD properties from milliseconds to seconds and polar motion rate properties from milliarcseconds to arcseconds. :param str column_header: IAU2000 column header in either ERP-style or IERS-style - :return Union[int, float]: Scaling factor to go from IAU2000 units to "SI units" + :return int | float: Scaling factor to go from IAU2000 units to "SI units" """ if column_header in ["PM-x", "Xpole", "PM-y", "Ypole", "PM-x-B", "Xpole-B", "PM-y-B", "Ypole-B"]: return 1 diff --git a/gnssanalysis/gn_io/igslog.py b/gnssanalysis/gn_io/igslog.py index 7fe741c..a7903ef 100644 --- a/gnssanalysis/gn_io/igslog.py +++ b/gnssanalysis/gn_io/igslog.py @@ -4,7 +4,7 @@ import glob as _glob import re as _re from multiprocessing import Pool as _Pool -from typing import Optional, Union +from typing import Optional import numpy as _np import pandas as _pd @@ -123,14 +123,14 @@ class LogVersionError(Exception): def find_recent_logs( logs_glob_path: str, - rnx_glob_path: Union[str, None] = None, + rnx_glob_path: str | None = None, raise_if_no_logs: bool = True, raise_if_no_rnx: bool = True, ) -> _pd.DataFrame: """Takes glob expression to create list of logs, parses names into site and date and selects most recent ones :param str logs_glob_path: A glob expression for log files, e.g. /data/station_logs_IGS/*/*.log - :param Union[str, None] rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to + :param str | None rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None. A list of station names can also be passed, though this is not officially supported. #TODO clean up. :param bool raise_if_no_logs: raise ValueError if logs glob finds no files. On by default. :param bool raise_if_no_rnx: raise ValueError if rnx glob specified, but finds no files. On by default. @@ -188,8 +188,8 @@ def determine_log_version(data: bytes) -> str: def extract_id_block( - data: bytes, file_path: str, file_code: str, version: Union[str, None] = None -) -> Union[list[str], _np.ndarray]: + data: bytes, file_path: str, file_code: str, version: str | None = None +) -> list[str] | _np.ndarray: """Extract the site ID block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode @@ -222,7 +222,7 @@ def extract_id_block( return id_block -def extract_location_block(data: bytes, file_path: str, version: Union[str, None] = None) -> _np.ndarray: +def extract_location_block(data: bytes, file_path: str, version: str | None = None) -> _np.ndarray: """Extract the location block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode @@ -249,7 +249,7 @@ def extract_location_block(data: bytes, file_path: str, version: Union[str, None return location_block -def extract_receiver_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]: +def extract_receiver_block(data: bytes, file_path: str) -> list[tuple[bytes]] | _np.ndarray: """Extract the location block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode @@ -264,7 +264,7 @@ def extract_receiver_block(data: bytes, file_path: str) -> Union[list[tuple[byte return receiver_block -def extract_antenna_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]: +def extract_antenna_block(data: bytes, file_path: str) -> list[tuple[bytes]] | _np.ndarray: """Extract the antenna block given the bytes object read from an IGS site log file :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode @@ -279,13 +279,13 @@ def extract_antenna_block(data: bytes, file_path: str) -> Union[list[tuple[bytes return antenna_block -def parse_igs_log_data(data: bytes, file_path: str, file_code: str) -> Union[_np.ndarray, None]: +def parse_igs_log_data(data: bytes, file_path: str, file_code: str) -> _np.ndarray | None: """Given the bytes object returned opening a IGS log file, parse to produce an ndarray with relevant data :param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode :param str file_path: The path to the file from which the "data" bytes object was obtained :param str file_code: Code from the filename_array passed to the parse_igs_log() function - :return Union[_np.ndarray, None]: Returns array with relevant data from the IGS log file bytes object, + :return _np.ndarray | None: Returns array with relevant data from the IGS log file bytes object, or `None` for unsupported version of the IGS Site log format. """ # Determine the version of the IGS log based on the data, Warn if unrecognised @@ -337,11 +337,11 @@ def parse_igs_log_data(data: bytes, file_path: str, file_code: str) -> Union[_np return _np.concatenate([blk_uni, file_path_arr], axis=1) -def parse_igs_log_file(filename_array: _np.ndarray) -> Union[_np.ndarray, None]: +def parse_igs_log_file(filename_array: _np.ndarray) -> _np.ndarray | None: """Reads igs log file and outputs ndarray with parsed data :param _np.ndarray filename_array: Metadata on input log file. Expects ndarray of the form [CODE DATE PATH] - :return Union[_np.ndarray, None]: Returns array with data from the parsed IGS log file, or `None` for unsupported + :return _np.ndarray | None: Returns array with data from the parsed IGS log file, or `None` for unsupported version of the IGS Site log format. """ # Split filename_array out into its three components (CODE, DATE, PATH), discarding the second element (DATE): diff --git a/gnssanalysis/gn_io/nanu.py b/gnssanalysis/gn_io/nanu.py index 213b8a8..d7df706 100644 --- a/gnssanalysis/gn_io/nanu.py +++ b/gnssanalysis/gn_io/nanu.py @@ -1,7 +1,6 @@ import glob import logging as _logging import os as _os -from typing import Union as _Union from datetime import datetime import numpy as _np @@ -52,7 +51,7 @@ def read_nanu(path: str) -> dict: """A parser for Notice Advisory to Navstar Users (NANU) files. Assumes there is only one message per file, that starts with '1.' - :param _Union[str, bytes] path_or_bytes: path to nanu file or a bytes object + :param str | bytes path: path to nanu file or a bytes object # TODO fix inconsisistency above :return dict: nanu values with parameter names as keys """ nanu_bytes = _gn_io.common.path2bytes(path) @@ -75,14 +74,14 @@ def collect_nanus_to_df(glob_expr: str) -> _pd.DataFrame: def get_bad_sv_from_nanu_df( - nanu_df: _pd.DataFrame, up_to_epoch: _Union[_np.datetime64, datetime, str], offset_days: int + nanu_df: _pd.DataFrame, up_to_epoch: _np.datetime64 | datetime | str, offset_days: int ) -> list: """A simple function that analyses an input dataframe NANU collection and outputs a list of SVs that should be excluded for the entered epoch+offset :param _pd.DataFrame nanu_df: a dataframe returned by the collect_nanus_to_df, effectively a _pd.DataFrame call on a list of parsed dicts or a parsed dict - :param _Union[_np.datetime64, datetime, str] up_to_epoch: epoch to analyse NANUs up to + :param _np.datetime64 | datetime | str up_to_epoch: epoch to analyse NANUs up to :param int offset_days: an offset or a length of a planned processing session in days :return list[str]: a list of SVs that should not be used for the specified timeperiod. FIXME Potentially needs to be int? diff --git a/gnssanalysis/gn_io/sinex.py b/gnssanalysis/gn_io/sinex.py index be48fee..b7ffcdf 100644 --- a/gnssanalysis/gn_io/sinex.py +++ b/gnssanalysis/gn_io/sinex.py @@ -8,7 +8,6 @@ from io import BytesIO as _BytesIO from typing import Any as _Any from typing import Iterable as _Iterable -from typing import Union as _Union import numpy as _np import pandas as _pd @@ -59,7 +58,7 @@ def file_desc(pb) -> str: # This is in tension with the existing above function but is what was used by # the filenames functionality and so is ported here for now. -def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> dict[str, _Any]: +def get_header_dict(file_path: str | bytes | _os.PathLike) -> dict[str, _Any]: """Extract the data contained in the header of a sinex file The extracted data is returned in a dictionary containing: @@ -72,7 +71,7 @@ def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> dict[str, _A - "estimate_count": str - "contents": list[str] - :param _Union[str, bytes, _os.PathLike] file_path: sinex file from which to read header + :param str | bytes | _os.PathLike file_path: sinex file from which to read header :return dict[str, _Any]: dictionary containing the properties extracted from the header """ with open(file_path, mode="r", encoding="utf-8") as f: @@ -105,10 +104,10 @@ def get_header_dict(file_path: _Union[str, bytes, _os.PathLike]) -> dict[str, _A return {} -def get_available_blocks(file_path: _Union[str, bytes, _os.PathLike]) -> list[str]: +def get_available_blocks(file_path: str | bytes | _os.PathLike) -> list[str]: """Return the blocks available within a sinex file - :param _Union[str, bytes, _os.PathLike] file_path: sinex file to read for blocks + :param str | bytes | _os.PathLike file_path: sinex file to read for blocks :return list[str]: list of names of blocks available in sinex file """ with open(file_path, "r", encoding="utf-8") as f: @@ -157,10 +156,10 @@ def all_notnan(iterable: _Iterable) -> bool: # TODO: Generalise to handle a path or bytes object? -def read_sinex_comment_block(filename: _Union[str, bytes, _os.PathLike]) -> list[str]: +def read_sinex_comment_block(filename: str | bytes | _os.PathLike) -> list[str]: """Extract comments from a provided sinex file - :param Union[str, bytes, os.PathLike] filename: path to sinex file + :param str | bytes | os.PathLike filename: path to sinex file :return list[str]: list containing all lines in sinex comment block """ with open(filename, "r", encoding="utf-8") as f: @@ -275,7 +274,7 @@ def extract_mincon_from_comments(comment_block: _Iterable[str]) -> dict[str, _An # TODO: Generalise to handle a path or bytes object? -def read_sinex_mincon(filename: _Union[str, bytes, _os.PathLike]) -> dict[str, _Any]: +def read_sinex_mincon(filename: str | bytes | _os.PathLike) -> dict[str, _Any]: """Extract PEA-style minimum constraints data from sinex file PEA can place information about the minimum constraint solution applied into a sinex @@ -293,7 +292,7 @@ def read_sinex_mincon(filename: _Union[str, bytes, _os.PathLike]) -> dict[str, _ - "unused": list[str], list of unused stations The entries will only be included if complete data is extracted for them. - :param _Union[str, bytes, _os.PathLike] filename: sinex file from which to read minimum constraints data + :param str | bytes | _os.PathLike filename: sinex file from which to read minimum constraints data :return dict[str, _Any]: dictionary containing extracted minimum constraints information """ return extract_mincon_from_comments(read_sinex_comment_block(filename)) @@ -321,7 +320,7 @@ def snx_soln_int_to_str(soln: _pd.Series, nan_as_dash=True) -> _pd.Series: return soln_str -def _get_valid_stypes(stypes: _Union[list[str], set[str]]) -> list[str]: +def _get_valid_stypes(stypes: list[str] | set[str]) -> list[str]: """Returns only stypes in allowed list Fastest if stypes size is small""" allowed_stypes = ["EST", "APR", "NEQ"] @@ -475,14 +474,14 @@ def _get_snx_matrix(path_or_bytes, stypes=("APR", "EST"), verbose=True, snx_head return output, stypes_content -def snxdf2xyzdf(snx_df: _pd.DataFrame, unstack: bool = True, keep_all_soln: _Union[bool, None] = None) -> _pd.DataFrame: +def snxdf2xyzdf(snx_df: _pd.DataFrame, unstack: bool = True, keep_all_soln: bool | None = None) -> _pd.DataFrame: """Provides simple functionality to preprocess different variations of the sinex vector dataframe for further processing and analysis. Args: snxdf (_pd.DataFrame): 'raw' vector dataframe (see _get_snx_vector's format) unstack (bool, optional): whether to unstack TYPE to columns (STAX, STAY and STAZ) or keep as in sinex file. Defaults to True. - keep_all_soln (_Union, optional): drops all the extra solutions if False, leaving just last one (max SOLN for each parameter). If None - keeps all solutions but drops the SOLN index TODO potentially remove the None option. Defaults to None. + keep_all_soln (bool, optional): drops all the extra solutions if False, leaving just last one (max SOLN for each parameter). If None - keeps all solutions but drops the SOLN index TODO potentially remove the None option. Defaults to None. Returns: _pd.DataFrame: a formatted sinex dataframe filtered by STAX, STAY and STAZ types, optionally unstacked by TYPE @@ -512,21 +511,21 @@ def snxdf2xyzdf(snx_df: _pd.DataFrame, unstack: bool = True, keep_all_soln: _Uni def _get_snx_vector( - path_or_bytes: _Union[str, bytes], - stypes: _Union[set[str], list[str]] = set(["EST", "APR"]), + path_or_bytes: str | bytes, + stypes: set[str] | list[str] = set(["EST", "APR"]), format: str = "long", - keep_all_soln: _Union[bool, None] = None, + keep_all_soln: bool | None = None, verbose: bool = True, recenter_epochs: bool = False, snx_header: dict = {}, -) -> _Union[_pd.DataFrame, None]: +) -> _pd.DataFrame | None: """Main function of reading vector data from sinex file. Doesn't support sinex files from EMR AC as APRIORI and ESTIMATE indices are not in sync (APRIORI params might not even exist in he ESTIMATE block). While will parse the file, the alignment of EST and APR values might be wrong. No easy solution was found for the issue thus unsupported for now. TODO parse header and add a warning if EMR agency Args: - path_or_bytes (_Union): _description_ + path_or_bytes (str | bytes): _description_ TODO stypes (tuple, optional): Specifies which blocks to extract: APRIORI, ESTIMATE, NORMAL_EQUATION. Could contain any from "APR","EST" and "NEQ". Defaults to ("EST", "APR"). format (str, optional): format of the output dataframe: one of 'raw', 'wide' and 'long. Defaults to "long". TODO. shall the keys be all-caps and how about creating a pandas subclass (bad idea?) or similar - keep_all_soln (_Union, optional): whether to keep all solutions of each parameter or just keep the one with max SOLN. If None then keeps all but drops SOLN index. Defaults to None. TODO do we need None option? + keep_all_soln (bool, optional): whether to keep all solutions of each parameter or just keep the one with max SOLN. If None then keeps all but drops SOLN index. Defaults to None. TODO do we need None option? verbose (bool, optional): logs extra information which might be useful for debugging. Defaults to True. recenter_epochs (bool, optional): overrides the read-in time values with _gn_const.SEC_IN_DAY // 2 so same-day values from different sinex files will align as the actual timestamps could be close to 43200 but not exactly. Defaults to False. diff --git a/gnssanalysis/gn_io/sp3.py b/gnssanalysis/gn_io/sp3.py index 7766f1c..e39b684 100644 --- a/gnssanalysis/gn_io/sp3.py +++ b/gnssanalysis/gn_io/sp3.py @@ -3,7 +3,7 @@ import io as _io import os as _os import re as _re -from typing import Callable, Literal, Mapping, Optional, Union, overload +from typing import Callable, Literal, Mapping, Optional, overload from pathlib import Path import warnings @@ -320,8 +320,8 @@ def get_sp3_comments(sp3_df: _pd.DataFrame) -> list[str]: def update_sp3_comments( sp3_df: _pd.DataFrame, - comment_lines: Union[list[str], None] = None, - comment_string: Union[str, None] = None, + comment_lines: list[str] | None = None, + comment_string: str | None = None, ammend: bool = True, strict_mode: type[StrictMode] = StrictModes.STRICT_RAISE, ) -> None: @@ -344,9 +344,9 @@ def update_sp3_comments( will be no comments). :param _pd.DataFrame sp3_df: SP3 DataFrame on which to update / replace comments (in place). - :param Union[list[str], None] comment_lines: List of comment lines to ammend / overwrite with. SP3 comment line + :param list[str] | None comment_lines: List of comment lines to ammend / overwrite with. SP3 comment line lead-in ('/* ') is optional and will be added if missing. - :param Union[str, None] comment_string: Arbitrary length string to be broken into lines and formatted as SP3 + :param str | None comment_string: Arbitrary length string to be broken into lines and formatted as SP3 comments. This should NOT have SP3 comment line lead-in ('/* ') on it; that will be added. :param bool ammend: Whether to ammend (specifically add additional) comment lines, or delete existing lines and replace with the provided input. Defaults to True. @@ -651,19 +651,19 @@ def _process_sp3_block( return temp_sp3 -def description_for_path_or_bytes(path_or_bytes: Union[str, Path, bytes]) -> Optional[str]: +def description_for_path_or_bytes(path_or_bytes: str | Path | bytes) -> Optional[str]: if isinstance(path_or_bytes, (str, Path)): return str(path_or_bytes) else: return "Data passed as bytes: no path available" -def try_get_sp3_filename(path_or_bytes: Union[str, Path, bytes]) -> Union[str, None]: +def try_get_sp3_filename(path_or_bytes: str | Path | bytes) -> str | None: """ Utility for validation during parsing. Attempts to pull the filename from the path or bytes SP3 source. - :param Union[str, Path, bytes] path_or_bytes: path or bytes SP3 source to try and get filename from - :return Union[str, None]: filename if able to extract, otherwise `None` + :param str | Path | bytes path_or_bytes: path or bytes SP3 source to try and get filename from + :return str | None: filename if able to extract, otherwise `None` """ if isinstance(path_or_bytes, bytes): return None @@ -681,7 +681,7 @@ def try_get_sp3_filename(path_or_bytes: Union[str, Path, bytes]) -> Union[str, N def check_epoch_counts_for_discrepancies( draft_sp3_df: _pd.DataFrame, parsed_sp3_header: _pd.Series, - sp3_path_or_bytes: Union[Path, str, bytes, None] = None, + sp3_path_or_bytes: Path | str | bytes | None = None, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN, ): """ @@ -694,12 +694,12 @@ def check_epoch_counts_for_discrepancies( indexes. Header is not added till late in parsing, so it is passed in separately. :param _pd.Series parsed_sp3_header: draft SP3 header, passed in separately as it gets added to the DataFrame later in the SP3 reading process. - :param Union[Path, str, bytes, None] sp3_path_or_bytes: representation of the source SP3 file path or binary data, + :param Path | str | bytes | None sp3_path_or_bytes: representation of the source SP3 file path or binary data, used to determine whether a filename can be found, and extract it if so. :param type[StrictMode] strict_mode: (Default: WARN) indicates whether to raise, warn, or ignore issues found. :raises ValueError: if discrepancies found in number of epochs indicated by SP3 filename/header/contents """ - sp3_filename: Union[str, None] = None + sp3_filename: str | None = None if sp3_path_or_bytes is not None: sp3_filename = try_get_sp3_filename(sp3_path_or_bytes) @@ -727,7 +727,7 @@ def check_epoch_counts_for_discrepancies( # Filename available to validate # Derive epoch count from filename period(as timedelta) / filename sampeleRate(as timedelta). # We shouldn't need sample rate to check for off by one as we're working in epochs this time. - filename_derived_epoch_count: Union[int, None] = None + filename_derived_epoch_count: int | None = None # Try extracting properties from filename filename_props: dict = filenames.determine_properties_from_filename(sp3_filename) @@ -923,7 +923,7 @@ def validate_sp3_comment_lines( def read_sp3( - sp3_path_or_bytes: Union[str, Path, bytes], + sp3_path_or_bytes: str | Path | bytes, pOnly: bool = True, nodata_to_nan: bool = True, drop_offline_sats: bool = False, @@ -939,7 +939,7 @@ def read_sp3( ) -> _pd.DataFrame: """Reads an SP3 file and returns the data as a pandas DataFrame. - :param Union[str, Path, bytes] sp3_path_or_bytes: SP3 file path (as str or Path) or SP3 data as bytes object. + :param str | Path | bytes sp3_path_or_bytes: SP3 file path (as str or Path) or SP3 data as bytes object. :param bool pOnly: If True, only P* values (positions) are included in the DataFrame. Defaults to True. :param bool nodata_to_nan: If True, converts 0.000000 (indicating nodata) to NaN in the SP3 POS column and converts 999999* (indicating nodata) to NaN in the SP3 CLK column. Defaults to True. @@ -1630,17 +1630,17 @@ def gen_sp3_content( def gen_sp3_content( sp3_df: _pd.DataFrame, - in_buf: Union[None, _io.StringIO] = None, + in_buf: None | _io.StringIO = None, sort_outputs: bool = False, continue_on_unhandled_velocity_data: bool = True, -) -> Union[str, None]: +) -> str | None: """ Organises, formats (including nodata values), then writes out SP3 content to a buffer if provided, or returns it otherwise. :param pandas.DataFrame sp3_df: The DataFrame containing the SP3 data. :param bool sort_outputs: Whether to sort the outputs. Defaults to False. - :param Union[_io.StringIO, None] in_buf: The buffer to write the SP3 content to. Defaults to None. + :param _io.StringIO | None in_buf: The buffer to write the SP3 content to. Defaults to None. :param bool continue_on_unhandled_velocity_data: If (currently unsupported) velocity data exists in the DataFrame, log a warning and skip velocity data, but write out position data. Set to false to raise an exception instead. :return str or None: Return SP3 content as a string if `in_buf` is None, otherwise write SP3 content to `in_buf`, @@ -1903,14 +1903,14 @@ def merge_attrs(df_list: list[_pd.DataFrame]) -> _pd.Series: def sp3merge( sp3paths: list[str], - clkpaths: Union[list[str], None] = None, + clkpaths: list[str] | None = None, nodata_to_nan: bool = False, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN, ) -> _pd.DataFrame: """Reads in a list of sp3 files and optional list of clk files and merges them into a single sp3 file. :param list[str] sp3paths: The list of paths to the sp3 files. - :param Union[list[str], None] clkpaths: The list of paths to the clk files, or None if no clk files are provided. + :param list[str] | None clkpaths: The list of paths to the clk files, or None if no clk files are provided. :param bool nodata_to_nan: Flag indicating whether to convert nodata values to NaN. :param type[StrictMode] strict_mode: (default: WARN) Strictness with which to check the SP3 files read in, for compliance with the SP3 d spec. diff --git a/gnssanalysis/gn_utils.py b/gnssanalysis/gn_utils.py index f381a37..84ab17d 100644 --- a/gnssanalysis/gn_utils.py +++ b/gnssanalysis/gn_utils.py @@ -6,8 +6,6 @@ import click as _click -from typing import Union - from gnssanalysis.enum_meta_properties import EnumMetaProperties @@ -104,7 +102,7 @@ def get_filetype(path): return suffix -def configure_logging(verbose: bool, output_logger: bool = False) -> Union[_logging.Logger, None]: +def configure_logging(verbose: bool, output_logger: bool = False) -> _logging.Logger | None: """Configure the logger object with the level of verbosity requested and output if desired :param bool verbose: Verbosity of logger object to use for encoding logging strings, True: DEBUG, False: INFO