diff --git a/src/hats_import/catalog/arguments.py b/src/hats_import/catalog/arguments.py index ca0d1ad6..600b51d7 100644 --- a/src/hats_import/catalog/arguments.py +++ b/src/hats_import/catalog/arguments.py @@ -84,6 +84,11 @@ class ImportArguments(RuntimeArguments): """when determining bins for the final partitioning, the maximum number of rows for a single resulting pixel. we may combine hierarchically until we near the ``pixel_threshold``""" + byte_pixel_threshold: int | None = None + """when determining bins for the final partitioning, the maximum number + of rows for a single resulting pixel, expressed in bytes. we may combine hierarchically until + we near the ``byte_pixel_threshold``. if this is set, it will override + ``pixel_threshold``.""" drop_empty_siblings: bool = True """when determining bins for the final partitioning, should we keep result pixels at a higher order (smaller area) if the 3 sibling pixels are empty. setting this to @@ -141,6 +146,13 @@ def _check_arguments(self): if self.sort_columns: raise ValueError("When using _healpix_29 for position, no sort columns should be added") + # Validate byte_pixel_threshold + if self.byte_pixel_threshold is not None: + if not isinstance(self.byte_pixel_threshold, int): + raise TypeError("byte_pixel_threshold must be an integer") + if self.byte_pixel_threshold < 0: + raise ValueError("byte_pixel_threshold must be non-negative") + # Basic checks complete - make more checks and create directories where necessary self.input_paths = find_input_paths(self.input_path, "**/*.*", self.input_file_list) diff --git a/src/hats_import/catalog/map_reduce.py b/src/hats_import/catalog/map_reduce.py index d3c1bd62..c1e8e4e2 100644 --- a/src/hats_import/catalog/map_reduce.py +++ b/src/hats_import/catalog/map_reduce.py @@ -1,6 +1,8 @@ """Import a set of non-hats files using dask for parallelization""" import pickle +import sys +from collections import defaultdict import cloudpickle import hats.pixel_math.healpix_shim as hp @@ -85,6 +87,7 @@ def map_to_pixels( ra_column, dec_column, use_healpix_29=False, + threshold_mode="row_count", ): """Map a file of input objects to their healpix pixels. @@ -98,6 +101,7 @@ def map_to_pixels( highest_order (int): healpix order to use when mapping ra_column (str): where to find right ascension data in the dataframe dec_column (str): where to find declation in the dataframe + threshold_mode (str): mode for thresholding, either "row_count" or "mem_size". Returns: one-dimensional numpy array of long integers where the value at each index corresponds @@ -107,14 +111,24 @@ def map_to_pixels( FileNotFoundError: if the file does not exist, or is a directory """ try: - histo = HistogramAggregator(highest_order) - - if use_healpix_29: + # Always generate the row-count histogram. + row_count_histo = HistogramAggregator(highest_order) + mem_size_histo = None + if threshold_mode == "mem_size": + mem_size_histo = HistogramAggregator(highest_order) + + # Determine which columns to read from the input file. If we're using + # the bytewise/mem_size histogram, we need to read all columns to accurately + # estimate memory usage. + if threshold_mode == "mem_size": + read_columns = None + elif use_healpix_29: read_columns = [SPATIAL_INDEX_COLUMN] else: read_columns = [ra_column, dec_column] - for _, _, mapped_pixels in _iterate_input_file( + # Iterate through the input file in chunks, mapping pixels and updating histograms. + for _, chunk_data, mapped_pixels in _iterate_input_file( input_file, pickled_reader_file, highest_order, @@ -123,18 +137,108 @@ def map_to_pixels( use_healpix_29, read_columns, ): + # Always add to row_count histogram. mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) + row_count_histo.add(SparseHistogram(mapped_pixel, count_at_pixel, highest_order)) + + # If using bytewise/mem_size thresholding, also add to mem_size histogram. + if threshold_mode == "mem_size": + data_mem_sizes = _get_mem_size_of_chunk(chunk_data) + pixel_mem_sizes: dict[int, int] = defaultdict(int) + for pixel, mem_size in zip(mapped_pixels, data_mem_sizes, strict=True): + pixel_mem_sizes[pixel] += mem_size + + # Turn our dict into two lists, the keys and vals, sorted so the keys are increasing + mapped_pixel_ids = np.array(list(pixel_mem_sizes.keys()), dtype=np.int64) + mapped_pixel_mem_sizes = np.array(list(pixel_mem_sizes.values()), dtype=np.int64) + + if mem_size_histo is not None: + mem_size_histo.add( + SparseHistogram(mapped_pixel_ids, mapped_pixel_mem_sizes, highest_order) + ) - histo.add(SparseHistogram(mapped_pixel, count_at_pixel, highest_order)) - - histo.to_sparse().to_file( + # Write row_count histogram to file. + row_count_histo.to_sparse().to_file( ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key) ) + # If using bytewise/mem_size thresholding, also write mem_size histogram to a separate file. + if threshold_mode == "mem_size" and mem_size_histo is not None: + mem_size_histo.to_sparse().to_file( + ResumePlan.partial_histogram_file( + tmp_path=resume_path, mapping_key=f"{mapping_key}", which_histogram="mem_size" + ) + ) except Exception as exception: # pylint: disable=broad-exception-caught print_task_failure(f"Failed MAPPING stage with file {input_file}", exception) raise exception +def _get_row_mem_size_data_frame(row): + """Given a pandas dataframe row (as a tuple), return the memory size of that row. + + Args: + row (tuple): the row from the dataframe + + Returns: + int: the memory size of the row in bytes + """ + total = 0 + + # Add the memory overhead of the row object itself. + total += sys.getsizeof(row) + + # Then add the size of each item in the row. + for item in row: + if isinstance(item, np.ndarray): + total += item.nbytes + sys.getsizeof(item) # object data + object overhead + else: + total += sys.getsizeof(item) + return total + + +def _get_row_mem_size_pa_table(table, row_index): + """Given a pyarrow table and a row index, return the memory size of that row. + + Args: + table (pa.Table): the pyarrow table + row_index (int): the index of the row to measure + + Returns: + int: the memory size of the row in bytes + """ + total = 0 + + # Add the memory overhead of the row object itself. + total += sys.getsizeof(row_index) + + # Then add the size of each item in the row. + for column in table.itercolumns(): + item = column[row_index] + if isinstance(item, np.ndarray): + total += item.nbytes + sys.getsizeof(item) # object data + object overhead + else: + total += sys.getsizeof(item.as_py()) + return total + + +def _get_mem_size_of_chunk(data): + """Given a 2D array of data, return a list of memory sizes for each row in the chunk. + + Args: + data (pd.DataFrame or pa.Table): the data chunk to measure + + Returns: + list[int]: list of memory sizes for each row in the chunk + """ + if isinstance(data, pd.DataFrame): + mem_sizes = [_get_row_mem_size_data_frame(row) for row in data.itertuples(index=False, name=None)] + elif isinstance(data, pa.Table): + mem_sizes = [_get_row_mem_size_pa_table(data, i) for i in range(data.num_rows)] + else: + raise NotImplementedError(f"Unsupported data type {type(data)} for memory size calculation") + return mem_sizes + + def split_pixels( input_file: UPath, pickled_reader_file: str, diff --git a/src/hats_import/catalog/resume_plan.py b/src/hats_import/catalog/resume_plan.py index 1ee8d367..4543ba42 100644 --- a/src/hats_import/catalog/resume_plan.py +++ b/src/hats_import/catalog/resume_plan.py @@ -23,13 +23,16 @@ class ResumePlan(PipelineResumePlan): """Container class for holding the state of each file in the pipeline plan.""" input_paths: list[UPath] = field(default_factory=list) - """resolved list of all files that will be used in the importer""" + """Resolved list of all files that will be used in the importer""" map_files: list[tuple[str, str]] = field(default_factory=list) - """list of files (and job keys) that have yet to be mapped""" + """List of files (and job keys) that have yet to be mapped""" split_keys: list[tuple[str, str]] = field(default_factory=list) - """set of files (and job keys) that have yet to be split""" + """Set of files (and job keys) that have yet to be split""" destination_pixel_map: dict[HealpixPixel, int] | None = None """Destination pixels and their expected final count""" + threshold_mode: str = "row_count" + """Which mode to use for partitioning: 'row_count' or 'mem_size'. + Determines whether to create additional mem_size histogram.""" should_run_mapping: bool = True should_run_splitting: bool = True should_run_reducing: bool = True @@ -41,6 +44,8 @@ class ResumePlan(PipelineResumePlan): ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz" ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms" + MEM_SIZE_HISTOGRAM_BINARY_FILE = "mem_size_mapping_histogram.npz" + MEM_SIZE_HISTOGRAMS_DIR = "mem_size_histograms" ALIGNMENT_FILE = "alignment.pickle" @@ -63,6 +68,10 @@ def __init__( if import_args.debug_stats_only: run_stages = ["mapping", "finishing"] self.input_paths = import_args.input_paths + + # Set threshold_mode based on byte_pixel_threshold + if hasattr(import_args, "byte_pixel_threshold") and import_args.byte_pixel_threshold is not None: + self.threshold_mode = "mem_size" else: super().__init__( resume=resume, @@ -118,6 +127,13 @@ def gather_plan(self, run_stages: list[str] | None = None): file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), exist_ok=True, ) + # If using mem_size thresholding, gather those keys too. + if self.threshold_mode == "mem_size": + self.get_remaining_map_keys(which_histogram="mem_size") + file_io.make_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.MEM_SIZE_HISTOGRAMS_DIR), + exist_ok=True, + ) if self.should_run_splitting: if not (mapping_done or self.should_run_mapping): raise ValueError("mapping must be complete before splitting") @@ -139,46 +155,88 @@ def gather_plan(self, run_stages: list[str] | None = None): ) step_progress.update(1) - def get_remaining_map_keys(self): + def get_remaining_map_keys(self, which_histogram: str = "row_count"): """Gather remaining keys, dropping successful mapping tasks from histogram names. + Args: + which_histogram (str): Which histogram to check for completed tasks, either 'row_count' + or 'mem_size'. Defaults to 'row_count'. + Returns: - list of mapping keys *not* found in files like /resume/path/mapping_key.npz + list of tuple: The mapping keys *not* found in files like /resume/path/mapping_key.npz, + along with their corresponding input file paths. + + Raises: + ValueError: If `which_histogram` is not recognized, or if which_histogram is + 'mem_size' but the threshold_mode is not 'mem_size'. """ - prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR + if which_histogram == "row_count": + prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR + elif which_histogram == "mem_size" and self.threshold_mode == "mem_size": + prefix = file_io.get_upath(self.tmp_path) / self.MEM_SIZE_HISTOGRAMS_DIR + elif which_histogram == "mem_size": + raise ValueError("Cannot get remaining mem_size map keys when threshold_mode is not 'mem_size'.") + else: + raise ValueError(f"Unrecognized which_histogram value: {which_histogram}") + map_file_pattern = re.compile(r"map_(\d+).npz") done_indexes = [int(map_file_pattern.match(path.name).group(1)) for path in prefix.glob("*.npz")] remaining_indexes = list(set(range(0, len(self.input_paths))) - (set(done_indexes))) return [(f"map_{key}", self.input_paths[key]) for key in remaining_indexes] - def read_histogram(self, healpix_order): - """Return histogram with healpix_order'd shape + def read_histogram(self, healpix_order, which_histogram: str = "row_count"): + """Returns a histogram with the specified Healpix order's shape. + + This method attempts the following steps in order: + 1. Tries to locate and return a combined histogram. + 2. If a combined histogram is unavailable, combines partial histograms to create one. + 3. If no partial histograms are found, returns an empty histogram. + + Args: + healpix_order (int): The desired Healpix order for the histogram. + which_histogram (str): Which histogram to read, either "row_count" or "mem_size". + Defaults to "row_count". + - - Try to find a combined histogram - - Otherwise, combine histograms from partials - - Otherwise, return an empty histogram + Returns: + numpy.ndarray: A one-dimensional array representing the histogram with the + specified Healpix order. + + Raises: + RuntimeError: If there are incomplete mapping stages. + ValueError: If the histogram from the previous execution is incompatible with + the highest Healpix order, or if `which_histogram` is invalid. """ - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) + if which_histogram == "row_count": + histogram_binary_file = self.ROW_COUNT_HISTOGRAM_BINARY_FILE + histogram_directory = self.ROW_COUNT_HISTOGRAMS_DIR + elif which_histogram == "mem_size" and self.threshold_mode == "mem_size": + histogram_binary_file = self.MEM_SIZE_HISTOGRAM_BINARY_FILE + histogram_directory = self.MEM_SIZE_HISTOGRAMS_DIR + elif which_histogram == "mem_size": + raise ValueError("Cannot read mem_size histogram when threshold_mode is not 'mem_size'.") + else: + raise ValueError(f"Unrecognized which_histogram value: {which_histogram}") - # Otherwise, read the histogram from partial histograms and combine. + file_name = file_io.append_paths_to_pointer(self.tmp_path, histogram_binary_file) + + # If no file, read the histogram from partial histograms and combine. if not file_io.does_file_or_directory_exist(file_name): remaining_map_files = self.get_remaining_map_keys() if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") - histogram_files = file_io.find_files_matching_path( - self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR, "*.npz" - ) + histogram_files = file_io.find_files_matching_path(self.tmp_path, histogram_directory, "*.npz") aggregate_histogram = HistogramAggregator(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) aggregate_histogram.add(partial) - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE) + file_name = file_io.append_paths_to_pointer(self.tmp_path, histogram_binary_file) with open(file_name, "wb+") as file_handle: file_handle.write(aggregate_histogram.full_histogram) if self.delete_resume_log_files: file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(self.tmp_path, histogram_directory), ignore_errors=True, ) @@ -194,7 +252,7 @@ def read_histogram(self, healpix_order): return full_histogram @classmethod - def partial_histogram_file(cls, tmp_path, mapping_key: str): + def partial_histogram_file(cls, tmp_path, mapping_key: str, which_histogram: str = "row_count"): """File name for writing a histogram file to a special intermediate directory. As a side effect, this method may create the special intermediate directory. @@ -202,12 +260,24 @@ def partial_histogram_file(cls, tmp_path, mapping_key: str): Args: tmp_path (str): where to write intermediate resume files. mapping_key (str): unique string for each mapping task (e.g. "map_57") + which_histogram (str): which histogram to write, either "row_count" or "mem_size". + Defaults to "row_count". + + Returns: + str: Full path to the partial histogram file. """ + if which_histogram == "row_count": + histograms_dir = cls.ROW_COUNT_HISTOGRAMS_DIR + elif which_histogram == "mem_size": + histograms_dir = cls.MEM_SIZE_HISTOGRAMS_DIR + else: + raise ValueError(f"Unrecognized which_histogram value: {which_histogram}") + file_io.make_directory( - file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR), + file_io.append_paths_to_pointer(tmp_path, histograms_dir), exist_ok=True, ) - return file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR, f"{mapping_key}.npz") + return file_io.append_paths_to_pointer(tmp_path, histograms_dir, f"{mapping_key}.npz") def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. diff --git a/src/hats_import/catalog/run_import.py b/src/hats_import/catalog/run_import.py index e44df3fa..46f2d269 100644 --- a/src/hats_import/catalog/run_import.py +++ b/src/hats_import/catalog/run_import.py @@ -52,8 +52,28 @@ def run(args, client): ) resume_plan.wait_for_mapping(futures) + # If we are partitioning by memory size, run the mapping for the mem_size histogram as well. + if resume_plan.threshold_mode == "mem_size": + futures = [] + for key, file_path in resume_plan.map_files: + futures.append( + client.submit( + mr.map_to_pixels, + input_file=file_path, + resume_path=resume_plan.tmp_path, + pickled_reader_file=pickled_reader_file, + mapping_key=key, + highest_order=args.mapping_healpix_order, + ra_column=args.ra_column, + dec_column=args.dec_column, + use_healpix_29=args.use_healpix_29, + threshold_mode="mem_size", + ) + ) + resume_plan.wait_for_mapping(futures) + with resume_plan.print_progress(total=2, stage_name="Binning") as step_progress: - raw_histogram = resume_plan.read_histogram(args.mapping_healpix_order) + raw_histogram = resume_plan.read_histogram(args.mapping_healpix_order, which_histogram="row_count") total_rows = int(raw_histogram.sum()) if args.expected_total_rows > 0 and args.expected_total_rows != total_rows: raise ValueError(