Skip to content

Commit 271ef74

Browse files
committed
fix: Fixed ODFV on-write transformations
Signed-off-by: ntkathole <[email protected]>
1 parent 41d4977 commit 271ef74

File tree

4 files changed

+447
-15
lines changed

4 files changed

+447
-15
lines changed

sdk/python/feast/feature_store.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@
6767
update_feature_views_with_inferred_features_and_entities,
6868
)
6969
from feast.infra.infra_object import Infra
70+
from feast.infra.offline_stores.offline_utils import (
71+
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
72+
)
7073
from feast.infra.provider import Provider, RetrievalJob, get_provider
7174
from feast.infra.registry.base_registry import BaseRegistry
7275
from feast.infra.registry.registry import Registry
@@ -1287,6 +1290,115 @@ def get_saved_dataset(self, name: str) -> SavedDataset:
12871290
)
12881291
return dataset.with_retrieval_job(retrieval_job)
12891292

1293+
def _materialize_odfv(
1294+
self,
1295+
feature_view: OnDemandFeatureView,
1296+
start_date: datetime,
1297+
end_date: datetime,
1298+
):
1299+
"""Helper to materialize a single OnDemandFeatureView."""
1300+
if not feature_view.source_feature_view_projections:
1301+
print(
1302+
f"[WARNING] ODFV {feature_view.name} materialization: No source feature views found."
1303+
)
1304+
return
1305+
start_date = utils.make_tzaware(start_date)
1306+
end_date = utils.make_tzaware(end_date)
1307+
1308+
source_features_from_projections = []
1309+
all_join_keys = set()
1310+
entity_timestamp_col_names = set()
1311+
source_fvs = {
1312+
self._get_feature_view(p.name)
1313+
for p in feature_view.source_feature_view_projections.values()
1314+
}
1315+
1316+
for source_fv in source_fvs:
1317+
all_join_keys.update(source_fv.entities)
1318+
if source_fv.batch_source:
1319+
entity_timestamp_col_names.add(source_fv.batch_source.timestamp_field)
1320+
1321+
for proj in feature_view.source_feature_view_projections.values():
1322+
source_features_from_projections.extend(
1323+
[f"{proj.name}:{f.name}" for f in proj.features]
1324+
)
1325+
1326+
all_join_keys = {key for key in all_join_keys if key}
1327+
1328+
if not all_join_keys:
1329+
print(
1330+
f"[WARNING] ODFV {feature_view.name} materialization: No join keys found in source views. Cannot create entity_df. Skipping."
1331+
)
1332+
return
1333+
1334+
if len(entity_timestamp_col_names) > 1:
1335+
print(
1336+
f"[WARNING] ODFV {feature_view.name} materialization: Found multiple timestamp columns in sources ({entity_timestamp_col_names}). This is not supported. Skipping."
1337+
)
1338+
return
1339+
1340+
if not entity_timestamp_col_names:
1341+
print(
1342+
f"[WARNING] ODFV {feature_view.name} materialization: No batch sources with timestamp columns found for sources. Skipping."
1343+
)
1344+
return
1345+
1346+
event_timestamp_col = list(entity_timestamp_col_names)[0]
1347+
all_source_dfs = []
1348+
provider = self._get_provider()
1349+
1350+
for source_fv in source_fvs:
1351+
if not source_fv.batch_source:
1352+
continue
1353+
1354+
job = provider.offline_store.pull_latest_from_table_or_query(
1355+
config=self.config,
1356+
data_source=source_fv.batch_source,
1357+
join_key_columns=source_fv.entities,
1358+
feature_name_columns=[f.name for f in source_fv.features],
1359+
timestamp_field=source_fv.batch_source.timestamp_field,
1360+
created_timestamp_column=getattr(
1361+
source_fv.batch_source, "created_timestamp_column", None
1362+
),
1363+
start_date=start_date,
1364+
end_date=end_date,
1365+
)
1366+
df = job.to_df()
1367+
if not df.empty:
1368+
all_source_dfs.append(df)
1369+
1370+
if not all_source_dfs:
1371+
print(
1372+
f"No source data found for ODFV {feature_view.name} in the given time range. Skipping materialization."
1373+
)
1374+
return
1375+
1376+
entity_df_cols = list(all_join_keys) + [event_timestamp_col]
1377+
all_sources_combined_df = pd.concat(all_source_dfs, ignore_index=True)
1378+
if all_sources_combined_df.empty:
1379+
return
1380+
1381+
entity_df = (
1382+
all_sources_combined_df[entity_df_cols]
1383+
.drop_duplicates()
1384+
.reset_index(drop=True)
1385+
)
1386+
1387+
if event_timestamp_col != DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL:
1388+
entity_df = entity_df.rename(
1389+
columns={event_timestamp_col: DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}
1390+
)
1391+
1392+
retrieval_job = self.get_historical_features(
1393+
entity_df=entity_df,
1394+
features=source_features_from_projections,
1395+
)
1396+
input_df = retrieval_job.to_df()
1397+
transformed_df = self._transform_on_demand_feature_view_df(
1398+
feature_view, input_df
1399+
)
1400+
self.write_to_online_store(feature_view.name, df=transformed_df)
1401+
12901402
def materialize_incremental(
12911403
self,
12921404
end_date: datetime,
@@ -1332,7 +1444,27 @@ def materialize_incremental(
13321444
# TODO paging large loads
13331445
for feature_view in feature_views_to_materialize:
13341446
if isinstance(feature_view, OnDemandFeatureView):
1447+
if feature_view.write_to_online_store:
1448+
source_fvs = {
1449+
self._get_feature_view(p.name)
1450+
for p in feature_view.source_feature_view_projections.values()
1451+
}
1452+
max_ttl = timedelta(0)
1453+
for fv in source_fvs:
1454+
if fv.ttl and fv.ttl > max_ttl:
1455+
max_ttl = fv.ttl
1456+
1457+
if max_ttl.total_seconds() > 0:
1458+
odfv_start_date = end_date - max_ttl
1459+
else:
1460+
odfv_start_date = end_date - timedelta(weeks=52)
1461+
1462+
print(
1463+
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:"
1464+
)
1465+
self._materialize_odfv(feature_view, odfv_start_date, end_date)
13351466
continue
1467+
13361468
start_date = feature_view.most_recent_end_time
13371469
if start_date is None:
13381470
if feature_view.ttl is None:
@@ -1428,6 +1560,13 @@ def materialize(
14281560
)
14291561
# TODO paging large loads
14301562
for feature_view in feature_views_to_materialize:
1563+
if isinstance(feature_view, OnDemandFeatureView):
1564+
if feature_view.write_to_online_store:
1565+
print(
1566+
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:"
1567+
)
1568+
self._materialize_odfv(feature_view, start_date, end_date)
1569+
continue
14311570
provider = self._get_provider()
14321571
print(f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:")
14331572

sdk/python/feast/infra/provider.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from feast.feature_view import FeatureView
2626
from feast.importer import import_class
2727
from feast.infra.infra_object import Infra
28-
from feast.infra.offline_stores.offline_store import RetrievalJob
28+
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
29+
from feast.infra.online_stores.online_store import OnlineStore
2930
from feast.infra.registry.base_registry import BaseRegistry
3031
from feast.infra.supported_async_methods import ProviderAsyncMethods
3132
from feast.on_demand_feature_view import OnDemandFeatureView
@@ -52,6 +53,10 @@ class Provider(ABC):
5253
engine. It is configured through a RepoConfig object.
5354
"""
5455

56+
repo_config: RepoConfig
57+
offline_store: OfflineStore
58+
online_store: OnlineStore
59+
5560
@abstractmethod
5661
def __init__(self, config: RepoConfig):
5762
pass

sdk/python/feast/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,11 @@ def _convert_arrow_to_proto(
257257
join_keys: Dict[str, ValueType],
258258
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
259259
# This is a workaround for isinstance(feature_view, OnDemandFeatureView), which triggers a circular import
260-
if getattr(feature_view, "source_request_sources", None):
260+
# Check for source_request_sources or source_feature_view_projections attributes to identify ODFVs
261+
if (
262+
getattr(feature_view, "source_request_sources", None) is not None
263+
or getattr(feature_view, "source_feature_view_projections", None) is not None
264+
):
261265
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]
262266
else:
263267
return _convert_arrow_fv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]

0 commit comments

Comments
 (0)