- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.2k
Implement Chronon-style tiling transformation engine for streaming features with ComputeEngine, Aggregation support, and comprehensive testing #5644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Co-authored-by: franciscojavierarceo <[email protected]>
…examples Co-authored-by: franciscojavierarceo <[email protected]>
| from feast.transformation.mode import TransformationMode | ||
|  | ||
|  | ||
| class TileConfiguration: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to on_demand_feature_view we should just support a mode parameter for the tiled transformation and pandas should be one of them, rather than create an entire new decorator for each (i.e., requiring the usage of from feast.transformation import pandas_tiled_transformation). Instead, we should have something like:
from feast.transformation import tiled_transformation
from datetime import timedelta
@tiled_transformation(
    mode="pandas",
    tile_size=timedelta(hours=1),
    overlap=timedelta(minutes=5),
    aggregation_functions=[
        lambda df: df.groupby('customer_id').agg({
            'amount': ['sum', 'mean', 'count']
        }).reset_index()
    ]
)
def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(
        rolling_avg=df['amount'].rolling(window=10).mean(),
        cumulative_sum=df['amount'].cumsum()
    )There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot please review and revise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the API to follow the on_demand_feature_view pattern as requested. The tiled_transformation decorator now accepts a mode parameter instead of having separate decorators for each processing mode.
New API:
from feast.transformation import tiled_transformation
@tiled_transformation(
    tile_size=timedelta(hours=1),
    mode="pandas",  # Mode parameter added
    overlap=timedelta(minutes=5),
    aggregation_functions=[...]
)
def hourly_transaction_features(df): ...Changes implemented in commit fefd284:
- Updated tiled_transformationdecorator withmode="pandas"parameter
- Removed separate pandas_tiled_transformationdecorator
- Updated all documentation, examples, and tests to use unified API
- Maintains all existing functionality while following established Feast patterns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a new class/decorator? I think it could be just an Aggregation attribute.
…nd_feature_view pattern Co-authored-by: franciscojavierarceo <[email protected]>
| tile_size=timedelta(hours=1), # Process data in 1-hour tiles | ||
| mode="pandas", # Use pandas processing mode | ||
| overlap=timedelta(minutes=5), # 5-minute overlap between tiles | ||
| aggregation_functions=[ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot this omits the schema and variable names, those are important primitives to retain so that we can accurately construct the DAG, as well as render the information in the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the missing sources and schema parameters to the tiled_transformation decorator following the on_demand_feature_view pattern.
The updated API now includes:
- sources: List of source feature views/data sources for DAG construction
- schema: List of Field definitions with names and types for UI rendering
Updated documentation examples show the complete pattern:
@tiled_transformation(
    sources=["transaction_source_fv"],
    schema=[
        Field(name="rolling_avg", dtype=Float64),
        Field(name="cumulative_amount", dtype=Float64),
    ],
    tile_size=timedelta(hours=1),
    mode="pandas",
    # ...
)These primitives enable proper DAG construction and feature metadata rendering in the UI. Commit: 915896a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is inconsistent with the StreamFeatureView example below. These need to b e aligned.
| ) | ||
|  | ||
| # Usage in StreamFeatureView | ||
| stream_fv = StreamFeatureView( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make sense to pass in a pandas data frame transformation into the Stream Feature View, does it? I suppose it may, we support the "spark" mode today. But this isn't compatible with it as the "mode" is passed into the tiled_transformation vs the StreamFeatureView, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right! I've refactored the architecture to fix this inconsistency.
The mode is now specified at the StreamFeatureView level (where it belongs), not within the transformation itself:
@tiled_transformation(
    tile_size=timedelta(hours=1),
    # mode parameter removed from here
)
def my_transformation(df): ...
# Use with StreamFeatureView - mode specified here
stream_fv = StreamFeatureView(
    feature_transformation=my_transformation,
    mode="spark",  # ComputeEngine mode specified at view level
    source=kafka_source
)This makes it consistent with Feast's architecture where transformations are engine-agnostic and the execution mode is determined by the feature view. Commit: 1932fff
…n_demand_feature_view pattern Co-authored-by: franciscojavierarceo <[email protected]>
| import random | ||
|  | ||
|  | ||
| def tiled_transformation(tile_size, mode="pandas", overlap=None, max_tiles_in_memory=10, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be based on the Transformation class and use the ComputeEngine class as well.
Transformations should be executed on a ComputeEngine. The Tiling should allow usage of an Aggregation as well.
We also need to support a window_size in the class as well as the max tiles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot please adjust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the implementation to address all these architectural requirements:
- Based on Transformationclass:TiledTransformationnow properly inherits from the baseTransformationclass
- ComputeEngine integration: Transformations are designed to be executed by ComputeEngine (Spark, Ray, etc.)
- Aggregation support: Added support for Feast Aggregationobjects alongside custom aggregation functions
- Added window_sizeparameter: Separate fromtile_sizefor aggregations within tiles
- Added max_tilesparameter: For memory management
Enhanced API:
@tiled_transformation(
    aggregations=[  # Feast Aggregation objects
        Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30))
    ],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),  # New parameter
    max_tiles_in_memory=10,  # New parameter
)
def my_transformation(df): ...The transformation now works with ComputeEngine for distributed execution and follows Feast's established transformation patterns. Commit: 1932fff
| @@ -0,0 +1,246 @@ | |||
| """ | |||
| Streaming Integration Example | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should be under sdk/tests/unit/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the streaming integration test to the proper location. Created sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py with comprehensive integration tests for:
- Tiled transformation creation and configuration
- StreamFeatureView compatibility
- ComputeEngine integration
- Aggregation support (both Feast objects and custom functions)
- Memory management
Removed the example-based streaming integration file. Commit: 29ee76d
| @@ -0,0 +1,253 @@ | |||
| """ | |||
| Advanced Tiled Transformation Example | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot  rather than add this documented example, let's add to the sdk/python/feast/templates//local/feature_repo/example_repo.py instead and update the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comprehensive tiled transformation example to sdk/python/feast/templates/local/feature_repo/example_repo.py. The example demonstrates:
@tiled_transformation(
    sources=[driver_stats_fv],
    schema=[Field(name="rolling_avg_trips", dtype=Float64), ...],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),
    aggregations=[Aggregation(column="avg_daily_trips", function="sum", ...)],
)
def driver_tiled_features(df): ...This provides a production-ready example that will be available in all new Feast projects created from templates, with proper ComputeEngine integration patterns and comprehensive documentation. Commit: 29ee76d
…Aggregation objects Co-authored-by: franciscojavierarceo <[email protected]>
… to example template Co-authored-by: franciscojavierarceo <[email protected]>
This PR implements a comprehensive tiling transformation engine for Feast's streaming feature processing, inspired by Chronon's tiled architecture. The implementation enables efficient temporal windowing and aggregation for streaming data by dividing time-series into manageable chunks (tiles), with full ComputeEngine integration, Feast Aggregation support, and comprehensive testing infrastructure.
Overview
Tiling addresses key challenges in streaming feature engineering:
Key Components
Core Architecture
TileConfiguration: Configures tile size, window size, overlap, memory limits, and late data handlingTiledTransformation: Base class inheriting fromTransformationfor ComputeEngine executionTransformationMode.TILING: New transformation mode integrated with existing frameworkComputeEngine Integration
Aggregationobjects and custom functionsAdvanced Features
Usage Examples
Basic Tiling with ComputeEngine
StreamFeatureView Integration
Advanced Chaining
API Design
The implementation follows Feast's established patterns with proper ComputeEngine integration:
sourcesparameter specifies feature view dependencies for proper DAG constructionschemaparameter defines output features with names and types for UI renderingAggregationobjects and custom aggregation functionsConfiguration Options
sources: List of source feature views or data sources for DAG constructionschema: List of Field definitions specifying output feature names and data typesaggregations: List of Feast Aggregation objects for window-based aggregationstile_size: Duration of each time tile (e.g.,timedelta(hours=1))window_size: Window size for aggregations within tiles (defaults to tile_size)overlap: Optional overlap between tiles for continuitymax_tiles_in_memory: Maximum number of tiles to keep in memory (default: 10)enable_late_data_handling: Whether to handle late-arriving data (default: True)aggregation_functions: Custom functions to apply within each tilechaining_functions: Functions to chain results across tiles for derived featuresNote: The transformation mode (e.g., "spark", "pandas") is specified at the StreamFeatureView level, not within the transformation itself. This allows the same tiled transformation to work with different ComputeEngines.
Testing & Template Integration
Comprehensive Test Coverage
sdk/python/tests/unit/transformation/test_tiled_transformation.pysdk/python/tests/unit/transformation/test_tiled_transformation_integration.pysdk/python/feast/templates/local/feature_repo/example_repo.pyTemplate Example
The implementation includes a comprehensive example in the Feast project template:
Documentation & Examples
Benefits
This implementation provides:
The tiling engine enables use cases like real-time fraud detection, customer behavior analytics, and temporal aggregations that require both efficiency and temporal continuity in streaming environments, while providing full integration with Feast's feature lineage, UI systems, distributed compute infrastructure, and development workflow.
Original prompt
💬 Share your feedback on Copilot coding agent for the chance to win a $200 gift card! Click here to start the survey.