diff --git a/src/hats_import/catalog/resume_plan.py b/src/hats_import/catalog/resume_plan.py index a33f4ce7..1ee8d367 100644 --- a/src/hats_import/catalog/resume_plan.py +++ b/src/hats_import/catalog/resume_plan.py @@ -39,8 +39,9 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" - HISTOGRAMS_DIR = "histograms" + ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz" + ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms" + ALIGNMENT_FILE = "alignment.pickle" # pylint: disable=too-many-arguments @@ -114,7 +115,7 @@ def gather_plan(self, run_stages: list[str] | None = None): if self.should_run_mapping: self.map_files = self.get_remaining_map_keys() file_io.make_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), exist_ok=True, ) if self.should_run_splitting: @@ -144,7 +145,7 @@ def get_remaining_map_keys(self): Returns: list of mapping keys *not* found in files like /resume/path/mapping_key.npz """ - prefix = file_io.get_upath(self.tmp_path) / self.HISTOGRAMS_DIR + prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR map_file_pattern = re.compile(r"map_(\d+).npz") done_indexes = [int(map_file_pattern.match(path.name).group(1)) for path in prefix.glob("*.npz")] remaining_indexes = list(set(range(0, len(self.input_paths))) - (set(done_indexes))) @@ -157,24 +158,27 @@ def read_histogram(self, healpix_order): - Otherwise, combine histograms from partials - Otherwise, return an empty histogram """ - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) + + # Otherwise, read the histogram from partial histograms and combine. if not file_io.does_file_or_directory_exist(file_name): - # Read the histogram from partial histograms and combine. remaining_map_files = self.get_remaining_map_keys() if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") - histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") + histogram_files = file_io.find_files_matching_path( + self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR, "*.npz" + ) aggregate_histogram = HistogramAggregator(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) aggregate_histogram.add(partial) - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) with open(file_name, "wb+") as file_handle: file_handle.write(aggregate_histogram.full_histogram) if self.delete_resume_log_files: file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), ignore_errors=True, ) @@ -200,10 +204,10 @@ def partial_histogram_file(cls, tmp_path, mapping_key: str): mapping_key (str): unique string for each mapping task (e.g. "map_57") """ file_io.make_directory( - file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR), exist_ok=True, ) - return file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.npz") + return file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR, f"{mapping_key}.npz") def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. @@ -266,7 +270,7 @@ def get_alignment_file( highest_healpix_order (int): the highest healpix order (e.g. 5-10) lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order constrains the partitioning to prevent spatially large pixels. - threshold (int): the maximum number of objects allowed in a single pixel + pixel_threshold (int): the maximum number of objects allowed in a single pixel drop_empty_siblings (bool): if 3 of 4 pixels are empty, keep only the non-empty pixel expected_total_rows (int): number of expected rows found in the dataset. diff --git a/tests/hats_import/catalog/test_map_reduce.py b/tests/hats_import/catalog/test_map_reduce.py index 5ceda41b..69a2e20d 100644 --- a/tests/hats_import/catalog/test_map_reduce.py +++ b/tests/hats_import/catalog/test_map_reduce.py @@ -87,14 +87,14 @@ def test_read_bad_fileformat(blank_data_file, capsys, tmp_path): def read_partial_histogram(tmp_path, mapping_key): """Helper to read in the former result of a map operation.""" - histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz" + histogram_file = tmp_path / "row_count_histograms" / f"{mapping_key}.npz" hist = SparseHistogram.from_file(histogram_file) return hist.to_array() def test_read_single_fits(tmp_path, formats_fits): """Success case - fits file that exists being read as fits""" - (tmp_path / "histograms").mkdir(parents=True) + (tmp_path / "row_count_histograms").mkdir(parents=True) mr.map_to_pixels( input_file=formats_fits, pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")), diff --git a/tests/hats_import/catalog/test_run_import.py b/tests/hats_import/catalog/test_run_import.py index 9f1ed1e2..f80d8e12 100644 --- a/tests/hats_import/catalog/test_run_import.py +++ b/tests/hats_import/catalog/test_run_import.py @@ -144,7 +144,7 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) - SparseHistogram([11], [131], 0).to_dense_file(resume_tmp / "mapping_histogram.npz") + SparseHistogram([11], [131], 0).to_dense_file(resume_tmp / "row_count_mapping_histogram.npz") for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") diff --git a/tests/hats_import/catalog/test_run_round_trip.py b/tests/hats_import/catalog/test_run_round_trip.py index 043a26e4..6133fcf3 100644 --- a/tests/hats_import/catalog/test_run_round_trip.py +++ b/tests/hats_import/catalog/test_run_round_trip.py @@ -551,20 +551,20 @@ def assert_stage_level_files_exist(base_intermediate_dir): # `small_sky_object_catalog` at order 0. expected_contents = [ "alignment.pickle", - "histograms", # directory containing sub-histograms "input_paths.txt", # original input paths for subsequent comparison "mapping_done", # stage-level done file - "mapping_histogram.npz", # concatenated histogram file "order_0", # all intermediate parquet files "reader.pickle", # pickled InputReader "reducing", # directory containing task-level done files "reducing_done", # stage-level done file + "row_count_histograms", # directory containing sub-histograms + "row_count_mapping_histogram.npz", # concatenated histogram file "splitting", # directory containing task-level done files "splitting_done", # stage-level done file ] assert_directory_contains(base_intermediate_dir, expected_contents) - checking_dir = base_intermediate_dir / "histograms" + checking_dir = base_intermediate_dir / "row_count_histograms" assert_directory_contains( checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"] )