Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
97deea2
feat: primitive parquet reader with page pruning
kszucs Jun 10, 2025
3677975
add poetry build for libviewer
lhoestq Jul 7, 2025
b2d49be
add libviewer to rows
lhoestq Jul 7, 2025
92b633d
refactor: only extract metadata and don't try to calculate offset index
kszucs Sep 8, 2025
d5d4397
ci: update dockerfiles to include the rust toolchain and libviewer
kszucs Sep 15, 2025
faeeb0c
chore: add cargo files to the rows dev docker image
kszucs Sep 15, 2025
1b23098
chore: add build-essentials
kszucs Sep 15, 2025
26242ff
chore: pin python to 3.12.11 in libviewer and update lockfile
kszucs Sep 23, 2025
c01f7bc
feat: use PageIndexPolicy to optionally read offset index
kszucs Oct 10, 2025
1536cd0
feat: support querying RowsIndex with page pruning
kszucs Oct 10, 2025
1492897
build: add libviewer as a dependency to libcommon
kszucs Oct 11, 2025
b15d7fc
style: ruff format libcommon changes
kszucs Oct 11, 2025
d0e7930
chore: use query_with_page_pruning from the rows endpoint
kszucs Oct 11, 2025
c6579ec
chore: fix mypy errors
kszucs Oct 11, 2025
c56021e
style: import Sequence from collections.abc
kszucs Oct 11, 2025
25943b5
build: don't use libviewer as an editable dependency
kszucs Oct 11, 2025
9f15627
build: try to configure poetry to properly install libviewer
kszucs Oct 12, 2025
92b0667
ci: temporarily disable poetry cache
kszucs Oct 12, 2025
802a823
style: fixx ruff check errors
kszucs Oct 12, 2025
191add5
build: relock projects depending on libcommon
kszucs Oct 12, 2025
ff056c6
build: add rust toolchain to more dockerfiles
kszucs Oct 12, 2025
c48408d
build: copy the entire libviewer directory in dockerfiles because poe…
kszucs Oct 12, 2025
7458864
build: add cargo to PATH
kszucs Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ assets
**/.pytest_cache
**/.coverage
**/coverage.xml
**/target
**/*.rs.bk
**/.cargo/registry
**/.cargo/git
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ e2e:

.PHONY: install
install:
$(MAKE) -C libs/libviewer install
$(MAKE) -C libs/libcommon install
$(MAKE) -C libs/libapi install
$(MAKE) -C jobs/cache_maintenance install
Expand Down
163 changes: 114 additions & 49 deletions libs/libcommon/src/libcommon/parquet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,42 +454,39 @@ def query(self, offset: int, length: int) -> pa.Table:

@staticmethod
def from_parquet_metadata_items(
parquet_file_metadata_items: list[ParquetFileMetadataItem],
features: Optional[Features],
parquet_files: list[ParquetFileMetadataItem],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet_file_metadata_items and parquet_files_metadata variable names were confusing due to the extensive use and separation of data and metadata files, so I rather renamed these variables to parquet_files.

features: Features,
parquet_metadata_directory: StrPath,
httpfs: HTTPFileSystem,
hf_token: Optional[str],
max_arrow_data_in_memory: int,
unsupported_features: list[FeatureType] = [],
) -> "ParquetIndexWithMetadata":
if not parquet_file_metadata_items:
if not parquet_files:
raise EmptyParquetMetadataError("No parquet files found.")
if not features:
raise ValueError("Features must be provided.")

partial = parquet_export_is_partial(parquet_file_metadata_items[0]["url"])
partial = parquet_export_is_partial(parquet_files[0]["url"])

with StepProfiler(
method="parquet_index_with_metadata.from_parquet_metadata_items",
step="get the index from parquet metadata",
):
try:
parquet_files_metadata = sorted(
parquet_file_metadata_items, key=lambda parquet_file_metadata: parquet_file_metadata["filename"]
)
parquet_files_urls = [parquet_file_metadata["url"] for parquet_file_metadata in parquet_files_metadata]
parquet_files_urls = [f["url"] for f in parquet_files]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parquet files used to be sorted here, but the page pruning reader requires them sorted as well, so I moved it to the new RowsIndex._init_dataset_info() method below.

metadata_paths = [
os.path.join(parquet_metadata_directory, parquet_file_metadata["parquet_metadata_subpath"])
for parquet_file_metadata in parquet_files_metadata
os.path.join(parquet_metadata_directory, f["parquet_metadata_subpath"])
for f in parquet_files
]
num_bytes = [parquet_file_metadata["size"] for parquet_file_metadata in parquet_files_metadata]
num_rows = [parquet_file_metadata["num_rows"] for parquet_file_metadata in parquet_files_metadata]
num_bytes = [f["size"] for f in parquet_files]
num_rows = [f["num_rows"] for f in parquet_files]
except Exception as e:
raise ParquetResponseFormatError(f"Could not parse the list of parquet files: {e}") from e

with StepProfiler(
method="parquet_index_with_metadata.from_parquet_metadata_items", step="get the dataset's features"
):
if features is None: # config-parquet version<6 didn't have features
features = Features.from_arrow_schema(pq.read_schema(metadata_paths[0]))
supported_columns, unsupported_columns = get_supported_unsupported_columns(
features,
unsupported_features=unsupported_features,
Expand Down Expand Up @@ -520,56 +517,109 @@ def __init__(
parquet_metadata_directory: StrPath,
max_arrow_data_in_memory: int,
unsupported_features: list[FeatureType] = [],
data_store="hf://"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It supposed to correspond to https but we cannot pass the python filesystem object down to rust, so we need to use an URI instead.

):
self.dataset = dataset
self.config = config
self.split = split
self.httpfs = httpfs
self.parquet_index = self._init_parquet_index(

self._init_dataset_info(parquet_metadata_directory)
self._init_parquet_index(
hf_token=hf_token,
httpfs=httpfs,
parquet_metadata_directory=parquet_metadata_directory,
max_arrow_data_in_memory=max_arrow_data_in_memory,
unsupported_features=unsupported_features,
)
self._init_viewer_index(
data_store=data_store,
metadata_store=f"file://{parquet_metadata_directory}"
)

def _init_dataset_info(self, parquet_metadata_directory: StrPath):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled of some logic from _init_parquet_index(), now it is responsible to query the mongo cache and parse out the revision, parquet_files in a sorted manner and the features unless they are absent where we read the first file's parquet metadata to have the corresponding arrow schema in case of an empty result set.

# get the list of parquet files and features
with StepProfiler(method="rows_index._get_dataset_metadata", step="all"):
response = get_previous_step_or_raise(
kind=CONFIG_PARQUET_METADATA_KIND,
dataset=self.dataset,
config=self.config,
split=None
)

# set the revision of the dataset
self.revision = response["dataset_git_revision"]

# get the list of parquet files
parquet_files = response["content"]["parquet_files_metadata"]
# filter only files for the current split and config
parquet_files = [f for f in parquet_files if f["split"] == self.split and f["config"] == self.config]
# sort by filename to have a deterministic order
parquet_files = sorted(parquet_files, key=lambda x: x["filename"])
if not parquet_files:
raise EmptyParquetMetadataError("No parquet files found.")
self.parquet_files = parquet_files

# retrieve the features from the mongo response
features = response["content"].get("features")
if features:
self.features = Features.from_dict(features)
else:
# config-parquet version<6 didn't have features
first_metadata_file = os.path.join(
parquet_metadata_directory, parquet_files[0]["parquet_metadata_subpath"]
)
arrow_schema = pq.read_schema(first_metadata_file)
self.features = Features.from_arrow_schema(arrow_schema)

def _init_parquet_index(
self,
httpfs: HfFileSystem,
hf_token: Optional[str],
parquet_metadata_directory: StrPath,
max_arrow_data_in_memory: int,
unsupported_features: list[FeatureType] = [],
) -> ParquetIndexWithMetadata:
with StepProfiler(method="rows_index._init_parquet_index", step="all"):
# get the list of parquet files
with StepProfiler(method="rows_index._init_parquet_index", step="get list of parquet files for split"):
response = get_previous_step_or_raise(
kind=CONFIG_PARQUET_METADATA_KIND,
dataset=self.dataset,
config=self.config,
split=None,
)
self.revision = response["dataset_git_revision"]
content = response["content"]
if content.get("features"): # config-parquet-metadata version<2 didn't have features
features = Features.from_dict(content["features"])
else:
features = None
logging.info(
f"Create ParquetIndexWithMetadata for dataset={self.dataset}, config={self.config}, split={self.split}"
)
return ParquetIndexWithMetadata.from_parquet_metadata_items(
[
parquet_item
for parquet_item in content["parquet_files_metadata"]
if parquet_item["split"] == self.split and parquet_item["config"] == self.config
],
features=features,
parquet_metadata_directory=parquet_metadata_directory,
httpfs=self.httpfs,
hf_token=hf_token,
max_arrow_data_in_memory=max_arrow_data_in_memory,
unsupported_features=unsupported_features,
)
unsupported_features: list[FeatureType] = []
) -> None:
logging.info(
f"Create ParquetIndexWithMetadata for dataset={self.dataset}, config={self.config}, split={self.split}"
)
self.parquet_index = ParquetIndexWithMetadata.from_parquet_metadata_items(
parquet_files=self.parquet_files,
features=self.features,
parquet_metadata_directory=parquet_metadata_directory,
httpfs=httpfs,
hf_token=hf_token,
max_arrow_data_in_memory=max_arrow_data_in_memory,
unsupported_features=unsupported_features,
)

def _init_viewer_index(self, data_store: str, metadata_store: str) -> None:
logging.info(
f"Create libviewer.Dataset for dataset={self.dataset}, config={self.config}, split={self.split}"
)
try:
from libviewer import Dataset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be mandatory but I still need to update the build environments to include the rust toolchain.

except ImportError as err:
raise ImportError(
"libviewer is not installed. Please install it with `pip install libviewer` to use page pruning."
) from err

# construct the required parquet_files list for libviewer.Dataset
files = []
for f in self.parquet_files:
files.append({
"path": f"{f['config']}/{f['split']}/{f['filename']}",
"size": f["size"],
"num_rows": f["num_rows"],
"metadata_path": f["parquet_metadata_subpath"]
})

self.viewer_index = Dataset(
name=self.dataset,
files=files,
revision=self.revision,
data_store=data_store,
metadata_store=metadata_store,
)

# note that this cache size is global for the class, not per instance
@lru_cache(maxsize=1)
Expand All @@ -592,6 +642,19 @@ def query(self, offset: int, length: int) -> pa.Table:
)
return self.parquet_index.query(offset=offset, length=length)

def query_with_page_pruning(self, offset: int, length: int) -> pa.Table:
logging.info(
f"Query libviewer.Dataset for dataset={self.dataset}, config={self.config},"
f" split={self.split}, offset={offset}, length={length}, with page pruning"
)
# IndexError is not ideal but .query() raises it for invalid offsets
if offset < 0:
raise IndexError("Offset must be non-negative")
if length < 0:
raise IndexError("Length must be non-negative")
batches, _files_to_index = self.viewer_index.sync_scan(offset=offset, limit=length)
return pa.Table.from_batches(batches, schema=self.features.arrow_schema)

# note that this cache size is global for the class, not per instance
@lru_cache(maxsize=1)
def query_truncated_binary(self, offset: int, length: int) -> tuple[pa.Table, list[str]]:
Expand Down Expand Up @@ -638,6 +701,7 @@ def get_rows_index(
dataset: str,
config: str,
split: str,
data_store="hf://"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required for testing purposes.

I also noticed that Indexer only serves as a RowsIndex factory, maybe we should instantiate the RowsIndex objects directly so we wouldn't need to wire through the parameters.

) -> RowsIndex:
filter_features = (
self.all_columns_supported_datasets_allow_list != "all"
Expand All @@ -653,4 +717,5 @@ def get_rows_index(
parquet_metadata_directory=self.parquet_metadata_directory,
max_arrow_data_in_memory=self.max_arrow_data_in_memory,
unsupported_features=unsupported_features,
data_store=data_store
)
31 changes: 30 additions & 1 deletion libs/libcommon/tests/test_parquet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ def rows_index_with_parquet_metadata(
) -> Generator[RowsIndex, None, None]:
with ds_sharded_fs.open("default/train/0003.parquet") as f:
with patch("libcommon.parquet_utils.HTTPFile", return_value=f):
yield indexer.get_rows_index("ds_sharded", "default", "train")
data_store = f"file://{ds_sharded_fs.local_root_dir}"
yield indexer.get_rows_index("ds_sharded", "default", "train", data_store=data_store)


@pytest.fixture
Expand Down Expand Up @@ -465,6 +466,34 @@ def test_rows_index_query_with_parquet_metadata(
rows_index_with_parquet_metadata.query(offset=-1, length=2)


def test_rows_index_query_with_page_pruning(
rows_index_with_parquet_metadata: RowsIndex, ds_sharded: Dataset
) -> None:
try:
import libviewer
except ImportError:
pytest.skip("libviewer is not installed")

assert isinstance(rows_index_with_parquet_metadata.viewer_index, libviewer.Dataset)

result = rows_index_with_parquet_metadata.query_with_page_pruning(offset=1, length=3)
assert result.to_pydict() == ds_sharded[1:4]

result = rows_index_with_parquet_metadata.query_with_page_pruning(offset=1, length=0)
assert result.to_pydict() == ds_sharded[:0]

result = rows_index_with_parquet_metadata.query_with_page_pruning(offset=999999, length=1)
assert result.to_pydict() == ds_sharded[:0]

result = rows_index_with_parquet_metadata.query_with_page_pruning(offset=1, length=99999999)
assert result.to_pydict() == ds_sharded[1:]

with pytest.raises(IndexError):
rows_index_with_parquet_metadata.query_with_page_pruning(offset=0, length=-1)
with pytest.raises(IndexError):
rows_index_with_parquet_metadata.query_with_page_pruning(offset=-1, length=2)


def test_rows_index_query_with_too_big_rows(rows_index_with_too_big_rows: RowsIndex, ds_sharded: Dataset) -> None:
with pytest.raises(TooBigRows):
rows_index_with_too_big_rows.query(offset=0, length=3)
Expand Down
Loading
Loading