diff --git a/docs/reference/catalog_inspection.rst b/docs/reference/catalog_inspection.rst index 13f0a494b..ce29440e5 100644 --- a/docs/reference/catalog_inspection.rst +++ b/docs/reference/catalog_inspection.rst @@ -8,6 +8,9 @@ Inspection Methods Catalog.get_healpix_pixels Catalog.get_ordered_healpix_pixels + Catalog.get_partition_file_paths + Catalog.get_partition_metadata + Catalog.get_memory_estimate Catalog.aggregate_column_statistics Catalog.per_pixel_statistics Catalog.partitions diff --git a/docs/tutorials/performance.rst b/docs/tutorials/performance.rst index 1f1ac0f7b..5289a7bc7 100644 --- a/docs/tutorials/performance.rst +++ b/docs/tutorials/performance.rst @@ -8,6 +8,48 @@ the `HATS `_ data format, efficient algorithms, and `Dask `_ framework for parallel computing. +Estimating Memory Usage +------------------------ + +When working with large catalogs, especially remote ones, it's useful to estimate how much data will need to be +transferred or loaded before performing computations. LSDB provides methods to help you estimate memory usage +based on the selected columns and spatial regions. + +The ``get_memory_estimate()`` method provides an upper bound on the amount of data that will be read from +disk (or transferred over the network) for the currently loaded columns and partitions:: + + import lsdb + + # Load catalog with selected columns + catalog = lsdb.open_catalog("my_catalog", columns=["ra", "dec", "mag"]) + + # Get memory estimate + estimate = catalog.get_memory_estimate() + print(f"Estimated data size: {estimate['total_mb']:.2f} MB") + print(f"Columns: {estimate['columns']}") + print(f"Number of partitions: {estimate['num_partitions']}") + +The estimate is based on the compressed size of columns in the Parquet files. Actual memory usage may be +lower due to: + +- Row filtering from queries or searches +- Parquet compression (compressed size is typically smaller than in-memory size) +- Dask's lazy evaluation (not all partitions may need to be computed) + +You can also get detailed metadata about each partition:: + + # Get detailed partition metadata + metadata = catalog.get_partition_metadata() + print(metadata[["pixel", "total_size_bytes"]]) + + # Or get the file paths for each partition + file_paths = catalog.get_partition_file_paths() + for pixel, path in file_paths.items(): + print(f"{pixel}: {path}") + +This is particularly useful when working with remote catalogs to understand the amount of data transfer +required before starting a computation. + Here, we demonstrate the results of LSDB performance tests for cross-matching operations, performed on the `Bridges2 cluster at Pittsburgh Supercomputing Center `_ using a single node with 128 cores and 256 GB of memory. diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index 0bb556c47..2fdabc62f 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -322,6 +322,167 @@ def get_partition_index(self, order: int, pixel: int) -> int: partition_index = self._ddf_pixel_map[hp_pixel] return partition_index + def get_partition_file_paths(self) -> dict[HealpixPixel, str]: + """Get the file paths for all partitions in the catalog. + + Returns: + Dictionary mapping HEALPix pixels to their parquet file paths. + The paths are returned as strings and may be local file paths or remote URIs. + """ + import hats.io as hc_io + + pixels = self.get_healpix_pixels() + file_paths = {} + for pixel in pixels: + file_path = hc_io.pixel_catalog_file( + self.hc_structure.catalog_base_dir, + pixel, + npix_suffix=self.hc_structure.catalog_info.npix_suffix, + ) + file_paths[pixel] = str(file_path) + return file_paths + + def get_partition_metadata(self) -> pd.DataFrame: + """Get detailed metadata for all partitions including file paths and sizes. + + This method reads the Parquet file metadata for each partition to determine: + - The file path + - The total compressed size of the file + - The size of each column in the file + + This provides an upper bound on the amount of data that would need to be + transferred or read when computing the catalog, since actual operations may + filter rows or benefit from compression. + + Returns: + DataFrame with one row per partition, containing: + - pixel: The HEALPix pixel + - file_path: Path to the parquet file + - total_size_bytes: Total compressed size of all data in the file + - column_sizes: Dictionary mapping column names to their compressed sizes in bytes + + Note: + For remote catalogs (e.g., HTTP, S3), this will make network requests to read + the parquet file metadata (footers). The actual data is not transferred. + """ + import pyarrow.parquet as pq + + file_paths = self.get_partition_file_paths() + metadata_list = [] + + for pixel, file_path in file_paths.items(): + try: + parquet_file = pq.ParquetFile(file_path) + metadata = parquet_file.metadata + + # Calculate total size across all row groups + total_size = 0 + column_sizes = {} + + for i in range(metadata.num_row_groups): + rg = metadata.row_group(i) + total_size += rg.total_byte_size + + # Accumulate column sizes + for j in range(rg.num_columns): + col = rg.column(j) + col_name = col.path_in_schema + if col_name not in column_sizes: + column_sizes[col_name] = 0 + column_sizes[col_name] += col.total_compressed_size + + metadata_list.append( + { + "pixel": pixel, + "file_path": file_path, + "total_size_bytes": total_size, + "column_sizes": column_sizes, + } + ) + except (FileNotFoundError, PermissionError, OSError) as e: + # If we can't read metadata for a partition (file not found, permission denied, + # or other I/O errors), include it with None values + metadata_list.append( + { + "pixel": pixel, + "file_path": file_path, + "total_size_bytes": None, + "column_sizes": None, + } + ) + + return pd.DataFrame(metadata_list) + + def get_memory_estimate(self, include_index: bool = False) -> dict: + """Estimate the upper bound of memory usage for the loaded columns. + + This method provides an estimate of the amount of data that would need to be + read from disk (or transferred over the network) to compute the catalog with + the currently selected columns. The estimate is based on the compressed size + of the columns in the parquet files. + + The actual memory usage may be lower due to: + - Row filtering (queries, searches) + - Decompression ratios (compressed size < uncompressed size usually) + - Dask's lazy evaluation (not all partitions may be computed) + + Args: + include_index (bool): Whether to include the HEALPix index column in the estimate. + Default is False. + + Returns: + Dictionary containing: + - total_bytes: Total estimated bytes across all partitions and selected columns + - total_kb: Total in kilobytes + - total_mb: Total in megabytes + - total_gb: Total in gigabytes + - per_column_bytes: Dictionary of column names to their total sizes + - num_partitions: Number of partitions included in the estimate + - columns: List of columns included in the estimate + + Example: + >>> catalog = lsdb.open_catalog("my_catalog", columns=["ra", "dec"]) + >>> estimate = catalog.get_memory_estimate() + >>> print(f"Estimated data size: {estimate['total_mb']:.2f} MB") + >>> print(f"Columns: {estimate['columns']}") + """ + partition_metadata = self.get_partition_metadata() + + # Get the currently loaded columns (schema columns) + loaded_columns = set(self._ddf.columns) + + # Determine index column and add it to loaded columns if requested + index_column = self._ddf.index.name + if include_index and index_column: + # Include the index column if requested + loaded_columns.add(index_column) + + # Calculate total size for loaded columns + total_bytes = 0 + per_column_bytes = {} + + for _, row in partition_metadata.iterrows(): + column_sizes = row["column_sizes"] + if column_sizes is None: + continue + + for col_name, col_size in column_sizes.items(): + if col_name in loaded_columns: + if col_name not in per_column_bytes: + per_column_bytes[col_name] = 0 + per_column_bytes[col_name] += col_size + total_bytes += col_size + + return { + "total_bytes": total_bytes, + "total_kb": total_bytes / 1024, + "total_mb": total_bytes / (1024**2), + "total_gb": total_bytes / (1024**3), + "per_column_bytes": per_column_bytes, + "num_partitions": len(partition_metadata), + "columns": sorted(list(loaded_columns)), + } + @property def partitions(self): """Returns the partitions of the catalog""" diff --git a/tests/lsdb/catalog/test_memory_estimate.py b/tests/lsdb/catalog/test_memory_estimate.py new file mode 100644 index 000000000..be913a9bd --- /dev/null +++ b/tests/lsdb/catalog/test_memory_estimate.py @@ -0,0 +1,169 @@ +"""Tests for memory estimation and partition metadata methods.""" + +import pytest +import pandas as pd +from hats.pixel_math import HealpixPixel +import lsdb + + +def test_get_partition_file_paths(small_sky_order1_catalog): + """Test that we can retrieve file paths for all partitions.""" + file_paths = small_sky_order1_catalog.get_partition_file_paths() + + # Check that we get a dictionary + assert isinstance(file_paths, dict) + + # Check that we have paths for all pixels + pixels = small_sky_order1_catalog.get_healpix_pixels() + assert len(file_paths) == len(pixels) + + # Check that all pixels are present + for pixel in pixels: + assert pixel in file_paths + + # Check that paths are strings and contain expected patterns + for pixel, path in file_paths.items(): + assert isinstance(path, str) + assert "Npix=" in path + assert str(pixel.pixel) in path + + +def test_get_partition_metadata(small_sky_order1_catalog): + """Test that we can retrieve detailed metadata for partitions.""" + metadata_df = small_sky_order1_catalog.get_partition_metadata() + + # Check that we get a DataFrame + assert isinstance(metadata_df, pd.DataFrame) + + # Check expected columns + assert "pixel" in metadata_df.columns + assert "file_path" in metadata_df.columns + assert "total_size_bytes" in metadata_df.columns + assert "column_sizes" in metadata_df.columns + + # Check that we have metadata for all pixels + pixels = small_sky_order1_catalog.get_healpix_pixels() + assert len(metadata_df) == len(pixels) + + # Check that file sizes are positive integers + for _, row in metadata_df.iterrows(): + assert row["total_size_bytes"] > 0 + assert isinstance(row["total_size_bytes"], int) + + # Check that column sizes are dictionaries + for _, row in metadata_df.iterrows(): + assert isinstance(row["column_sizes"], dict) + assert len(row["column_sizes"]) > 0 + # All column sizes should be positive + for col_name, col_size in row["column_sizes"].items(): + assert col_size > 0 + assert isinstance(col_name, str) + + +def test_get_memory_estimate_all_columns(small_sky_order1_catalog): + """Test memory estimation with all columns loaded.""" + estimate = small_sky_order1_catalog.get_memory_estimate() + + # Check that we get a dictionary with expected keys + assert isinstance(estimate, dict) + assert "total_bytes" in estimate + assert "total_kb" in estimate + assert "total_mb" in estimate + assert "total_gb" in estimate + assert "per_column_bytes" in estimate + assert "num_partitions" in estimate + assert "columns" in estimate + + # Check that total bytes is positive + assert estimate["total_bytes"] > 0 + + # Check that conversions are correct + assert estimate["total_kb"] == estimate["total_bytes"] / 1024 + assert estimate["total_mb"] == estimate["total_bytes"] / (1024**2) + assert estimate["total_gb"] == estimate["total_bytes"] / (1024**3) + + # Check that per-column bytes adds up to total + per_column_sum = sum(estimate["per_column_bytes"].values()) + assert per_column_sum == estimate["total_bytes"] + + # Check that num_partitions matches catalog + assert estimate["num_partitions"] == small_sky_order1_catalog.npartitions + + # Check that columns list is populated + assert len(estimate["columns"]) > 0 + + +def test_get_memory_estimate_subset_columns(small_sky_order1_dir): + """Test memory estimation with only a subset of columns loaded.""" + # Load catalog with only a subset of columns + catalog = lsdb.open_catalog(small_sky_order1_dir, columns=["ra", "dec"]) + + estimate = catalog.get_memory_estimate() + + # Check that total bytes is positive but smaller than full catalog + assert estimate["total_bytes"] > 0 + + # Check that only the loaded columns are included + assert "ra" in estimate["columns"] + assert "dec" in estimate["columns"] + assert "id" not in estimate["columns"] + + # Check that per_column_bytes only has the loaded columns + assert "ra" in estimate["per_column_bytes"] + assert "dec" in estimate["per_column_bytes"] + assert "id" not in estimate["per_column_bytes"] + + +def test_get_memory_estimate_with_index(small_sky_order1_catalog): + """Test memory estimation including the index column.""" + estimate_without_index = small_sky_order1_catalog.get_memory_estimate(include_index=False) + estimate_with_index = small_sky_order1_catalog.get_memory_estimate(include_index=True) + + # The estimate with index should be larger + assert estimate_with_index["total_bytes"] > estimate_without_index["total_bytes"] + + # The index column should be included when include_index=True + index_name = small_sky_order1_catalog._ddf.index.name + if index_name: + assert index_name in estimate_with_index["columns"] + assert index_name not in estimate_without_index["columns"] + + +def test_get_memory_estimate_after_search(small_sky_order1_catalog): + """Test memory estimation after filtering with a search.""" + # Perform a cone search that will reduce partitions + filtered_catalog = small_sky_order1_catalog.cone_search(0, 0, 1) + + # The filtered catalog should have fewer partitions + assert filtered_catalog.npartitions <= small_sky_order1_catalog.npartitions + + # Get memory estimate for filtered catalog + estimate = filtered_catalog.get_memory_estimate() + + # Check that estimate is for the filtered partitions + assert estimate["num_partitions"] == filtered_catalog.npartitions + + # If fewer partitions, the estimate should be smaller + if filtered_catalog.npartitions < small_sky_order1_catalog.npartitions: + full_estimate = small_sky_order1_catalog.get_memory_estimate() + assert estimate["total_bytes"] < full_estimate["total_bytes"] + + +def test_partition_metadata_consistency(small_sky_order1_catalog): + """Test that partition metadata is consistent across calls.""" + metadata1 = small_sky_order1_catalog.get_partition_metadata() + metadata2 = small_sky_order1_catalog.get_partition_metadata() + + # Should get the same results + pd.testing.assert_frame_equal(metadata1, metadata2) + + +def test_file_paths_point_to_existing_pixels(small_sky_order1_catalog): + """Test that file paths reference the correct pixel numbers.""" + file_paths = small_sky_order1_catalog.get_partition_file_paths() + + for pixel, path in file_paths.items(): + # The path should contain the pixel number + assert f"Npix={pixel.pixel}" in path + # The path should contain the order + assert f"Norder={pixel.order}" in path