|
| 1 | +# SP3 file trimming / editing utility. Intended for test purposes only, including creation of SP3 test data for unit |
| 2 | +# tests, avoiding the need to store excessively large files in the repo. |
| 3 | + |
| 4 | +import argparse |
| 5 | +from datetime import timedelta |
| 6 | +from typing import Optional |
| 7 | +from gnssanalysis.filenames import convert_nominal_span, determine_properties_from_filename |
| 8 | +from gnssanalysis.gn_io.sp3 import ( |
| 9 | + filter_by_svs, |
| 10 | + read_sp3, |
| 11 | + trim_to_first_n_epochs, |
| 12 | + write_sp3, |
| 13 | + remove_offline_sats, |
| 14 | + trim_df, |
| 15 | +) |
| 16 | +import logging |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | + |
| 21 | +#### Configuration #### |
| 22 | + |
| 23 | +# Constrain to x SVs, specific SV names, both, or neither |
| 24 | +trim_to_sv_names: Optional[list[str]] = None # ["G02", "G03", "G19"] |
| 25 | +trim_to_sv_count: Optional[int] = None # 1 |
| 26 | +trim_to_sat_letter: Optional[str] = None # "E" |
| 27 | + |
| 28 | +# How many epochs to include in the trimmed file (offset from start) |
| 29 | +trim_to_num_epochs: Optional[int] = None # 3 |
| 30 | + |
| 31 | +# Trim off this time onwards, leaving only the start of the file |
| 32 | +trim_to_first_n_time: Optional[timedelta] = None |
| 33 | + |
| 34 | +drop_offline_sats: bool = False |
| 35 | + |
| 36 | +#### End configuration #### |
| 37 | + |
| 38 | + |
| 39 | +def trim_sp3(src_path: str, dest_path: str) -> None: |
| 40 | + # Default to hardcoded paths, unless called with paths. |
| 41 | + |
| 42 | + filename = src_path.rsplit("/")[-1] |
| 43 | + print(f"Filename is: {filename}") |
| 44 | + |
| 45 | + # Determine sample rate (needed for trimming) |
| 46 | + # Raw data would be: determine_sp3_name_props() - that retrieves in seconds. But we want to be more generally applicable, so not just SP3 here ideally. |
| 47 | + sample_rate_raw: timedelta | None = convert_nominal_span( |
| 48 | + determine_properties_from_filename(filename)["sampling_rate"], non_timed_span_output="none" |
| 49 | + ) |
| 50 | + if sample_rate_raw is None: |
| 51 | + print("Warning: failed to determine sample rate, may be a non-timed unit i.e. 'U'") |
| 52 | + else: |
| 53 | + # sample_rate: timedelta = sample_rate_raw |
| 54 | + print(f"sample_rate is: {sample_rate_raw}") |
| 55 | + |
| 56 | + # Load |
| 57 | + print("Loading SP3 into DataFrame (Pos data only, strict mode, warn only)...") |
| 58 | + sp3_df = read_sp3( |
| 59 | + src_path, |
| 60 | + # check_header_vs_filename_vs_content_discrepancies=True, |
| 61 | + # continue_on_discrepancies=True, |
| 62 | + ) |
| 63 | + print("Read done.") |
| 64 | + |
| 65 | + # Trim to first x epochs |
| 66 | + if trim_to_num_epochs is not None: |
| 67 | + print(f"Trimming to first {trim_to_num_epochs} epochs") |
| 68 | + sp3_df = trim_to_first_n_epochs(sp3_df=sp3_df, epoch_count=trim_to_num_epochs, sp3_filename=filename) |
| 69 | + |
| 70 | + elif trim_to_first_n_time is not None: # These two are mutually exclusive |
| 71 | + print(f"Trimming to first: {trim_to_first_n_time} (timedelta)") |
| 72 | + sp3_df = trim_df(sp3_df, keep_first_delta_amount=trim_to_first_n_time) |
| 73 | + |
| 74 | + # Filter to chosen SVs or number of SVs... |
| 75 | + print( |
| 76 | + "Applying SV filters (max count: " |
| 77 | + f"{trim_to_sv_count}, limit to names: {trim_to_sv_names}, limit to constellation: {trim_to_sat_letter})..." |
| 78 | + ) |
| 79 | + sp3_df = filter_by_svs( |
| 80 | + sp3_df, |
| 81 | + filter_by_count=trim_to_sv_count, |
| 82 | + filter_by_name=trim_to_sv_names, |
| 83 | + filter_to_sat_letter=trim_to_sat_letter, |
| 84 | + ) |
| 85 | + |
| 86 | + # Drop offline sats if requested |
| 87 | + if drop_offline_sats: |
| 88 | + print(f"Dropping offline sats (and updating header accordingly)...") |
| 89 | + sp3_df = remove_offline_sats(sp3_df) |
| 90 | + |
| 91 | + # Write out |
| 92 | + print( |
| 93 | + "Writing out new SP3 file... " |
| 94 | + 'CAUTION: please check output header for consistency. It is based on metadata in .attrs["HEADER"], not the ' |
| 95 | + "contents of the dataframe, and may not have been updated for all changes." |
| 96 | + ) |
| 97 | + write_sp3(sp3_df, dest_path) |
| 98 | + |
| 99 | + # Test if we can successfully read that file... |
| 100 | + print("Testing re-read of the output file (strict mode, warn only)...") |
| 101 | + re_read = read_sp3( |
| 102 | + dest_path, |
| 103 | + pOnly=False, |
| 104 | + # check_header_vs_filename_vs_content_discrepancies=True, |
| 105 | + # continue_on_discrepancies=True |
| 106 | + ) |
| 107 | + |
| 108 | + |
| 109 | +def parse_arguments(): |
| 110 | + parser = argparse.ArgumentParser(description="Trim SP3 files for testing purposes") |
| 111 | + parser.add_argument( |
| 112 | + "-i", |
| 113 | + "--inpath", |
| 114 | + type=str, |
| 115 | + help="Input filepath", |
| 116 | + # nargs="+", |
| 117 | + required=True, |
| 118 | + ) |
| 119 | + parser.add_argument("-o", "--outpath", type=str, help="Output filepath", required=True) # default=None |
| 120 | + return parser.parse_args() |
| 121 | + |
| 122 | + |
| 123 | +if __name__ == "__main__": |
| 124 | + # Arg parse example based on snx2map.py |
| 125 | + parsed_args = parse_arguments() |
| 126 | + print(f"Parsed args: in path: '{parsed_args.inpath}', out path: '{parsed_args.outpath}'") |
| 127 | + trim_sp3(src_path=parsed_args.inpath, dest_path=parsed_args.outpath) |
0 commit comments