Skip to content

Commit b96c5b2

Browse files
authored
Import with npix directories (#613)
1 parent 9c0417f commit b96c5b2

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

src/hats_import/catalog/arguments.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ class ImportArguments(RuntimeArguments):
5252
add_healpix_29: bool = True
5353
"""add the healpix-based hats spatial index field alongside the data"""
5454
npix_suffix: str = ".parquet"
55-
"""Suffix for Npix files."""
55+
"""Suffix for pixel data. When specified as "/" each pixel will have a directory in its name."""
56+
npix_parquet_name: str | None = None
57+
"""Name of the pixel parquet file to be used when npix_suffix=/. By default, it will be named
58+
after the pixel with a .parquet extension (e.g. 'Npix=10.parquet')"""
5659
write_table_kwargs: dict | None = None
5760
"""additional keyword arguments to use when writing files to parquet (e.g. compression schemes)."""
5861
row_group_kwargs: dict | None = None

src/hats_import/catalog/map_reduce.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pyarrow.parquet as pq
1212
from hats import pixel_math
1313
from hats.io import file_io, paths
14+
from hats.io.paths import PARTITION_PIXEL
1415
from hats.pixel_math.healpix_pixel import HealpixPixel
1516
from hats.pixel_math.sparse_histogram import HistogramAggregator, SparseHistogram
1617
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN, spatial_index_to_healpix
@@ -199,7 +200,7 @@ def split_pixels(
199200
raise exception
200201

201202

202-
# pylint: disable=too-many-positional-arguments
203+
# pylint: disable=too-many-positional-arguments,too-many-statements
203204
def reduce_pixel_shards(
204205
cache_shard_path,
205206
resume_path,
@@ -218,6 +219,7 @@ def reduce_pixel_shards(
218219
write_table_kwargs=None,
219220
row_group_kwargs=None,
220221
npix_suffix=".parquet",
222+
npix_parquet_name=None,
221223
):
222224
"""Reduce sharded source pixels into destination pixels.
223225
@@ -250,6 +252,9 @@ def reduce_pixel_shards(
250252
row_group_kwargs (dict): additional keyword arguments to use in
251253
creation of rowgroups when writing files to parquet.
252254
npix_suffix (str): suffix for Npix files. Defaults to ".parquet".
255+
npix_parquet_name (str): name of the pixel parquet file to be used
256+
when npix_suffix=/. By default, it will be named after the pixel
257+
with a .parquet extension (e.g. 'Npix=10.parquet').
253258
254259
Raises:
255260
ValueError: if the number of rows written doesn't equal provided
@@ -263,6 +268,13 @@ def reduce_pixel_shards(
263268

264269
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
265270
destination_file = paths.pixel_catalog_file(output_path, healpix_pixel, npix_suffix=npix_suffix)
271+
272+
if npix_parquet_name is None:
273+
npix_parquet_name = f"{PARTITION_PIXEL}={healpix_pixel.pixel}.parquet"
274+
if npix_suffix == "/":
275+
destination_file.mkdir(exist_ok=True)
276+
destination_file = destination_file / npix_parquet_name
277+
266278
if destination_file.exists():
267279
rows_written = file_io.read_parquet_metadata(destination_file).num_rows
268280
if rows_written != destination_pixel_size:

src/hats_import/catalog/run_import.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def run(args, client):
121121
write_table_kwargs=args.write_table_kwargs,
122122
row_group_kwargs=args.row_group_kwargs,
123123
npix_suffix=args.npix_suffix,
124+
npix_parquet_name=args.npix_parquet_name,
124125
)
125126
)
126127

tests/hats_import/catalog/test_run_import.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,43 @@ def test_import_mismatch_expectation(
384384

385385
with pytest.raises(ValueError, match="does not match expectation"):
386386
runner.run(args, dask_client)
387+
388+
389+
@pytest.mark.dask
390+
def test_import_with_npix_dir(dask_client, small_sky_parts_dir, tmp_path, assert_parquet_file_ids):
391+
"""Test that we can create a catalog where the partition data
392+
is stored inside a directory: npix_suffix=/"""
393+
args = ImportArguments(
394+
output_artifact_name="small_sky_object_catalog",
395+
input_path=small_sky_parts_dir,
396+
file_reader="csv",
397+
output_path=tmp_path,
398+
dask_tmp=tmp_path,
399+
tmp_dir=tmp_path,
400+
highest_healpix_order=0,
401+
pixel_threshold=1000,
402+
npix_suffix="/",
403+
progress_bar=False,
404+
)
405+
runner.run(args, dask_client)
406+
407+
catalog = read_hats(args.catalog_path)
408+
assert catalog.catalog_info.total_rows == 131
409+
assert len(catalog.get_healpix_pixels()) == 1
410+
assert catalog.catalog_info.npix_suffix == "/"
411+
412+
pix_dir = Path(args.catalog_path) / "dataset" / "Norder=0" / "Dir=0" / "Npix=11"
413+
expected_ids = [*range(700, 831)]
414+
415+
# The file exists and contains the expected object IDs
416+
output_file = pix_dir / "Npix=11.parquet"
417+
assert_parquet_file_ids(output_file, "id", expected_ids)
418+
419+
# Try with a custom npix_parquet_name
420+
shutil.rmtree(tmp_path / "small_sky_object_catalog")
421+
args.npix_parquet_name = "0.parquet"
422+
runner.run(args, dask_client)
423+
424+
# The file exists and contains the expected object IDs
425+
output_file = pix_dir / "0.parquet"
426+
assert_parquet_file_ids(output_file, "id", expected_ids)

0 commit comments

Comments
 (0)