diff --git a/docs/getting-started/architecture/feature-transformation.md b/docs/getting-started/architecture/feature-transformation.md index 419f6172746..2f1f3e5e752 100644 --- a/docs/getting-started/architecture/feature-transformation.md +++ b/docs/getting-started/architecture/feature-transformation.md @@ -115,4 +115,104 @@ feature_view = FeatureView( ... ) ``` -The underlying implementation of the join is an inner join by default, and join key is the entity id. \ No newline at end of file +The underlying implementation of the join is an inner join by default, and join key is the entity id. + +### Tiling + +Tiling is a transformation pattern designed for efficient streaming feature engineering, inspired by Chronon's tiled architecture. It divides time-series data into manageable chunks (tiles) for processing temporal aggregations and derived features efficiently. This is particularly useful for: + +- Temporal aggregations over sliding windows +- Chaining features across different time horizons +- Handling late-arriving data in streaming scenarios +- Memory-efficient processing of large time-series data + +Examples include: + +```python +from feast.transformation import tiled_transformation +from feast import Field, Aggregation +from feast.types import Float64, Int64 +from datetime import timedelta + +@tiled_transformation( + sources=["transaction_source_fv"], # Source feature views or data sources + schema=[ # Output feature schema + Field(name="rolling_avg", dtype=Float64), + Field(name="cumulative_amount", dtype=Float64), + Field(name="transaction_count", dtype=Int64), + ], + tile_size=timedelta(hours=1), # Process data in 1-hour tiles + window_size=timedelta(minutes=30), # Window size for aggregations within tiles + overlap=timedelta(minutes=5), # 5-minute overlap between tiles + aggregations=[ # Feast Aggregation objects + Aggregation(column="transaction_amount", function="sum", time_window=timedelta(minutes=30)), + Aggregation(column="transaction_amount", function="mean", time_window=timedelta(minutes=30)), + ], + aggregation_functions=[ # Custom aggregation functions + lambda df: df.groupby('entity_id').agg({ + 'transaction_amount': ['count'] + }).reset_index() + ] +) +def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame: + """Transform transaction data within each hour tile.""" + return df.assign( + rolling_avg=df['transaction_amount'].rolling(window=10).mean(), + cumulative_amount=df['transaction_amount'].cumsum(), + transaction_count=df.groupby('entity_id').cumcount() + 1 + ) + +# Usage in StreamFeatureView - mode specified here, not in transformation +stream_fv = StreamFeatureView( + name="transaction_features", + feature_transformation=hourly_transaction_features, + source=kafka_source, + mode="spark", # Transformation mode specified at StreamFeatureView level + entities=["customer_id"] +) +``` + +**Chaining Features Example:** +```python +@tiled_transformation( + sources=["transaction_hourly_fv"], # Source feature views + schema=[ # Output feature schema + Field(name="local_cumsum", dtype=Float64), + Field(name="global_cumsum", dtype=Float64), + ], + tile_size=timedelta(hours=1), + window_size=timedelta(minutes=30), # Window size for aggregations + chaining_functions=[ + # Chain cumulative sums across tiles for continuity + lambda prev_df, curr_df: curr_df.assign( + global_cumsum=curr_df['amount'] + (prev_df['global_cumsum'].max() if not prev_df.empty else 0) + ) + ] +) +def chained_cumulative_features(df: pd.DataFrame) -> pd.DataFrame: + return df.assign( + local_cumsum=df['amount'].cumsum() + ) + +# Use with StreamFeatureView (mode specified at view level) +stream_fv = StreamFeatureView( + name="chained_features", + feature_transformation=chained_cumulative_features, + source=kafka_source, + mode="spark" # ComputeEngine mode specified here +) +``` + +**Configuration Options:** +- `sources`: List of source feature views or data sources that this transformation depends on +- `schema`: List of Field definitions specifying output feature names and data types +- `aggregations`: List of Feast Aggregation objects for window-based aggregations +- `tile_size`: Duration of each time tile (e.g., `timedelta(hours=1)`) +- `window_size`: Window size for aggregations within tiles (defaults to tile_size) +- `overlap`: Optional overlap between tiles for continuity +- `max_tiles_in_memory`: Maximum number of tiles to keep in memory (default: 10) +- `enable_late_data_handling`: Whether to handle late-arriving data (default: True) +- `aggregation_functions`: Custom functions to apply within each tile +- `chaining_functions`: Functions to chain results across tiles for derived features + +**Note**: The transformation mode (e.g., "spark", "pandas") is specified at the StreamFeatureView level, not within the transformation itself. This allows the same tiled transformation to work with different ComputeEngines. \ No newline at end of file diff --git a/examples/tiled_streaming_features/README.md b/examples/tiled_streaming_features/README.md new file mode 100644 index 00000000000..91504461e12 --- /dev/null +++ b/examples/tiled_streaming_features/README.md @@ -0,0 +1,53 @@ +# Tiled Streaming Features Example + +This example demonstrates how to use Feast's tiling transformation engine for efficient streaming feature engineering. + +## Overview + +Tiling in Feast is inspired by Chronon's tiled architecture and provides: +- Time-based data partitioning into manageable tiles +- Efficient temporal aggregations over sliding windows +- Chaining features across different time horizons +- Memory-efficient processing of streaming data +- Late-arriving data handling + +## Examples + +See the example files: +- `basic_tiling.py` - Basic tiled transformation usage +- `advanced_tiling.py` - Advanced features like chaining and complex aggregations + +For production integration examples, see: +- `sdk/python/feast/templates/local/feature_repo/example_repo.py` - Template example with tiled transformations +- `sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py` - Integration tests with StreamFeatureView + +## Running the Examples + +```bash +# Basic tiling example +python basic_tiling.py + +# Advanced tiling with chaining +python advanced_tiling.py +``` + +## Key Concepts + +### Tile Configuration +- **tile_size**: Duration of each time tile (e.g., `timedelta(hours=1)`) +- **window_size**: Window size for aggregations within tiles (defaults to tile_size) +- **overlap**: Optional overlap between tiles for continuity +- **max_tiles_in_memory**: Maximum number of tiles to keep in memory +- **enable_late_data_handling**: Whether to handle late-arriving data + +### Aggregation Functions +Functions that operate within each tile to compute aggregated features. + +### Chaining Functions +Functions that chain results across tiles for derived features that require continuity across time boundaries. + +### ComputeEngine Integration +Tiled transformations work with Feast's ComputeEngine architecture: +- Mode specified at StreamFeatureView level (not in transformation) +- Supports Spark, Ray, and other distributed engines +- Integrates with Feast Aggregation objects \ No newline at end of file diff --git a/examples/tiled_streaming_features/advanced_tiling.py b/examples/tiled_streaming_features/advanced_tiling.py new file mode 100644 index 00000000000..4b4a50fd70e --- /dev/null +++ b/examples/tiled_streaming_features/advanced_tiling.py @@ -0,0 +1,253 @@ +""" +Advanced Tiled Transformation Example + +This example demonstrates advanced tiling features including: +- Chaining functions for cross-tile continuity +- Complex aggregation patterns +- Late data handling +- Memory management +""" + +import pandas as pd +from datetime import datetime, timedelta +import random + + +def tiled_transformation(tile_size, mode="pandas", overlap=None, max_tiles_in_memory=10, + aggregation_functions=None, chaining_functions=None): + """Simplified decorator for demonstration purposes.""" + def decorator(func): + func.tile_size = tile_size + func.mode = mode + func.overlap = overlap or timedelta(seconds=0) + func.aggregation_functions = aggregation_functions or [] + func.chaining_functions = chaining_functions or [] + + def transform(data, timestamp_column='timestamp'): + """Simplified tiled transformation with chaining logic.""" + # Simulate tile processing + tiles = [] + + # Sort data by timestamp + data_sorted = data.sort_values(timestamp_column) + + # Create mock tiles (in real implementation, this would be time-based) + tile_size_minutes = int(tile_size.total_seconds() / 60) + unique_hours = data_sorted[timestamp_column].dt.floor(f'{tile_size_minutes}min').unique() + + for hour in unique_hours: + tile_data = data_sorted[ + data_sorted[timestamp_column].dt.floor(f'{tile_size_minutes}min') == hour + ].copy() + + if not tile_data.empty: + # Apply base transformation + tile_result = func(tile_data) + + # Apply aggregation functions + for agg_func in func.aggregation_functions: + tile_result = agg_func(tile_result) + + tiles.append(tile_result) + + # Chain tiles if chaining functions are provided + if func.chaining_functions and len(tiles) > 1: + chained_result = tiles[0] + for i in range(1, len(tiles)): + for chain_func in func.chaining_functions: + chained_result = chain_func(chained_result, tiles[i]) + return chained_result + elif tiles: + return pd.concat(tiles, ignore_index=True) + else: + return pd.DataFrame() + + func.transform = transform + return func + + return decorator + + +# Advanced tiled transformation with chaining +@tiled_transformation( + tile_size=timedelta(hours=1), + mode="pandas", + overlap=timedelta(minutes=10), + max_tiles_in_memory=3, + aggregation_functions=[ + # Complex multi-level aggregation + lambda df: df.groupby(['customer_id', 'merchant_category']).agg({ + 'amount': ['sum', 'mean', 'std', 'count'], + 'timestamp': ['min', 'max'], + 'high_value_tx': 'sum' + }).reset_index(), + ], + chaining_functions=[ + # Chain running totals across tiles + lambda prev_df, curr_df: chain_running_totals(prev_df, curr_df), + # Chain session continuity + lambda prev_df, curr_df: chain_session_features(prev_df, curr_df) + ] +) +def advanced_customer_features(df: pd.DataFrame) -> pd.DataFrame: + """ + Advanced transformation with complex feature engineering. + + Args: + df: DataFrame with transaction data + + Returns: + DataFrame with advanced derived features + """ + df = df.copy() + + # Feature engineering within tile + df['hour_of_day'] = df['timestamp'].dt.hour + df['day_of_week'] = df['timestamp'].dt.dayofweek + df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) + + # Transaction patterns + df['high_value_tx'] = (df['amount'] > df['amount'].quantile(0.8)).astype(int) + df['velocity'] = df.groupby('customer_id')['timestamp'].diff().dt.total_seconds() / 60 # minutes + + # Rolling features within tile (with minimum periods for robustness) + df = df.sort_values(['customer_id', 'timestamp']) + df['rolling_velocity_avg'] = df.groupby('customer_id')['velocity'].rolling( + window=3, min_periods=1 + ).mean().reset_index(drop=True) + + # Customer behavior segmentation within tile + customer_stats = df.groupby('customer_id').agg({ + 'amount': ['mean', 'std'], + 'velocity': 'mean' + }).reset_index() + customer_stats.columns = ['customer_id', 'avg_amount', 'std_amount', 'avg_velocity'] + customer_stats['customer_segment'] = 'regular' + customer_stats.loc[customer_stats['avg_amount'] > customer_stats['avg_amount'].quantile(0.8), 'customer_segment'] = 'high_value' + customer_stats.loc[customer_stats['avg_velocity'] < 10, 'customer_segment'] = 'slow_spender' + + # Merge back to main dataframe + df = df.merge(customer_stats[['customer_id', 'customer_segment']], on='customer_id', how='left') + + return df + + +def chain_running_totals(prev_df: pd.DataFrame, curr_df: pd.DataFrame) -> pd.DataFrame: + """Chain running totals across tiles.""" + if prev_df.empty: + return curr_df + + curr_df = curr_df.copy() + + # Get the last running total for each customer from previous tile + if 'amount' in prev_df.columns and 'sum' in prev_df.columns: + prev_totals = prev_df.groupby('customer_id')['amount']['sum'].last().to_dict() + + # Add to current tile's totals + for customer_id in curr_df['customer_id'].unique(): + if customer_id in prev_totals: + mask = curr_df['customer_id'] == customer_id + if 'amount' in curr_df.columns and 'sum' in curr_df.columns: + curr_df.loc[mask, ('amount', 'sum')] += prev_totals[customer_id] + + return curr_df + + +def chain_session_features(prev_df: pd.DataFrame, curr_df: pd.DataFrame) -> pd.DataFrame: + """Chain session-based features across tiles.""" + if prev_df.empty: + return curr_df + + curr_df = curr_df.copy() + + # Add session continuity indicator (simplified) + curr_df['session_continued'] = 1 # Would be more complex in real implementation + + return curr_df + + +def simulate_late_arriving_data(df: pd.DataFrame, late_probability: float = 0.1) -> pd.DataFrame: + """Simulate late-arriving data by randomly delaying some records.""" + df = df.copy() + + # Randomly select some records to be "late" + late_mask = pd.Series([random.random() < late_probability for _ in range(len(df))], index=df.index) + + # Add delay to late records + df.loc[late_mask, 'timestamp'] = df.loc[late_mask, 'timestamp'] + timedelta(minutes=random.randint(5, 30)) + + # Re-sort by original order but keep the delayed timestamps + return df + + +def main(): + """Demonstrate advanced tiling features.""" + + # Create more complex sample data + base_time = datetime(2024, 1, 1, 10, 0, 0) + + transactions = [] + customers = [f'customer_{i}' for i in range(8)] + merchant_categories = ['grocery', 'restaurant', 'gas', 'retail', 'entertainment', 'healthcare'] + + for i in range(120): + # More realistic transaction patterns + customer = random.choice(customers) + category = random.choice(merchant_categories) + + # Vary amounts based on category + category_multipliers = { + 'grocery': 1.0, 'restaurant': 1.5, 'gas': 0.8, + 'retail': 2.0, 'entertainment': 1.8, 'healthcare': 3.0 + } + base_amount = random.uniform(20, 200) * category_multipliers[category] + + transactions.append({ + 'timestamp': base_time + timedelta(minutes=i*5 + random.randint(-2, 2)), + 'customer_id': customer, + 'amount': round(base_amount, 2), + 'merchant_category': category + }) + + df = pd.DataFrame(transactions) + + # Simulate late-arriving data + df = simulate_late_arriving_data(df, late_probability=0.15) + + print(f"Created {len(df)} sample transactions with realistic patterns") + print(f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}") + print(f"Customers: {df['customer_id'].nunique()}") + print(f"Categories: {df['merchant_category'].nunique()}") + + # Apply the advanced tiled transformation + print("\nApplying advanced tiled transformation...") + result = advanced_customer_features.transform(df, timestamp_column='timestamp') + + print(f"\nResult shape: {result.shape}") + print(f"Result columns: {list(result.columns)}") + + if not result.empty: + print("\nSample of advanced features:") + display_cols = [] + for col in ['customer_id', 'amount', 'customer_segment', 'high_value_tx']: + if col in result.columns: + display_cols.append(col) + elif isinstance(result.columns, pd.MultiIndex): + # Handle MultiIndex columns from aggregation + matching_cols = [c for c in result.columns if col in str(c)] + display_cols.extend(matching_cols[:2]) # Take first 2 matching + + if display_cols: + print(result[display_cols].head(10)) + + # Show tiling configuration + print(f"\nAdvanced tiling configuration:") + print(f"- Tile size: {advanced_customer_features.tile_size}") + print(f"- Mode: {advanced_customer_features.mode}") + print(f"- Overlap: {advanced_customer_features.overlap}") + print(f"- Aggregation functions: {len(advanced_customer_features.aggregation_functions)}") + print(f"- Chaining functions: {len(advanced_customer_features.chaining_functions)}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/tiled_streaming_features/basic_tiling.py b/examples/tiled_streaming_features/basic_tiling.py new file mode 100644 index 00000000000..d0194a36685 --- /dev/null +++ b/examples/tiled_streaming_features/basic_tiling.py @@ -0,0 +1,120 @@ +""" +Basic Tiled Transformation Example + +This example shows how to use tiled transformations for processing +streaming transaction data in time-based chunks. +""" + +import pandas as pd +from datetime import datetime, timedelta + +# Note: In a real Feast installation, you would import like this: +# from feast.transformation import tiled_transformation + +# For this example, we'll define a simplified version +def tiled_transformation(tile_size, mode="pandas", overlap=None, max_tiles_in_memory=10, aggregation_functions=None): + """Simplified decorator for demonstration purposes.""" + def decorator(func): + func.tile_size = tile_size + func.mode = mode + func.overlap = overlap or timedelta(seconds=0) + func.aggregation_functions = aggregation_functions or [] + + def transform(data, timestamp_column='timestamp'): + """Simplified tiled transformation logic.""" + # In the real implementation, this would partition data into tiles + # and apply the transformation to each tile + result = func(data) + + # Apply aggregation functions + for agg_func in func.aggregation_functions: + result = agg_func(result) + + return result + + func.transform = transform + return func + + return decorator + + +# Define a tiled transformation for transaction features +@tiled_transformation( + tile_size=timedelta(hours=1), # Process data in 1-hour tiles + mode="pandas", # Use pandas processing mode + overlap=timedelta(minutes=5), # 5-minute overlap for continuity + max_tiles_in_memory=5, # Keep max 5 tiles in memory + aggregation_functions=[ + # Aggregate by customer within each tile + lambda df: df.groupby('customer_id').agg({ + 'amount': ['sum', 'mean', 'count'], + 'timestamp': 'max' + }).reset_index() + ] +) +def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame: + """ + Transform transaction data within each hour tile. + + Args: + df: DataFrame with columns: customer_id, amount, timestamp, merchant_category + + Returns: + DataFrame with derived features + """ + # Basic transformations within the tile + df = df.copy() + + # Calculate rolling statistics (within tile) + df = df.sort_values(['customer_id', 'timestamp']) + df['rolling_avg_3'] = df.groupby('customer_id')['amount'].rolling(window=3, min_periods=1).mean().reset_index(drop=True) + df['rolling_sum_2'] = df.groupby('customer_id')['amount'].rolling(window=2, min_periods=1).sum().reset_index(drop=True) + + # Calculate cumulative features (within tile) + df['cumulative_amount'] = df.groupby('customer_id')['amount'].cumsum() + df['transaction_count'] = df.groupby('customer_id').cumcount() + 1 + + # Merchant category features + df['high_risk_merchant'] = df['merchant_category'].isin(['gambling', 'adult', 'alcohol']).astype(int) + + return df + + +def main(): + """Example usage of the tiled transformation.""" + + # Create sample streaming transaction data + base_time = datetime(2024, 1, 1, 10, 0, 0) + + transactions = [] + for i in range(50): + transactions.append({ + 'timestamp': base_time + timedelta(minutes=i*3), # Every 3 minutes + 'customer_id': f'customer_{i % 5}', # 5 different customers + 'amount': 50 + (i % 7) * 20, # Varying amounts + 'merchant_category': ['grocery', 'restaurant', 'gas', 'retail'][i % 4] + }) + + df = pd.DataFrame(transactions) + print(f"Created {len(df)} sample transactions") + print(f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}") + + # Apply the tiled transformation + print("\nApplying tiled transformation...") + result = hourly_transaction_features.transform(df, timestamp_column='timestamp') + + print(f"\nResult shape: {result.shape}") + print("\nResult columns:", list(result.columns)) + print("\nResult sample:") + print(result.head(10)) + + # Show the effect of tiling configuration + print(f"\nTile configuration:") + print(f"- Tile size: {hourly_transaction_features.tile_size}") + print(f"- Mode: {hourly_transaction_features.mode}") + print(f"- Overlap: {hourly_transaction_features.overlap}") + print(f"- Aggregation functions: {len(hourly_transaction_features.aggregation_functions)}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index e148000bc96..b90cabfdb5d 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -129,7 +129,47 @@ def _ingest_stream_data(self) -> StreamTable: return stream_df def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: + # Check if we have a tiled transformation + if hasattr(self.sfv, 'feature_transformation') and self.sfv.feature_transformation: + from feast.transformation.tiled_transformation import TiledTransformation + if isinstance(self.sfv.feature_transformation, TiledTransformation): + # Apply tiled transformation logic for streaming + return self._apply_tiled_transformation(df, self.sfv.feature_transformation) + + # Fallback to existing UDF approach return self.sfv.udf.__call__(df) if self.sfv.udf else df + + def _apply_tiled_transformation(self, df: StreamTable, tiled_transform: "TiledTransformation") -> StreamTable: + """ + Apply tiled transformation to streaming DataFrame. + + For streaming data, we need to implement windowing logic that works with Spark Streaming. + """ + from pyspark.sql import functions as F + from pyspark.sql.window import Window + + # Get tile configuration + tile_config = tiled_transform.tile_config + + # Convert timedelta to seconds for Spark + window_duration = f"{int(tile_config.tile_size.total_seconds())} seconds" + slide_duration = f"{int(tile_config.slide_interval.total_seconds())} seconds" if tile_config.slide_interval else window_duration + + # Add watermark for late data handling if enabled + if tile_config.enable_late_data_handling: + df = df.withWatermark("timestamp", "30 seconds") # Allow 30 seconds for late data + + # Create time-based windows for tiling + df_windowed = df.withColumn( + "tile_window", + F.window(F.col("timestamp"), window_duration, slide_duration) + ) + + # Apply the transformation within each window/tile + # For now, we'll use the UDF directly - more sophisticated tiling logic can be added + result_df = tiled_transform.udf(df_windowed) + + return result_df def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. diff --git a/sdk/python/feast/templates/local/feature_repo/example_repo.py b/sdk/python/feast/templates/local/feature_repo/example_repo.py index e2fd0a891cf..e5f1b8a62ae 100644 --- a/sdk/python/feast/templates/local/feature_repo/example_repo.py +++ b/sdk/python/feast/templates/local/feature_repo/example_repo.py @@ -142,6 +142,65 @@ def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame: return df +# Example of tiled transformation for streaming feature engineering +# This demonstrates Chronon-style tiling for efficient temporal processing +try: + from feast.transformation import tiled_transformation + from feast.aggregation import Aggregation + + @tiled_transformation( + sources=[driver_stats_fv], # Source feature views for DAG construction + schema=[ # Output schema for UI rendering + Field(name="rolling_avg_trips", dtype=Float64), + Field(name="cumulative_trips", dtype=Int64), + Field(name="trip_velocity", dtype=Float64), + ], + tile_size=timedelta(hours=1), # Process data in 1-hour tiles + window_size=timedelta(minutes=30), # Window size for aggregations within tiles + overlap=timedelta(minutes=5), # 5-minute overlap between tiles + aggregations=[ # Feast Aggregation objects + Aggregation(column="avg_daily_trips", function="sum", time_window=timedelta(minutes=30)), + Aggregation(column="avg_daily_trips", function="mean", time_window=timedelta(minutes=30)), + ], + max_tiles_in_memory=10, # Memory management + enable_late_data_handling=True, # Handle late-arriving data + ) + def driver_tiled_features(df: pd.DataFrame) -> pd.DataFrame: + """ + Tiled transformation for efficient streaming driver feature processing. + + This transformation processes driver statistics in temporal tiles, + enabling efficient computation of rolling averages and cumulative metrics + for streaming scenarios. + """ + # Calculate rolling features within each tile + df = df.sort_values('event_timestamp') + df['rolling_avg_trips'] = df['avg_daily_trips'].rolling(window=3, min_periods=1).mean() + + # Calculate cumulative features within tile + df['cumulative_trips'] = df['avg_daily_trips'].cumsum() + + # Calculate trip velocity (trips per time unit) + time_diff = df['event_timestamp'].diff().dt.total_seconds() / 3600 # hours + df['trip_velocity'] = df['avg_daily_trips'] / (time_diff.fillna(1) + 0.1) # avoid division by zero + + return df + + # Note: In a streaming scenario, this would be used with StreamFeatureView: + # stream_fv = StreamFeatureView( + # name="driver_tiled_features", + # feature_transformation=driver_tiled_features, + # source=stream_source, # Kafka or other streaming source + # mode="spark", # ComputeEngine mode specified here + # entities=[driver], + # aggregations=driver_tiled_features.aggregations # Can reuse aggregations + # ) + +except ImportError: + # Tiled transformation not available - skip this example + driver_tiled_features = None + + driver_activity_v3 = FeatureService( name="driver_activity_v3", features=[driver_stats_fresh_fv, transformed_conv_rate_fresh], diff --git a/sdk/python/feast/transformation/__init__.py b/sdk/python/feast/transformation/__init__.py index e69de29bb2d..562d1a9bf2f 100644 --- a/sdk/python/feast/transformation/__init__.py +++ b/sdk/python/feast/transformation/__init__.py @@ -0,0 +1,22 @@ +from .base import Transformation, transformation +from .mode import TransformationMode +from .pandas_transformation import PandasTransformation +from .python_transformation import PythonTransformation +from .spark_transformation import SparkTransformation +from .sql_transformation import SQLTransformation +from .substrait_transformation import SubstraitTransformation +from .tiled_transformation import TiledTransformation, TileConfiguration, tiled_transformation + +__all__ = [ + "Transformation", + "transformation", + "TransformationMode", + "PandasTransformation", + "PythonTransformation", + "SparkTransformation", + "SQLTransformation", + "SubstraitTransformation", + "TiledTransformation", + "TileConfiguration", + "tiled_transformation", +] diff --git a/sdk/python/feast/transformation/factory.py b/sdk/python/feast/transformation/factory.py index 50c3c665764..ba595d4b8ed 100644 --- a/sdk/python/feast/transformation/factory.py +++ b/sdk/python/feast/transformation/factory.py @@ -7,6 +7,7 @@ "sql": "feast.transformation.sql_transformation.SQLTransformation", "spark_sql": "feast.transformation.spark_transformation.SparkTransformation", "spark": "feast.transformation.spark_transformation.SparkTransformation", + "tiling": "feast.transformation.tiled_transformation.TiledTransformation", } diff --git a/sdk/python/feast/transformation/mode.py b/sdk/python/feast/transformation/mode.py index 2b453477b3a..9b551f3c636 100644 --- a/sdk/python/feast/transformation/mode.py +++ b/sdk/python/feast/transformation/mode.py @@ -8,3 +8,4 @@ class TransformationMode(Enum): SPARK = "spark" SQL = "sql" SUBSTRAIT = "substrait" + TILING = "tiling" diff --git a/sdk/python/feast/transformation/pandas_tiled_transformation.py b/sdk/python/feast/transformation/pandas_tiled_transformation.py new file mode 100644 index 00000000000..fc0160bfd7b --- /dev/null +++ b/sdk/python/feast/transformation/pandas_tiled_transformation.py @@ -0,0 +1,280 @@ +from datetime import timedelta +from typing import Any, Callable, Dict, List, Optional + +import pandas as pd + +from feast.transformation.tiled_transformation import TiledTransformation, TileConfiguration + + +class PandasTiledTransformation(TiledTransformation): + """ + Pandas-specific implementation of tiled transformations for streaming feature engineering. + + This class implements efficient temporal windowing and aggregation for Pandas DataFrames, + following Chronon's tiled architecture patterns. + """ + + def __init__( + self, + udf: Callable[[pd.DataFrame], pd.DataFrame], + udf_string: str, + tile_config: TileConfiguration, + sources: Optional[List[Any]] = None, + schema: Optional[List[Any]] = None, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + aggregation_functions: Optional[List[Callable[[pd.DataFrame], pd.DataFrame]]] = None, + chaining_functions: Optional[List[Callable[[pd.DataFrame, pd.DataFrame], pd.DataFrame]]] = None, + ): + super().__init__( + udf=udf, + udf_string=udf_string, + tile_config=tile_config, + sources=sources, + schema=schema, + name=name, + tags=tags, + description=description, + owner=owner, + aggregation_functions=aggregation_functions, + chaining_functions=chaining_functions, + ) + + def transform(self, data: pd.DataFrame, timestamp_column: str = "timestamp") -> pd.DataFrame: + """ + Apply tiled transformation to a Pandas DataFrame. + + Args: + data: Input DataFrame with timestamp column + timestamp_column: Name of the timestamp column for tiling + + Returns: + Transformed DataFrame with tiled processing applied + """ + if timestamp_column not in data.columns: + raise ValueError(f"Timestamp column '{timestamp_column}' not found in DataFrame") + + # Ensure timestamp column is datetime + if not pd.api.types.is_datetime64_any_dtype(data[timestamp_column]): + data[timestamp_column] = pd.to_datetime(data[timestamp_column]) + + return self._apply_tiled_processing(data, timestamp_column) + + def _partition_into_tiles(self, data: pd.DataFrame, timestamp_column: str) -> Dict[str, pd.DataFrame]: + """ + Partition DataFrame into time-based tiles. + + Args: + data: Input DataFrame + timestamp_column: Column to use for time-based partitioning + + Returns: + Dictionary mapping tile keys to DataFrame chunks + """ + tiles = {} + + if data.empty: + return tiles + + # Sort by timestamp to ensure proper tile processing order + data_sorted = data.sort_values(timestamp_column) + + # Find time range + min_time = data_sorted[timestamp_column].min() + max_time = data_sorted[timestamp_column].max() + + # Create tile boundaries + tile_start = min_time + tile_index = 0 + + while tile_start <= max_time: + tile_end = tile_start + self.tile_config.tile_size + + # Apply overlap if configured + if self.tile_config.overlap.total_seconds() > 0 and tile_index > 0: + tile_start_adjusted = tile_start - self.tile_config.overlap + else: + tile_start_adjusted = tile_start + + # Filter data for this tile + tile_mask = ( + (data_sorted[timestamp_column] >= tile_start_adjusted) & + (data_sorted[timestamp_column] < tile_end) + ) + + tile_data = data_sorted[tile_mask] + + if not tile_data.empty: + tile_key = f"tile_{tile_index}_{tile_start.isoformat()}" + tiles[tile_key] = tile_data.copy() + + # Move to next tile + tile_start = tile_end + tile_index += 1 + + return tiles + + def _process_single_tile(self, tile_data: pd.DataFrame, tile_key: str) -> pd.DataFrame: + """ + Process a single tile of DataFrame data. + + Args: + tile_data: DataFrame for this tile + tile_key: Unique identifier for this tile + + Returns: + Processed DataFrame + """ + # Apply the main UDF to the tile + result = self.udf(tile_data) + + # Apply any tile-specific aggregations + for agg_func in self.aggregation_functions: + result = agg_func(result) + + # Cache management + if len(self._tile_cache) >= self.tile_config.max_tiles_in_memory: + # Remove oldest tile + if self._tile_timestamps: + oldest_key = self._tile_timestamps.pop(0) + self._tile_cache.pop(oldest_key, None) + + self._tile_cache[tile_key] = result + self._tile_timestamps.append(tile_key) + + return result + + def _chain_tile_results(self, processed_tiles: List[pd.DataFrame]) -> pd.DataFrame: + """ + Chain results across tiles for derived features. + + Args: + processed_tiles: List of processed tile DataFrames + + Returns: + Chained result DataFrame + """ + if not processed_tiles: + return pd.DataFrame() + + result = processed_tiles[0] + + for chain_func in self.chaining_functions: + for i in range(1, len(processed_tiles)): + result = chain_func(result, processed_tiles[i]) + + return result + + def _combine_tiles(self, processed_tiles: List[pd.DataFrame]) -> pd.DataFrame: + """ + Combine processed tiles into final result DataFrame. + + Args: + processed_tiles: List of processed tile DataFrames + + Returns: + Combined result DataFrame + """ + if not processed_tiles: + return pd.DataFrame() + + # Concatenate all tiles + combined = pd.concat(processed_tiles, ignore_index=True) + + # Remove duplicates if overlap was used (keep most recent) + if self.tile_config.overlap.total_seconds() > 0 and 'timestamp' in combined.columns: + combined = combined.drop_duplicates( + subset=['timestamp'] + [col for col in combined.columns if col.startswith('entity_')], + keep='last' + ) + + return combined + + def infer_features(self, random_input: Dict[str, List[Any]], *args, **kwargs) -> List[Any]: + """ + Infer feature schema from tiled transformation. + + Args: + random_input: Sample input data for schema inference + + Returns: + Inferred feature list + """ + # Create sample DataFrame + df = pd.DataFrame.from_dict(random_input) + + # Add timestamp if not present + if 'timestamp' not in df.columns: + df['timestamp'] = pd.Timestamp.now() + + # Apply transformation to sample + sample_tile = self._process_single_tile(df, "sample_tile") + + # Use pandas transformation logic for feature inference + from feast.transformation.pandas_transformation import PandasTransformation + temp_pandas_transform = PandasTransformation( + udf=lambda x: sample_tile, + udf_string="lambda x: sample_tile" + ) + + return temp_pandas_transform.infer_features(random_input, *args, **kwargs) + + +def pandas_tiled_transformation( + tile_size: timedelta, + overlap: Optional[timedelta] = None, + max_tiles_in_memory: int = 10, + enable_late_data_handling: bool = True, + aggregation_functions: Optional[List[Callable[[pd.DataFrame], pd.DataFrame]]] = None, + chaining_functions: Optional[List[Callable[[pd.DataFrame, pd.DataFrame], pd.DataFrame]]] = None, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", +): + """ + Decorator for creating Pandas-specific tiled transformations. + + Example: + @pandas_tiled_transformation( + tile_size=timedelta(hours=1), + overlap=timedelta(minutes=5), + aggregation_functions=[ + lambda df: df.groupby('entity_id').agg({ + 'value': ['sum', 'mean', 'count'] + }).reset_index() + ] + ) + def hourly_aggregated_features(df: pd.DataFrame) -> pd.DataFrame: + # Apply transformations within each 1-hour tile + return df.assign( + rolling_avg=df['value'].rolling(window=10).mean(), + cumsum=df['value'].cumsum() + ) + """ + def decorator(user_function): + import dill + + udf_string = dill.source.getsource(user_function) + tile_config = TileConfiguration( + tile_size=tile_size, + overlap=overlap, + max_tiles_in_memory=max_tiles_in_memory, + enable_late_data_handling=enable_late_data_handling, + ) + + return PandasTiledTransformation( + udf=user_function, + udf_string=udf_string, + tile_config=tile_config, + name=name or user_function.__name__, + tags=tags, + description=description, + owner=owner, + aggregation_functions=aggregation_functions, + chaining_functions=chaining_functions, + ) + + return decorator \ No newline at end of file diff --git a/sdk/python/feast/transformation/tiled_transformation.py b/sdk/python/feast/transformation/tiled_transformation.py new file mode 100644 index 00000000000..aab810cdacd --- /dev/null +++ b/sdk/python/feast/transformation/tiled_transformation.py @@ -0,0 +1,326 @@ +from datetime import timedelta +from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING + +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode + +if TYPE_CHECKING: + from feast.feature_view import FeatureView + from feast.data_source import DataSource + from feast.field import Field + from feast.aggregation import Aggregation + + +class TileConfiguration: + """ + Configuration for tiling in streaming transformations. + + Attributes: + tile_size: The size of each time tile (e.g., 1 hour) + window_size: The window size for aggregations within tiles + overlap: Optional overlap between tiles for continuity + max_tiles_in_memory: Maximum number of tiles to keep in memory + enable_late_data_handling: Whether to handle late-arriving data + """ + + def __init__( + self, + tile_size: timedelta, + window_size: Optional[timedelta] = None, + overlap: Optional[timedelta] = None, + max_tiles_in_memory: int = 10, + enable_late_data_handling: bool = True, + ): + self.tile_size = tile_size + self.window_size = window_size or tile_size # Default window_size to tile_size + self.overlap = overlap or timedelta(seconds=0) + self.max_tiles_in_memory = max_tiles_in_memory + self.enable_late_data_handling = enable_late_data_handling + + +class TiledTransformation(Transformation): + """ + A transformation that operates on tiled data for efficient streaming processing. + + Based on Chronon's tiled architecture approach, this transformation divides time + into manageable chunks (tiles) for processing streaming aggregations and derived + features efficiently. This transformation works with ComputeEngine for execution. + + This is particularly useful for: + - Temporal aggregations over sliding windows + - Chaining features across different time horizons + - Handling late-arriving data in streaming scenarios + - Memory-efficient processing of large time-series data + + Attributes: + tile_config: Configuration for tiling behavior + sources: Source feature views or data sources for DAG construction + schema: Output feature schema for UI rendering + aggregations: List of Aggregation objects for window-based aggregations + aggregation_functions: Custom functions to apply within each tile + chaining_functions: Functions for chaining results across tiles + """ + + def __init__( + self, + mode: Union[TransformationMode, str], + udf: Callable[[Any], Any], + udf_string: str, + tile_config: TileConfiguration, + sources: Optional[List[Union[str, "FeatureView", "DataSource"]]] = None, + schema: Optional[List["Field"]] = None, + aggregations: Optional[List["Aggregation"]] = None, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + aggregation_functions: Optional[List[Callable]] = None, + chaining_functions: Optional[List[Callable]] = None, + ): + super().__init__( + mode=mode, + udf=udf, + udf_string=udf_string, + name=name, + tags=tags, + description=description, + owner=owner, + ) + self.tile_config = tile_config + self.sources = sources or [] + self.schema = schema or [] + self.aggregations = aggregations or [] # Support Feast Aggregation objects + self.aggregation_functions = aggregation_functions or [] + self.chaining_functions = chaining_functions or [] + + # State for tracking tiles in memory + self._tile_cache: Dict[str, Any] = {} + self._tile_timestamps: List[str] = [] + + def transform(self, data: Any, timestamp_column: str = "timestamp") -> Any: + """ + Apply tiled transformation to streaming data. + + This method should be executed by a ComputeEngine for proper distributed processing. + + Args: + data: Input streaming data (DataFrame, etc.) + timestamp_column: Name of the timestamp column for tiling + + Returns: + Transformed data with tiled processing applied + """ + return self._apply_tiled_processing(data, timestamp_column) + + def _apply_tiled_processing(self, data: Any, timestamp_column: str) -> Any: + """ + Core tiling logic that divides data into time-based tiles and processes them. + + This implementation provides the framework for tiled processing. Specific + compute engines can override this for optimized execution. + """ + + # 1. Partition data into tiles based on timestamp + tiles = self._partition_into_tiles(data, timestamp_column) + + # 2. Process each tile with aggregations + processed_tiles = [] + for tile_key, tile_data in tiles.items(): + processed_tile = self._process_single_tile(tile_data, tile_key) + processed_tiles.append(processed_tile) + + # 3. Chain results across tiles if needed + if self.chaining_functions: + return self._chain_tile_results(processed_tiles) + + # 4. Combine tiles into final result + return self._combine_tiles(processed_tiles) + + def _partition_into_tiles(self, data: Any, timestamp_column: str) -> Dict[str, Any]: + """ + Partition input data into time-based tiles. + + This is a base implementation - specific compute engines should override + this for optimized partitioning (e.g., using Spark windowing). + """ + # Implementation would depend on the specific data processing engine + # For now, return the data as a single tile + return {"tile_0": data} + + def _process_single_tile(self, tile_data: Any, tile_key: str) -> Any: + """ + Process a single tile of data, applying aggregations and transformations. + """ + # Apply Feast Aggregation objects first if any + result = tile_data + for aggregation in self.aggregations: + result = self._apply_aggregation(result, aggregation) + + # Apply the main UDF to the tile + result = self.udf(result) + + # Apply any tile-specific aggregation functions + for agg_func in self.aggregation_functions: + result = agg_func(result) + + # Cache management + if len(self._tile_cache) >= self.tile_config.max_tiles_in_memory: + # Remove oldest tile + if self._tile_timestamps: + oldest_key = self._tile_timestamps.pop(0) + self._tile_cache.pop(oldest_key, None) + + self._tile_cache[tile_key] = result + self._tile_timestamps.append(tile_key) + + return result + + def _apply_aggregation(self, data: Any, aggregation: "Aggregation") -> Any: + """ + Apply a Feast Aggregation object to tile data. + + This is a placeholder implementation - specific compute engines should + override this to properly execute aggregations. + """ + # Base implementation - just return data unchanged + # Specific engines (Spark, Ray, etc.) should override this + return data + + def _chain_tile_results(self, processed_tiles: List[Any]) -> Any: + """ + Chain results across tiles for derived features. + """ + result = processed_tiles[0] if processed_tiles else None + + for chain_func in self.chaining_functions: + for i in range(1, len(processed_tiles)): + result = chain_func(result, processed_tiles[i]) + + return result + + def _combine_tiles(self, processed_tiles: List[Any]) -> Any: + """ + Combine processed tiles into final result. + This is a placeholder - specific implementations depend on the data format. + """ + # For now, return the last tile (most recent) + return processed_tiles[-1] if processed_tiles else None + + def infer_features(self, *args, **kwargs) -> Any: + """ + Infer feature schema from tiled transformation. + """ + # If schema is explicitly provided, use it + if self.schema: + return self.schema + + # Otherwise delegate to parent implementation + return super().infer_features(*args, **kwargs) + + +# Factory function for creating tiled transformations +def tiled_transformation( + tile_size: timedelta, + sources: Optional[List[Union[str, "FeatureView", "DataSource"]]] = None, + schema: Optional[List["Field"]] = None, + aggregations: Optional[List["Aggregation"]] = None, + window_size: Optional[timedelta] = None, + overlap: Optional[timedelta] = None, + max_tiles_in_memory: int = 10, + enable_late_data_handling: bool = True, + aggregation_functions: Optional[List[Callable]] = None, + chaining_functions: Optional[List[Callable]] = None, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", +): + """ + Decorator for creating tiled transformations that work with ComputeEngine. + + The mode is not specified here - it will be determined by the StreamFeatureView + or FeatureView that uses this transformation, allowing it to work with different + compute engines (Spark, Ray, etc.). + + Args: + tile_size: The size of each time tile (e.g., timedelta(hours=1)) + sources: List of source feature views or data sources for DAG construction + schema: List of Field definitions specifying output feature names and data types + aggregations: List of Feast Aggregation objects for window-based aggregations + window_size: The window size for aggregations within tiles + overlap: Optional overlap between tiles for continuity + max_tiles_in_memory: Maximum number of tiles to keep in memory + enable_late_data_handling: Whether to handle late-arriving data + aggregation_functions: Custom functions to apply within each tile + chaining_functions: Functions for chaining results across tiles + name: Optional name for the transformation + tags: Optional metadata tags + description: Optional description + owner: Optional owner + + Example: + @tiled_transformation( + tile_size=timedelta(hours=1), + sources=["transaction_source_fv"], + schema=[ + Field(name="rolling_avg", dtype=Float64), + Field(name="cumulative_amount", dtype=Float64), + ], + aggregations=[ + Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30)) + ], + window_size=timedelta(minutes=30), + overlap=timedelta(minutes=5), + ) + def my_tiled_feature(df): + return df.assign( + rolling_avg=df['value'].rolling(window=10).mean(), + cumulative_amount=df['value'].cumsum() + ) + + # Use with StreamFeatureView (mode determined by StreamFeatureView) + stream_fv = StreamFeatureView( + name="transaction_features", + feature_transformation=my_tiled_feature, + source=kafka_source, + mode="spark", # Mode specified here, not in the transformation + entities=["customer_id"] + ) + """ + def decorator(user_function): + import dill + + def mainify(obj): + # Needed to allow dill to properly serialize the udf + if obj.__module__ != "__main__": + obj.__module__ = "__main__" + + mainify(user_function) + udf_string = dill.source.getsource(user_function) + tile_config = TileConfiguration( + tile_size=tile_size, + window_size=window_size, + overlap=overlap, + max_tiles_in_memory=max_tiles_in_memory, + enable_late_data_handling=enable_late_data_handling, + ) + + # Create a tiled transformation that can work with any mode + # The actual mode will be determined when used in StreamFeatureView/FeatureView + return TiledTransformation( + mode=TransformationMode.TILING, # This is our new tiling mode + udf=user_function, + udf_string=udf_string, + tile_config=tile_config, + sources=sources, + schema=schema, + aggregations=aggregations, + name=name or user_function.__name__, + tags=tags, + description=description, + owner=owner, + aggregation_functions=aggregation_functions, + chaining_functions=chaining_functions, + ) + + return decorator \ No newline at end of file diff --git a/sdk/python/tests/unit/transformation/test_tiled_transformation.py b/sdk/python/tests/unit/transformation/test_tiled_transformation.py new file mode 100644 index 00000000000..a353ae335fa --- /dev/null +++ b/sdk/python/tests/unit/transformation/test_tiled_transformation.py @@ -0,0 +1,212 @@ +from datetime import datetime, timedelta + +import pandas as pd +import pytest + +from feast.transformation.tiled_transformation import ( + TiledTransformation, + TileConfiguration, + tiled_transformation, +) + + +def test_tile_configuration(): + """Test TileConfiguration creation and validation.""" + config = TileConfiguration( + tile_size=timedelta(hours=1), + overlap=timedelta(minutes=5), + max_tiles_in_memory=5, + enable_late_data_handling=True, + ) + + assert config.tile_size == timedelta(hours=1) + assert config.overlap == timedelta(minutes=5) + assert config.max_tiles_in_memory == 5 + assert config.enable_late_data_handling is True + + +def test_pandas_tiled_transformation_basic(): + """Test basic pandas tiled transformation functionality.""" + + def simple_transform(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(doubled_value=df['value'] * 2) + + config = TileConfiguration(tile_size=timedelta(hours=1)) + + # Import here to avoid circular imports + from feast.transformation.pandas_tiled_transformation import PandasTiledTransformation + transformation = PandasTiledTransformation( + udf=simple_transform, + udf_string="lambda df: df.assign(doubled_value=df['value'] * 2)", + tile_config=config, + name="test_tiling", + ) + + # Create test data with timestamps + data = pd.DataFrame({ + 'timestamp': pd.date_range('2023-01-01 00:00:00', periods=4, freq='30min'), + 'entity_id': ['user1', 'user2', 'user1', 'user2'], + 'value': [10, 20, 30, 40] + }) + + result = transformation.transform(data, 'timestamp') + + # Check that transformation was applied + assert 'doubled_value' in result.columns + assert result['doubled_value'].tolist() == [20, 40, 60, 80] + + +def test_pandas_tiled_transformation_with_aggregation(): + """Test pandas tiled transformation with aggregation functions.""" + + def base_transform(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(processed=df['value'] + 1) + + def aggregate_func(df: pd.DataFrame) -> pd.DataFrame: + return df.groupby('entity_id').agg({ + 'value': 'sum', + 'processed': 'mean' + }).reset_index() + + config = TileConfiguration(tile_size=timedelta(hours=1)) + + from feast.transformation.pandas_tiled_transformation import PandasTiledTransformation + transformation = PandasTiledTransformation( + udf=base_transform, + udf_string="lambda df: df.assign(processed=df['value'] + 1)", + tile_config=config, + name="test_aggregation", + aggregation_functions=[aggregate_func], + ) + + # Create test data spanning multiple hours + data = pd.DataFrame({ + 'timestamp': pd.date_range('2023-01-01 00:00:00', periods=6, freq='30min'), + 'entity_id': ['user1', 'user2'] * 3, + 'value': [10, 20, 15, 25, 12, 18] + }) + + result = transformation.transform(data, 'timestamp') + + # Should have aggregated results + assert 'processed' in result.columns + assert len(result) <= len(data) # Should be aggregated + + +def test_pandas_tiled_transformation_with_chaining(): + """Test pandas tiled transformation with chaining functions.""" + + def base_transform(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(running_sum=df['value'].cumsum()) + + def chain_func(prev_df: pd.DataFrame, curr_df: pd.DataFrame) -> pd.DataFrame: + # Add previous max to current running sum for continuity + if not prev_df.empty and not curr_df.empty: + max_prev = prev_df['running_sum'].max() + curr_df = curr_df.copy() + curr_df['running_sum'] = curr_df['running_sum'] + max_prev + return curr_df + + config = TileConfiguration( + tile_size=timedelta(hours=1), + overlap=timedelta(minutes=10) + ) + + from feast.transformation.pandas_tiled_transformation import PandasTiledTransformation + transformation = PandasTiledTransformation( + udf=base_transform, + udf_string="lambda df: df.assign(running_sum=df['value'].cumsum())", + tile_config=config, + name="test_chaining", + chaining_functions=[chain_func], + ) + + # Create test data spanning multiple hours + data = pd.DataFrame({ + 'timestamp': pd.date_range('2023-01-01 00:00:00', periods=6, freq='30min'), + 'entity_id': ['user1'] * 6, + 'value': [10, 20, 15, 25, 12, 18] + }) + + result = transformation.transform(data, 'timestamp') + + # Should have chained running sum + assert 'running_sum' in result.columns + + +def test_tiled_transformation_decorator(): + """Test the decorator syntax for tiled transformations.""" + + @tiled_transformation( + tile_size=timedelta(hours=1), + mode="pandas", + overlap=timedelta(minutes=5), + aggregation_functions=[ + lambda df: df.groupby('entity_id').agg({'value': 'mean'}).reset_index() + ] + ) + def my_tiled_feature(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(scaled_value=df['value'] * 0.5) + + # Create test data + data = pd.DataFrame({ + 'timestamp': pd.date_range('2023-01-01 00:00:00', periods=4, freq='30min'), + 'entity_id': ['user1', 'user2', 'user1', 'user2'], + 'value': [100, 200, 150, 250] + }) + + result = my_tiled_feature.transform(data, 'timestamp') + + # Should have both original transformation and aggregation + assert 'value' in result.columns + + +def test_pandas_tiled_transformation_empty_data(): + """Test tiled transformation with empty DataFrame.""" + + def simple_transform(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(new_col=1) if not df.empty else df + + config = TileConfiguration(tile_size=timedelta(hours=1)) + + from feast.transformation.pandas_tiled_transformation import PandasTiledTransformation + transformation = PandasTiledTransformation( + udf=simple_transform, + udf_string="lambda df: df.assign(new_col=1) if not df.empty else df", + tile_config=config, + name="test_empty", + ) + + # Empty DataFrame + data = pd.DataFrame(columns=['timestamp', 'entity_id', 'value']) + + result = transformation.transform(data, 'timestamp') + + # Should handle empty data gracefully + assert result.empty + + +def test_pandas_tiled_transformation_missing_timestamp(): + """Test error handling when timestamp column is missing.""" + + def simple_transform(df: pd.DataFrame) -> pd.DataFrame: + return df + + config = TileConfiguration(tile_size=timedelta(hours=1)) + + from feast.transformation.pandas_tiled_transformation import PandasTiledTransformation + transformation = PandasTiledTransformation( + udf=simple_transform, + udf_string="lambda df: df", + tile_config=config, + name="test_missing_timestamp", + ) + + # Data without timestamp column + data = pd.DataFrame({ + 'entity_id': ['user1', 'user2'], + 'value': [10, 20] + }) + + with pytest.raises(ValueError, match="Timestamp column 'timestamp' not found"): + transformation.transform(data, 'timestamp') \ No newline at end of file diff --git a/sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py b/sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py new file mode 100644 index 00000000000..11454d774ee --- /dev/null +++ b/sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py @@ -0,0 +1,209 @@ +""" +Integration tests for tiled transformations with StreamFeatureView. + +This test module validates the integration between tiled transformations +and Feast's StreamFeatureView for real-time feature engineering. +""" + +import pytest +from datetime import datetime, timedelta +from unittest.mock import Mock + +from feast.transformation.tiled_transformation import tiled_transformation, TileConfiguration +from feast.aggregation import Aggregation +from feast.field import Field +from feast.types import Float64, Int64 + + +def test_tiled_transformation_creation(): + """Test basic creation of tiled transformation.""" + + @tiled_transformation( + tile_size=timedelta(hours=1), + window_size=timedelta(minutes=30), + overlap=timedelta(minutes=5), + sources=["transaction_source_fv"], + schema=[ + Field(name="rolling_avg", dtype=Float64), + Field(name="cumulative_amount", dtype=Float64), + ], + aggregations=[ + Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30)) + ] + ) + def test_transformation(df): + return df.assign(rolling_avg=df['amount'].rolling(window=3).mean()) + + # Verify transformation was created correctly + assert hasattr(test_transformation, 'tile_config') + assert test_transformation.tile_config.tile_size == timedelta(hours=1) + assert test_transformation.tile_config.window_size == timedelta(minutes=30) + assert test_transformation.tile_config.overlap == timedelta(minutes=5) + + # Verify sources and schema + assert test_transformation.sources == ["transaction_source_fv"] + assert len(test_transformation.schema) == 2 + assert test_transformation.schema[0].name == "rolling_avg" + + # Verify aggregations + assert len(test_transformation.aggregations) == 1 + assert test_transformation.aggregations[0].column == "amount" + assert test_transformation.aggregations[0].function == "sum" + + +def test_tiled_transformation_with_chaining(): + """Test tiled transformation with chaining functions.""" + + def chain_func(prev_df, curr_df): + return curr_df.assign(chained_value=1) + + @tiled_transformation( + tile_size=timedelta(hours=1), + sources=["transaction_fv"], + schema=[Field(name="local_sum", dtype=Float64)], + chaining_functions=[chain_func] + ) + def chained_transformation(df): + return df.assign(local_sum=df['amount'].sum()) + + # Verify chaining functions are stored + assert len(chained_transformation.chaining_functions) == 1 + assert chained_transformation.chaining_functions[0] == chain_func + + +def test_tiled_transformation_aggregation_support(): + """Test that tiled transformation supports both Feast Aggregation objects and custom functions.""" + + aggregation = Aggregation( + column="transaction_amount", + function="mean", + time_window=timedelta(minutes=30) + ) + + custom_agg = lambda df: df.groupby('customer_id').sum() + + @tiled_transformation( + tile_size=timedelta(hours=1), + aggregations=[aggregation], + aggregation_functions=[custom_agg] + ) + def mixed_aggregation_transform(df): + return df.assign(processed=1) + + # Verify both types of aggregations are supported + assert len(mixed_aggregation_transform.aggregations) == 1 + assert mixed_aggregation_transform.aggregations[0] == aggregation + + assert len(mixed_aggregation_transform.aggregation_functions) == 1 + assert mixed_aggregation_transform.aggregation_functions[0] == custom_agg + + +def test_tiled_transformation_memory_management(): + """Test memory management configuration for tiled transformations.""" + + @tiled_transformation( + tile_size=timedelta(hours=1), + max_tiles_in_memory=5, + enable_late_data_handling=False + ) + def memory_managed_transform(df): + return df + + # Verify memory management settings + assert memory_managed_transform.tile_config.max_tiles_in_memory == 5 + assert memory_managed_transform.tile_config.enable_late_data_handling is False + + +def test_tile_configuration(): + """Test TileConfiguration class directly.""" + + config = TileConfiguration( + tile_size=timedelta(hours=2), + window_size=timedelta(minutes=45), + overlap=timedelta(minutes=10), + max_tiles_in_memory=8, + enable_late_data_handling=True + ) + + assert config.tile_size == timedelta(hours=2) + assert config.window_size == timedelta(minutes=45) + assert config.overlap == timedelta(minutes=10) + assert config.max_tiles_in_memory == 8 + assert config.enable_late_data_handling is True + + +def test_tile_configuration_defaults(): + """Test TileConfiguration with default values.""" + + config = TileConfiguration(tile_size=timedelta(hours=1)) + + # Verify defaults + assert config.tile_size == timedelta(hours=1) + assert config.window_size == timedelta(hours=1) # Should default to tile_size + assert config.overlap == timedelta(seconds=0) + assert config.max_tiles_in_memory == 10 + assert config.enable_late_data_handling is True + + +class TestStreamFeatureViewIntegration: + """Test integration with StreamFeatureView (using mocks).""" + + def test_streamfeatureview_compatibility(self): + """Test that tiled transformations work with StreamFeatureView pattern.""" + + # Create a tiled transformation + @tiled_transformation( + tile_size=timedelta(hours=1), + sources=["kafka_source"], + schema=[Field(name="processed_amount", dtype=Float64)] + ) + def process_transactions(df): + return df.assign(processed_amount=df['amount'] * 1.1) + + # Mock StreamFeatureView usage pattern + mock_kafka_source = Mock() + mock_kafka_source.name = "transaction_stream" + + # Simulate how this would be used in StreamFeatureView + # (In real usage, this would be: StreamFeatureView(feature_transformation=process_transactions, ...)) + feature_view_config = { + "name": "transaction_features", + "feature_transformation": process_transactions, + "source": mock_kafka_source, + "mode": "spark", # Mode specified at view level, not transformation level + "entities": ["customer_id"] + } + + # Verify the transformation can be used in this pattern + assert feature_view_config["feature_transformation"] == process_transactions + assert hasattr(feature_view_config["feature_transformation"], "tile_config") + assert feature_view_config["mode"] == "spark" # Mode at view level + + def test_compute_engine_compatibility(self): + """Test that tiled transformations are compatible with ComputeEngine execution.""" + + @tiled_transformation( + tile_size=timedelta(hours=1), + aggregations=[ + Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30)) + ] + ) + def compute_engine_transform(df): + return df.assign(computed_feature=df['amount'].rolling(window=5).mean()) + + # Verify transformation is based on Transformation class + from feast.transformation.base import Transformation + assert isinstance(compute_engine_transform, Transformation) + + # Verify it has the required attributes for ComputeEngine execution + assert hasattr(compute_engine_transform, 'udf') + assert hasattr(compute_engine_transform, 'mode') + assert hasattr(compute_engine_transform, 'aggregations') + + # Verify aggregations are properly configured + assert len(compute_engine_transform.aggregations) == 1 + assert compute_engine_transform.aggregations[0].column == "amount" + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file