Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/catalog_inspection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Inspection Methods

Catalog.get_healpix_pixels
Catalog.get_ordered_healpix_pixels
Catalog.get_partition_file_paths
Catalog.get_partition_metadata
Catalog.get_memory_estimate
Catalog.aggregate_column_statistics
Catalog.per_pixel_statistics
Catalog.partitions
Expand Down
42 changes: 42 additions & 0 deletions docs/tutorials/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,48 @@ the `HATS <https://github.com/astronomy-commons/hats>`_ data format,
efficient algorithms,
and `Dask <https://dask.org/>`_ framework for parallel computing.

Estimating Memory Usage
------------------------

When working with large catalogs, especially remote ones, it's useful to estimate how much data will need to be
transferred or loaded before performing computations. LSDB provides methods to help you estimate memory usage
based on the selected columns and spatial regions.

The ``get_memory_estimate()`` method provides an upper bound on the amount of data that will be read from
disk (or transferred over the network) for the currently loaded columns and partitions::

import lsdb

# Load catalog with selected columns
catalog = lsdb.open_catalog("my_catalog", columns=["ra", "dec", "mag"])

# Get memory estimate
estimate = catalog.get_memory_estimate()
print(f"Estimated data size: {estimate['total_mb']:.2f} MB")
print(f"Columns: {estimate['columns']}")
print(f"Number of partitions: {estimate['num_partitions']}")

The estimate is based on the compressed size of columns in the Parquet files. Actual memory usage may be
lower due to:

- Row filtering from queries or searches
- Parquet compression (compressed size is typically smaller than in-memory size)
- Dask's lazy evaluation (not all partitions may need to be computed)

You can also get detailed metadata about each partition::

# Get detailed partition metadata
metadata = catalog.get_partition_metadata()
print(metadata[["pixel", "total_size_bytes"]])

# Or get the file paths for each partition
file_paths = catalog.get_partition_file_paths()
for pixel, path in file_paths.items():
print(f"{pixel}: {path}")

This is particularly useful when working with remote catalogs to understand the amount of data transfer
required before starting a computation.

Here, we demonstrate the results of LSDB performance tests for cross-matching operations,
performed on the `Bridges2 cluster at Pittsburgh Supercomputing Center <https://www.psc.edu/resources/bridges-2/>`_
using a single node with 128 cores and 256 GB of memory.
Expand Down
161 changes: 161 additions & 0 deletions src/lsdb/catalog/dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,167 @@ def get_partition_index(self, order: int, pixel: int) -> int:
partition_index = self._ddf_pixel_map[hp_pixel]
return partition_index

def get_partition_file_paths(self) -> dict[HealpixPixel, str]:
"""Get the file paths for all partitions in the catalog.

Returns:
Dictionary mapping HEALPix pixels to their parquet file paths.
The paths are returned as strings and may be local file paths or remote URIs.
"""
import hats.io as hc_io

pixels = self.get_healpix_pixels()
file_paths = {}
for pixel in pixels:
file_path = hc_io.pixel_catalog_file(
self.hc_structure.catalog_base_dir,
pixel,
npix_suffix=self.hc_structure.catalog_info.npix_suffix,
)
file_paths[pixel] = str(file_path)
return file_paths

def get_partition_metadata(self) -> pd.DataFrame:
"""Get detailed metadata for all partitions including file paths and sizes.

This method reads the Parquet file metadata for each partition to determine:
- The file path
- The total compressed size of the file
- The size of each column in the file

This provides an upper bound on the amount of data that would need to be
transferred or read when computing the catalog, since actual operations may
filter rows or benefit from compression.

Returns:
DataFrame with one row per partition, containing:
- pixel: The HEALPix pixel
- file_path: Path to the parquet file
- total_size_bytes: Total compressed size of all data in the file
- column_sizes: Dictionary mapping column names to their compressed sizes in bytes

Note:
For remote catalogs (e.g., HTTP, S3), this will make network requests to read
the parquet file metadata (footers). The actual data is not transferred.
"""
import pyarrow.parquet as pq

file_paths = self.get_partition_file_paths()
metadata_list = []

for pixel, file_path in file_paths.items():
try:
parquet_file = pq.ParquetFile(file_path)
metadata = parquet_file.metadata

# Calculate total size across all row groups
total_size = 0
column_sizes = {}

for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
total_size += rg.total_byte_size

# Accumulate column sizes
for j in range(rg.num_columns):
col = rg.column(j)
col_name = col.path_in_schema
if col_name not in column_sizes:
column_sizes[col_name] = 0
column_sizes[col_name] += col.total_compressed_size

metadata_list.append(
{
"pixel": pixel,
"file_path": file_path,
"total_size_bytes": total_size,
"column_sizes": column_sizes,
}
)
except (FileNotFoundError, PermissionError, OSError) as e:
# If we can't read metadata for a partition (file not found, permission denied,
# or other I/O errors), include it with None values
metadata_list.append(
{
"pixel": pixel,
"file_path": file_path,
"total_size_bytes": None,
"column_sizes": None,
}
)

return pd.DataFrame(metadata_list)

def get_memory_estimate(self, include_index: bool = False) -> dict:
"""Estimate the upper bound of memory usage for the loaded columns.

This method provides an estimate of the amount of data that would need to be
read from disk (or transferred over the network) to compute the catalog with
the currently selected columns. The estimate is based on the compressed size
of the columns in the parquet files.

The actual memory usage may be lower due to:
- Row filtering (queries, searches)
- Decompression ratios (compressed size < uncompressed size usually)
- Dask's lazy evaluation (not all partitions may be computed)

Args:
include_index (bool): Whether to include the HEALPix index column in the estimate.
Default is False.

Returns:
Dictionary containing:
- total_bytes: Total estimated bytes across all partitions and selected columns
- total_kb: Total in kilobytes
- total_mb: Total in megabytes
- total_gb: Total in gigabytes
- per_column_bytes: Dictionary of column names to their total sizes
- num_partitions: Number of partitions included in the estimate
- columns: List of columns included in the estimate

Example:
>>> catalog = lsdb.open_catalog("my_catalog", columns=["ra", "dec"])
>>> estimate = catalog.get_memory_estimate()
>>> print(f"Estimated data size: {estimate['total_mb']:.2f} MB")
>>> print(f"Columns: {estimate['columns']}")
"""
partition_metadata = self.get_partition_metadata()

# Get the currently loaded columns (schema columns)
loaded_columns = set(self._ddf.columns)

# Determine index column and add it to loaded columns if requested
index_column = self._ddf.index.name
if include_index and index_column:
# Include the index column if requested
loaded_columns.add(index_column)

# Calculate total size for loaded columns
total_bytes = 0
per_column_bytes = {}

for _, row in partition_metadata.iterrows():
column_sizes = row["column_sizes"]
if column_sizes is None:
continue

for col_name, col_size in column_sizes.items():
if col_name in loaded_columns:
if col_name not in per_column_bytes:
per_column_bytes[col_name] = 0
per_column_bytes[col_name] += col_size
total_bytes += col_size

return {
"total_bytes": total_bytes,
"total_kb": total_bytes / 1024,
"total_mb": total_bytes / (1024**2),
"total_gb": total_bytes / (1024**3),
"per_column_bytes": per_column_bytes,
"num_partitions": len(partition_metadata),
"columns": sorted(list(loaded_columns)),
}

@property
def partitions(self):
"""Returns the partitions of the catalog"""
Expand Down
169 changes: 169 additions & 0 deletions tests/lsdb/catalog/test_memory_estimate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Tests for memory estimation and partition metadata methods."""

import pytest
import pandas as pd
from hats.pixel_math import HealpixPixel
import lsdb


def test_get_partition_file_paths(small_sky_order1_catalog):
"""Test that we can retrieve file paths for all partitions."""
file_paths = small_sky_order1_catalog.get_partition_file_paths()

# Check that we get a dictionary
assert isinstance(file_paths, dict)

# Check that we have paths for all pixels
pixels = small_sky_order1_catalog.get_healpix_pixels()
assert len(file_paths) == len(pixels)

# Check that all pixels are present
for pixel in pixels:
assert pixel in file_paths

# Check that paths are strings and contain expected patterns
for pixel, path in file_paths.items():
assert isinstance(path, str)
assert "Npix=" in path
assert str(pixel.pixel) in path


def test_get_partition_metadata(small_sky_order1_catalog):
"""Test that we can retrieve detailed metadata for partitions."""
metadata_df = small_sky_order1_catalog.get_partition_metadata()

# Check that we get a DataFrame
assert isinstance(metadata_df, pd.DataFrame)

# Check expected columns
assert "pixel" in metadata_df.columns
assert "file_path" in metadata_df.columns
assert "total_size_bytes" in metadata_df.columns
assert "column_sizes" in metadata_df.columns

# Check that we have metadata for all pixels
pixels = small_sky_order1_catalog.get_healpix_pixels()
assert len(metadata_df) == len(pixels)

# Check that file sizes are positive integers
for _, row in metadata_df.iterrows():
assert row["total_size_bytes"] > 0
assert isinstance(row["total_size_bytes"], int)

# Check that column sizes are dictionaries
for _, row in metadata_df.iterrows():
assert isinstance(row["column_sizes"], dict)
assert len(row["column_sizes"]) > 0
# All column sizes should be positive
for col_name, col_size in row["column_sizes"].items():
assert col_size > 0
assert isinstance(col_name, str)


def test_get_memory_estimate_all_columns(small_sky_order1_catalog):
"""Test memory estimation with all columns loaded."""
estimate = small_sky_order1_catalog.get_memory_estimate()

# Check that we get a dictionary with expected keys
assert isinstance(estimate, dict)
assert "total_bytes" in estimate
assert "total_kb" in estimate
assert "total_mb" in estimate
assert "total_gb" in estimate
assert "per_column_bytes" in estimate
assert "num_partitions" in estimate
assert "columns" in estimate

# Check that total bytes is positive
assert estimate["total_bytes"] > 0

# Check that conversions are correct
assert estimate["total_kb"] == estimate["total_bytes"] / 1024
assert estimate["total_mb"] == estimate["total_bytes"] / (1024**2)
assert estimate["total_gb"] == estimate["total_bytes"] / (1024**3)

# Check that per-column bytes adds up to total
per_column_sum = sum(estimate["per_column_bytes"].values())
assert per_column_sum == estimate["total_bytes"]

# Check that num_partitions matches catalog
assert estimate["num_partitions"] == small_sky_order1_catalog.npartitions

# Check that columns list is populated
assert len(estimate["columns"]) > 0


def test_get_memory_estimate_subset_columns(small_sky_order1_dir):
"""Test memory estimation with only a subset of columns loaded."""
# Load catalog with only a subset of columns
catalog = lsdb.open_catalog(small_sky_order1_dir, columns=["ra", "dec"])

estimate = catalog.get_memory_estimate()

# Check that total bytes is positive but smaller than full catalog
assert estimate["total_bytes"] > 0

# Check that only the loaded columns are included
assert "ra" in estimate["columns"]
assert "dec" in estimate["columns"]
assert "id" not in estimate["columns"]

# Check that per_column_bytes only has the loaded columns
assert "ra" in estimate["per_column_bytes"]
assert "dec" in estimate["per_column_bytes"]
assert "id" not in estimate["per_column_bytes"]


def test_get_memory_estimate_with_index(small_sky_order1_catalog):
"""Test memory estimation including the index column."""
estimate_without_index = small_sky_order1_catalog.get_memory_estimate(include_index=False)
estimate_with_index = small_sky_order1_catalog.get_memory_estimate(include_index=True)

# The estimate with index should be larger
assert estimate_with_index["total_bytes"] > estimate_without_index["total_bytes"]

# The index column should be included when include_index=True
index_name = small_sky_order1_catalog._ddf.index.name
if index_name:
assert index_name in estimate_with_index["columns"]
assert index_name not in estimate_without_index["columns"]


def test_get_memory_estimate_after_search(small_sky_order1_catalog):
"""Test memory estimation after filtering with a search."""
# Perform a cone search that will reduce partitions
filtered_catalog = small_sky_order1_catalog.cone_search(0, 0, 1)

# The filtered catalog should have fewer partitions
assert filtered_catalog.npartitions <= small_sky_order1_catalog.npartitions

# Get memory estimate for filtered catalog
estimate = filtered_catalog.get_memory_estimate()

# Check that estimate is for the filtered partitions
assert estimate["num_partitions"] == filtered_catalog.npartitions

# If fewer partitions, the estimate should be smaller
if filtered_catalog.npartitions < small_sky_order1_catalog.npartitions:
full_estimate = small_sky_order1_catalog.get_memory_estimate()
assert estimate["total_bytes"] < full_estimate["total_bytes"]


def test_partition_metadata_consistency(small_sky_order1_catalog):
"""Test that partition metadata is consistent across calls."""
metadata1 = small_sky_order1_catalog.get_partition_metadata()
metadata2 = small_sky_order1_catalog.get_partition_metadata()

# Should get the same results
pd.testing.assert_frame_equal(metadata1, metadata2)


def test_file_paths_point_to_existing_pixels(small_sky_order1_catalog):
"""Test that file paths reference the correct pixel numbers."""
file_paths = small_sky_order1_catalog.get_partition_file_paths()

for pixel, path in file_paths.items():
# The path should contain the pixel number
assert f"Npix={pixel.pixel}" in path
# The path should contain the order
assert f"Norder={pixel.order}" in path