Skip to content

Commit 56469f7

Browse files
nquinn408nickquinn408devin-ai-integration[bot]
authored
feat: Adding optional name to Aggregation (#5994) (#6083)
* feat: Adding optional name to Aggregation (#5994) Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Fix lint-python test Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Adding name to documentation Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Devin feedback Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Update sdk/python/feast/aggregation/__init__.py Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> --------- Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> Co-authored-by: Nick Quinn <nicholas_quinn@apple.com> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 76917b7 commit 56469f7

File tree

11 files changed

+102
-29
lines changed

11 files changed

+102
-29
lines changed

docs/getting-started/concepts/batch-feature-view.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ driver_fv = BatchFeatureView(
7070
Field(name="conv_rate", dtype=Float32),
7171
],
7272
aggregations=[
73-
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)),
73+
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1), name="total_conv_rate_1d"),
7474
],
7575
source=source,
7676
)
@@ -145,6 +145,7 @@ See:
145145
- `source` is optional; if omitted (`None`), the feature view has no associated batch data source.
146146
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
147147
- Aggregation logic must reference columns present in the raw source or transformed inputs.
148+
- The output feature name for an aggregation defaults to `{function}_{column}` (e.g., `sum_conv_rate`). Use the `name` parameter to override it (e.g., `name="total_conv_rate_1d"`).
148149

149150
---
150151

docs/getting-started/concepts/tiling.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ customer_features = StreamFeatureView(
206206
batch_source=file_source, # For historical data
207207
),
208208
aggregations=[
209-
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)),
210-
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)),
211-
Aggregation(column="amount", function="std", time_window=timedelta(hours=1)),
209+
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1), name="sum_amount_1h"),
210+
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1), name="avg_amount_1h"),
211+
Aggregation(column="amount", function="std", time_window=timedelta(hours=1), name="std_amount_1h"),
212212
],
213213
timestamp_field="event_timestamp",
214214
online=True,
@@ -229,7 +229,12 @@ customer_features = StreamFeatureView(
229229

230230
### Key Parameters
231231

232-
- `aggregations`: List of time-windowed aggregations to compute
232+
- `aggregations`: List of time-windowed aggregations to compute. Each `Aggregation` accepts:
233+
- `column`: source column to aggregate
234+
- `function`: aggregation function (`sum`, `avg`, `mean`, `min`, `max`, `count`, `std`)
235+
- `time_window`: duration of the aggregation window
236+
- `slide_interval`: hop/slide size (defaults to `time_window`)
237+
- `name` *(optional)*: output feature name. Defaults to `{function}_{column}` (e.g., `sum_amount`). Set this to use a custom name (e.g., `name="sum_amount_1h"`).
233238
- `timestamp_field`: Column name for timestamps (required when aggregations are specified)
234239
- `enable_tiling`: Enable tiling optimization (default: `False`)
235240
- Set to `True` for **streaming scenarios**

protos/feast/core/Aggregation.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ message Aggregation {
1212
string function = 2;
1313
google.protobuf.Duration time_window = 3;
1414
google.protobuf.Duration slide_interval = 4;
15+
string name = 5;
1516
}

sdk/python/feast/aggregation/__init__.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,22 @@ class Aggregation:
2121
function: str # Provided built in aggregations sum, max, min, count mean
2222
time_window: timedelta # The time window for this aggregation.
2323
slide_interval: timedelta # The sliding window for these aggregations
24+
name: str # Optional override for the output feature name (defaults to {function}_{column})
2425
"""
2526

2627
column: str
2728
function: str
2829
time_window: Optional[timedelta]
2930
slide_interval: Optional[timedelta]
31+
name: str
3032

3133
def __init__(
3234
self,
3335
column: Optional[str] = "",
3436
function: Optional[str] = "",
3537
time_window: Optional[timedelta] = None,
3638
slide_interval: Optional[timedelta] = None,
39+
name: Optional[str] = None,
3740
):
3841
self.column = column or ""
3942
self.function = function or ""
@@ -42,6 +45,7 @@ def __init__(
4245
self.slide_interval = self.time_window
4346
else:
4447
self.slide_interval = slide_interval
48+
self.name = name or ""
4549

4650
def to_proto(self) -> AggregationProto:
4751
window_duration = None
@@ -59,6 +63,7 @@ def to_proto(self) -> AggregationProto:
5963
function=self.function,
6064
time_window=window_duration,
6165
slide_interval=slide_interval_duration,
66+
name=self.name,
6267
)
6368

6469
@classmethod
@@ -79,6 +84,7 @@ def from_proto(cls, agg_proto: AggregationProto):
7984
function=agg_proto.function,
8085
time_window=time_window,
8186
slide_interval=slide_interval,
87+
name=agg_proto.name or None,
8288
)
8389
return aggregation
8490

@@ -91,11 +97,26 @@ def __eq__(self, other):
9197
or self.function != other.function
9298
or self.time_window != other.time_window
9399
or self.slide_interval != other.slide_interval
100+
or self.name != other.name
94101
):
95102
return False
96103

97104
return True
98105

106+
def resolved_name(self, time_window: Optional[timedelta] = None) -> str:
107+
"""Return the output feature name for this aggregation.
108+
109+
If ``name`` is set it is returned as-is. Otherwise the name is
110+
derived as ``{function}_{column}``, with ``_{seconds}s`` appended
111+
when *time_window* is provided.
112+
"""
113+
if self.name:
114+
return self.name
115+
base = f"{self.function}_{self.column}"
116+
if time_window is not None and time_window.total_seconds() > 0:
117+
return f"{base}_{int(time_window.total_seconds())}s"
118+
return base
119+
99120

100121
def aggregation_specs_to_agg_ops(
101122
agg_specs: Iterable[Any],
@@ -106,7 +127,7 @@ def aggregation_specs_to_agg_ops(
106127
for agg in agg_specs:
107128
if getattr(agg, "time_window", None) is not None:
108129
raise ValueError(time_window_unsupported_error_message)
109-
alias = f"{agg.function}_{agg.column}"
130+
alias = getattr(agg, "name", None) or f"{agg.function}_{agg.column}"
110131
agg_ops[alias] = (agg.function, agg.column)
111132
return agg_ops
112133

sdk/python/feast/aggregation/tiling/orchestrator.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ def apply_sawtooth_window_tiling(
6161
ir_metadata_dict = {}
6262

6363
for agg in aggregations:
64-
feature_name = (
65-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
66-
)
64+
feature_name = agg.resolved_name(window_size)
6765
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
6866
ir_metadata_dict[feature_name] = metadata
6967

@@ -161,9 +159,7 @@ def apply_sawtooth_window_tiling(
161159

162160
# Step 5: Compute final feature values from IRs (for algebraic aggs, just rename)
163161
for agg in aggregations:
164-
feature_name = (
165-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
166-
)
162+
feature_name = agg.resolved_name(window_size)
167163
metadata = ir_metadata_dict[feature_name]
168164

169165
if metadata.type == "algebraic":

sdk/python/feast/aggregation/tiling/tile_subtraction.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ def convert_cumulative_to_windowed(
7676

7777
# Subtract previous tile values from current tile for each aggregation
7878
for agg in aggregations:
79-
feature_name = (
80-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
81-
)
79+
feature_name = agg.resolved_name(window_size)
8280
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
8381

8482
if metadata.type == "algebraic":

sdk/python/feast/infra/compute_engines/ray/nodes.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,7 @@ def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue:
461461
# Convert aggregations to Ray's groupby format
462462
agg_dict = {}
463463
for agg in self.aggregations:
464-
feature_name = f"{agg.function}_{agg.column}"
465-
if agg.time_window:
466-
feature_name += f"_{int(agg.time_window.total_seconds())}s"
464+
feature_name = agg.resolved_name(agg.time_window)
467465

468466
if agg.function == "count":
469467
agg_dict[feature_name] = (agg.column, "count")

sdk/python/feast/infra/compute_engines/spark/nodes.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue:
311311
expected_columns = entity_keys + [self.timestamp_col]
312312
for time_window, window_aggs in aggs_by_window.items():
313313
for agg in window_aggs:
314-
feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s"
314+
feature_name = agg.resolved_name(time_window)
315315
if feature_name not in expected_columns:
316316
expected_columns.append(feature_name)
317317

@@ -372,11 +372,7 @@ def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue:
372372
agg_exprs = []
373373
for agg in self.aggregations:
374374
func = getattr(F, agg.function)
375-
expr = func(agg.column).alias(
376-
f"{agg.function}_{agg.column}_{int(agg.time_window.total_seconds())}s"
377-
if agg.time_window
378-
else f"{agg.function}_{agg.column}"
379-
)
375+
expr = func(agg.column).alias(agg.resolved_name(agg.time_window))
380376
agg_exprs.append(expr)
381377

382378
if any(agg.time_window for agg in self.aggregations):

sdk/python/feast/protos/feast/core/Aggregation_pb2.py

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ class Aggregation(google.protobuf.message.Message):
2222
FUNCTION_FIELD_NUMBER: builtins.int
2323
TIME_WINDOW_FIELD_NUMBER: builtins.int
2424
SLIDE_INTERVAL_FIELD_NUMBER: builtins.int
25+
NAME_FIELD_NUMBER: builtins.int
2526
column: builtins.str
2627
function: builtins.str
28+
name: builtins.str
2729
@property
2830
def time_window(self) -> google.protobuf.duration_pb2.Duration: ...
2931
@property
@@ -35,8 +37,9 @@ class Aggregation(google.protobuf.message.Message):
3537
function: builtins.str = ...,
3638
time_window: google.protobuf.duration_pb2.Duration | None = ...,
3739
slide_interval: google.protobuf.duration_pb2.Duration | None = ...,
40+
name: builtins.str = ...,
3841
) -> None: ...
3942
def HasField(self, field_name: typing_extensions.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"]) -> builtins.bool: ...
40-
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...
43+
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...
4144

4245
global___Aggregation = Aggregation

0 commit comments

Comments
 (0)