Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5b178d2
NPI-4460 misc supporting changes: update read_clk() to support byte i…
treefern Feb 9, 2026
85f0d34
NPI-4460 implement baselining in test_clk
treefern Feb 9, 2026
106b475
NPI-4460 implement baselining in test_igslog - needs further work
treefern Feb 18, 2026
c4faa43
NPI-4460 update class names for IGS log parse tests, add baseline files
treefern Feb 19, 2026
4442833
Merge branch 'main' into NPI-4460-update-unit-tests-to-use-baselining
treefern Feb 19, 2026
0209762
NPI-4460 clean up comments
treefern Feb 23, 2026
233baf1
NPI-4460 improve unexpected warning output
treefern Feb 23, 2026
1b4afb5
NPI-4460 start of baselining for test_sp3 - not yet complete
treefern Feb 23, 2026
167afe3
NPI-4460 add baselining to test_update_sp3_comments()
treefern Feb 24, 2026
4db660a
NPI-4460 add warnings to baselining lines
treefern Feb 24, 2026
9b99511
NPI-4460 update overlong sp3 content line test to ensure only first 5…
treefern Feb 24, 2026
102cf02
NPI-4460 tidy up existing unit test
treefern Feb 24, 2026
f8d74bf
NPI-4460 add baselining for sample DF generator. Note this is not a f…
treefern Feb 24, 2026
5f62d5b
NPI-4460 add baseline for standalone sp3 comment validation
treefern Feb 24, 2026
06e49d5
NPI-4460 add baseline for unsupported SP3 velicity handling
treefern Feb 24, 2026
d1b1740
NPI-4460 add baseline for SP3 velocity interpolation. Note this is NO…
treefern Feb 24, 2026
2c8d31b
NPI-4460 add baseline for SP3 offline sat removal
treefern Feb 24, 2026
596b898
NPI-4460 add baseline for SP3 offline sat removal and fix function to…
treefern Mar 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions gnssanalysis/gn_io/clk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""RINEX CLK file parsing function"""

import logging as _logging
from pathlib import Path
import re as _re
from io import BytesIO as _BytesIO
from typing import Union as _Union
Expand All @@ -15,8 +16,8 @@
_RE_LINE = _re.compile(rb"(AS[ ]G.+)") # GPS SV line (other GNSS may not have STD)


def read_clk(clk_path):
content = _gn_io.common.path2bytes(str(clk_path))
def read_clk(clk_path_or_bytes: _Union[Path, str, bytes]) -> _pd.DataFrame:
content = _gn_io.common.path2bytes(clk_path_or_bytes)
data_b = content.find(b"END OF HEADER") + 13
data_b += content[data_b : data_b + 20].find(b"\n") + 1

Expand All @@ -32,7 +33,7 @@ def read_clk(clk_path):
clk_cols += [10]
clk_names += ["STD"]

clk_df = _pd.read_csv(
clk_df = _pd.read_csv( # TODO consider updating to read_fwf()
_BytesIO(data),
sep="\\s+", # delim_whitespace is deprecated
header=None,
Expand Down Expand Up @@ -77,12 +78,12 @@ def get_sv_clocks(clk_df: _pd.DataFrame) -> _pd.Series:
:raises IndexError: Raise error if the dataframe is not indexed correctly
:return _pd.Series: Retrieved satellite clocks
"""
if clk_df.index.names == ['A', 'J2000', 'CODE']:
if clk_df.index.names == ["A", "J2000", "CODE"]:
# fastest method to grab a specific category!, same as clk_df.EST.loc['AS'] but >6 times faster
AS_cat_code = clk_df.index.levels[0].categories.get_loc("AS")
mask = clk_df.index.codes[0] == AS_cat_code
return _pd.Series(data=clk_df.values[:, 0][mask], index=clk_df.index.droplevel(0)[mask])
elif clk_df.index.names == ['J2000', 'PRN']:
elif clk_df.index.names == ["J2000", "PRN"]:
return _pd.Series(data=clk_df[("EST", "CLK")].values, index=clk_df.index)
else:
raise IndexError("Incorrect index names of dataframe")
Expand Down
6 changes: 3 additions & 3 deletions gnssanalysis/gn_io/sp3.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,17 @@ def remove_offline_sats(sp3_df: _pd.DataFrame, df_friendly_name: str = "") -> _p
offline_sats = sp3_df[mask_either].index.get_level_values(1).unique()

# Using that list of offline / partially offline sats, remove all entries for those sats from the SP3 DataFrame:
sp3_df = sp3_df.drop(offline_sats, level=1, errors="ignore")
sp3_df_cleaned = sp3_df.drop(offline_sats, level=1, errors="ignore")

if len(offline_sats) > 0:
# Update the internal representation of the SP3 header to match the change
remove_svs_from_header(sp3_df, offline_sats.values)
remove_svs_from_header(sp3_df_cleaned, offline_sats.values)
logger.info(
f"Dropped offline / nodata sats from {df_friendly_name} SP3 DataFrame (including header): {offline_sats.values}"
)
else:
logger.info(f"No offline / nodata sats detected to be dropped from {df_friendly_name} SP3 DataFrame")
return sp3_df
return sp3_df_cleaned


def filter_by_svs(
Expand Down
27 changes: 25 additions & 2 deletions gnssanalysis/gn_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,28 @@ def __exit__(self, type, value, traceback):
print(self.readout)


def stringify_warnings(captured_warnings: list[warnings.WarningMessage]) -> str:
"""
Convenience function to convert a list of warning messages to a string.
E.g. output:
Warning message #1: Some warning
Warning message #2: Some other warning
...

:param captured_warnings: list of warning message objects (e.g. from UnitTest's _AssertWarnsContext.warnings)
:type captured_warnings: list[warnings.WarningMessage]
:return: rendered string for multi-line log output
:rtype: str
"""
aggregate_message = ""
for i in range(len(captured_warnings)):
w = captured_warnings[i]
aggregate_message += f"Warning message #{i+1}: {str(w.message)}\n"
return aggregate_message
# Alternatively:
# return f"{''.join('MESSAGE -> ' + str(w.message) + NEWLINE for w in captured_warnings)}"


def sha256(bytes_to_hash: bytes) -> str:
"""
Convenience wrapper to quickly call hashlib.sha256 and return a hex digest string
Expand Down Expand Up @@ -1126,7 +1148,7 @@ def ensure_unique_objects(objects: list[object]) -> None:

@staticmethod
def create_baseline( # Was baseline_pickled_df_list_and_hash()
current_object_list: list[object],
current_object_list: list, # Any kind of object is ok
# These are used to describe the calling class and function, and are inferred automatically. If needed they
# can be explicitly set here:
subdir: Optional[_pathlib.Path] = None,
Expand Down Expand Up @@ -1203,7 +1225,8 @@ def create_baseline( # Was baseline_pickled_df_list_and_hash()

@staticmethod
def verify( # Was create_and_verify_pickled_df_list()
current_object_list: list[object],
current_object_list: list, # Can be any type of object (though diff output only supported for some types)
# TODO update to output notice rather than crashing, if type encountered we can't print a diff for.
# parent_dir: _pathlib.Path = BASELINE_DATAFRAME_RECORDS_DIR_ROOT_RELATIVE,
# Option to strictly enforce that a baseline must exist for anything this function is invoked to check:
raise_for_missing_baseline: bool = False,
Expand Down
98 changes: 80 additions & 18 deletions tests/test_clk.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pyfakefs.fake_filesystem_unittest import TestCase
from pandas import DataFrame
from unittest import TestCase

import gnssanalysis.gn_io.clk as clk
import gnssanalysis.gn_diffaux as gn_diffaux
from gnssanalysis.gn_utils import UnitTestBaseliner, stringify_warnings

from test_datasets.clk_test_data import (
# first dataset is a truncated version of file IGS0OPSRAP_20240400000_01D_05M_CLK.CLK:
Expand All @@ -12,18 +14,13 @@


class TestClk(TestCase):
def setUp(self):
self.setUpPyfakefs()
self.fs.reset()

def test_clk_read(self):
self.fs.reset()
file_paths = ["/fake/dir/file0.clk", "/fake/dir/file1.clk"]
self.fs.create_file(file_paths[0], contents=input_data_igs)
self.fs.create_file(file_paths[1], contents=input_data_gfz)
clk_df_igs: DataFrame = clk.read_clk(clk_path_or_bytes=input_data_igs)
clk_df_gfz: DataFrame = clk.read_clk(clk_path_or_bytes=input_data_gfz)

clk_df_igs = clk.read_clk(clk_path=file_paths[0])
clk_df_gfz = clk.read_clk(clk_path=file_paths[1])
# To help detect changes / regressions, check the dataframe we constructed against the stored hash.
# If they differ, load stored DF from pickle and print the difference.

self.assertEqual(len(clk_df_igs), 93, msg="Check that data generally read into df as expected")
self.assertEqual(len(clk_df_gfz), 90, msg="Check that data generally read into df as expected")
Expand All @@ -34,17 +31,24 @@ def test_clk_read(self):
self.assertEqual(clk_df_igs["EST"].iloc[-1], -0.0006105557076344, msg="Check last datapoint is correct")
self.assertEqual(clk_df_gfz["EST"].iloc[-1], -0.000610553573006, msg="Check last datapoint is correct")

# Baseline (manually) to disk
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.record_baseline([clk_df_igs, clk_df_gfz])

# Verify against on disk baseline
self.assertTrue(UnitTestBaseliner.verify([clk_df_igs, clk_df_gfz]), "Hash verify should succeed")

def test_diff_clk(self):
"""
Note this also tests the now deprecated version, compare_clk()
"""
self.fs.reset() # Reset pyfakefs to delete any files which may have persisted from a previous test
file_paths = ["/fake/dir/file0.clk", "/fake/dir/file1.clk"]
self.fs.create_file(file_paths[0], contents=input_data_igs)
self.fs.create_file(file_paths[1], contents=input_data_gfz)

clk_df_igs = clk.read_clk(clk_path=file_paths[0])
clk_df_gfz = clk.read_clk(clk_path=file_paths[1])
# List of dataframes created during this test, to compare against baselined results on disk (regression check).
dfs_to_verify: list[object] = []

# Don't include these in the baseline, as test_clk_read() already looks after that.
clk_df_igs = clk.read_clk(clk_path_or_bytes=input_data_igs)
clk_df_gfz = clk.read_clk(clk_path_or_bytes=input_data_gfz)

# Deprecated version
# Ensure depreciation warnings are raised, but don't print them.
Expand All @@ -60,6 +64,7 @@ def test_diff_clk(self):
result_epoch_G07 = gn_diffaux.compare_clk(clk_a=clk_df_igs, clk_b=clk_df_gfz, norm_types=["epoch", "G07"])
result_daily_G08 = gn_diffaux.compare_clk(clk_a=clk_df_igs, clk_b=clk_df_gfz, norm_types=["daily", "G08"])
result_G09_G11 = gn_diffaux.compare_clk(clk_a=clk_df_igs, clk_b=clk_df_gfz, norm_types=["G09", "G11"])

captured_warnings = warning_assessor.warnings
self.assertEqual(
"compare_clk() is deprecated. Please use diff_clk() and note that the clk inputs are in opposite order",
Expand All @@ -68,7 +73,10 @@ def test_diff_clk(self):
self.assertEqual(
len(captured_warnings),
9,
"Expected exactly 9 warnings. Check what other warnings are being raised!",
"Expected exactly 9 warnings. Check what other warnings are being raised! Full list below:\n"
+ stringify_warnings(captured_warnings),
# Passing the converted warning strings to the assert may not be very efficient. Consider changing if
# it slows things down.
)

# Test index is as expected
Expand All @@ -79,12 +87,29 @@ def test_diff_clk(self):
self.assertEqual(result_epoch_only["G04"].iloc[0], 2.7128617820053325e-12, msg="Check datapoint is correct")
self.assertEqual(result_sv_only["G05"].iloc[0], 1.1623200004470119e-10, msg="Check datapoint is correct")
self.assertEqual(result_G06["G06"].iloc[0], 0.0, msg="Check datapoint is correct")
self.assertEqual(result_daily_epoch_G04["G07"].iloc[0], 1.3071733365871419e-11, msg="Check datapoint is correct")
self.assertEqual(
result_daily_epoch_G04["G07"].iloc[0], 1.3071733365871419e-11, msg="Check datapoint is correct"
)
self.assertEqual(result_epoch_G07["G08"].iloc[0], -3.3217389966032004e-11, msg="Check datapoint is correct")
self.assertEqual(result_daily_G08["G09"].iloc[-1], 1.3818666534399365e-12, msg="Check datapoint is correct")
self.assertEqual(result_G09_G11["G11"].iloc[-1], 0.0, msg="Check datapoint is correct")
self.assertEqual(result_G09_G11["G01"].iloc[-1], 8.94520000606358e-11, msg="Check datapoint is correct")

# Add all these output DFs to the list to be compared against the baseline on disk
dfs_to_verify.extend(
[
result_default,
result_daily_only,
result_epoch_only,
result_sv_only,
result_G06,
result_daily_epoch_G04,
result_epoch_G07,
result_daily_G08,
result_G09_G11,
]
)

# New version (clk order flipped)
result_default = gn_diffaux.diff_clk(clk_baseline=clk_df_gfz, clk_test=clk_df_igs)
result_daily_only = gn_diffaux.diff_clk(clk_baseline=clk_df_gfz, clk_test=clk_df_igs, norm_types=["daily"])
Expand Down Expand Up @@ -117,3 +142,40 @@ def test_diff_clk(self):
self.assertEqual(result_daily_G08["G09"].iloc[-1], 1.3818666534399365e-12, msg="Check datapoint is correct")
self.assertEqual(result_G09_G11["G11"].iloc[-1], 0.0, msg="Check datapoint is correct")
self.assertEqual(result_G09_G11["G01"].iloc[-1], 8.94520000606358e-11, msg="Check datapoint is correct")

dfs_to_verify.extend(
[
result_default,
result_daily_only,
result_epoch_only,
result_sv_only,
result_G06,
result_daily_epoch_G04,
result_epoch_G07,
result_daily_G08,
result_G09_G11,
]
)

# Baseline establishment (manual use only). DO NOT commit this enabled:
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.record_baseline(dfs_to_verify)

# Verify all dataframes against recorded baseline on disk
self.assertTrue(
UnitTestBaseliner.verify(dfs_to_verify), "Validation should succeed (unless in baselining mode)"
)


# if __name__ == "__main__":
# # For debugger use

# logging.basicConfig(format="%(levelname)s: %(message)s")
# logger = logging.getLogger()
# logger.setLevel(logging.DEBUG)

# os.chdir("./tests")

# test_clk = TestClk()
# test_clk.test_diff_clk()
# test_clk.test_clk_read()
66 changes: 61 additions & 5 deletions tests/test_igslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from pyfakefs.fake_filesystem_unittest import TestCase

from gnssanalysis.gn_io import igslog
from gnssanalysis.gn_utils import UnitTestBaseliner
from test_datasets.sitelog_test_data import (
abmf_site_log_v1 as v1_data,
abmf_site_log_v2 as v2_data,
aggo_site_log_v2 as aggo_v2_data,
)


class TestRegex(unittest.TestCase):
class TestIgsLogRegex(unittest.TestCase):
"""
Test the various regex expressions used in the parsing of IGS log files
"""
Expand All @@ -20,13 +21,15 @@ def test_determine_log_version(self):
self.assertEqual(igslog.determine_log_version(v2_data), "v2.0")

# Check that LogVersionError is raised on wrong data
self.assertRaises(igslog.LogVersionError, igslog.determine_log_version, b"Wrong data")
with self.assertRaises(igslog.LogVersionError):
igslog.determine_log_version(b"Wrong data")

def test_extract_id_block(self):
# Ensure the extract of ID information works and gives correct dome number:
self.assertEqual(igslog.extract_id_block(v1_data, "/example/path", "ABMF", "v1.0"), ["ABMF", "97103M001"])
self.assertEqual(igslog.extract_id_block(v2_data, "/example/path", "ABMF", "v2.0"), ["ABMF", "97103M001"])
# Check automatic version determination works as expected:
# Check that automatic version determination is used when a version is not provided. This
# leverages determine_log_version() which is already tested above:
self.assertEqual(igslog.extract_id_block(v1_data, "/example/path", "ABMF"), ["ABMF", "97103M001"])

# Check LogVersionError is raised on no data:
Expand All @@ -42,6 +45,7 @@ def test_extract_id_block(self):
def test_extract_location_block(self):
# Version 1 Location description results:
v1_location_block = igslog.extract_location_block(v1_data, "/example/path", "v1.0")
# NOTE: this test cannot currently support baselining. This will be addressed in NPI-4492
self.assertEqual(v1_location_block.group(1), b"Les Abymes")
self.assertEqual(v1_location_block.group(2), b"Guadeloupe")

Expand Down Expand Up @@ -86,6 +90,16 @@ def test_extract_receiver_block(self):
# Last receiver should not have an end date assigned (i.e. current):
self.assertEqual(v2_receiver_block[-1][-1], b"")

objs_to_verify: list[object] = [v1_receiver_block, v2_receiver_block]

# Baseline (manually)
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.create_baseline(objs_to_verify)

# Verify
self.assertTrue(UnitTestBaseliner.verify(objs_to_verify), "Hash verification should pass")
# TODO update verify() to support required datatypes, so it does not crash if hash changes

def test_extract_antenna_block(self):
# Testing version 1:
v1_antenna_block = igslog.extract_antenna_block(v1_data, "/example/path")
Expand All @@ -101,8 +115,18 @@ def test_extract_antenna_block(self):
# Last antenna should not have an end date assigned (i.e. current):
self.assertEqual(v2_antenna_block[-1][-1], b"")

objs_to_verify: list[object] = [v1_antenna_block, v2_antenna_block]

class TestDataParsing(unittest.TestCase):
# Baseline (manually)
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.create_baseline(objs_to_verify)

# Verify
self.assertTrue(UnitTestBaseliner.verify(objs_to_verify), "Hash verification should pass")
# TODO update verify() to support required datatypes, so it does not crash if hash changes


class TestIgsLogDataParsing(unittest.TestCase):
"""
Test the integrated functions that gather and parse information from IGS log files
"""
Expand All @@ -122,8 +146,18 @@ def test_parse_igs_log_data(self):
# Check last antenna type:
self.assertEqual(v2_data_parsed[-1][2], "TRM57971.00")

objs_to_verify: list[object] = [v1_data_parsed, v2_data_parsed]

# Baseline (manually)
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.create_baseline(objs_to_verify)

class TestFileParsing(TestCase):
# Verify
self.assertTrue(UnitTestBaseliner.verify(objs_to_verify), "Hash verification should pass")
# TODO update verify() to support required datatypes, so it does not crash if hash changes


class TestIgsLogFileParsing(TestCase):
"""
Test gather_metadata()
"""
Expand Down Expand Up @@ -158,3 +192,25 @@ def test_gather_metadata(self):
self.assertEqual(record_3.CODE, "AGGO")
# Antenna info: test for antenna serial number
self.assertEqual(result[2]["S/N"][4], "726722")

# As the gather_metadata() function we are testing here, reads from a filesystem and outputs a DataFrame,
# running it without pyfakefs isn't practical. So we temporarily suspend patching in order to run baselining.
# See docs here:
# https://pytest-pyfakefs.readthedocs.io/en/latest/convenience.html#suspending-patching

# Pause fake filesystem patching to allow access to baseline files.
self.fs.pause()

# Create a generic (object rather than DF) list, and copy elements across
dfs_to_verify: list[object] = []
dfs_to_verify.extend(result)

# Baseline (manually)
# UnitTestBaseliner.mode = "baseline"
# UnitTestBaseliner.create_baseline(dfs_to_verify)

# Verify
self.assertTrue(UnitTestBaseliner.verify(dfs_to_verify), "Hash verification should pass")

# Ensure pyfakefs is re-enabled before further tests run
self.fs.resume()
Loading
Loading