Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ A library that extends [Apache DataFusion](https://github.com/apache/datafusion)

This crate implements a two-phase execution model for index-accelerated queries:

1. **Index Phase**: Scan one or more secondary indexes to identify row IDs matching filter predicates
2. **Fetch Phase**: Use row IDs to fetch complete records from underlying storage
1. **Index Phase**: Scan one or more secondary indexes to identify primary key values matching filter predicates
2. **Fetch Phase**: Use primary key values to fetch complete records from underlying storage

This approach reduces I/O by limiting data retrieval to rows that satisfy query predicates, particularly effective for selective queries (typically < 10% of rows).

Expand All @@ -24,20 +24,23 @@ datafusion-index-provider = "0.1.0"

### Index Trait

Implement the [`Index`](src/physical_plan/mod.rs) trait to define how your index scans for matching row IDs:
Implement the [`Index`](src/physical_plan/mod.rs) trait to define how your index scans for matching primary key values:

```rust
use datafusion_index_provider::physical_plan::Index;

impl Index for MyIndex {
fn name(&self) -> &str { "my_index" }
fn column_name(&self) -> &str { "indexed_column" }
fn index_schema(&self) -> SchemaRef { /* schema with __row_id__ column */ }
fn index_schema(&self) -> SchemaRef {
// Schema whose columns form the composite primary key
// e.g. create_index_schema([Field::new("id", DataType::UInt64, false)])
}
fn table_name(&self) -> &str { "my_table" }

fn scan(&self, filters: &[Expr], limit: Option<usize>)
-> Result<SendableRecordBatchStream> {
// Return RecordBatch stream with __row_id__ column containing matching IDs
// Return RecordBatch stream with primary key columns matching the filters
}

fn statistics(&self) -> Statistics { /* index statistics */ }
Expand All @@ -46,16 +49,16 @@ impl Index for MyIndex {

### RecordFetcher Trait

Implement [`RecordFetcher`](src/physical_plan/fetcher.rs) to define how complete records are retrieved using row IDs:
Implement [`RecordFetcher`](src/physical_plan/fetcher.rs) to define how complete records are retrieved using primary key values:

```rust
use datafusion_index_provider::physical_plan::fetcher::RecordFetcher;

#[async_trait]
impl RecordFetcher for MyTable {
async fn fetch(&self, row_ids: RecordBatch) -> Result<RecordBatch> {
// row_ids contains __row_id__ column with IDs to fetch
// Return complete records for those IDs
async fn fetch(&self, index_batch: RecordBatch) -> Result<RecordBatch> {
// index_batch contains primary key columns as defined by index_schema()
// Return complete records for those primary keys
}
}
```
Expand Down Expand Up @@ -156,50 +159,57 @@ RecordFetchExec

### AND - Index Intersection

Uses Hash or SortMerge joins to intersect row IDs:
Uses Hash or SortMerge joins to intersect primary key values:

```
RecordFetchExec
└── HashJoin(on: __row_id__)
├── IndexScanExec(age_index, [age > 25])
└── IndexScanExec(dept_index, [department = 'Engineering'])
└── Projection(PK columns)
└── HashJoin(on: PK columns)
├── IndexScanExec(age_index, [age > 25])
└── IndexScanExec(dept_index, [department = 'Engineering'])
```

**Join Selection**:
- **SortMergeJoin**: When both indexes return ordered row IDs (`Index::is_ordered() == true`)
- **SortMergeJoin**: When both indexes return ordered primary keys (`Index::is_ordered()`)
- **HashJoin**: When inputs are unordered (builds hash table from left side)

### OR - Union with Deduplication

Uses UnionExec + AggregateExec to deduplicate overlapping row IDs:
Uses UnionExec + AggregateExec to deduplicate overlapping primary key values:

```
RecordFetchExec
└── AggregateExec(GROUP BY __row_id__)
└── AggregateExec(GROUP BY PK columns)
└── UnionExec
├── IndexScanExec(age_index, [age < 25])
└── IndexScanExec(dept_index, [department = 'Sales'])
```

This prevents duplicate record fetches when row IDs appear in multiple index results.
This prevents duplicate record fetches when primary key values appear in multiple index results.

## Reference Implementation

See [`tests/common/`](tests/common/) for complete working examples:

**Single-column primary key:**
- [`age_index.rs`](tests/common/age_index.rs): BTreeMap-based index supporting range queries (Eq, Lt, Gt, LtEq, GtEq)
- [`department_index.rs`](tests/common/department_index.rs): HashMap-based index for equality queries
- [`employee_provider.rs`](tests/common/employee_provider.rs): Complete TableProvider with IndexedTableProvider integration
- [`record_fetcher.rs`](tests/common/record_fetcher.rs): RecordFetcher implementation using in-memory data

Integration tests in [`tests/integration_tests.rs`](tests/integration_tests.rs) demonstrate complex query scenarios including deeply nested AND/OR combinations.
**Composite primary key:**
- [`composite_pk_age_index.rs`](tests/common/composite_pk_age_index.rs): Index with composite PK (tenant_id, employee_id)
- [`composite_pk_department_index.rs`](tests/common/composite_pk_department_index.rs): Department index with composite PK
- [`composite_pk_provider.rs`](tests/common/composite_pk_provider.rs): Multi-tenant TableProvider with composite PK
- [`composite_pk_fetcher.rs`](tests/common/composite_pk_fetcher.rs): RecordFetcher for composite PK tables

Integration tests in [`tests/integration_tests.rs`](tests/integration_tests.rs) and [`tests/composite_pk_tests.rs`](tests/composite_pk_tests.rs) demonstrate query scenarios including deeply nested AND/OR combinations with both single and composite primary keys.

## Limitations

1. **Projection pushdown**: Not currently supported for indexed scans (fetches all columns)
2. **Remaining filters**: Non-indexed filters applied after record fetch, not during index scan
3. **Partitioning**: Index scans produce single partition output
4. **Schema compatibility**: Index schema must contain `__row_id__` column (enforced at runtime)

## Testing

Expand All @@ -217,8 +227,8 @@ cargo test --test integration_tests
```

The test suite includes:
- 22 unit tests covering execution plan generation and streaming
- 24 integration tests covering query scenarios from simple to deeply nested
- 27 unit tests covering execution plan generation and streaming
- 36 integration tests covering query scenarios from simple to deeply nested, with both single and composite primary keys

## Compatibility

Expand Down
32 changes: 17 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@
//!
//! The crate implements a two-phase execution model:
//!
//! 1. **Index Phase**: Scan one or more indexes to identify row IDs matching the query filters
//! 2. **Fetch Phase**: Use the row IDs to fetch complete records from the underlying storage
//! 1. **Index Phase**: Scan one or more indexes to identify primary key values matching the query filters
//! 2. **Fetch Phase**: Use the primary key values to fetch complete records from the underlying storage
//!
//! This approach is particularly effective for selective queries where indexes can significantly
//! reduce the number of rows that need to be fetched from primary storage.
//!
//! ## Core Components
//!
//! ### Index Management
//! - [`physical_plan::Index`]: Trait representing a physical index that can be scanned to retrieve row IDs
//! - [`physical_plan::Index`]: Trait representing a physical index that can be scanned to retrieve primary key values
//! - [`provider::IndexedTableProvider`]: Extension of DataFusion's `TableProvider` with index discovery
//! - [`types::IndexFilter`]: Enum representing filter operations that can be pushed down to indexes
//!
//! ### Execution Engine
//! - [`physical_plan::exec::fetch::RecordFetchExec`]: Top-level execution plan orchestrating the two phases
//! - [`physical_plan::exec::index::IndexScanExec`]: Execution plan for scanning a single index
//! - [`physical_plan::fetcher::RecordFetcher`]: Trait for fetching complete records using row IDs
//! - [`physical_plan::fetcher::RecordFetcher`]: Trait for fetching complete records using primary key values
//!
//! ## Query Capabilities
//!
Expand Down Expand Up @@ -105,23 +105,25 @@
//! ### IndexFilter::And - Index Intersection
//!
//! For conjunctive conditions across multiple indexes, the system builds a left-deep tree
//! of joins to intersect row IDs:
//! of joins to intersect primary key values:
//! ```text
//! RecordFetchExec
//! └── HashJoin/SortMergeJoin (INNER on row_id)
//! ├── HashJoin/SortMergeJoin (INNER on row_id)
//! │ ├── IndexScanExec (col_a_index)
//! │ └── IndexScanExec (col_b_index)
//! └── IndexScanExec (col_c_index)
//! └── Projection(PK columns)
//! └── HashJoin/SortMergeJoin (INNER on PK columns)
//! ├── Projection(PK columns)
//! │ └── HashJoin/SortMergeJoin (INNER on PK columns)
//! │ ├── IndexScanExec (col_a_index)
//! │ └── IndexScanExec (col_b_index)
//! └── IndexScanExec (col_c_index)
//! ```
//!
//! ### IndexFilter::Or - Union with Deduplication
//!
//! For disjunctive conditions, the system uses `UnionExec` followed by `AggregateExec`
//! for automatic row ID deduplication:
//! for automatic primary key deduplication:
//! ```text
//! RecordFetchExec
//! └── AggregateExec (GROUP BY row_id)
//! └── AggregateExec (GROUP BY PK columns)
//! └── UnionExec
//! ├── IndexScanExec (col_a)
//! ├── IndexScanExec (col_b)
Expand All @@ -133,8 +135,8 @@
//!
//! ## Implementation Guide
//!
//! - **Implement the Index Trait**: Create indexes that can scan and return row IDs
//! - **Implement the RecordFetcher Trait**: Define how to fetch complete records using row IDs
//! - **Implement the Index Trait**: Create indexes that can scan and return primary key values
//! - **Implement the RecordFetcher Trait**: Define how to fetch complete records using primary key values
//! - **Implement IndexedTableProvider**: Expose available indexes and filter analysis capabilities
//! - **Update TableProvider Implementation**: Integrate index-based execution into your scan method
//!
Expand All @@ -148,7 +150,7 @@
//! ### Memory Usage
//! - **Index results**: Streamed through execution pipeline to minimize memory footprint
//! - **Join operations**: Hash joins require memory proportional to smaller index result set
//! - **Deduplication**: OR operations require memory to store unique row IDs during aggregation
//! - **Deduplication**: OR operations require memory to store unique primary key values during aggregation
//!
//! ### Bounded Execution
//! - **Single partition requirement**: [`physical_plan::exec::fetch::RecordFetchExec`] requires single partition input for correct result merging
Expand Down
Loading