@@ -2097,76 +2097,104 @@ def _transform_on_demand_feature_view_df(
20972097 Raises:
20982098 Exception: For unsupported OnDemandFeatureView modes
20992099 """
2100- if feature_view .mode == "python" and isinstance (
2101- feature_view .feature_transformation , PythonTransformation
2102- ):
2103- input_dict = (
2104- df .to_dict (orient = "records" )[0 ]
2105- if feature_view .singleton
2106- else df .to_dict (orient = "list" )
2100+ _should_track = False
2101+ try :
2102+ from feast .metrics import _config as _metrics_config
2103+
2104+ _should_track = _metrics_config .online_features and getattr (
2105+ feature_view , "track_metrics" , False
21072106 )
2107+ except Exception :
2108+ pass
21082109
2109- if feature_view . singleton :
2110- transformed_rows = []
2110+ if _should_track :
2111+ import time as _time
21112112
2112- for i , row in df .iterrows ():
2113- output = feature_view .feature_transformation .udf (row .to_dict ())
2114- if i == 0 :
2115- transformed_rows = output
2116- else :
2117- for k in output :
2118- if isinstance (output [k ], list ):
2119- transformed_rows [k ].extend (output [k ])
2120- else :
2121- transformed_rows [k ].append (output [k ])
2122-
2123- transformed_data = pd .DataFrame (transformed_rows )
2124- else :
2125- transformed_data = feature_view .feature_transformation .udf (input_dict )
2113+ _t0 = _time .monotonic ()
21262114
2127- if feature_view .write_to_online_store :
2128- entities = [
2129- self .get_entity (entity ) for entity in (feature_view .entities or [])
2130- ]
2131- join_keys = [entity .join_key for entity in entities if entity ]
2132- join_keys = [k for k in join_keys if k in input_dict .keys ()]
2133- transformed_df = (
2134- pd .DataFrame (transformed_data )
2135- if not isinstance (transformed_data , pd .DataFrame )
2136- else transformed_data
2137- )
2138- input_df = pd .DataFrame (
2139- [input_dict ] if feature_view .singleton else input_dict
2115+ try :
2116+ if feature_view .mode == "python" and isinstance (
2117+ feature_view .feature_transformation , PythonTransformation
2118+ ):
2119+ input_dict = (
2120+ df .to_dict (orient = "records" )[0 ]
2121+ if feature_view .singleton
2122+ else df .to_dict (orient = "list" )
21402123 )
2141- if input_df .shape [0 ] == transformed_df .shape [0 ]:
2124+
2125+ if feature_view .singleton :
2126+ transformed_rows = []
2127+
2128+ for i , row in df .iterrows ():
2129+ output = feature_view .feature_transformation .udf (row .to_dict ())
2130+ if i == 0 :
2131+ transformed_rows = output
2132+ else :
2133+ for k in output :
2134+ if isinstance (output [k ], list ):
2135+ transformed_rows [k ].extend (output [k ])
2136+ else :
2137+ transformed_rows [k ].append (output [k ])
2138+
2139+ transformed_data = pd .DataFrame (transformed_rows )
2140+ else :
2141+ transformed_data = feature_view .feature_transformation .udf (
2142+ input_dict
2143+ )
2144+
2145+ if feature_view .write_to_online_store :
2146+ entities = [
2147+ self .get_entity (entity )
2148+ for entity in (feature_view .entities or [])
2149+ ]
2150+ join_keys = [entity .join_key for entity in entities if entity ]
2151+ join_keys = [k for k in join_keys if k in input_dict .keys ()]
2152+ transformed_df = (
2153+ pd .DataFrame (transformed_data )
2154+ if not isinstance (transformed_data , pd .DataFrame )
2155+ else transformed_data
2156+ )
2157+ input_df = pd .DataFrame (
2158+ [input_dict ] if feature_view .singleton else input_dict
2159+ )
2160+ if input_df .shape [0 ] == transformed_df .shape [0 ]:
2161+ for k in input_dict :
2162+ if k not in transformed_data :
2163+ transformed_data [k ] = input_dict [k ]
2164+ transformed_df = pd .DataFrame (transformed_data )
2165+ else :
2166+ transformed_df = pd .merge (
2167+ transformed_df ,
2168+ input_df ,
2169+ how = "left" ,
2170+ on = join_keys ,
2171+ )
2172+ else :
2173+ # overwrite any transformed features and update the dictionary
21422174 for k in input_dict :
21432175 if k not in transformed_data :
21442176 transformed_data [k ] = input_dict [k ]
2145- transformed_df = pd .DataFrame (transformed_data )
2146- else :
2147- transformed_df = pd .merge (
2148- transformed_df ,
2149- input_df ,
2150- how = "left" ,
2151- on = join_keys ,
2152- )
2153- else :
2154- # overwrite any transformed features and update the dictionary
2155- for k in input_dict :
2156- if k not in transformed_data :
2157- transformed_data [k ] = input_dict [k ]
21582177
2159- return pd .DataFrame (transformed_data )
2178+ return pd .DataFrame (transformed_data )
21602179
2161- elif feature_view .mode == "pandas" and isinstance (
2162- feature_view .feature_transformation , PandasTransformation
2163- ):
2164- transformed_df = feature_view .feature_transformation .udf (df )
2165- for col in df .columns :
2166- transformed_df [col ] = df [col ]
2167- return transformed_df
2168- else :
2169- raise Exception ("Unsupported OnDemandFeatureView mode" )
2180+ elif feature_view .mode == "pandas" and isinstance (
2181+ feature_view .feature_transformation , PandasTransformation
2182+ ):
2183+ transformed_df = feature_view .feature_transformation .udf (df )
2184+ for col in df .columns :
2185+ transformed_df [col ] = df [col ]
2186+ return transformed_df
2187+ else :
2188+ raise Exception ("Unsupported OnDemandFeatureView mode" )
2189+ finally :
2190+ if _should_track :
2191+ from feast .metrics import track_write_transformation
2192+
2193+ track_write_transformation (
2194+ feature_view .name ,
2195+ feature_view .mode ,
2196+ _time .monotonic () - _t0 ,
2197+ )
21702198
21712199 def _validate_vector_features (self , feature_view , df : pd .DataFrame ) -> None :
21722200 """
0 commit comments