Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-cicd-units.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13"]

name: Build and Test on Python ${{ matrix.python-version }}

Expand Down
14 changes: 7 additions & 7 deletions gnssanalysis/filenames.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

# The collections.abc (rather than typing) versions don't support subscripting until 3.9
# from collections import Iterable
from typing import Iterable, Literal, Mapping, Any, Optional, Union, overload
from typing import Iterable, Literal, Mapping, Any, Optional, overload
import warnings

import click
Expand Down Expand Up @@ -262,7 +262,7 @@ def generate_IGS_long_filename(
start_epoch: datetime.datetime,
*,
end_epoch: datetime.datetime,
timespan: Union[datetime.timedelta, str, None] = ...,
timespan: datetime.timedelta | str | None = ...,
solution_type: str = ...,
sampling_rate: str = ...,
sampling_rate_seconds: Optional[int] = ...,
Expand All @@ -280,7 +280,7 @@ def generate_IGS_long_filename(
start_epoch: datetime.datetime,
*,
end_epoch: None = ...,
timespan: Union[datetime.timedelta, str],
timespan: datetime.timedelta | str,
solution_type: str = ...,
sampling_rate: str = ...,
sampling_rate_seconds: Optional[int] = ...,
Expand All @@ -297,7 +297,7 @@ def generate_IGS_long_filename(
start_epoch: datetime.datetime,
*,
end_epoch: Optional[datetime.datetime] = None,
timespan: Union[datetime.timedelta, str, None] = None,
timespan: datetime.timedelta | str | None = None,
solution_type: str = "", # TTT
sampling_rate: str = "15M", # SMP
sampling_rate_seconds: Optional[int] = None, # Not used here, but passed for structural consistency
Expand All @@ -321,7 +321,7 @@ def generate_IGS_long_filename(
:param str format_type: File extension
:param datetime.datetime start_epoch: datetime representing initial epoch in file
:param Optional[datetime.datetime] end_epoch: datetime representing final epoch in file, defaults to None
:param timespan: Union[datetime.timedelta, str, None] timespan: timedelta representing time range of data in file,
:param timespan: datetime.timedelta | str | None timespan: timedelta representing time range of data in file,
defaults to None
:param str solution_type: Three letter solution type identifier, defaults to ""
:param str sampling_rate: Three letter sampling rate string, defaults to "15M"
Expand Down Expand Up @@ -437,7 +437,7 @@ def nominal_span_string(span_seconds: float) -> str:
def convert_nominal_span(
nominal_span: str,
non_timed_span_output: Literal["none", "timedelta"] = "timedelta",
) -> Union[datetime.timedelta, None]:
) -> datetime.timedelta | None:
"""Effectively invert :func: `filenames.generate_nominal_span`, turn a span string into a timedelta

:param str nominal_span: Three-character span string in IGS format (e.g. 01D, 15M, 01L ?)
Expand Down Expand Up @@ -729,7 +729,7 @@ def determine_sp3_name_props(

# Next, properties from the filename:
try:
props_from_existing_name: Union[dict, None] = determine_properties_from_filename(
props_from_existing_name: dict | None = determine_properties_from_filename(
file_path.name, strict_mode=strict_mode
)
logging.debug(f"props_from_existing_name =\n{str(props_from_existing_name)}")
Expand Down
40 changes: 20 additions & 20 deletions gnssanalysis/gn_aux.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Auxiliary functions"""

import logging as _logging
from typing import overload, Union
from typing import overload
import numpy as _np
import pandas as _pd

Expand All @@ -26,7 +26,7 @@ def rad2arcsec(x: _np.ndarray) -> _np.ndarray:
return _np.rad2deg(x) * 3600


def wrap_radians(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]:
def wrap_radians(x: float | _np.ndarray) -> float | _np.ndarray:
"""Overwrite negative angles in radians with positive coterminal angles

:param float or _np.ndarray x: angles in radians
Expand All @@ -35,7 +35,7 @@ def wrap_radians(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]:
return x % (2 * _np.pi)


def wrap_degrees(x: Union[float, _np.ndarray]) -> Union[float, _np.ndarray]:
def wrap_degrees(x: float | _np.ndarray) -> float | _np.ndarray:
"""Overwrite negative angles in decimal degrees with positive coterminal angles

:param float or _np.ndarray x: angles in decimal degrees
Expand Down Expand Up @@ -99,7 +99,7 @@ def unique_cols(df: _pd.DataFrame) -> _np.ndarray:
return (a[:, 0][:, None] == a).all(1)


def rm_duplicates_df(df: Union[_pd.DataFrame, _pd.Series], rm_nan_level: Union[int, str, None] = None):
def rm_duplicates_df(df: _pd.DataFrame | _pd.Series, rm_nan_level: int | str | None = None):
"""
Takes in a clk/sp3/other dataframe and removes any duplicate indices.
Optionally, removes level_values from the index which contain NaNs
Expand Down Expand Up @@ -134,7 +134,7 @@ def rm_duplicates_df(df: Union[_pd.DataFrame, _pd.Series], rm_nan_level: Union[i
return df


def get_sampling(arr: _np.ndarray) -> Union[int, None]:
def get_sampling(arr: _np.ndarray) -> int | None:
"""
Simple function to compute sampling of the J2000 array

Expand Down Expand Up @@ -170,10 +170,10 @@ def array_equal_unordered(a1: _np.ndarray, a2: _np.ndarray) -> bool:


def rms(
arr: Union[_pd.DataFrame, _pd.Series],
axis: Union[None, int] = 0,
level: Union[None, int, str] = None,
) -> Union[_pd.Series, _pd.DataFrame]:
arr: _pd.DataFrame | _pd.Series,
axis: None | int = 0,
level: None | int | str = None,
) -> _pd.Series | _pd.DataFrame:
"""Trivial function to compute root mean square"""
if level is not None:
return (arr**2).groupby(axis=axis, level=level).mean() ** 0.5
Expand All @@ -183,7 +183,7 @@ def rms(

def get_std_bounds(
a: _np.ndarray,
axis: Union[None, int, tuple[int, ...]] = None,
axis: None | int | tuple[int, ...] = None,
sigma_coeff: int = 3,
):
"""
Expand All @@ -210,7 +210,7 @@ def get_std_bounds(
return bounds if axis is None else _np.expand_dims(a=bounds, axis=axis)


def df_quick_select(df: _pd.DataFrame, ind_lvl: Union[str, int], ind_keys, as_mask: bool = False) -> _np.ndarray:
def df_quick_select(df: _pd.DataFrame, ind_lvl: str | int, ind_keys, as_mask: bool = False) -> _np.ndarray:
"""A faster alternative to do index selection over pandas dataframe, if multiple index levels are being used then better generate masks with this function and add them later into a single mask.
df.loc(axis=0)[:,:,'IND_KEY',:] is the same as df_quick_select(df, 2, 'IND_KEY'),
or, if used as mask: df[df_quick_select(df, 2, 'IND_NAME', as_mask=True)]"""
Expand Down Expand Up @@ -269,11 +269,11 @@ def degminsec2deg(a: list) -> _pd.Series: ...
def degminsec2deg(a: str) -> float: ...


def degminsec2deg(a: Union[_pd.Series, _pd.DataFrame, list, str]) -> Union[_pd.Series, _pd.DataFrame, float]:
def degminsec2deg(a: _pd.Series | _pd.DataFrame | list | str) -> _pd.Series | _pd.DataFrame | float:
"""Converts degrees/minutes/seconds to decimal degrees.

:param _Union[_pd.Series, _pd.DataFrame, list, str] a: space-delimited string values of degrees/minutes/seconds
:return _Union[_pd.Series, _pd.DataFrame, float]: Series, DataFrame or scalar float decimal degrees, depending on the input
:param __pd.Series | _pd.DataFrame | list | str a: space-delimited string values of degrees/minutes/seconds
:return _pd.Series | _pd.DataFrame | float: Series, DataFrame or scalar float decimal degrees, depending on the input
"""
if isinstance(a, str):
a_single = _np.asarray(a.split(maxsplit=2)).astype(float)
Expand Down Expand Up @@ -315,7 +315,7 @@ def deg2degminsec(a: list) -> _np.ndarray: ...
def deg2degminsec(a: _np.ndarray) -> _np.ndarray: ...


def deg2degminsec(a: Union[_np.ndarray, list, float]) -> Union[_np.ndarray, float]:
def deg2degminsec(a: _np.ndarray | list | float) -> _np.ndarray | float:
"""Converts decimal degrees to string representation in the form of degrees minutes seconds
as in the sinex SITE/ID block. Could be used with multiple columns at once (2D ndarray)

Expand Down Expand Up @@ -363,7 +363,7 @@ def throw_if_nans(trace_bytes: bytes, nan_to_find=b"-nan", max_reported_nans: in
raise ValueError(f"Found nan values (max_nans = {max_reported_nans})\n{nans_bytes.decode()}")


def df_groupby_statistics(df: Union[_pd.Series, _pd.DataFrame], lvl_name: Union[list, str]):
def df_groupby_statistics(df: _pd.Series | _pd.DataFrame, lvl_name: list | str):
"""Generate AVG/STD/RMS statistics from a dataframe summarizing over levels

:param _pd.Series df: an input dataframe or series
Expand Down Expand Up @@ -404,14 +404,14 @@ def _get_trend(dataset, deg=1):

def remove_outliers(
dataframe: _pd.DataFrame,
cutoff: Union[int, float, None] = None,
coeff_std: Union[int, float] = 3,
cutoff: int | float | None = None,
coeff_std: int | float = 3,
) -> _pd.DataFrame:
"""Filters a dataframe with linear data. Runs detrending of the data to normalize to zero and applies absolute cutoff and std-based filtering

:param _pd.DataFrame dataframe: a dataframe to filter the columns
:param _Union[int, float, None] cutoff: an absolute cutoff value to apply over detrended data, defaults to None
:param _Union[int, float] coeff_std: STD coefficient, defaults to 3
:param _int | float | None cutoff: an absolute cutoff value to apply over detrended data, defaults to None
:param _int | float coeff_std: STD coefficient, defaults to 3
:return _pd.DataFrame: a filtered dataframe
"""
detrend = dataframe - _get_trend(dataframe)
Expand Down
20 changes: 10 additions & 10 deletions gnssanalysis/gn_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import date as _date
from datetime import timedelta as _timedelta
from io import StringIO as _StringIO
from typing import Optional, overload, Union
from typing import Optional, overload

import numpy as _np
import pandas as _pd
Expand All @@ -17,7 +17,7 @@
logger = logging.getLogger(__name__)


def derive_gps_week(year: Union[int, str], day_of_year: Union[int, str], weekday_suffix: bool = False) -> str:
def derive_gps_week(year: int | str, day_of_year: int | str, weekday_suffix: bool = False) -> str:
"""
Convert year, day-of-year to GPS week format: WWWWD or WWWW
Based on code from Kristine Larson's gps.py
Expand Down Expand Up @@ -78,7 +78,7 @@ class GPSDate:
# For compatibility, we have accessors called 'ts' and 'timestamp'.
_internal_dt64: _np.datetime64

def __init__(self, time: Union[_np.datetime64, _datetime, _date, str]):
def __init__(self, time: _np.datetime64 | _datetime | _date | str):
if isinstance(time, _np.datetime64):
self._internal_dt64 = time
elif isinstance(time, (_datetime, _date, str)):
Expand Down Expand Up @@ -172,7 +172,7 @@ def datetime_to_gps_week(dt: _datetime, wkday_suff: bool = False) -> str:
return derive_gps_week(yr, doy, weekday_suffix=wkday_suff)


def dt2gpswk(dt: _datetime, wkday_suff: bool = False, both: bool = False) -> Union[str, tuple[str, str]]:
def dt2gpswk(dt: _datetime, wkday_suff: bool = False, both: bool = False) -> str | tuple[str, str]:
"""
TODO DEPRECATED. Please use datetime_to_gps_week()
"""
Expand Down Expand Up @@ -222,7 +222,7 @@ def gpswkD2dt(gpswkD: str) -> _datetime:


def yydoysec2datetime(
arr: Union[_np.ndarray, _pd.Series, list], recenter: bool = False, as_j2000: bool = True, delimiter: str = ":"
arr: _np.ndarray | _pd.Series | list, recenter: bool = False, as_j2000: bool = True, delimiter: str = ":"
) -> _np.ndarray:
"""Converts snx YY:DOY:SSSSS [snx] or YYYY:DOY:SSSSS [bsx/bia] object Series/ndarray to datetime64.
recenter overrides day seconds value to midday
Expand All @@ -241,7 +241,7 @@ def yydoysec2datetime(
return datetime2j2000(datetime64) if as_j2000 else datetime64


def datetime2yydoysec(datetime: Union[_np.ndarray, _pd.Series]) -> _np.ndarray:
def datetime2yydoysec(datetime: _np.ndarray | _pd.Series) -> _np.ndarray:
"""datetime64[s] -> yydoysecond
The '2000-01-01T00:00:00' (-43200 J2000 for 00:000:00000) datetime becomes 00:000:00000 as it should,
No masking and overriding with year 2100 is needed"""
Expand Down Expand Up @@ -270,7 +270,7 @@ def gpsweeksec2datetime(gps_week: _np.ndarray, tow: _np.ndarray, as_j2000: bool
return datetime


def datetime2gpsweeksec(array: _np.ndarray, as_decimal=False) -> Union[tuple, _np.ndarray]:
def datetime2gpsweeksec(array: _np.ndarray, as_decimal=False) -> tuple | _np.ndarray:
if array.dtype == int:
ORIGIN = _gn_const.J2000_ORIGIN.astype("int64") - _gn_const.GPS_ORIGIN.astype("int64")
gps_time = array + ORIGIN # need int conversion for the case of datetime64
Expand Down Expand Up @@ -523,10 +523,10 @@ def round_timedelta(delta, roundto, *, tol=0.5, abs_tol=None):

:delta:, :roundto:, and :abs_tol: (if used) must all have the same type.

:param Union[datetime.timedelta, numpy.timedelta64] delta: timedelta to round
:param Union[datetime.timedelta, numpy.timedelta64] roundto: "measuring stick", :delta: is rounded to integer multiples of this value
:param datetime.timedelta | numpy.timedelta64 delta: timedelta to round
:param datetime.timedelta | numpy.timedelta64 roundto: "measuring stick", :delta: is rounded to integer multiples of this value
:param float tol: relative tolerance to use for the measure of "near"
:param Union[datetime.timedelta, numpy.timedelta64] abs_tol: absolute tolerance to use for the measure of "near"
:param datetime.timedelta | numpy.timedelta64 abs_tol: absolute tolerance to use for the measure of "near"
"""
# TODO: Test this with numpy timedeltas, it was written for datetime.timedelta but should work
if abs_tol is not None:
Expand Down
36 changes: 18 additions & 18 deletions gnssanalysis/gn_diffaux.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging as _logging
from pathlib import Path as _Path
from typing import Literal, Union
from typing import Literal

import numpy as _np
import pandas as _pd
Expand All @@ -24,7 +24,7 @@ def _valvar2diffstd(valvar1, valvar2, std_coeff=1):
return df_combo


def _diff2msg(diff, tol=None, dt_as_gpsweek: Union[bool, None] = False):
def _diff2msg(diff, tol=None, dt_as_gpsweek: bool | None = False):
_pd.set_option("display.max_colwidth", 10000)
from_valvar = _np.all(_np.isin(["DIFF", "STD"], diff.columns.get_level_values(0).values))

Expand Down Expand Up @@ -103,13 +103,13 @@ def _diff2msg(diff, tol=None, dt_as_gpsweek: Union[bool, None] = False):
return msg


def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None] = None, plot: bool = False) -> int:
def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: float | None = None, plot: bool = False) -> int:
"""_summary_

Args:
diffstd (_pd.DataFrame): a difference DataFrame to assess
log_lvl (int): logging level of the produced messages
tol (_Union[float, None], optional): Either a float threshold or None to use the present STD values. Defaults to None.
tol (float, optional): Either a float threshold or None to use the present STD values. Defaults to None.
plot (bool, optional): So you want a simple plot to terminal? Defaults to False.

Returns:
Expand Down Expand Up @@ -142,13 +142,13 @@ def _compare_states(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None
return 0


def _compare_residuals(diffstd: _pd.DataFrame, log_lvl: int, tol: Union[float, None] = None):
def _compare_residuals(diffstd: _pd.DataFrame, log_lvl: int, tol: float | None = None):
"""Compares extracted POSTFIT residuals from the trace file and generates a comprehensive statistics on the present differences. Alternatively logs an OK message.

Args:
diffstd (_pd.DataFrame): a difference DataFrame to assess
log_lvl (int): logging level of the produced messages
tol (_Union[float, None], optional): Either a float threshold or None to use the present STD values. Defaults to None.
tol (float, optional): Either a float threshold or None to use the present STD values. Defaults to None.

Returns:
int: status (0 means differences within threshold)
Expand Down Expand Up @@ -310,8 +310,8 @@ def compare_clk(
clk_a: _pd.DataFrame,
clk_b: _pd.DataFrame,
norm_types: list[str] = ["daily", "epoch"],
ext_dt: Union[_np.ndarray, _pd.Index, None] = None,
ext_svs: Union[_np.ndarray, _pd.Index, None] = None,
ext_dt: _np.ndarray | _pd.Index | None = None,
ext_svs: _np.ndarray | _pd.Index | None = None,
) -> _pd.DataFrame:
"""
DEPRECATED Please use diff_clk() instead.
Expand All @@ -333,8 +333,8 @@ def diff_clk(
clk_baseline: _pd.DataFrame,
clk_test: _pd.DataFrame,
norm_types: list = ["daily", "epoch"],
ext_dt: Union[_np.ndarray, _pd.Index, None] = None,
ext_svs: Union[_np.ndarray, _pd.Index, None] = None,
ext_dt: _np.ndarray | _pd.Index | None = None,
ext_svs: _np.ndarray | _pd.Index | None = None,
) -> _pd.DataFrame:
"""Compares clock dataframes, removed common mode.

Expand All @@ -343,8 +343,8 @@ def diff_clk(
:param _pd.DataFrame clk_baseline: clk dataframe 2 / b
:param _pd.DataFrame clk_test: clk dataframe 1 / a
:param list[str] norm_types: normalization to apply, defaults to ["daily", "epoch"]
:param _Union[_np.ndarray, _pd.Index, None] ext_dt: external datetime values to filter the clk dfs, defaults to None
:param _Union[_np.ndarray, _pd.Index, None] ext_svs: external satellites to filter the clk dfs, defaults to None
:param _np.ndarray | _pd.Index | None ext_dt: external datetime values to filter the clk dfs, defaults to None
:param _np.ndarray | _pd.Index | None ext_svs: external satellites to filter the clk dfs, defaults to None
:raises ValueError: if no common epochs between clk_a and external datetime were found
:raises ValueError: if no common epochs between files were found
:return _pd.DataFrame: clk differences in the same units as input clk dfs (usually seconds)
Expand Down Expand Up @@ -412,12 +412,12 @@ def diff_clk(
def sisre(
sp3_a: _pd.DataFrame,
sp3_b: _pd.DataFrame,
clk_a: Union[_pd.DataFrame, None] = None,
clk_b: Union[_pd.DataFrame, None] = None,
clk_a: _pd.DataFrame | None = None,
clk_b: _pd.DataFrame | None = None,
norm_types: list[str] = ["daily", "epoch"],
output_mode: str = "rms",
clean: bool = True,
cutoff: Union[int, float, None] = None,
cutoff: int | float | None = None,
use_rms: bool = False,
hlm_mode=None,
plot: bool = False,
Expand Down Expand Up @@ -450,12 +450,12 @@ def sisre(
def calculate_sisre(
sp3_baseline: _pd.DataFrame,
sp3_test: _pd.DataFrame,
clk_baseline: Union[_pd.DataFrame, None] = None, # Clk b
clk_test: Union[_pd.DataFrame, None] = None, # Clk a
clk_baseline: _pd.DataFrame | None = None, # Clk b
clk_test: _pd.DataFrame | None = None, # Clk a
norm_types: list[str] = ["daily", "epoch"],
output_mode: str = "rms",
clean: bool = True,
cutoff: Union[int, float, None] = None,
cutoff: int | float | None = None,
use_rms: bool = False,
hlm_mode=None,
plot: bool = False,
Expand Down
Loading