Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/build-run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.x'
python-version: '3.12'

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true

- name: Run Data Loader Tests
working-directory: integrations/python/dataloader
run: make sync all

- name: Install dependencies
run: pip install -r scripts/python/requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ hs_err_pid*
*.iws
.idea

# VS Code
.vscode/

# LinkedIn / Gradle / Hardware
.cache
.gradle
Expand Down
25 changes: 25 additions & 0 deletions integrations/python/dataloader/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Python
__pycache__/
*.py[cod]
*.so

# Virtual environments
.venv/
venv/

# Distribution / packaging
build/
dist/
*.egg-info/

# Testing
.pytest_cache/
.coverage
htmlcov/

# IDEs
.vscode/
.idea/

# OS
.DS_Store
54 changes: 54 additions & 0 deletions integrations/python/dataloader/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Claude Instructions for OpenHouse DataLoader

## Project Overview

Python library for distributed data loading of OpenHouse tables. Uses DataFusion for query execution and PyIceberg for table access.

## Common Commands

```bash
make sync # Install dependencies
make check # Run lint + format checks
make test # Run tests
make all # Run all checks and tests
make format # Auto-format code
```

## Workflows
When making a change run `make all` to ensure all tests and checks pass

## Project Structure

```
src/openhouse/dataloader/
├── __init__.py # Public API exports
├── data_loader.py # Main API: OpenHouseDataLoader
├── data_loader_splits.py # DataLoaderSplits (iterable of splits)
├── data_loader_split.py # DataLoaderSplit (single callable split)
├── table_identifier.py # TableIdentifier dataclass
├── table_transformer.py # TableTransformer ABC (internal)
└── udf_registry.py # UDFRegistry ABC (internal)
```

## Public API

Only these are exported in `__init__.py`:
- `OpenHouseDataLoader` - Main entry point
- `TableIdentifier` - Table reference (database, table, branch)

Internal modules (TableTransformer, UDFRegistry) can be imported directly if needed but expose DataFusion types.

## Code Style

- Uses `ruff` for linting and formatting
- Line length: 120
- Python 3.12+
- Use modern type hints (`list`, `dict`, `X | None` instead of `List`, `Dict`, `Optional`)
- Use `raise NotImplementedError` for unimplemented methods in concrete classes
- Use `pass` for abstract methods decorated with `@abstractmethod`

## Versioning

- Version is in `pyproject.toml` (single source of truth)
- `__version__` in `__init__.py` reads from package metadata at runtime
- Major.minor aligns with OpenHouse monorepo, patch is independent
38 changes: 38 additions & 0 deletions integrations/python/dataloader/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.PHONY: help sync clean lint format format-check check test all

help:
@echo "Available commands:"
@echo " make sync - Sync dependencies using uv"
@echo " make lint - Run ruff linter"
@echo " make format - Format code with ruff"
@echo " make check - Run all checks (lint + format check)"
@echo " make test - Run tests with pytest"
@echo " make all - Run all checks and tests"
@echo " make clean - Clean build artifacts"

sync:
uv sync --all-extras

lint:
uv run ruff check src/ tests/

format:
uv run ruff format src/ tests/

format-check:
uv run ruff format --check src/ tests/

check: lint format-check

test:
uv run pytest

all: check test

clean:
rm -rf build/
rm -rf dist/
rm -rf *.egg-info
rm -rf .venv/
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name "*.pyc" -delete
34 changes: 34 additions & 0 deletions integrations/python/dataloader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# OpenHouse DataLoader

A Python library for distributed data loading of OpenHouse tables

## Quickstart

```python
from openhouse.dataloader import OpenHouseDataLoader, TableIdentifier


splits = OpenHouseDataLoader().create_splits(
table=TableIdentifier("db", "tbl"),
columns=["colA", "colB"], # omit for SELECT *
)

# Loading can be distributed to compute engines like Ray
for split in splits:
print(split.table_properties) # available on every split
for batch in split:
process(batch)
```

## Development

```bash
# Set up environment
make sync

# Run tests
make test

# See all available commands
make help
```
20 changes: 20 additions & 0 deletions integrations/python/dataloader/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "openhouse-dataloader"
version = "0.0.1"
description = "A Python library for distributed data loading of OpenHouse tables"
readme = "README.md"
requires-python = ">=3.12"
dependencies = ["datafusion==51.0.0", "pyiceberg==0.10.0"]

[project.optional-dependencies]
dev = ["ruff>=0.9.0", "pytest>=8.0.0"]

[tool.ruff]
line-length = 120
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]

[tool.ruff.format]
quote-style = "double"
1 change: 1 addition & 0 deletions integrations/python/dataloader/src/openhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from importlib.metadata import version

from openhouse.dataloader.data_loader import OpenHouseDataLoader
from openhouse.dataloader.table_identifier import TableIdentifier

__version__ = version("openhouse-dataloader")
__all__ = ["OpenHouseDataLoader", "TableIdentifier"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from collections.abc import Iterable, Mapping, Sequence

from openhouse.dataloader.data_loader_split import DataLoaderSplit
from openhouse.dataloader.table_identifier import TableIdentifier
from openhouse.dataloader.table_transformer import TableTransformer
from openhouse.dataloader.udf_registry import UDFRegistry


class OpenHouseDataLoader:
"""An API for distributed data loading of OpenHouse tables"""

def __init__(
self,
table_transformer: TableTransformer = None,
udf_registry: UDFRegistry = None,
):
"""
Args:
table_transformer: A prerequisite transformation to apply to the table before loading the data
udf_registry: UDFs required for the prerequisite table transformation
"""
self._table_transformer = table_transformer
self._udf_registry = udf_registry

def create_splits(
self,
table: TableIdentifier,
columns: Sequence[str] | None = None,
context: Mapping[str, str] | None = None,
) -> Iterable[DataLoaderSplit]:
"""Create data splits for distributed data loading of the table

Args:
table: Identifier for the table to load
columns: Column names to load, or None to load all columns (SELECT *)
context: Dictionary of context information (e.g. tenant, environment, etc.)

Returns:
Iterable of DataLoaderSplit, each containing table_properties
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from collections.abc import Iterator, Mapping

from datafusion.plan import LogicalPlan
from pyarrow import RecordBatch
from pyiceberg.io import FileScanTask

from openhouse.dataloader.udf_registry import UDFRegistry


class DataLoaderSplit:
"""A single data split"""

def __init__(
self,
plan: LogicalPlan,
file_scan_task: FileScanTask,
udf_registry: UDFRegistry,
table_properties: Mapping[str, str],
):
self._plan = plan
self._file_scan_task = file_scan_task
self._udf_registry = udf_registry
self._table_properties = table_properties

@property
def table_properties(self) -> Mapping[str, str]:
"""Properties of the table being loaded"""
return self._table_properties

def __iter__(self) -> Iterator[RecordBatch]:
"""Loads the split data after applying, including applying a prerequisite
table transformation if provided

Returns:
An iterator for batches of data in the split
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass


@dataclass
class TableIdentifier:
"""Identifier for a table in OpenHouse

Args:
database: Database name
table: Table name
branch: Optional branch name
"""

database: str
table: str
branch: str | None = None

def __str__(self) -> str:
"""Return the fully qualified table name."""
base = f"{self.database}.{self.table}"
return f"{base}.{self.branch}" if self.branch else base
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from collections.abc import Mapping

from datafusion.context import SessionContext
from datafusion.dataframe import DataFrame

from openhouse.dataloader.table_identifier import TableIdentifier


class TableTransformer(ABC):
"""Interface for applying additional transformation logic to the data
being loaded (e.g. column masking, row filtering)
"""

@abstractmethod
def transform(
self, session_context: SessionContext, table: TableIdentifier, context: Mapping[str, str]
) -> DataFrame | None:
"""Applies transformation logic to the base table that is being loaded.

Args:
table: Identifier for the table
context: Dictionary of context information (e.g. tenant, environment, etc.)

Returns:
The DataFrame representing the transformation. This is expected to read from the exact
base table identifier passed in as input. If no transformation is required, None is returned.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod

from datafusion.context import SessionContext


class UDFRegistry(ABC):
"""Used to register DataFusion UDFs"""

@abstractmethod
def register_udfs(self, session_context: SessionContext) -> None:
"""Registers UDFs with DataFusion

Args:
session_context: The session context to register the UDFs in
"""
pass
Empty file.
3 changes: 3 additions & 0 deletions integrations/python/dataloader/tests/test_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test_data_loader():
"""Test placeholder until real tests are added"""
pass
Loading