Skip to content

Commit f04872a

Browse files
chore: Refactor transform on write (feast-dev#5300)
1 parent 14f45ee commit f04872a

File tree

1 file changed

+152
-95
lines changed

1 file changed

+152
-95
lines changed

sdk/python/feast/feature_store.py

Lines changed: 152 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,134 +1518,191 @@ def _offline_write():
15181518

15191519
await run_in_threadpool(_offline_write)
15201520

1521-
def _get_feature_view_and_df_for_online_write(
1521+
def _validate_and_convert_input_data(
15221522
self,
1523-
feature_view_name: str,
1524-
df: Optional[pd.DataFrame] = None,
1525-
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1526-
allow_registry_cache: bool = True,
1527-
transform_on_write: bool = True,
1528-
):
1529-
feature_view_dict = {
1530-
fv_proto.name: fv_proto
1531-
for fv_proto in self.list_all_feature_views(allow_registry_cache)
1532-
}
1533-
try:
1534-
feature_view = feature_view_dict[feature_view_name]
1535-
except FeatureViewNotFoundException:
1536-
raise FeatureViewNotFoundException(feature_view_name, self.project)
1523+
df: Optional[pd.DataFrame],
1524+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]],
1525+
) -> Optional[pd.DataFrame]:
1526+
"""
1527+
Validates input parameters and converts them to a pandas DataFrame.
1528+
1529+
Args:
1530+
df: Optional DataFrame input
1531+
inputs: Optional dictionary or DataFrame input
1532+
1533+
Returns:
1534+
Validated pandas DataFrame or None
1535+
1536+
Raises:
1537+
ValueError: If both df and inputs are provided
1538+
DataFrameSerializationError: If input data cannot be converted to DataFrame
1539+
"""
15371540
if df is not None and inputs is not None:
15381541
raise ValueError("Both df and inputs cannot be provided at the same time.")
1542+
15391543
if df is None and inputs is not None:
15401544
if isinstance(inputs, dict) or isinstance(inputs, List):
15411545
try:
1542-
df = pd.DataFrame(inputs)
1546+
return pd.DataFrame(inputs)
15431547
except Exception as _:
15441548
raise DataFrameSerializationError(inputs)
15451549
elif isinstance(inputs, pd.DataFrame):
1546-
pass
1550+
return inputs
15471551
else:
15481552
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")
1553+
15491554
if df is not None and inputs is None:
15501555
if isinstance(df, dict) or isinstance(df, List):
15511556
try:
1552-
df = pd.DataFrame(df)
1557+
return pd.DataFrame(df)
15531558
except Exception as _:
15541559
raise DataFrameSerializationError(df)
15551560

1556-
if feature_view.features[0].vector_index and df is not None:
1561+
return df
1562+
1563+
def _transform_on_demand_feature_view_df(
1564+
self, feature_view: OnDemandFeatureView, df: pd.DataFrame
1565+
) -> pd.DataFrame:
1566+
"""
1567+
Apply transformations for an OnDemandFeatureView to the input dataframe.
1568+
1569+
Args:
1570+
feature_view: The OnDemandFeatureView containing the transformation
1571+
df: The input dataframe to transform
1572+
1573+
Returns:
1574+
Transformed dataframe
1575+
1576+
Raises:
1577+
Exception: For unsupported OnDemandFeatureView modes
1578+
"""
1579+
if feature_view.mode == "python" and isinstance(
1580+
feature_view.feature_transformation, PythonTransformation
1581+
):
1582+
input_dict = (
1583+
df.to_dict(orient="records")[0]
1584+
if feature_view.singleton
1585+
else df.to_dict(orient="list")
1586+
)
1587+
1588+
if feature_view.singleton:
1589+
transformed_rows = []
1590+
1591+
for i, row in df.iterrows():
1592+
output = feature_view.feature_transformation.udf(row.to_dict())
1593+
if i == 0:
1594+
transformed_rows = output
1595+
else:
1596+
for k in output:
1597+
if isinstance(output[k], list):
1598+
transformed_rows[k].extend(output[k])
1599+
else:
1600+
transformed_rows[k].append(output[k])
1601+
1602+
transformed_data = pd.DataFrame(transformed_rows)
1603+
else:
1604+
transformed_data = feature_view.feature_transformation.udf(input_dict)
1605+
1606+
if feature_view.write_to_online_store:
1607+
entities = [
1608+
self.get_entity(entity) for entity in (feature_view.entities or [])
1609+
]
1610+
join_keys = [entity.join_key for entity in entities if entity]
1611+
join_keys = [k for k in join_keys if k in input_dict.keys()]
1612+
transformed_df = (
1613+
pd.DataFrame(transformed_data)
1614+
if not isinstance(transformed_data, pd.DataFrame)
1615+
else transformed_data
1616+
)
1617+
input_df = pd.DataFrame(
1618+
[input_dict] if feature_view.singleton else input_dict
1619+
)
1620+
if input_df.shape[0] == transformed_df.shape[0]:
1621+
for k in input_dict:
1622+
if k not in transformed_data:
1623+
transformed_data[k] = input_dict[k]
1624+
transformed_df = pd.DataFrame(transformed_data)
1625+
else:
1626+
transformed_df = pd.merge(
1627+
transformed_df,
1628+
input_df,
1629+
how="left",
1630+
on=join_keys,
1631+
)
1632+
else:
1633+
# overwrite any transformed features and update the dictionary
1634+
for k in input_dict:
1635+
if k not in transformed_data:
1636+
transformed_data[k] = input_dict[k]
1637+
1638+
return pd.DataFrame(transformed_data)
1639+
1640+
elif feature_view.mode == "pandas" and isinstance(
1641+
feature_view.feature_transformation, PandasTransformation
1642+
):
1643+
transformed_df = feature_view.feature_transformation.udf(df)
1644+
for col in df.columns:
1645+
transformed_df[col] = df[col]
1646+
return transformed_df
1647+
else:
1648+
raise Exception("Unsupported OnDemandFeatureView mode")
1649+
1650+
def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None:
1651+
"""
1652+
Validates vector features in the DataFrame against the feature view specifications.
1653+
1654+
Args:
1655+
feature_view: The feature view containing vector feature specifications
1656+
df: The DataFrame to validate
1657+
1658+
Raises:
1659+
ValueError: If vector dimension constraints are violated
1660+
"""
1661+
if feature_view.features and feature_view.features[0].vector_index:
15571662
fv_vector_feature_name = feature_view.features[0].name
15581663
df_vector_feature_index = df.columns.get_loc(fv_vector_feature_name)
1664+
15591665
if feature_view.features[0].vector_length != 0:
15601666
if (
15611667
df.shape[df_vector_feature_index]
15621668
> feature_view.features[0].vector_length
15631669
):
15641670
raise ValueError(
1565-
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors which is greater than expected (i.e {feature_view.features[0].vector_length}) by feature view {feature_view.name}."
1671+
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors "
1672+
f"which is greater than expected (i.e {feature_view.features[0].vector_length}) "
1673+
f"by feature view {feature_view.name}."
15661674
)
15671675

1676+
def _get_feature_view_and_df_for_online_write(
1677+
self,
1678+
feature_view_name: str,
1679+
df: Optional[pd.DataFrame] = None,
1680+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1681+
allow_registry_cache: bool = True,
1682+
transform_on_write: bool = True,
1683+
):
1684+
feature_view_dict = {
1685+
fv_proto.name: fv_proto
1686+
for fv_proto in self.list_all_feature_views(allow_registry_cache)
1687+
}
1688+
try:
1689+
feature_view = feature_view_dict[feature_view_name]
1690+
except FeatureViewNotFoundException:
1691+
raise FeatureViewNotFoundException(feature_view_name, self.project)
1692+
1693+
# Convert inputs/df to a consistent DataFrame format
1694+
df = self._validate_and_convert_input_data(df, inputs)
1695+
1696+
if df is not None:
1697+
self._validate_vector_features(feature_view, df)
1698+
15681699
# # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True
15691700
if (
15701701
isinstance(feature_view, OnDemandFeatureView)
15711702
and feature_view.write_to_online_store
15721703
and transform_on_write
15731704
):
1574-
if (
1575-
feature_view.mode == "python"
1576-
and isinstance(
1577-
feature_view.feature_transformation, PythonTransformation
1578-
)
1579-
and df is not None
1580-
):
1581-
input_dict = (
1582-
df.to_dict(orient="records")[0]
1583-
if feature_view.singleton
1584-
else df.to_dict(orient="list")
1585-
)
1586-
if feature_view.singleton:
1587-
transformed_rows = []
1588-
1589-
for i, row in df.iterrows():
1590-
output = feature_view.feature_transformation.udf(row.to_dict())
1591-
if i == 0:
1592-
transformed_rows = output
1593-
else:
1594-
for k in output:
1595-
if isinstance(output[k], list):
1596-
transformed_rows[k].extend(output[k])
1597-
else:
1598-
transformed_rows[k].append(output[k])
1599-
1600-
transformed_data = pd.DataFrame(transformed_rows)
1601-
else:
1602-
transformed_data = feature_view.feature_transformation.udf(
1603-
input_dict
1604-
)
1605-
if feature_view.write_to_online_store:
1606-
entities = [
1607-
self.get_entity(entity)
1608-
for entity in (feature_view.entities or [])
1609-
]
1610-
join_keys = [entity.join_key for entity in entities if entity]
1611-
join_keys = [k for k in join_keys if k in input_dict.keys()]
1612-
transformed_df = (
1613-
pd.DataFrame(transformed_data)
1614-
if not isinstance(transformed_data, pd.DataFrame)
1615-
else transformed_data
1616-
)
1617-
input_df = pd.DataFrame(
1618-
[input_dict] if feature_view.singleton else input_dict
1619-
)
1620-
if input_df.shape[0] == transformed_df.shape[0]:
1621-
for k in input_dict:
1622-
if k not in transformed_data:
1623-
transformed_data[k] = input_dict[k]
1624-
transformed_df = pd.DataFrame(transformed_data)
1625-
else:
1626-
transformed_df = pd.merge(
1627-
transformed_df,
1628-
input_df,
1629-
how="left",
1630-
on=join_keys,
1631-
)
1632-
else:
1633-
# overwrite any transformed features and update the dictionary
1634-
for k in input_dict:
1635-
if k not in transformed_data:
1636-
transformed_data[k] = input_dict[k]
1637-
df = pd.DataFrame(transformed_data)
1638-
elif feature_view.mode == "pandas" and isinstance(
1639-
feature_view.feature_transformation, PandasTransformation
1640-
):
1641-
transformed_df = feature_view.feature_transformation.udf(df)
1642-
if df is not None:
1643-
for col in df.columns:
1644-
transformed_df[col] = df[col]
1645-
df = transformed_df
1646-
1647-
else:
1648-
raise Exception("Unsupported OnDemandFeatureView mode")
1705+
df = self._transform_on_demand_feature_view_df(feature_view, df)
16491706

16501707
return feature_view, df
16511708

0 commit comments

Comments
 (0)