diff --git a/gnssanalysis/gn_io/igslog.py b/gnssanalysis/gn_io/igslog.py index 7fe741c..fa46c56 100644 --- a/gnssanalysis/gn_io/igslog.py +++ b/gnssanalysis/gn_io/igslog.py @@ -123,7 +123,8 @@ class LogVersionError(Exception): def find_recent_logs( logs_glob_path: str, - rnx_glob_path: Union[str, None] = None, + rnx_glob_path: Optional[str] = None, + site_list: Optional[list[str]] = None, raise_if_no_logs: bool = True, raise_if_no_rnx: bool = True, ) -> _pd.DataFrame: @@ -131,7 +132,9 @@ def find_recent_logs( :param str logs_glob_path: A glob expression for log files, e.g. /data/station_logs_IGS/*/*.log :param Union[str, None] rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to - None. A list of station names can also be passed, though this is not officially supported. #TODO clean up. + None. + :param list[str] | None site_list: list of stations/sites to use (alternately extracts from RNX files), defaults + to None. :param bool raise_if_no_logs: raise ValueError if logs glob finds no files. On by default. :param bool raise_if_no_rnx: raise ValueError if rnx glob specified, but finds no files. On by default. :return _pd.DataFrame: Returns a dataframe containing information from all station logs processed @@ -149,23 +152,50 @@ def find_recent_logs( logs_df.columns = ["CODE", "DATE", "PATH"] logs_df = logs_df[~logs_df.CODE.isna()].sort_values(["CODE", "DATE"]) recent_logs_df = logs_df[~logs_df.CODE.duplicated(keep="last")] - if rnx_glob_path is not None: - if isinstance(rnx_glob_path, list): # If a station list was passed instead of a path? - if len(rnx_glob_path) == 0: - raise ValueError("(Station) list passed instead of rnx glob, but list was empty") - rnx_stations = rnx_glob_path - if isinstance(rnx_glob_path, str): - rnx_files = sorted(_glob.glob(rnx_glob_path)) - assert len(rnx_files) != 0, f"No rnx files were found using '{rnx_glob_path}'" - if len(rnx_files) == 0: - if raise_if_no_rnx: - raise ValueError(f"No RNX files found using glob: '{rnx_glob_path}'") - logger.error(f"No RNX files found using glob: '{rnx_glob_path}'") - rnx_stations = _pd.Series(rnx_files).str.extract(r"(\w{4})[^\/]+$", expand=False).to_list() + + if site_list is not None and rnx_glob_path is not None: + raise ValueError("site_list and rnx_glob_path cannot be used together") + + filter_to_sites: bool = False + if site_list is not None and isinstance(site_list, list): + if len(site_list) == 0: + raise ValueError("Site list passed, but was empty") + rnx_stations = site_list + filter_to_sites = True + elif rnx_glob_path is not None and isinstance(rnx_glob_path, str): + rnx_stations = sites_from_rnx(rnx_glob_path, raise_if_no_rnx=raise_if_no_rnx) + if len(rnx_stations) == 0: + if raise_if_no_rnx: + raise ValueError("No sites extracted from RNX") + logger.error("No sites extracted from RNX") + else: + filter_to_sites = True + + if filter_to_sites == True: return recent_logs_df[recent_logs_df.CODE.isin(rnx_stations).values] return recent_logs_df +def sites_from_rnx(rnx_glob_path: str, raise_if_no_rnx: bool = True) -> list[str]: + """ + Extracts a list of sites / stations from RNX files matching the given glob + + :param str rnx_glob_path: glob expression to match (already downloaded) RNX files + :param bool raise_if_no_rnx: raise ValueError if glob does not match any RNX files, default True + :returns list[str]: list of site IDs + :raises ValueError if provided glob doesn't match any RNX files, and flag is set + """ + + rnx_files = sorted(_glob.glob(rnx_glob_path)) + if len(rnx_files) == 0: + if raise_if_no_rnx: + raise ValueError(f"No RNX files found using glob: '{rnx_glob_path}'") + logger.error(f"No RNX files found using glob: '{rnx_glob_path}'") + return [] + rnx_stations: list[str] = _pd.Series(rnx_files).str.extract(r"(\w{4})[^\/]+$", expand=False).to_list() + return rnx_stations + + def determine_log_version(data: bytes) -> str: """Given the byes object that results from reading an IGS log file, determine the version ("v1.0" or "v2.0") @@ -416,16 +446,23 @@ def translate_series(series: _pd.Series, translation: dict) -> _pd.Series: def gather_metadata( - logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path: str = None, num_threads: int = 1 + logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", + rnx_glob_path: Optional[str] = None, + site_list: Optional[list[str]] = None, + num_threads: int = 1, ) -> list[_pd.DataFrame]: """Parses log files found with glob expressions into pd.DataFrames :param str logs_glob_path: A glob expression for log files, defaults to "/data/station_logs_IGS/*/*.log" - :param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None + :param str | None rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None + :param list[str] | None site_list: list of sites to use (alternately extracts from RNX files), defaults + to None. :param int num_threads: Number of threads to run, defaults to 1 :return list[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data """ - parsed_filenames = find_recent_logs(logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path).values + parsed_filenames = find_recent_logs( + logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, site_list=site_list + ).values total = parsed_filenames.shape[0] if total == 0: @@ -733,10 +770,11 @@ def meta2string(id_loc_df: _pd.DataFrame, rec_df: _pd.DataFrame, ant_df: _pd.Dat def write_meta_gather_master( logs_glob_path: str, - rnx_glob_path: str, frame_snx_path: str, frame_soln_path: str, frame_psd_path: str, + rnx_glob_path: Optional[str] = None, + site_list: Optional[list[str]] = None, frame_datetime: Optional[_np.datetime64] = None, out_path: str = "/data/meta_gather.snx", num_threads: int = 1, @@ -744,11 +782,12 @@ def write_meta_gather_master( """Create a SNX file of stations, based on given reference frame projected to a datetime using site logs + rnxs :param str logs_glob_path: A glob path to find desired log files, e.g. "/data/site_logs/*/*.log" - :param str rnx_glob_path: A glob path to find desired RNX files (optional), e.g. "/data/rinex-files/*.rnx" :param str frame_snx_path: Path to reference frame sinex file, e.g. "/data/itrf2014/ITRF2014-IGS-TRF.SNX.gz" :param str frame_soln_path: Path to solution file of reference frame, e.g. "/data/itrf2014/ITRF2014-soln-gnss.snx" :param str frame_psd_path: Path to post-seismic deformation file, e.g. "/data/itrf2014/ITRF2014-psd-gnss.snx" - :param _np.datetime64 frame_datetime: Datetime to project the dataframe to, defaults to None + :param str | None rnx_glob_path: A glob path to find desired RNX files (optional), e.g. "/data/rinex-files/*.rnx" + :param list[str] | None site_list: A list of sites to use (optional), rather than extracting from RNX. + :param _np.datetime64 | None frame_datetime: Datetime to project the dataframe to, defaults to None :param str out_path: Path of file to output, defaults to "/data/meta_gather.snx" :param int num_threads: Number of threads to run on parsing log / rnx files, defaults to 1 """ @@ -758,7 +797,7 @@ def write_meta_gather_master( frame_datetime = _np.datetime64(frame_datetime) id_loc_df, rec_df, ant_df = gather_metadata( - logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, num_threads=num_threads + logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path, site_list=site_list, num_threads=num_threads ) sites_meta = rec_df.CODE.unique() @@ -791,7 +830,7 @@ def write_meta_gather_master( + "+FILE/REFERENCE\n" + "DESCRIPTION merged metadata\n" + "OUTPUT historical sinex header file\n" - + "CONTACT bogdan.matviichuk@ga.gov.au\n" + + "CONTACT gnssanalysis@ga.gov.au\n" + "SOFTWARE LOG2SNX v0.1.2\n" + "HARDWARE AWS\n" + "INPUT igs ftp site logs\n" diff --git a/gnssanalysis/gn_utils.py b/gnssanalysis/gn_utils.py index f381a37..7117292 100644 --- a/gnssanalysis/gn_utils.py +++ b/gnssanalysis/gn_utils.py @@ -435,6 +435,7 @@ def sp3merge(sp3paths, clkpaths, output, nodata_to_nan): @_click.command() @_click.option("-l", "--logglob", required=True, type=str, help="logs glob path") @_click.option("-r", "--rnxglob", type=str, help="rinex glob path") +@_click.option("-s", "--sitelist", type=str, help="list of site IDs") @_click.option("-o", "--output", type=str, help="output sinex filepath", default="./metagather.snx") @_click.option( "-fs", @@ -470,7 +471,7 @@ def sp3merge(sp3paths, clkpaths, output, nodata_to_nan): help="number of threads to run in parallel", default=None, ) -def log2snx(logglob, rnxglob, outfile, frame_snx, frame_dis, frame_psd, datetime, num_threads): +def log2snx(logglob, rnxglob, site_list, outfile, frame_snx, frame_dis, frame_psd, datetime, num_threads): """ IGS log files parsing utility. Globs over log files using LOGGLOB expression and outputs SINEX metadata file. If provided with frame and frame discontinuity files (soln), @@ -518,14 +519,12 @@ def log2snx(logglob, rnxglob, outfile, frame_snx, frame_dis, frame_psd, datetime from .gn_io import igslog if isinstance(rnxglob, list): - if (len(rnxglob) == 1) and ( - rnxglob[0].find("*") != -1 - ): # it's rnx_glob expression (may be better to check if star is present) - rnxglob = rnxglob[0] + raise ValueError("rnxglob should be a string, not a list. To pass a site list, use site_list instead") igslog.write_meta_gather_master( logs_glob_path=logglob, rnx_glob_path=rnxglob, + site_list=site_list, out_path=outfile, frame_snx_path=frame_snx, frame_soln_path=frame_dis,