Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions gnssanalysis/filenames.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

# The collections.abc (rather than typing) versions don't support subscripting until 3.9
# from collections import Iterable
from typing import Iterable, Mapping, Any, Dict, Optional, Tuple, Union, overload
from typing import Iterable, Mapping, Any, Optional, Union, overload
import warnings

import click
Expand Down Expand Up @@ -99,8 +99,8 @@
@click.option("--verbose", is_flag=True)
def determine_file_name_main(
files: Iterable[pathlib.Path],
defaults: Iterable[Tuple[str, str]],
overrides: Iterable[Tuple[str, str]],
defaults: Iterable[tuple[str, str]],
overrides: Iterable[tuple[str, str]],
current_name: bool,
delimiter: str,
verbose: bool,
Expand Down Expand Up @@ -165,8 +165,8 @@ def determine_file_name(
defined as a parameter to maintain syntactic simplicity when calling.

:param pathlib.Path file_path: Path to the file for which to determine name
:param Dict[str, Any] defaults: Default name properties to use when properties can't be determined
:param Dict[str, Any] overrides: Name properties that should override anything detected in the file
:param dict[str, Any] defaults: Default name properties to use when properties can't be determined
:param dict[str, Any] overrides: Name properties that should override anything detected in the file
:raises NotImplementedError: For files that we should support but currently don't (bia, iox, obx, sum, tro)
:return str: Proposed IGS long filename
"""
Expand All @@ -182,7 +182,7 @@ def determine_properties_from_contents_and_filename(
file_path: pathlib.Path,
defaults: Optional[Mapping[str, Any]] = None,
overrides: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Determine the properties of a file based on its contents

The function reads both the existing filename of the provided file as well as
Expand All @@ -204,8 +204,8 @@ def determine_properties_from_contents_and_filename(
- project: str

:param pathlib.Path file_path: Path to the file for which to determine properties
:param Dict[str, Any] defaults: Default name properties to use when properties can't be determined
:param Dict[str, Any] overrides: Name properties that should override anything detected in the file
:param dict[str, Any] defaults: Default name properties to use when properties can't be determined
:param dict[str, Any] overrides: Name properties that should override anything detected in the file
:raises NotImplementedError: For files that we should support but currently don't (bia, iox, obx, sum, tro)
:return str: Dictionary of file properties
"""
Expand Down Expand Up @@ -459,15 +459,15 @@ def convert_nominal_span(nominal_span: str) -> datetime.timedelta:
return datetime.timedelta()


def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
def determine_clk_name_props(file_path: pathlib.Path) -> dict[str, Any]:
"""Determine the IGS filename properties for a CLK files

Like all functions in this series, the function reads both a filename and the files contents
to determine properties that can be used to describe the expected IGS long filename. The
function returns a dictionary with any properties it manages to successfully determine.

:param pathlib.Path file_path: file for which to determine name properties
:return Dict[str, Any]: dictionary containing the extracted name properties
:return dict[str, Any]: dictionary containing the extracted name properties
"""
name_props = {}
try:
Expand Down Expand Up @@ -519,15 +519,15 @@ def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
return name_props


def determine_erp_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
def determine_erp_name_props(file_path: pathlib.Path) -> dict[str, Any]:
"""Determine the IGS filename properties for a ERP files

Like all functions in this series, the function reads both a filename and the files contents
to determine properties that can be used to describe the expected IGS long filename. The
function returns a dictionary with any properties it manages to successfully determine.

:param pathlib.Path file_path: file for which to determine name properties
:return Dict[str, Any]: dictionary containing the extracted name properties
:return dict[str, Any]: dictionary containing the extracted name properties
"""
name_props = {}
try:
Expand Down Expand Up @@ -574,15 +574,15 @@ def determine_erp_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
return name_props


def determine_snx_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
def determine_snx_name_props(file_path: pathlib.Path) -> dict[str, Any]:
"""Determine the IGS filename properties for a SINEX files

Like all functions in this series, the function reads both a filename and the files contents
to determine properties that can be used to describe the expected IGS long filename. The
function returns a dictionary with any properties it manages to successfully determine.

:param pathlib.Path file_path: file for which to determine name properties
:return Dict[str, Any]: dictionary containing the extracted name properties
:return dict[str, Any]: dictionary containing the extracted name properties
"""
name_props = {}
try:
Expand Down Expand Up @@ -668,7 +668,7 @@ def determine_snx_name_props(file_path: pathlib.Path) -> Dict[str, Any]:

def determine_sp3_name_props(
file_path: pathlib.Path, strict_mode: type[StrictMode] = StrictModes.STRICT_WARN
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Determine the IGS filename properties for a SP3 files

Like all functions in this series, the function reads both a filename and the files contents
Expand All @@ -678,7 +678,7 @@ def determine_sp3_name_props(
:param pathlib.Path file_path: file for which to determine name properties
:param type[StrictMode] strict_mode: indicates whether to raise, warn, or silently continue on errors such as
failure to get properties from a filename.
:return Dict[str, Any]: dictionary containing the extracted name properties. May be empty on some errors, if
:return dict[str, Any]: dictionary containing the extracted name properties. May be empty on some errors, if
strict_mode is not set to RAISE.
:raises ValueError: if strict_mode set to RAISE, and unable to statically extract properties from a filename
"""
Expand Down
6 changes: 3 additions & 3 deletions gnssanalysis/gn_aux.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Auxiliary functions"""

import logging as _logging
from typing import overload, Tuple, Union
from typing import overload, Union
import numpy as _np
import pandas as _pd

Expand Down Expand Up @@ -183,7 +183,7 @@ def rms(

def get_std_bounds(
a: _np.ndarray,
axis: Union[None, int, Tuple[int, ...]] = None,
axis: Union[None, int, tuple[int, ...]] = None,
sigma_coeff: int = 3,
):
"""
Expand Down Expand Up @@ -287,7 +287,7 @@ def degminsec2deg(a: Union[_pd.Series, _pd.DataFrame, list, str]) -> Union[_pd.S
assert isinstance(a_series, _pd.Series)
return _series_str_degminsec2deg(a_series).unstack()
else:
raise TypeError("Unsupported input type. Please use either of _pd.Series, _pd.DataFrame, List or str")
raise TypeError("Unsupported input type. Please use either of _pd.Series, _pd.DataFrame, list or str")


def _deg2degminsec(a: _np.ndarray) -> _np.ndarray:
Expand Down
14 changes: 7 additions & 7 deletions gnssanalysis/gn_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import ftplib as _ftplib
from ftplib import FTP_TLS as _FTP_TLS
from pathlib import Path as _Path
from typing import Any, Generator, List, Literal, Optional, Tuple, Union
from typing import Any, Generator, Literal, Optional, Union
from urllib import request as _request
from urllib.error import HTTPError as _HTTPError

Expand Down Expand Up @@ -391,7 +391,7 @@ def generate_product_filename(
version: str = "0",
project: str = "OPS",
content_type: str = None,
) -> Tuple[str, GPSDate, _datetime.datetime]:
) -> tuple[str, GPSDate, _datetime.datetime]:
"""Given a reference datetime and extention of file, generate the IGS filename and GPSDate obj for use in download

:param _datetime.datetime reference_start: Datetime of the start period of interest
Expand All @@ -405,7 +405,7 @@ def generate_product_filename(
:param str version: Version of the file, defaults to "0"
:param str project: IGS project descriptor, defaults to "OPS"
:param str content_type: IGS content specifier - if None set automatically based on file_ext, defaults to None
:return _Tuple[str, GPSDate, _datetime.datetime]: Tuple of filename str, GPSDate and datetime obj (based on shift)
:return tuple[str, GPSDate, _datetime.datetime]: Tuple of filename str, GPSDate and datetime obj (based on shift)
"""
reference_start += _datetime.timedelta(hours=shift)
if type(reference_start == _datetime.date):
Expand Down Expand Up @@ -822,7 +822,7 @@ def download_product_from_cddis(
project_type: str = "OPS",
timespan: _datetime.timedelta = _datetime.timedelta(days=2),
if_file_present: str = "prompt_user",
) -> List[_Path]:
) -> list[_Path]:
"""Download the file/s from CDDIS based on start and end epoch, to the download directory (download_dir)

:param _Path download_dir: Where to download files (local directory)
Expand All @@ -838,7 +838,7 @@ def download_product_from_cddis(
:param _datetime.timedelta timespan: Timespan of the file/s to download, defaults to _datetime.timedelta(days=2)
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
:raises FileNotFoundError: Raise error if the specified file cannot be found on CDDIS
:return List[_Path]: Return list of paths of downloaded files
:return list[_Path]: Return list of paths of downloaded files
"""
# Download the correct IGS FIN ERP files
if file_ext == "ERP" and analysis_center == "IGS" and solution_type == "FIN": # get the correct start_epoch
Expand Down Expand Up @@ -1193,12 +1193,12 @@ def download_satellite_metadata_snx(download_dir: _Path, if_file_present: str =
return download_filepath


def download_yaw_files(download_dir: _Path, if_file_present: str = "prompt_user") -> List[_Path]:
def download_yaw_files(download_dir: _Path, if_file_present: str = "prompt_user") -> list[_Path]:
"""Download yaw rate / bias files needed to for Ginan's PEA

:param _Path download_dir: Where to download files (local directory)
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
:return List[_Path]: Return list paths of downloaded files
:return list[_Path]: Return list paths of downloaded files
"""
ensure_folders([download_dir])
download_filepaths = []
Expand Down
10 changes: 5 additions & 5 deletions gnssanalysis/gn_io/erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# from collections.abc import Callable, Iterable
from typing import Callable, Iterable
from io import BytesIO as _BytesIO
from typing import List, TextIO, Union
from typing import TextIO, Union
from urllib import request as _rqs

import numpy as _np
Expand All @@ -17,11 +17,11 @@
from gnssanalysis import gn_io as _gn_io


def normalise_headers(headers: Iterable[str]) -> List[str]:
def normalise_headers(headers: Iterable[str]) -> list[str]:
"""Apply :func: `gn_io.erp.normalise_headers` to all headers in an iterable

:param Iterable[str] headers: Iterable of header strings obtained from an ERP file
:return List[str]: List of normalised headers as per :func: `gn_io.erp.normalise_headers`
:return list[str]: List of normalised headers as per :func: `gn_io.erp.normalise_headers`
"""
return [normalise_header(h) for h in headers]

Expand All @@ -44,15 +44,15 @@ def normalise_header(header: str) -> str:
return get_canonical_header(header)


def merge_hyphen_headers(raw_split_header: Iterable[str]) -> List[str]:
def merge_hyphen_headers(raw_split_header: Iterable[str]) -> list[str]:
"""Take a list of raw headers from an ERP file and merge hyphenated headers that got split

In some ERP files hyphenated headers, such as UTC-TAI, occasionally have spaces before or after
the hyphen/minus sign. This breaks tokenisation as the header gets split into multiple parts.
This function re-merges those header components.

:param Iterable[str] raw_split_header: ERP header line that has been split/tokenized
:return List[str]: List of ERP headers with hyphen-separated headers merged
:return list[str]: List of ERP headers with hyphen-separated headers merged
"""
# Copy to avoid mutating input list
headers = list(raw_split_header)
Expand Down
16 changes: 8 additions & 8 deletions gnssanalysis/gn_io/igslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import glob as _glob
import re as _re
from multiprocessing import Pool as _Pool
from typing import Union, List, Tuple
from typing import Union

import numpy as _np
import pandas as _pd
Expand Down Expand Up @@ -169,7 +169,7 @@ def determine_log_version(data: bytes) -> str:

def extract_id_block(
data: bytes, file_path: str, file_code: str, version: Union[str, None] = None
) -> Union[List[str], _np.ndarray]:
) -> Union[list[str], _np.ndarray]:
"""Extract the site ID block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
Expand Down Expand Up @@ -229,12 +229,12 @@ def extract_location_block(data: bytes, file_path: str, version: Union[str, None
return location_block


def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]:
def extract_receiver_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]:
"""Extract the location block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]] or _np.ndarray: The receiver block of the data. Each list element specifies an receiver.
:return list[tuple[bytes]] or _np.ndarray: The receiver block of the data. Each list element specifies an receiver.
If regex doesn't match, an empty numpy NDArray is returned instead.
"""
receiver_block = _REGEX_REC.findall(data)
Expand All @@ -244,12 +244,12 @@ def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[byte
return receiver_block


def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]:
def extract_antenna_block(data: bytes, file_path: str) -> Union[list[tuple[bytes]], _np.ndarray]:
"""Extract the antenna block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]] or _np.ndarray: The antenna block of the data. Each list element specifies an antenna.
:return list[tuple[bytes]] or _np.ndarray: The antenna block of the data. Each list element specifies an antenna.
If regex doesn't match, an empty numpy NDArray is returned instead.
"""
antenna_block = _REGEX_ANT.findall(data)
Expand Down Expand Up @@ -397,13 +397,13 @@ def translate_series(series: _pd.Series, translation: dict) -> _pd.Series:

def gather_metadata(
logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path: str = None, num_threads: int = 1
) -> List[_pd.DataFrame]:
) -> list[_pd.DataFrame]:
"""Parses log files found with glob expressions into pd.DataFrames

:param str logs_glob_path: A glob expression for log files, defaults to "/data/station_logs_IGS/*/*.log"
:param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:param int num_threads: Number of threads to run, defaults to 1
:return List[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data
:return list[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data
"""
parsed_filenames = find_recent_logs(logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path).values

Expand Down
Loading