Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def get_dataset_collection(
provider: Optional[str] = None,
dataset_id: Optional[str] = None,
metadata_only: Optional[bool] = False,
page: Optional[int] = None,
page_size: Optional[int] = None,
**selector,
) -> DatasetCollection:
if "selector" in selector:
Expand All @@ -89,9 +91,80 @@ def get_dataset_collection(
provider=provider,
metadata_only=metadata_only,
selector=selector,
page=page,
page_size=page_size,
)
return dataset_collection

def iter_dataset_collection(
self,
dataset_type: Optional[str] = None,
provider: Optional[str] = None,
dataset_id: Optional[str] = None,
metadata_only: Optional[bool] = False,
page_size: int = 1000,
yield_dataset_collection: bool = False,
**selector,
):
"""
Iterate through all datasets matching the criteria with automatic pagination.

Examples:
```
# Iterate through individual datasets
for dataset in store.iter_dataset_collection(dataset_type="match", provider="statsbomb"):
process(dataset)

# Iterate through DatasetCollection objects (pages)
for collection in store.iter_dataset_collection(
dataset_type="match",
provider="statsbomb",
yield_dataset_collection=True
):
process_collection(collection)
```

Args:
dataset_type: Optional dataset type filter
provider: Optional provider filter
dataset_id: Optional dataset ID filter
metadata_only: Whether to fetch only metadata
page_size: Number of datasets to fetch per page
yield_dataset_collection: If True, yields entire DatasetCollection objects
instead of individual Dataset objects
**selector: Additional selector criteria

Yields:
If yield_dataset_collection is False (default): Dataset objects one by one
If yield_dataset_collection is True: DatasetCollection objects (pages)
"""
page = 1
while True:
collection = self.get_dataset_collection(
dataset_type=dataset_type,
provider=provider,
dataset_id=dataset_id,
metadata_only=metadata_only,
page=page,
page_size=page_size,
**selector,
)

if not collection or len(collection) == 0:
break

if yield_dataset_collection:
yield collection
else:
for dataset in collection:
yield dataset

# If we got fewer results than page_size, we've reached the end
if len(collection) < page_size:
break

page += 1

#
# def destroy_dataset(self, dataset_id: str):
# dataset = self.dataset_repository.
Expand Down
9 changes: 0 additions & 9 deletions ingestify/domain/models/dataset/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ def __init__(
}
self.metadata = metadata

def loaded(self):
return self.metadata.count == len(self.datasets)

def get(self, dataset_identifier: Identifier) -> Dataset:
return self.datasets.get(dataset_identifier.key)

Expand All @@ -31,12 +28,6 @@ def __len__(self):
def __iter__(self):
return iter(self.datasets.values())

def get_dataset_by_id(self, dataset_id):
for dataset in self:
if dataset.dataset_id == dataset_id:
return dataset
return None

def first(self):
try:
return next(iter(self.datasets.values()))
Expand Down
15 changes: 14 additions & 1 deletion ingestify/infra/store/dataset/sqlalchemy/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ def _upsert(
entities: list[dict],
immutable_rows: bool = False,
):
if not entities:
# Nothing to do
return

dialect = self.dialect.name
if dialect == "mysql":
from sqlalchemy.dialects.mysql import insert
Expand Down Expand Up @@ -395,6 +399,8 @@ def get_dataset_collection(
dataset_id: Optional[Union[str, List[str]]] = None,
selector: Optional[Union[Selector, List[Selector]]] = None,
metadata_only: bool = False,
page: Optional[int] = None,
page_size: Optional[int] = None,
) -> DatasetCollection:
def apply_query_filter(query):
return self._filter_query(
Expand All @@ -410,9 +416,16 @@ def apply_query_filter(query):
# Use a contextmanager to make sure it's closed afterwards

if not metadata_only:
# Apply sorting by created_at in ascending order
dataset_query = apply_query_filter(
self.session.query(dataset_table.c.dataset_id)
)
).order_by(dataset_table.c.created_at.asc())

# Apply pagination if both page and page_size are provided
if page is not None and page_size is not None:
offset = (page - 1) * page_size
dataset_query = dataset_query.offset(offset).limit(page_size)

self._debug_query(dataset_query)
dataset_ids = [row.dataset_id for row in dataset_query]
datasets = self._load_datasets(dataset_ids)
Expand Down
81 changes: 81 additions & 0 deletions ingestify/tests/test_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pytest
from datetime import datetime, timedelta
import pytz

from ingestify.domain import Dataset, Identifier
from ingestify.main import get_engine


def test_iter_dataset_collection(config_file):
"""Test iteration over datasets with pagination using iter_dataset_collection."""
# Get engine from the fixture
engine = get_engine(config_file, "main")
store = engine.store
bucket = store.bucket

# Create 30 datasets with different creation times
now = datetime.now(pytz.utc)

# Save datasets with ascending created_at timestamps
for i in range(30):
dataset = Dataset(
bucket=bucket,
dataset_id=f"dataset-{i}",
name=f"Dataset {i}",
state="COMPLETE",
identifier=Identifier(test_id=i),
dataset_type="test",
provider="test-provider",
metadata={},
created_at=now
+ timedelta(minutes=i), # Each dataset created 1 minute apart
updated_at=now + timedelta(minutes=i),
last_modified_at=now + timedelta(minutes=i),
)
store.dataset_repository.save(bucket, dataset)

# Test iteration with small page_size (yields individual datasets)
dataset_ids = []
for dataset in store.iter_dataset_collection(
dataset_type="test",
provider="test-provider",
page_size=5, # Small page size to force multiple pages
):
dataset_ids.append(dataset.dataset_id)

# Should get all 30 datasets
assert len(dataset_ids) == 30

# Make sure we have all datasets from 0 to 29
expected_ids = [f"dataset-{i}" for i in range(30)]
assert set(dataset_ids) == set(expected_ids)

# Test iteration yielding entire DatasetCollection objects
collections = []
for collection in store.iter_dataset_collection(
dataset_type="test",
provider="test-provider",
page_size=5, # Small page size to force multiple pages
yield_dataset_collection=True,
):
collections.append(collection)

# Should have 6 collections (30 datasets / 5 per page = 6 pages)
assert len(collections) == 6

# Verify total dataset count across all collections
total_datasets = sum(len(collection) for collection in collections)
assert total_datasets == 30

# Test iteration with a filter that returns fewer results
filtered_dataset_ids = []
for dataset in store.iter_dataset_collection(
dataset_type="test",
provider="test-provider",
test_id=5, # Only get dataset with test_id=5
page_size=10,
):
filtered_dataset_ids.append(dataset.dataset_id)

assert len(filtered_dataset_ids) == 1
assert filtered_dataset_ids[0] == "dataset-5"