-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Implement Chronon-style tiling transformation engine for streaming features with ComputeEngine, Aggregation support, and comprehensive testing #5644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
f7ccce3
7f80220
aec46b5
fefd284
915896a
1932fff
29ee76d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,4 +115,72 @@ feature_view = FeatureView( | |
... | ||
) | ||
``` | ||
The underlying implementation of the join is an inner join by default, and join key is the entity id. | ||
The underlying implementation of the join is an inner join by default, and join key is the entity id. | ||
|
||
### Tiling | ||
|
||
Tiling is a transformation pattern designed for efficient streaming feature engineering, inspired by Chronon's tiled architecture. It divides time-series data into manageable chunks (tiles) for processing temporal aggregations and derived features efficiently. This is particularly useful for: | ||
|
||
- Temporal aggregations over sliding windows | ||
- Chaining features across different time horizons | ||
- Handling late-arriving data in streaming scenarios | ||
- Memory-efficient processing of large time-series data | ||
|
||
Examples include: | ||
|
||
```python | ||
from feast.transformation import tiled_transformation | ||
from datetime import timedelta | ||
|
||
@tiled_transformation( | ||
tile_size=timedelta(hours=1), # Process data in 1-hour tiles | ||
mode="pandas", # Use pandas processing mode | ||
overlap=timedelta(minutes=5), # 5-minute overlap between tiles | ||
aggregation_functions=[ | ||
lambda df: df.groupby('entity_id').agg({ | ||
'transaction_amount': ['sum', 'mean', 'count'] | ||
}).reset_index() | ||
] | ||
) | ||
def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame: | ||
"""Transform transaction data within each hour tile.""" | ||
return df.assign( | ||
rolling_avg=df['transaction_amount'].rolling(window=10).mean(), | ||
cumulative_amount=df['transaction_amount'].cumsum() | ||
) | ||
|
||
# Usage in StreamFeatureView | ||
stream_fv = StreamFeatureView( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't make sense to pass in a pandas data frame transformation into the Stream Feature View, does it? I suppose it may, we support the "spark" mode today. But this isn't compatible with it as the "mode" is passed into the tiled_transformation vs the StreamFeatureView, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're absolutely right! I've refactored the architecture to fix this inconsistency. The mode is now specified at the @tiled_transformation(
tile_size=timedelta(hours=1),
# mode parameter removed from here
)
def my_transformation(df): ...
# Use with StreamFeatureView - mode specified here
stream_fv = StreamFeatureView(
feature_transformation=my_transformation,
mode="spark", # ComputeEngine mode specified at view level
source=kafka_source
) This makes it consistent with Feast's architecture where transformations are engine-agnostic and the execution mode is determined by the feature view. Commit: 1932fff |
||
name="transaction_features", | ||
feature_transformation=hourly_transaction_features, | ||
source=kafka_source, | ||
... | ||
) | ||
``` | ||
|
||
**Chaining Features Example:** | ||
```python | ||
@tiled_transformation( | ||
tile_size=timedelta(hours=1), | ||
mode="pandas", | ||
chaining_functions=[ | ||
# Chain cumulative sums across tiles for continuity | ||
lambda prev_df, curr_df: curr_df.assign( | ||
global_cumsum=curr_df['amount'] + (prev_df['global_cumsum'].max() if not prev_df.empty else 0) | ||
) | ||
] | ||
) | ||
def chained_cumulative_features(df: pd.DataFrame) -> pd.DataFrame: | ||
return df.assign( | ||
local_cumsum=df['amount'].cumsum() | ||
) | ||
``` | ||
|
||
**Configuration Options:** | ||
- `tile_size`: Duration of each time tile (e.g., `timedelta(hours=1)`) | ||
- `mode`: Processing mode - currently supports `"pandas"` | ||
- `overlap`: Optional overlap between tiles for continuity | ||
- `max_tiles_in_memory`: Maximum number of tiles to keep in memory (default: 10) | ||
- `enable_late_data_handling`: Whether to handle late-arriving data (default: True) | ||
- `aggregation_functions`: Functions to apply within each tile | ||
- `chaining_functions`: Functions to chain results across tiles for derived features |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Tiled Streaming Features Example | ||
|
||
This example demonstrates how to use Feast's tiling transformation engine for efficient streaming feature engineering. | ||
|
||
## Overview | ||
|
||
Tiling in Feast is inspired by Chronon's tiled architecture and provides: | ||
- Time-based data partitioning into manageable tiles | ||
- Efficient temporal aggregations over sliding windows | ||
- Chaining features across different time horizons | ||
- Memory-efficient processing of streaming data | ||
- Late-arriving data handling | ||
|
||
## Examples | ||
|
||
See the example files: | ||
- `basic_tiling.py` - Basic tiled transformation usage | ||
- `advanced_tiling.py` - Advanced features like chaining and complex aggregations | ||
- `streaming_integration.py` - Integration with StreamFeatureView | ||
|
||
## Running the Examples | ||
|
||
```bash | ||
# Basic tiling example | ||
python basic_tiling.py | ||
|
||
# Advanced tiling with chaining | ||
python advanced_tiling.py | ||
|
||
# Streaming integration (requires Kafka setup) | ||
python streaming_integration.py | ||
``` | ||
|
||
## Key Concepts | ||
|
||
### Tile Configuration | ||
- **tile_size**: Duration of each time tile (e.g., `timedelta(hours=1)`) | ||
- **overlap**: Optional overlap between tiles for continuity | ||
- **max_tiles_in_memory**: Maximum number of tiles to keep in memory | ||
- **enable_late_data_handling**: Whether to handle late-arriving data | ||
|
||
### Aggregation Functions | ||
Functions that operate within each tile to compute aggregated features. | ||
|
||
### Chaining Functions | ||
Functions that chain results across tiles for derived features that require continuity across time boundaries. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
""" | ||
Advanced Tiled Transformation Example | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot rather than add this documented example, let's add to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comprehensive tiled transformation example to @tiled_transformation(
sources=[driver_stats_fv],
schema=[Field(name="rolling_avg_trips", dtype=Float64), ...],
tile_size=timedelta(hours=1),
window_size=timedelta(minutes=30),
aggregations=[Aggregation(column="avg_daily_trips", function="sum", ...)],
)
def driver_tiled_features(df): ... This provides a production-ready example that will be available in all new Feast projects created from templates, with proper ComputeEngine integration patterns and comprehensive documentation. Commit: 29ee76d |
||
This example demonstrates advanced tiling features including: | ||
- Chaining functions for cross-tile continuity | ||
- Complex aggregation patterns | ||
- Late data handling | ||
- Memory management | ||
""" | ||
|
||
import pandas as pd | ||
from datetime import datetime, timedelta | ||
import random | ||
|
||
|
||
def tiled_transformation(tile_size, mode="pandas", overlap=None, max_tiles_in_memory=10, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be based on the Transformations should be executed on a We also need to support a window_size in the class as well as the max tiles. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot please adjust. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored the implementation to address all these architectural requirements:
Enhanced API: @tiled_transformation(
aggregations=[ # Feast Aggregation objects
Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30))
],
tile_size=timedelta(hours=1),
window_size=timedelta(minutes=30), # New parameter
max_tiles_in_memory=10, # New parameter
)
def my_transformation(df): ... The transformation now works with ComputeEngine for distributed execution and follows Feast's established transformation patterns. Commit: 1932fff |
||
aggregation_functions=None, chaining_functions=None): | ||
"""Simplified decorator for demonstration purposes.""" | ||
def decorator(func): | ||
func.tile_size = tile_size | ||
func.mode = mode | ||
func.overlap = overlap or timedelta(seconds=0) | ||
func.aggregation_functions = aggregation_functions or [] | ||
func.chaining_functions = chaining_functions or [] | ||
|
||
def transform(data, timestamp_column='timestamp'): | ||
"""Simplified tiled transformation with chaining logic.""" | ||
# Simulate tile processing | ||
tiles = [] | ||
|
||
# Sort data by timestamp | ||
data_sorted = data.sort_values(timestamp_column) | ||
|
||
# Create mock tiles (in real implementation, this would be time-based) | ||
tile_size_minutes = int(tile_size.total_seconds() / 60) | ||
unique_hours = data_sorted[timestamp_column].dt.floor(f'{tile_size_minutes}min').unique() | ||
|
||
for hour in unique_hours: | ||
tile_data = data_sorted[ | ||
data_sorted[timestamp_column].dt.floor(f'{tile_size_minutes}min') == hour | ||
].copy() | ||
|
||
if not tile_data.empty: | ||
# Apply base transformation | ||
tile_result = func(tile_data) | ||
|
||
# Apply aggregation functions | ||
for agg_func in func.aggregation_functions: | ||
tile_result = agg_func(tile_result) | ||
|
||
tiles.append(tile_result) | ||
|
||
# Chain tiles if chaining functions are provided | ||
if func.chaining_functions and len(tiles) > 1: | ||
chained_result = tiles[0] | ||
for i in range(1, len(tiles)): | ||
for chain_func in func.chaining_functions: | ||
chained_result = chain_func(chained_result, tiles[i]) | ||
return chained_result | ||
elif tiles: | ||
return pd.concat(tiles, ignore_index=True) | ||
else: | ||
return pd.DataFrame() | ||
|
||
func.transform = transform | ||
return func | ||
|
||
return decorator | ||
|
||
|
||
# Advanced tiled transformation with chaining | ||
@tiled_transformation( | ||
tile_size=timedelta(hours=1), | ||
mode="pandas", | ||
overlap=timedelta(minutes=10), | ||
max_tiles_in_memory=3, | ||
aggregation_functions=[ | ||
# Complex multi-level aggregation | ||
lambda df: df.groupby(['customer_id', 'merchant_category']).agg({ | ||
'amount': ['sum', 'mean', 'std', 'count'], | ||
'timestamp': ['min', 'max'], | ||
'high_value_tx': 'sum' | ||
}).reset_index(), | ||
], | ||
chaining_functions=[ | ||
# Chain running totals across tiles | ||
lambda prev_df, curr_df: chain_running_totals(prev_df, curr_df), | ||
# Chain session continuity | ||
lambda prev_df, curr_df: chain_session_features(prev_df, curr_df) | ||
] | ||
) | ||
def advanced_customer_features(df: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Advanced transformation with complex feature engineering. | ||
Args: | ||
df: DataFrame with transaction data | ||
Returns: | ||
DataFrame with advanced derived features | ||
""" | ||
df = df.copy() | ||
|
||
# Feature engineering within tile | ||
df['hour_of_day'] = df['timestamp'].dt.hour | ||
df['day_of_week'] = df['timestamp'].dt.dayofweek | ||
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) | ||
|
||
# Transaction patterns | ||
df['high_value_tx'] = (df['amount'] > df['amount'].quantile(0.8)).astype(int) | ||
df['velocity'] = df.groupby('customer_id')['timestamp'].diff().dt.total_seconds() / 60 # minutes | ||
|
||
# Rolling features within tile (with minimum periods for robustness) | ||
df = df.sort_values(['customer_id', 'timestamp']) | ||
df['rolling_velocity_avg'] = df.groupby('customer_id')['velocity'].rolling( | ||
window=3, min_periods=1 | ||
).mean().reset_index(drop=True) | ||
|
||
# Customer behavior segmentation within tile | ||
customer_stats = df.groupby('customer_id').agg({ | ||
'amount': ['mean', 'std'], | ||
'velocity': 'mean' | ||
}).reset_index() | ||
customer_stats.columns = ['customer_id', 'avg_amount', 'std_amount', 'avg_velocity'] | ||
customer_stats['customer_segment'] = 'regular' | ||
customer_stats.loc[customer_stats['avg_amount'] > customer_stats['avg_amount'].quantile(0.8), 'customer_segment'] = 'high_value' | ||
customer_stats.loc[customer_stats['avg_velocity'] < 10, 'customer_segment'] = 'slow_spender' | ||
|
||
# Merge back to main dataframe | ||
df = df.merge(customer_stats[['customer_id', 'customer_segment']], on='customer_id', how='left') | ||
|
||
return df | ||
|
||
|
||
def chain_running_totals(prev_df: pd.DataFrame, curr_df: pd.DataFrame) -> pd.DataFrame: | ||
"""Chain running totals across tiles.""" | ||
if prev_df.empty: | ||
return curr_df | ||
|
||
curr_df = curr_df.copy() | ||
|
||
# Get the last running total for each customer from previous tile | ||
if 'amount' in prev_df.columns and 'sum' in prev_df.columns: | ||
prev_totals = prev_df.groupby('customer_id')['amount']['sum'].last().to_dict() | ||
|
||
# Add to current tile's totals | ||
for customer_id in curr_df['customer_id'].unique(): | ||
if customer_id in prev_totals: | ||
mask = curr_df['customer_id'] == customer_id | ||
if 'amount' in curr_df.columns and 'sum' in curr_df.columns: | ||
curr_df.loc[mask, ('amount', 'sum')] += prev_totals[customer_id] | ||
|
||
return curr_df | ||
|
||
|
||
def chain_session_features(prev_df: pd.DataFrame, curr_df: pd.DataFrame) -> pd.DataFrame: | ||
"""Chain session-based features across tiles.""" | ||
if prev_df.empty: | ||
return curr_df | ||
|
||
curr_df = curr_df.copy() | ||
|
||
# Add session continuity indicator (simplified) | ||
curr_df['session_continued'] = 1 # Would be more complex in real implementation | ||
|
||
return curr_df | ||
|
||
|
||
def simulate_late_arriving_data(df: pd.DataFrame, late_probability: float = 0.1) -> pd.DataFrame: | ||
"""Simulate late-arriving data by randomly delaying some records.""" | ||
df = df.copy() | ||
|
||
# Randomly select some records to be "late" | ||
late_mask = pd.Series([random.random() < late_probability for _ in range(len(df))], index=df.index) | ||
|
||
# Add delay to late records | ||
df.loc[late_mask, 'timestamp'] = df.loc[late_mask, 'timestamp'] + timedelta(minutes=random.randint(5, 30)) | ||
|
||
# Re-sort by original order but keep the delayed timestamps | ||
return df | ||
|
||
|
||
def main(): | ||
"""Demonstrate advanced tiling features.""" | ||
|
||
# Create more complex sample data | ||
base_time = datetime(2024, 1, 1, 10, 0, 0) | ||
|
||
transactions = [] | ||
customers = [f'customer_{i}' for i in range(8)] | ||
merchant_categories = ['grocery', 'restaurant', 'gas', 'retail', 'entertainment', 'healthcare'] | ||
|
||
for i in range(120): | ||
# More realistic transaction patterns | ||
customer = random.choice(customers) | ||
category = random.choice(merchant_categories) | ||
|
||
# Vary amounts based on category | ||
category_multipliers = { | ||
'grocery': 1.0, 'restaurant': 1.5, 'gas': 0.8, | ||
'retail': 2.0, 'entertainment': 1.8, 'healthcare': 3.0 | ||
} | ||
base_amount = random.uniform(20, 200) * category_multipliers[category] | ||
|
||
transactions.append({ | ||
'timestamp': base_time + timedelta(minutes=i*5 + random.randint(-2, 2)), | ||
'customer_id': customer, | ||
'amount': round(base_amount, 2), | ||
'merchant_category': category | ||
}) | ||
|
||
df = pd.DataFrame(transactions) | ||
|
||
# Simulate late-arriving data | ||
df = simulate_late_arriving_data(df, late_probability=0.15) | ||
|
||
print(f"Created {len(df)} sample transactions with realistic patterns") | ||
print(f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}") | ||
print(f"Customers: {df['customer_id'].nunique()}") | ||
print(f"Categories: {df['merchant_category'].nunique()}") | ||
|
||
# Apply the advanced tiled transformation | ||
print("\nApplying advanced tiled transformation...") | ||
result = advanced_customer_features.transform(df, timestamp_column='timestamp') | ||
|
||
print(f"\nResult shape: {result.shape}") | ||
print(f"Result columns: {list(result.columns)}") | ||
|
||
if not result.empty: | ||
print("\nSample of advanced features:") | ||
display_cols = [] | ||
for col in ['customer_id', 'amount', 'customer_segment', 'high_value_tx']: | ||
if col in result.columns: | ||
display_cols.append(col) | ||
elif isinstance(result.columns, pd.MultiIndex): | ||
# Handle MultiIndex columns from aggregation | ||
matching_cols = [c for c in result.columns if col in str(c)] | ||
display_cols.extend(matching_cols[:2]) # Take first 2 matching | ||
|
||
if display_cols: | ||
print(result[display_cols].head(10)) | ||
|
||
# Show tiling configuration | ||
print(f"\nAdvanced tiling configuration:") | ||
print(f"- Tile size: {advanced_customer_features.tile_size}") | ||
print(f"- Mode: {advanced_customer_features.mode}") | ||
print(f"- Overlap: {advanced_customer_features.overlap}") | ||
print(f"- Aggregation functions: {len(advanced_customer_features.aggregation_functions)}") | ||
print(f"- Chaining functions: {len(advanced_customer_features.chaining_functions)}") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot this omits the schema and variable names, those are important primitives to retain so that we can accurately construct the DAG, as well as render the information in the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the missing
sources
andschema
parameters to thetiled_transformation
decorator following theon_demand_feature_view
pattern.The updated API now includes:
sources
: List of source feature views/data sources for DAG constructionschema
: List of Field definitions with names and types for UI renderingUpdated documentation examples show the complete pattern:
These primitives enable proper DAG construction and feature metadata rendering in the UI. Commit: 915896a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is inconsistent with the StreamFeatureView example below. These need to b e aligned.