From c5a028f54239e195d4e8c0244a07f7881ea68845 Mon Sep 17 00:00:00 2001 From: Olivia Lynn Date: Wed, 8 Oct 2025 13:43:44 -0700 Subject: [PATCH 1/3] Rename histogram directory (histograms/ -> row_count_histograms/); retain legacy support --- src/hats_import/catalog/resume_plan.py | 44 +++++++++++++------ tests/hats_import/catalog/test_map_reduce.py | 4 +- .../catalog/test_run_round_trip.py | 6 +-- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/hats_import/catalog/resume_plan.py b/src/hats_import/catalog/resume_plan.py index a33f4ce7..ba87bc0f 100644 --- a/src/hats_import/catalog/resume_plan.py +++ b/src/hats_import/catalog/resume_plan.py @@ -39,8 +39,11 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" - HISTOGRAMS_DIR = "histograms" + LEGACY_HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" + LEGACY_HISTOGRAMS_DIR = "histograms" + ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz" + ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms" + ALIGNMENT_FILE = "alignment.pickle" # pylint: disable=too-many-arguments @@ -114,7 +117,7 @@ def gather_plan(self, run_stages: list[str] | None = None): if self.should_run_mapping: self.map_files = self.get_remaining_map_keys() file_io.make_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), exist_ok=True, ) if self.should_run_splitting: @@ -144,7 +147,12 @@ def get_remaining_map_keys(self): Returns: list of mapping keys *not* found in files like /resume/path/mapping_key.npz """ - prefix = file_io.get_upath(self.tmp_path) / self.HISTOGRAMS_DIR + prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR + + # Support legacy directory name for histogram files. + if not file_io.does_file_or_directory_exist(prefix): + prefix = file_io.get_upath(self.tmp_path) / self.LEGACY_HISTOGRAMS_DIR + map_file_pattern = re.compile(r"map_(\d+).npz") done_indexes = [int(map_file_pattern.match(path.name).group(1)) for path in prefix.glob("*.npz")] remaining_indexes = list(set(range(0, len(self.input_paths))) - (set(done_indexes))) @@ -157,24 +165,34 @@ def read_histogram(self, healpix_order): - Otherwise, combine histograms from partials - Otherwise, return an empty histogram """ - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) - if not file_io.does_file_or_directory_exist(file_name): - # Read the histogram from partial histograms and combine. + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) + legacy_file_name = file_io.append_paths_to_pointer(self.tmp_path, self.LEGACY_HISTOGRAM_BINARY_FILE) + + # Fall back to the legacy histogram file name, if needed. + if not file_io.does_file_or_directory_exist(file_name) and file_io.does_file_or_directory_exist( + legacy_file_name + ): + file_name = legacy_file_name + + # Otherwise, read the histogram from partial histograms and combine. + elif not file_io.does_file_or_directory_exist(file_name): remaining_map_files = self.get_remaining_map_keys() if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") - histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") + histogram_files = file_io.find_files_matching_path( + self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR, "*.npz" + ) aggregate_histogram = HistogramAggregator(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) aggregate_histogram.add(partial) - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) with open(file_name, "wb+") as file_handle: file_handle.write(aggregate_histogram.full_histogram) if self.delete_resume_log_files: file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), ignore_errors=True, ) @@ -200,10 +218,10 @@ def partial_histogram_file(cls, tmp_path, mapping_key: str): mapping_key (str): unique string for each mapping task (e.g. "map_57") """ file_io.make_directory( - file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR), exist_ok=True, ) - return file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.npz") + return file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR, f"{mapping_key}.npz") def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. @@ -266,7 +284,7 @@ def get_alignment_file( highest_healpix_order (int): the highest healpix order (e.g. 5-10) lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order constrains the partitioning to prevent spatially large pixels. - threshold (int): the maximum number of objects allowed in a single pixel + pixel_threshold (int): the maximum number of objects allowed in a single pixel drop_empty_siblings (bool): if 3 of 4 pixels are empty, keep only the non-empty pixel expected_total_rows (int): number of expected rows found in the dataset. diff --git a/tests/hats_import/catalog/test_map_reduce.py b/tests/hats_import/catalog/test_map_reduce.py index 5ceda41b..69a2e20d 100644 --- a/tests/hats_import/catalog/test_map_reduce.py +++ b/tests/hats_import/catalog/test_map_reduce.py @@ -87,14 +87,14 @@ def test_read_bad_fileformat(blank_data_file, capsys, tmp_path): def read_partial_histogram(tmp_path, mapping_key): """Helper to read in the former result of a map operation.""" - histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz" + histogram_file = tmp_path / "row_count_histograms" / f"{mapping_key}.npz" hist = SparseHistogram.from_file(histogram_file) return hist.to_array() def test_read_single_fits(tmp_path, formats_fits): """Success case - fits file that exists being read as fits""" - (tmp_path / "histograms").mkdir(parents=True) + (tmp_path / "row_count_histograms").mkdir(parents=True) mr.map_to_pixels( input_file=formats_fits, pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")), diff --git a/tests/hats_import/catalog/test_run_round_trip.py b/tests/hats_import/catalog/test_run_round_trip.py index 5f1f5f50..b6b080da 100644 --- a/tests/hats_import/catalog/test_run_round_trip.py +++ b/tests/hats_import/catalog/test_run_round_trip.py @@ -551,20 +551,20 @@ def assert_stage_level_files_exist(base_intermediate_dir): # `small_sky_object_catalog` at order 0. expected_contents = [ "alignment.pickle", - "histograms", # directory containing sub-histograms "input_paths.txt", # original input paths for subsequent comparison "mapping_done", # stage-level done file - "mapping_histogram.npz", # concatenated histogram file "order_0", # all intermediate parquet files "reader.pickle", # pickled InputReader "reducing", # directory containing task-level done files "reducing_done", # stage-level done file + "row_count_histograms", # directory containing sub-histograms + "row_count_mapping_histogram.npz", # concatenated histogram file "splitting", # directory containing task-level done files "splitting_done", # stage-level done file ] assert_directory_contains(base_intermediate_dir, expected_contents) - checking_dir = base_intermediate_dir / "histograms" + checking_dir = base_intermediate_dir / "row_count_histograms" assert_directory_contains( checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"] ) From 4c07255d5dabea2d019aeb90d535c349ad279798 Mon Sep 17 00:00:00 2001 From: Olivia Lynn Date: Wed, 8 Oct 2025 13:49:46 -0700 Subject: [PATCH 2/3] Remove legacy dir support in get_remaining_map_keys, as it's just for partials/intermediate files anyway --- src/hats_import/catalog/resume_plan.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/hats_import/catalog/resume_plan.py b/src/hats_import/catalog/resume_plan.py index ba87bc0f..5a3815d6 100644 --- a/src/hats_import/catalog/resume_plan.py +++ b/src/hats_import/catalog/resume_plan.py @@ -148,11 +148,6 @@ def get_remaining_map_keys(self): list of mapping keys *not* found in files like /resume/path/mapping_key.npz """ prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR - - # Support legacy directory name for histogram files. - if not file_io.does_file_or_directory_exist(prefix): - prefix = file_io.get_upath(self.tmp_path) / self.LEGACY_HISTOGRAMS_DIR - map_file_pattern = re.compile(r"map_(\d+).npz") done_indexes = [int(map_file_pattern.match(path.name).group(1)) for path in prefix.glob("*.npz")] remaining_indexes = list(set(range(0, len(self.input_paths))) - (set(done_indexes))) From f10c017beae98c83467f815aa5463f91f0710253 Mon Sep 17 00:00:00 2001 From: Olivia Lynn Date: Thu, 9 Oct 2025 17:57:28 -0400 Subject: [PATCH 3/3] Remove all legacy support for old histogram dir name --- src/hats_import/catalog/resume_plan.py | 11 +---------- tests/hats_import/catalog/test_run_import.py | 2 +- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/hats_import/catalog/resume_plan.py b/src/hats_import/catalog/resume_plan.py index 5a3815d6..1ee8d367 100644 --- a/src/hats_import/catalog/resume_plan.py +++ b/src/hats_import/catalog/resume_plan.py @@ -39,8 +39,6 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - LEGACY_HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" - LEGACY_HISTOGRAMS_DIR = "histograms" ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz" ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms" @@ -161,16 +159,9 @@ def read_histogram(self, healpix_order): - Otherwise, return an empty histogram """ file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) - legacy_file_name = file_io.append_paths_to_pointer(self.tmp_path, self.LEGACY_HISTOGRAM_BINARY_FILE) - - # Fall back to the legacy histogram file name, if needed. - if not file_io.does_file_or_directory_exist(file_name) and file_io.does_file_or_directory_exist( - legacy_file_name - ): - file_name = legacy_file_name # Otherwise, read the histogram from partial histograms and combine. - elif not file_io.does_file_or_directory_exist(file_name): + if not file_io.does_file_or_directory_exist(file_name): remaining_map_files = self.get_remaining_map_keys() if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") diff --git a/tests/hats_import/catalog/test_run_import.py b/tests/hats_import/catalog/test_run_import.py index 9f1ed1e2..f80d8e12 100644 --- a/tests/hats_import/catalog/test_run_import.py +++ b/tests/hats_import/catalog/test_run_import.py @@ -144,7 +144,7 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) - SparseHistogram([11], [131], 0).to_dense_file(resume_tmp / "mapping_histogram.npz") + SparseHistogram([11], [131], 0).to_dense_file(resume_tmp / "row_count_mapping_histogram.npz") for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")