Skip to content
102 changes: 101 additions & 1 deletion docs/getting-started/architecture/feature-transformation.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,104 @@ feature_view = FeatureView(
...
)
```
The underlying implementation of the join is an inner join by default, and join key is the entity id.
The underlying implementation of the join is an inner join by default, and join key is the entity id.

### Tiling

Tiling is a transformation pattern designed for efficient streaming feature engineering, inspired by Chronon's tiled architecture. It divides time-series data into manageable chunks (tiles) for processing temporal aggregations and derived features efficiently. This is particularly useful for:

- Temporal aggregations over sliding windows
- Chaining features across different time horizons
- Handling late-arriving data in streaming scenarios
- Memory-efficient processing of large time-series data

Examples include:

```python
from feast.transformation import tiled_transformation
from feast import Field, Aggregation
from feast.types import Float64, Int64
from datetime import timedelta

@tiled_transformation(
sources=["transaction_source_fv"], # Source feature views or data sources
schema=[ # Output feature schema
Field(name="rolling_avg", dtype=Float64),
Field(name="cumulative_amount", dtype=Float64),
Field(name="transaction_count", dtype=Int64),
],
tile_size=timedelta(hours=1), # Process data in 1-hour tiles
window_size=timedelta(minutes=30), # Window size for aggregations within tiles
overlap=timedelta(minutes=5), # 5-minute overlap between tiles
aggregations=[ # Feast Aggregation objects
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(minutes=30)),
Aggregation(column="transaction_amount", function="mean", time_window=timedelta(minutes=30)),
],
aggregation_functions=[ # Custom aggregation functions
lambda df: df.groupby('entity_id').agg({
'transaction_amount': ['count']
}).reset_index()
]
)
def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame:
"""Transform transaction data within each hour tile."""
return df.assign(
rolling_avg=df['transaction_amount'].rolling(window=10).mean(),
cumulative_amount=df['transaction_amount'].cumsum(),
transaction_count=df.groupby('entity_id').cumcount() + 1
)

# Usage in StreamFeatureView - mode specified here, not in transformation
stream_fv = StreamFeatureView(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to pass in a pandas data frame transformation into the Stream Feature View, does it? I suppose it may, we support the "spark" mode today. But this isn't compatible with it as the "mode" is passed into the tiled_transformation vs the StreamFeatureView, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! I've refactored the architecture to fix this inconsistency.

The mode is now specified at the StreamFeatureView level (where it belongs), not within the transformation itself:

@tiled_transformation(
    tile_size=timedelta(hours=1),
    # mode parameter removed from here
)
def my_transformation(df): ...

# Use with StreamFeatureView - mode specified here
stream_fv = StreamFeatureView(
    feature_transformation=my_transformation,
    mode="spark",  # ComputeEngine mode specified at view level
    source=kafka_source
)

This makes it consistent with Feast's architecture where transformations are engine-agnostic and the execution mode is determined by the feature view. Commit: 1932fff

name="transaction_features",
feature_transformation=hourly_transaction_features,
source=kafka_source,
mode="spark", # Transformation mode specified at StreamFeatureView level
entities=["customer_id"]
)
```

**Chaining Features Example:**
```python
@tiled_transformation(
sources=["transaction_hourly_fv"], # Source feature views
schema=[ # Output feature schema
Field(name="local_cumsum", dtype=Float64),
Field(name="global_cumsum", dtype=Float64),
],
tile_size=timedelta(hours=1),
window_size=timedelta(minutes=30), # Window size for aggregations
chaining_functions=[
# Chain cumulative sums across tiles for continuity
lambda prev_df, curr_df: curr_df.assign(
global_cumsum=curr_df['amount'] + (prev_df['global_cumsum'].max() if not prev_df.empty else 0)
)
]
)
def chained_cumulative_features(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(
local_cumsum=df['amount'].cumsum()
)

# Use with StreamFeatureView (mode specified at view level)
stream_fv = StreamFeatureView(
name="chained_features",
feature_transformation=chained_cumulative_features,
source=kafka_source,
mode="spark" # ComputeEngine mode specified here
)
```

**Configuration Options:**
- `sources`: List of source feature views or data sources that this transformation depends on
- `schema`: List of Field definitions specifying output feature names and data types
- `aggregations`: List of Feast Aggregation objects for window-based aggregations
- `tile_size`: Duration of each time tile (e.g., `timedelta(hours=1)`)
- `window_size`: Window size for aggregations within tiles (defaults to tile_size)
- `overlap`: Optional overlap between tiles for continuity
- `max_tiles_in_memory`: Maximum number of tiles to keep in memory (default: 10)
- `enable_late_data_handling`: Whether to handle late-arriving data (default: True)
- `aggregation_functions`: Custom functions to apply within each tile
- `chaining_functions`: Functions to chain results across tiles for derived features

**Note**: The transformation mode (e.g., "spark", "pandas") is specified at the StreamFeatureView level, not within the transformation itself. This allows the same tiled transformation to work with different ComputeEngines.
53 changes: 53 additions & 0 deletions examples/tiled_streaming_features/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Tiled Streaming Features Example

This example demonstrates how to use Feast's tiling transformation engine for efficient streaming feature engineering.

## Overview

Tiling in Feast is inspired by Chronon's tiled architecture and provides:
- Time-based data partitioning into manageable tiles
- Efficient temporal aggregations over sliding windows
- Chaining features across different time horizons
- Memory-efficient processing of streaming data
- Late-arriving data handling

## Examples

See the example files:
- `basic_tiling.py` - Basic tiled transformation usage
- `advanced_tiling.py` - Advanced features like chaining and complex aggregations

For production integration examples, see:
- `sdk/python/feast/templates/local/feature_repo/example_repo.py` - Template example with tiled transformations
- `sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py` - Integration tests with StreamFeatureView

## Running the Examples

```bash
# Basic tiling example
python basic_tiling.py

# Advanced tiling with chaining
python advanced_tiling.py
```

## Key Concepts

### Tile Configuration
- **tile_size**: Duration of each time tile (e.g., `timedelta(hours=1)`)
- **window_size**: Window size for aggregations within tiles (defaults to tile_size)
- **overlap**: Optional overlap between tiles for continuity
- **max_tiles_in_memory**: Maximum number of tiles to keep in memory
- **enable_late_data_handling**: Whether to handle late-arriving data

### Aggregation Functions
Functions that operate within each tile to compute aggregated features.

### Chaining Functions
Functions that chain results across tiles for derived features that require continuity across time boundaries.

### ComputeEngine Integration
Tiled transformations work with Feast's ComputeEngine architecture:
- Mode specified at StreamFeatureView level (not in transformation)
- Supports Spark, Ray, and other distributed engines
- Integrates with Feast Aggregation objects
Loading