@@ -39,8 +39,9 @@ class ResumePlan(PipelineResumePlan):
3939 SPLITTING_STAGE = "splitting"
4040 REDUCING_STAGE = "reducing"
4141
42- HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
43- HISTOGRAMS_DIR = "histograms"
42+ ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz"
43+ ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms"
44+
4445 ALIGNMENT_FILE = "alignment.pickle"
4546
4647 # pylint: disable=too-many-arguments
@@ -114,7 +115,7 @@ def gather_plan(self, run_stages: list[str] | None = None):
114115 if self .should_run_mapping :
115116 self .map_files = self .get_remaining_map_keys ()
116117 file_io .make_directory (
117- file_io .append_paths_to_pointer (self .tmp_path , self .HISTOGRAMS_DIR ),
118+ file_io .append_paths_to_pointer (self .tmp_path , self .ROW_COUNT_HISTOGRAMS_DIR ),
118119 exist_ok = True ,
119120 )
120121 if self .should_run_splitting :
@@ -144,7 +145,7 @@ def get_remaining_map_keys(self):
144145 Returns:
145146 list of mapping keys *not* found in files like /resume/path/mapping_key.npz
146147 """
147- prefix = file_io .get_upath (self .tmp_path ) / self .HISTOGRAMS_DIR
148+ prefix = file_io .get_upath (self .tmp_path ) / self .ROW_COUNT_HISTOGRAMS_DIR
148149 map_file_pattern = re .compile (r"map_(\d+).npz" )
149150 done_indexes = [int (map_file_pattern .match (path .name ).group (1 )) for path in prefix .glob ("*.npz" )]
150151 remaining_indexes = list (set (range (0 , len (self .input_paths ))) - (set (done_indexes )))
@@ -157,24 +158,27 @@ def read_histogram(self, healpix_order):
157158 - Otherwise, combine histograms from partials
158159 - Otherwise, return an empty histogram
159160 """
160- file_name = file_io .append_paths_to_pointer (self .tmp_path , self .HISTOGRAM_BINARY_FILE )
161+ file_name = file_io .append_paths_to_pointer (self .tmp_path , self .ROW_COUNT_HISTOGRAM_BINARY_FILE )
162+
163+ # Otherwise, read the histogram from partial histograms and combine.
161164 if not file_io .does_file_or_directory_exist (file_name ):
162- # Read the histogram from partial histograms and combine.
163165 remaining_map_files = self .get_remaining_map_keys ()
164166 if len (remaining_map_files ) > 0 :
165167 raise RuntimeError (f"{ len (remaining_map_files )} map stages did not complete successfully." )
166- histogram_files = file_io .find_files_matching_path (self .tmp_path , self .HISTOGRAMS_DIR , "*.npz" )
168+ histogram_files = file_io .find_files_matching_path (
169+ self .tmp_path , self .ROW_COUNT_HISTOGRAMS_DIR , "*.npz"
170+ )
167171 aggregate_histogram = HistogramAggregator (healpix_order )
168172 for partial_file_name in histogram_files :
169173 partial = SparseHistogram .from_file (partial_file_name )
170174 aggregate_histogram .add (partial )
171175
172- file_name = file_io .append_paths_to_pointer (self .tmp_path , self .HISTOGRAM_BINARY_FILE )
176+ file_name = file_io .append_paths_to_pointer (self .tmp_path , self .ROW_COUNT_HISTOGRAM_BINARY_FILE )
173177 with open (file_name , "wb+" ) as file_handle :
174178 file_handle .write (aggregate_histogram .full_histogram )
175179 if self .delete_resume_log_files :
176180 file_io .remove_directory (
177- file_io .append_paths_to_pointer (self .tmp_path , self .HISTOGRAMS_DIR ),
181+ file_io .append_paths_to_pointer (self .tmp_path , self .ROW_COUNT_HISTOGRAMS_DIR ),
178182 ignore_errors = True ,
179183 )
180184
@@ -200,10 +204,10 @@ def partial_histogram_file(cls, tmp_path, mapping_key: str):
200204 mapping_key (str): unique string for each mapping task (e.g. "map_57")
201205 """
202206 file_io .make_directory (
203- file_io .append_paths_to_pointer (tmp_path , cls .HISTOGRAMS_DIR ),
207+ file_io .append_paths_to_pointer (tmp_path , cls .ROW_COUNT_HISTOGRAMS_DIR ),
204208 exist_ok = True ,
205209 )
206- return file_io .append_paths_to_pointer (tmp_path , cls .HISTOGRAMS_DIR , f"{ mapping_key } .npz" )
210+ return file_io .append_paths_to_pointer (tmp_path , cls .ROW_COUNT_HISTOGRAMS_DIR , f"{ mapping_key } .npz" )
207211
208212 def get_remaining_split_keys (self ):
209213 """Gather remaining keys, dropping successful split tasks from done file names.
@@ -266,7 +270,7 @@ def get_alignment_file(
266270 highest_healpix_order (int): the highest healpix order (e.g. 5-10)
267271 lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order
268272 constrains the partitioning to prevent spatially large pixels.
269- threshold (int): the maximum number of objects allowed in a single pixel
273+ pixel_threshold (int): the maximum number of objects allowed in a single pixel
270274 drop_empty_siblings (bool): if 3 of 4 pixels are empty, keep only the non-empty pixel
271275 expected_total_rows (int): number of expected rows found in the dataset.
272276
0 commit comments