Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 62 additions & 23 deletions gnssanalysis/gn_io/igslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,18 @@ class LogVersionError(Exception):

def find_recent_logs(
logs_glob_path: str,
rnx_glob_path: Union[str, None] = None,
rnx_glob_path: Optional[str] = None,
site_list: Optional[list[str]] = None,
raise_if_no_logs: bool = True,
raise_if_no_rnx: bool = True,
) -> _pd.DataFrame:
"""Takes glob expression to create list of logs, parses names into site and date and selects most recent ones

:param str logs_glob_path: A glob expression for log files, e.g. /data/station_logs_IGS/*/*.log
:param Union[str, None] rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to
None. A list of station names can also be passed, though this is not officially supported. #TODO clean up.
None.
:param list[str] | None site_list: list of stations/sites to use (alternately extracts from RNX files), defaults
to None.
:param bool raise_if_no_logs: raise ValueError if logs glob finds no files. On by default.
:param bool raise_if_no_rnx: raise ValueError if rnx glob specified, but finds no files. On by default.
:return _pd.DataFrame: Returns a dataframe containing information from all station logs processed
Expand All @@ -149,23 +152,50 @@ def find_recent_logs(
logs_df.columns = ["CODE", "DATE", "PATH"]
logs_df = logs_df[~logs_df.CODE.isna()].sort_values(["CODE", "DATE"])
recent_logs_df = logs_df[~logs_df.CODE.duplicated(keep="last")]
if rnx_glob_path is not None:
if isinstance(rnx_glob_path, list): # If a station list was passed instead of a path?
if len(rnx_glob_path) == 0:
raise ValueError("(Station) list passed instead of rnx glob, but list was empty")
rnx_stations = rnx_glob_path
if isinstance(rnx_glob_path, str):
rnx_files = sorted(_glob.glob(rnx_glob_path))
assert len(rnx_files) != 0, f"No rnx files were found using '{rnx_glob_path}'"
if len(rnx_files) == 0:
if raise_if_no_rnx:
raise ValueError(f"No RNX files found using glob: '{rnx_glob_path}'")
logger.error(f"No RNX files found using glob: '{rnx_glob_path}'")
rnx_stations = _pd.Series(rnx_files).str.extract(r"(\w{4})[^\/]+$", expand=False).to_list()

if site_list is not None and rnx_glob_path is not None:
raise ValueError("site_list and rnx_glob_path cannot be used together")

filter_to_sites: bool = False
if site_list is not None and isinstance(site_list, list):
if len(site_list) == 0:
raise ValueError("Site list passed, but was empty")
rnx_stations = site_list
filter_to_sites = True
elif rnx_glob_path is not None and isinstance(rnx_glob_path, str):
rnx_stations = sites_from_rnx(rnx_glob_path, raise_if_no_rnx=raise_if_no_rnx)
if len(rnx_stations) == 0:
if raise_if_no_rnx:
raise ValueError("No sites extracted from RNX")
logger.error("No sites extracted from RNX")
else:
filter_to_sites = True

if filter_to_sites == True:
return recent_logs_df[recent_logs_df.CODE.isin(rnx_stations).values]
return recent_logs_df


def sites_from_rnx(rnx_glob_path: str, raise_if_no_rnx: bool = True) -> list[str]:
"""
Extracts a list of sites / stations from RNX files matching the given glob

:param str rnx_glob_path: glob expression to match (already downloaded) RNX files
:param bool raise_if_no_rnx: raise ValueError if glob does not match any RNX files, default True
:returns list[str]: list of site IDs
:raises ValueError if provided glob doesn't match any RNX files, and flag is set
"""

rnx_files = sorted(_glob.glob(rnx_glob_path))
if len(rnx_files) == 0:
if raise_if_no_rnx:
raise ValueError(f"No RNX files found using glob: '{rnx_glob_path}'")
logger.error(f"No RNX files found using glob: '{rnx_glob_path}'")
return []
rnx_stations: list[str] = _pd.Series(rnx_files).str.extract(r"(\w{4})[^\/]+$", expand=False).to_list()
return rnx_stations


def determine_log_version(data: bytes) -> str:
"""Given the byes object that results from reading an IGS log file, determine the version ("v1.0" or "v2.0")

Expand Down Expand Up @@ -416,16 +446,23 @@ def translate_series(series: _pd.Series, translation: dict) -> _pd.Series:


def gather_metadata(
logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path: str = None, num_threads: int = 1
logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log",
rnx_glob_path: Optional[str] = None,
site_list: Optional[list[str]] = None,
num_threads: int = 1,
) -> list[_pd.DataFrame]:
"""Parses log files found with glob expressions into pd.DataFrames

:param str logs_glob_path: A glob expression for log files, defaults to "/data/station_logs_IGS/*/*.log"
:param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:param str | None rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:param list[str] | None site_list: list of sites to use (alternately extracts from RNX files), defaults
to None.
:param int num_threads: Number of threads to run, defaults to 1
:return list[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data
"""
parsed_filenames = find_recent_logs(logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path).values
parsed_filenames = find_recent_logs(
logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, site_list=site_list
).values

total = parsed_filenames.shape[0]
if total == 0:
Expand Down Expand Up @@ -733,22 +770,24 @@ def meta2string(id_loc_df: _pd.DataFrame, rec_df: _pd.DataFrame, ant_df: _pd.Dat

def write_meta_gather_master(
logs_glob_path: str,
rnx_glob_path: str,
frame_snx_path: str,
frame_soln_path: str,
frame_psd_path: str,
rnx_glob_path: Optional[str] = None,
site_list: Optional[list[str]] = None,
frame_datetime: Optional[_np.datetime64] = None,
out_path: str = "/data/meta_gather.snx",
num_threads: int = 1,
) -> None:
"""Create a SNX file of stations, based on given reference frame projected to a datetime using site logs + rnxs

:param str logs_glob_path: A glob path to find desired log files, e.g. "/data/site_logs/*/*.log"
:param str rnx_glob_path: A glob path to find desired RNX files (optional), e.g. "/data/rinex-files/*.rnx"
:param str frame_snx_path: Path to reference frame sinex file, e.g. "/data/itrf2014/ITRF2014-IGS-TRF.SNX.gz"
:param str frame_soln_path: Path to solution file of reference frame, e.g. "/data/itrf2014/ITRF2014-soln-gnss.snx"
:param str frame_psd_path: Path to post-seismic deformation file, e.g. "/data/itrf2014/ITRF2014-psd-gnss.snx"
:param _np.datetime64 frame_datetime: Datetime to project the dataframe to, defaults to None
:param str | None rnx_glob_path: A glob path to find desired RNX files (optional), e.g. "/data/rinex-files/*.rnx"
:param list[str] | None site_list: A list of sites to use (optional), rather than extracting from RNX.
:param _np.datetime64 | None frame_datetime: Datetime to project the dataframe to, defaults to None
:param str out_path: Path of file to output, defaults to "/data/meta_gather.snx"
:param int num_threads: Number of threads to run on parsing log / rnx files, defaults to 1
"""
Expand All @@ -758,7 +797,7 @@ def write_meta_gather_master(
frame_datetime = _np.datetime64(frame_datetime)

id_loc_df, rec_df, ant_df = gather_metadata(
logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, num_threads=num_threads
logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, site_list=site_list, num_threads=num_threads
)

sites_meta = rec_df.CODE.unique()
Expand Down Expand Up @@ -791,7 +830,7 @@ def write_meta_gather_master(
+ "+FILE/REFERENCE\n"
+ "DESCRIPTION merged metadata\n"
+ "OUTPUT historical sinex header file\n"
+ "CONTACT bogdan.matviichuk@ga.gov.au\n"
+ "CONTACT gnssanalysis@ga.gov.au\n"
+ "SOFTWARE LOG2SNX v0.1.2\n"
+ "HARDWARE AWS\n"
+ "INPUT igs ftp site logs\n"
Expand Down
9 changes: 4 additions & 5 deletions gnssanalysis/gn_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def sp3merge(sp3paths, clkpaths, output, nodata_to_nan):
@_click.command()
@_click.option("-l", "--logglob", required=True, type=str, help="logs glob path")
@_click.option("-r", "--rnxglob", type=str, help="rinex glob path")
@_click.option("-s", "--sitelist", type=str, help="list of site IDs")
@_click.option("-o", "--output", type=str, help="output sinex filepath", default="./metagather.snx")
@_click.option(
"-fs",
Expand Down Expand Up @@ -470,7 +471,7 @@ def sp3merge(sp3paths, clkpaths, output, nodata_to_nan):
help="number of threads to run in parallel",
default=None,
)
def log2snx(logglob, rnxglob, outfile, frame_snx, frame_dis, frame_psd, datetime, num_threads):
def log2snx(logglob, rnxglob, site_list, outfile, frame_snx, frame_dis, frame_psd, datetime, num_threads):
"""
IGS log files parsing utility. Globs over log files using LOGGLOB expression
and outputs SINEX metadata file. If provided with frame and frame discontinuity files (soln),
Expand Down Expand Up @@ -518,14 +519,12 @@ def log2snx(logglob, rnxglob, outfile, frame_snx, frame_dis, frame_psd, datetime
from .gn_io import igslog

if isinstance(rnxglob, list):
if (len(rnxglob) == 1) and (
rnxglob[0].find("*") != -1
): # it's rnx_glob expression (may be better to check if star is present)
rnxglob = rnxglob[0]
raise ValueError("rnxglob should be a string, not a list. To pass a site list, use site_list instead")

igslog.write_meta_gather_master(
logs_glob_path=logglob,
rnx_glob_path=rnxglob,
site_list=site_list,
out_path=outfile,
frame_snx_path=frame_snx,
frame_soln_path=frame_dis,
Expand Down