Skip to content

Commit 4986065

Browse files
aniketpaluntkathole
authored andcommitted
Added support to handle empty dataframe
Signed-off-by: Aniket Paluskar <[email protected]>
1 parent 1189512 commit 4986065

File tree

2 files changed

+246
-0
lines changed

2 files changed

+246
-0
lines changed

sdk/python/feast/feature_store.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1884,6 +1884,9 @@ def write_to_online_store(
18841884
inputs: Optional the dictionary object to be written
18851885
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
18861886
transform_on_write (optional): Whether to transform the data before pushing.
1887+
1888+
Raises:
1889+
ValueError: If the dataframe is empty (has no rows) or if feature columns are empty.
18871890
"""
18881891

18891892
feature_view, df = self._get_feature_view_and_df_for_online_write(
@@ -1893,6 +1896,19 @@ def write_to_online_store(
18931896
allow_registry_cache=allow_registry_cache,
18941897
transform_on_write=transform_on_write,
18951898
)
1899+
1900+
# Validate that the dataframe has meaningful feature data
1901+
if df is not None:
1902+
if df.empty:
1903+
raise ValueError("Cannot write empty dataframe to online store")
1904+
1905+
# Check if feature columns are empty (entity columns may have data but feature columns are empty)
1906+
feature_column_names = [f.name for f in feature_view.features]
1907+
if feature_column_names:
1908+
feature_df = df[feature_column_names]
1909+
if feature_df.empty or feature_df.isnull().all().all():
1910+
raise ValueError("Cannot write dataframe with empty feature columns to online store")
1911+
18961912
provider = self._get_provider()
18971913
provider.ingest_df(feature_view, df)
18981914

@@ -1911,6 +1927,9 @@ async def write_to_online_store_async(
19111927
df: The dataframe to be persisted.
19121928
inputs: Optional the dictionary object to be written
19131929
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1930+
1931+
Raises:
1932+
ValueError: If the dataframe is empty (has no rows) or if feature columns are empty.
19141933
"""
19151934

19161935
feature_view, df = self._get_feature_view_and_df_for_online_write(
@@ -1919,6 +1938,19 @@ async def write_to_online_store_async(
19191938
inputs=inputs,
19201939
allow_registry_cache=allow_registry_cache,
19211940
)
1941+
1942+
# Validate that the dataframe has meaningful feature data
1943+
if df is not None:
1944+
if df.empty:
1945+
raise ValueError("Cannot write empty dataframe to online store")
1946+
1947+
# Check if feature columns are empty (entity columns may have data but feature columns are empty)
1948+
feature_column_names = [f.name for f in feature_view.features]
1949+
if feature_column_names:
1950+
feature_df = df[feature_column_names]
1951+
if feature_df.empty or feature_df.isnull().all().all():
1952+
raise ValueError("Cannot write dataframe with empty feature columns to online store")
1953+
19221954
provider = self._get_provider()
19231955
await provider.ingest_df_async(feature_view, df)
19241956

sdk/python/tests/unit/online_store/test_online_writes.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,220 @@ def test_online_retrieval(self):
155155
)
156156

157157

158+
class TestEmptyDataFrameValidation(unittest.TestCase):
159+
def setUp(self):
160+
with tempfile.TemporaryDirectory() as data_dir:
161+
self.store = FeatureStore(
162+
config=RepoConfig(
163+
project="test_empty_df_validation",
164+
registry=os.path.join(data_dir, "registry.db"),
165+
provider="local",
166+
entity_key_serialization_version=2,
167+
online_store=SqliteOnlineStoreConfig(
168+
path=os.path.join(data_dir, "online.db")
169+
),
170+
)
171+
)
172+
173+
# Generate test data for schema creation
174+
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
175+
start_date = end_date - timedelta(days=1)
176+
177+
driver_entities = [1001]
178+
driver_df = create_driver_hourly_stats_df(
179+
driver_entities, start_date, end_date
180+
)
181+
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
182+
driver_df.to_parquet(
183+
path=driver_stats_path, allow_truncated_timestamps=True
184+
)
185+
186+
driver = Entity(name="driver", join_keys=["driver_id"])
187+
188+
driver_stats_source = FileSource(
189+
name="driver_hourly_stats_source",
190+
path=driver_stats_path,
191+
timestamp_field="event_timestamp",
192+
created_timestamp_column="created",
193+
)
194+
195+
driver_stats_fv = FeatureView(
196+
name="driver_hourly_stats",
197+
entities=[driver],
198+
ttl=timedelta(days=0),
199+
schema=[
200+
Field(name="conv_rate", dtype=Float32),
201+
Field(name="acc_rate", dtype=Float32),
202+
Field(name="avg_daily_trips", dtype=Int64),
203+
],
204+
online=True,
205+
source=driver_stats_source,
206+
)
207+
208+
self.store.apply([driver, driver_stats_source, driver_stats_fv])
209+
210+
def test_empty_dataframe_raises_error(self):
211+
"""Test that completely empty dataframe raises ValueError"""
212+
empty_df = pd.DataFrame()
213+
214+
with self.assertRaises(ValueError) as context:
215+
self.store.write_to_online_store(
216+
feature_view_name="driver_hourly_stats", df=empty_df
217+
)
218+
219+
self.assertIn("Cannot write empty dataframe to online store", str(context.exception))
220+
221+
def test_empty_dataframe_async_raises_error(self):
222+
"""Test that completely empty dataframe raises ValueError in async version"""
223+
import asyncio
224+
225+
async def test_async_empty():
226+
empty_df = pd.DataFrame()
227+
228+
with self.assertRaises(ValueError) as context:
229+
await self.store.write_to_online_store_async(
230+
feature_view_name="driver_hourly_stats", df=empty_df
231+
)
232+
233+
self.assertIn("Cannot write empty dataframe to online store", str(context.exception))
234+
235+
asyncio.run(test_async_empty())
236+
237+
def test_dataframe_with_empty_feature_columns_raises_error(self):
238+
"""Test that dataframe with entity data but empty feature columns raises ValueError"""
239+
current_time = pd.Timestamp.now()
240+
df_with_entity_only = pd.DataFrame({
241+
"driver_id": [1001, 1002, 1003],
242+
"event_timestamp": [current_time] * 3,
243+
"created": [current_time] * 3,
244+
"conv_rate": [None, None, None], # All nulls
245+
"acc_rate": [None, None, None], # All nulls
246+
"avg_daily_trips": [None, None, None] # All nulls
247+
})
248+
249+
with self.assertRaises(ValueError) as context:
250+
self.store.write_to_online_store(
251+
feature_view_name="driver_hourly_stats", df=df_with_entity_only
252+
)
253+
254+
self.assertIn("Cannot write dataframe with empty feature columns to online store", str(context.exception))
255+
256+
def test_dataframe_with_empty_feature_columns_async_raises_error(self):
257+
"""Test that dataframe with entity data but empty feature columns raises ValueError in async version"""
258+
import asyncio
259+
260+
async def test_async_empty_features():
261+
current_time = pd.Timestamp.now()
262+
df_with_entity_only = pd.DataFrame({
263+
"driver_id": [1001, 1002, 1003],
264+
"event_timestamp": [current_time] * 3,
265+
"created": [current_time] * 3,
266+
"conv_rate": [None, None, None],
267+
"acc_rate": [None, None, None],
268+
"avg_daily_trips": [None, None, None]
269+
})
270+
271+
with self.assertRaises(ValueError) as context:
272+
await self.store.write_to_online_store_async(
273+
feature_view_name="driver_hourly_stats", df=df_with_entity_only
274+
)
275+
276+
self.assertIn("Cannot write dataframe with empty feature columns to online store", str(context.exception))
277+
278+
asyncio.run(test_async_empty_features())
279+
280+
def test_valid_dataframe_succeeds(self):
281+
"""Test that valid dataframe with feature data succeeds"""
282+
current_time = pd.Timestamp.now()
283+
valid_df = pd.DataFrame({
284+
"driver_id": [1001, 1002],
285+
"event_timestamp": [current_time] * 2,
286+
"created": [current_time] * 2,
287+
"conv_rate": [0.5, 0.7],
288+
"acc_rate": [0.8, 0.9],
289+
"avg_daily_trips": [10, 12]
290+
})
291+
292+
# This should not raise an exception
293+
self.store.write_to_online_store(
294+
feature_view_name="driver_hourly_stats", df=valid_df
295+
)
296+
297+
def test_valid_dataframe_async_succeeds(self):
298+
"""Test that valid dataframe with feature data succeeds in async version"""
299+
import asyncio
300+
301+
async def test_async_valid():
302+
current_time = pd.Timestamp.now()
303+
valid_df = pd.DataFrame({
304+
"driver_id": [1001, 1002],
305+
"event_timestamp": [current_time] * 2,
306+
"created": [current_time] * 2,
307+
"conv_rate": [0.5, 0.7],
308+
"acc_rate": [0.8, 0.9],
309+
"avg_daily_trips": [10, 12]
310+
})
311+
312+
# This should not raise an exception
313+
await self.store.write_to_online_store_async(
314+
feature_view_name="driver_hourly_stats", df=valid_df
315+
)
316+
317+
asyncio.run(test_async_valid())
318+
319+
def test_mixed_dataframe_with_some_valid_features_succeeds(self):
320+
"""Test that dataframe with some valid feature values succeeds"""
321+
current_time = pd.Timestamp.now()
322+
mixed_df = pd.DataFrame({
323+
"driver_id": [1001, 1002, 1003],
324+
"event_timestamp": [current_time] * 3,
325+
"created": [current_time] * 3,
326+
"conv_rate": [0.5, None, 0.7], # Mixed values
327+
"acc_rate": [0.8, 0.9, None], # Mixed values
328+
"avg_daily_trips": [10, 12, 15] # All valid
329+
})
330+
331+
# This should not raise an exception because not all feature values are null
332+
self.store.write_to_online_store(
333+
feature_view_name="driver_hourly_stats", df=mixed_df
334+
)
335+
336+
def test_empty_inputs_dict_raises_error(self):
337+
"""Test that empty inputs dict raises ValueError"""
338+
empty_inputs = {
339+
"driver_id": [],
340+
"conv_rate": [],
341+
"acc_rate": [],
342+
"avg_daily_trips": []
343+
}
344+
345+
with self.assertRaises(ValueError) as context:
346+
self.store.write_to_online_store(
347+
feature_view_name="driver_hourly_stats", inputs=empty_inputs
348+
)
349+
350+
self.assertIn("Cannot write empty dataframe to online store", str(context.exception))
351+
352+
def test_inputs_dict_with_empty_features_raises_error(self):
353+
"""Test that inputs dict with empty feature values raises ValueError"""
354+
current_time = pd.Timestamp.now()
355+
empty_feature_inputs = {
356+
"driver_id": [1001, 1002, 1003],
357+
"event_timestamp": [current_time] * 3,
358+
"created": [current_time] * 3,
359+
"conv_rate": [None, None, None],
360+
"acc_rate": [None, None, None],
361+
"avg_daily_trips": [None, None, None]
362+
}
363+
364+
with self.assertRaises(ValueError) as context:
365+
self.store.write_to_online_store(
366+
feature_view_name="driver_hourly_stats", inputs=empty_feature_inputs
367+
)
368+
369+
self.assertIn("Cannot write dataframe with empty feature columns to online store", str(context.exception))
370+
371+
158372
class TestOnlineWritesWithTransform(unittest.TestCase):
159373
def test_transform_on_write_pdf(self):
160374
with tempfile.TemporaryDirectory() as data_dir:

0 commit comments

Comments
 (0)