Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/hats_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class ImportArguments(RuntimeArguments):
"""when determining bins for the final partitioning, the maximum number
of rows for a single resulting pixel. we may combine hierarchically until
we near the ``pixel_threshold``"""
byte_pixel_threshold: int | None = None
"""when determining bins for the final partitioning, the maximum number
of rows for a single resulting pixel, expressed in bytes. we may combine hierarchically until
we near the ``byte_pixel_threshold``. if this is set, it will override
``pixel_threshold``."""
drop_empty_siblings: bool = True
"""when determining bins for the final partitioning, should we keep result pixels
at a higher order (smaller area) if the 3 sibling pixels are empty. setting this to
Expand Down Expand Up @@ -144,6 +149,13 @@ def _check_arguments(self):
if self.sort_columns:
raise ValueError("When using _healpix_29 for position, no sort columns should be added")

# Validate byte_pixel_threshold
if self.byte_pixel_threshold is not None:
if not isinstance(self.byte_pixel_threshold, int):
raise TypeError("byte_pixel_threshold must be an integer")
if self.byte_pixel_threshold < 0:
raise ValueError("byte_pixel_threshold must be non-negative")

# Basic checks complete - make more checks and create directories where necessary
self.input_paths = find_input_paths(self.input_path, "**/*.*", self.input_file_list)

Expand Down
118 changes: 111 additions & 7 deletions src/hats_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Import a set of non-hats files using dask for parallelization"""

import pickle
import sys
from collections import defaultdict

import cloudpickle
import hats.pixel_math.healpix_shim as hp
Expand Down Expand Up @@ -86,6 +88,7 @@ def map_to_pixels(
ra_column,
dec_column,
use_healpix_29=False,
threshold_mode="row_count",
):
"""Map a file of input objects to their healpix pixels.

Expand All @@ -99,6 +102,7 @@ def map_to_pixels(
highest_order (int): healpix order to use when mapping
ra_column (str): where to find right ascension data in the dataframe
dec_column (str): where to find declation in the dataframe
threshold_mode (str): mode for thresholding, either "row_count" or "mem_size".

Returns:
one-dimensional numpy array of long integers where the value at each index corresponds
Expand All @@ -108,14 +112,24 @@ def map_to_pixels(
FileNotFoundError: if the file does not exist, or is a directory
"""
try:
histo = HistogramAggregator(highest_order)

if use_healpix_29:
# Always generate the row-count histogram.
row_count_histo = HistogramAggregator(highest_order)
mem_size_histo = None
if threshold_mode == "mem_size":
mem_size_histo = HistogramAggregator(highest_order)

# Determine which columns to read from the input file. If we're using
# the bytewise/mem_size histogram, we need to read all columns to accurately
# estimate memory usage.
if threshold_mode == "mem_size":
read_columns = None
elif use_healpix_29:
read_columns = [SPATIAL_INDEX_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
# Iterate through the input file in chunks, mapping pixels and updating histograms.
for _, chunk_data, mapped_pixels in _iterate_input_file(
input_file,
pickled_reader_file,
highest_order,
Expand All @@ -124,18 +138,108 @@ def map_to_pixels(
use_healpix_29,
read_columns,
):
# Always add to row_count histogram.
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
row_count_histo.add(SparseHistogram(mapped_pixel, count_at_pixel, highest_order))

# If using bytewise/mem_size thresholding, also add to mem_size histogram.
if threshold_mode == "mem_size":
data_mem_sizes = _get_mem_size_of_chunk(chunk_data)
pixel_mem_sizes: dict[int, int] = defaultdict(int)
for pixel, mem_size in zip(mapped_pixels, data_mem_sizes, strict=True):
pixel_mem_sizes[pixel] += mem_size

# Turn our dict into two lists, the keys and vals, sorted so the keys are increasing
mapped_pixel_ids = np.array(list(pixel_mem_sizes.keys()), dtype=np.int64)
mapped_pixel_mem_sizes = np.array(list(pixel_mem_sizes.values()), dtype=np.int64)

if mem_size_histo is not None:
mem_size_histo.add(
SparseHistogram(mapped_pixel_ids, mapped_pixel_mem_sizes, highest_order)
)

histo.add(SparseHistogram(mapped_pixel, count_at_pixel, highest_order))

histo.to_sparse().to_file(
# Write row_count histogram to file.
row_count_histo.to_sparse().to_file(
ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key)
)
# If using bytewise/mem_size thresholding, also write mem_size histogram to a separate file.
if threshold_mode == "mem_size" and mem_size_histo is not None:
mem_size_histo.to_sparse().to_file(
ResumePlan.partial_histogram_file(
tmp_path=resume_path, mapping_key=f"{mapping_key}", which_histogram="mem_size"
)
)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage with file {input_file}", exception)
raise exception


def _get_row_mem_size_data_frame(row):
"""Given a pandas dataframe row (as a tuple), return the memory size of that row.

Args:
row (tuple): the row from the dataframe

Returns:
int: the memory size of the row in bytes
"""
total = 0

# Add the memory overhead of the row object itself.
total += sys.getsizeof(row)

# Then add the size of each item in the row.
for item in row:
if isinstance(item, np.ndarray):
total += item.nbytes + sys.getsizeof(item) # object data + object overhead
else:
total += sys.getsizeof(item)
return total


def _get_row_mem_size_pa_table(table, row_index):
"""Given a pyarrow table and a row index, return the memory size of that row.

Args:
table (pa.Table): the pyarrow table
row_index (int): the index of the row to measure

Returns:
int: the memory size of the row in bytes
"""
total = 0

# Add the memory overhead of the row object itself.
total += sys.getsizeof(row_index)

# Then add the size of each item in the row.
for column in table.itercolumns():
item = column[row_index]
if isinstance(item, np.ndarray):
total += item.nbytes + sys.getsizeof(item) # object data + object overhead
else:
total += sys.getsizeof(item.as_py())
return total


def _get_mem_size_of_chunk(data):
"""Given a 2D array of data, return a list of memory sizes for each row in the chunk.

Args:
data (pd.DataFrame or pa.Table): the data chunk to measure

Returns:
list[int]: list of memory sizes for each row in the chunk
"""
if isinstance(data, pd.DataFrame):
mem_sizes = [_get_row_mem_size_data_frame(row) for row in data.itertuples(index=False, name=None)]
elif isinstance(data, pa.Table):
mem_sizes = [_get_row_mem_size_pa_table(data, i) for i in range(data.num_rows)]
else:
raise NotImplementedError(f"Unsupported data type {type(data)} for memory size calculation")
return mem_sizes


def split_pixels(
input_file: UPath,
pickled_reader_file: str,
Expand Down
Loading