@@ -175,13 +175,9 @@ def setUp(self):
175
175
start_date = end_date - timedelta (days = 1 )
176
176
177
177
driver_entities = [1001 ]
178
- driver_df = create_driver_hourly_stats_df (
179
- driver_entities , start_date , end_date
180
- )
178
+ driver_df = create_driver_hourly_stats_df (driver_entities , start_date , end_date )
181
179
driver_stats_path = os .path .join (self .data_dir , "driver_stats.parquet" )
182
- driver_df .to_parquet (
183
- path = driver_stats_path , allow_truncated_timestamps = True
184
- )
180
+ driver_df .to_parquet (path = driver_stats_path , allow_truncated_timestamps = True )
185
181
186
182
driver = Entity (name = "driver" , join_keys = ["driver_id" ])
187
183
@@ -209,90 +205,107 @@ def setUp(self):
209
205
210
206
def tearDown (self ):
211
207
import shutil
208
+
212
209
shutil .rmtree (self .data_dir )
213
210
214
211
def test_empty_dataframe_raises_error (self ):
215
212
"""Test that completely empty dataframe raises ValueError"""
216
213
empty_df = pd .DataFrame ()
217
-
214
+
218
215
with self .assertRaises (ValueError ) as context :
219
216
self .store .write_to_online_store (
220
217
feature_view_name = "driver_hourly_stats" , df = empty_df
221
218
)
222
-
223
- self .assertIn ("Cannot write empty dataframe to online store" , str (context .exception ))
219
+
220
+ self .assertIn (
221
+ "Cannot write empty dataframe to online store" , str (context .exception )
222
+ )
224
223
225
224
def test_empty_dataframe_async_raises_error (self ):
226
225
"""Test that completely empty dataframe raises ValueError in async version"""
227
226
import asyncio
228
-
227
+
229
228
async def test_async_empty ():
230
229
empty_df = pd .DataFrame ()
231
-
230
+
232
231
with self .assertRaises (ValueError ) as context :
233
232
await self .store .write_to_online_store_async (
234
233
feature_view_name = "driver_hourly_stats" , df = empty_df
235
234
)
236
-
237
- self .assertIn ("Cannot write empty dataframe to online store" , str (context .exception ))
238
-
235
+
236
+ self .assertIn (
237
+ "Cannot write empty dataframe to online store" , str (context .exception )
238
+ )
239
+
239
240
asyncio .run (test_async_empty ())
240
241
241
242
def test_dataframe_with_empty_feature_columns_raises_error (self ):
242
243
"""Test that dataframe with entity data but empty feature columns raises ValueError"""
243
244
current_time = pd .Timestamp .now ()
244
- df_with_entity_only = pd .DataFrame ({
245
- "driver_id" : [1001 , 1002 , 1003 ],
246
- "event_timestamp" : [current_time ] * 3 ,
247
- "created" : [current_time ] * 3 ,
248
- "conv_rate" : [None , None , None ], # All nulls
249
- "acc_rate" : [None , None , None ], # All nulls
250
- "avg_daily_trips" : [None , None , None ] # All nulls
251
- })
252
-
245
+ df_with_entity_only = pd .DataFrame (
246
+ {
247
+ "driver_id" : [1001 , 1002 , 1003 ],
248
+ "event_timestamp" : [current_time ] * 3 ,
249
+ "created" : [current_time ] * 3 ,
250
+ "conv_rate" : [None , None , None ], # All nulls
251
+ "acc_rate" : [None , None , None ], # All nulls
252
+ "avg_daily_trips" : [None , None , None ], # All nulls
253
+ }
254
+ )
255
+
253
256
with self .assertRaises (ValueError ) as context :
254
257
self .store .write_to_online_store (
255
258
feature_view_name = "driver_hourly_stats" , df = df_with_entity_only
256
259
)
257
-
258
- self .assertIn ("Cannot write dataframe with empty feature columns to online store" , str (context .exception ))
260
+
261
+ self .assertIn (
262
+ "Cannot write dataframe with empty feature columns to online store" ,
263
+ str (context .exception ),
264
+ )
259
265
260
266
def test_dataframe_with_empty_feature_columns_async_raises_error (self ):
261
267
"""Test that dataframe with entity data but empty feature columns raises ValueError in async version"""
262
268
import asyncio
263
-
269
+
264
270
async def test_async_empty_features ():
265
271
current_time = pd .Timestamp .now ()
266
- df_with_entity_only = pd .DataFrame ({
267
- "driver_id" : [1001 , 1002 , 1003 ],
268
- "event_timestamp" : [current_time ] * 3 ,
269
- "created" : [current_time ] * 3 ,
270
- "conv_rate" : [None , None , None ],
271
- "acc_rate" : [None , None , None ],
272
- "avg_daily_trips" : [None , None , None ]
273
- })
274
-
272
+ df_with_entity_only = pd .DataFrame (
273
+ {
274
+ "driver_id" : [1001 , 1002 , 1003 ],
275
+ "event_timestamp" : [current_time ] * 3 ,
276
+ "created" : [current_time ] * 3 ,
277
+ "conv_rate" : [None , None , None ],
278
+ "acc_rate" : [None , None , None ],
279
+ "avg_daily_trips" : [None , None , None ],
280
+ }
281
+ )
282
+
275
283
with self .assertRaises (ValueError ) as context :
276
284
await self .store .write_to_online_store_async (
277
285
feature_view_name = "driver_hourly_stats" , df = df_with_entity_only
278
286
)
279
-
280
- self .assertIn ("Cannot write dataframe with empty feature columns to online store" , str (context .exception ))
281
-
287
+
288
+ self .assertIn (
289
+ "Cannot write dataframe with empty feature columns to online store" ,
290
+ str (context .exception ),
291
+ )
292
+
282
293
asyncio .run (test_async_empty_features ())
283
294
284
295
def test_valid_dataframe (self ):
285
296
"""Test that valid dataframe with feature data succeeds"""
286
297
current_time = pd .Timestamp .now ()
287
- valid_df = pd .DataFrame ({
288
- "driver_id" : [1001 , 1002 ],
289
- "event_timestamp" : [current_time ] * 2 ,
290
- "created" : [current_time ] * 2 ,
291
- "conv_rate" : [0.5 , 0.7 ],
292
- "acc_rate" : [0.8 , 0.9 ],
293
- "avg_daily_trips" : [10 , 12 ]
294
- })
295
-
298
+ valid_df = pd .DataFrame (
299
+ {
300
+ "driver_id" : [1001 , 1002 ],
301
+ "event_timestamp" : [current_time ] * 2 ,
302
+ "created" : [current_time ] * 2 ,
303
+ "conv_rate" : [0.5 , 0.7 ],
304
+ "acc_rate" : [0.8 , 0.9 ],
305
+ "avg_daily_trips" : [10 , 12 ],
306
+ }
307
+ )
308
+
296
309
# This should not raise an exception
297
310
self .store .write_to_online_store (
298
311
feature_view_name = "driver_hourly_stats" , df = valid_df
@@ -301,40 +314,45 @@ def test_valid_dataframe(self):
301
314
def test_valid_dataframe_async (self ):
302
315
"""Test that valid dataframe with feature data succeeds in async version"""
303
316
import asyncio
317
+
304
318
import pytest
305
-
319
+
306
320
pytest .skip ("Feature not implemented yet" )
307
-
321
+
308
322
async def test_async_valid ():
309
323
current_time = pd .Timestamp .now ()
310
- valid_df = pd .DataFrame ({
311
- "driver_id" : [1001 , 1002 ],
312
- "event_timestamp" : [current_time ] * 2 ,
313
- "created" : [current_time ] * 2 ,
314
- "conv_rate" : [0.5 , 0.7 ],
315
- "acc_rate" : [0.8 , 0.9 ],
316
- "avg_daily_trips" : [10 , 12 ]
317
- })
318
-
324
+ valid_df = pd .DataFrame (
325
+ {
326
+ "driver_id" : [1001 , 1002 ],
327
+ "event_timestamp" : [current_time ] * 2 ,
328
+ "created" : [current_time ] * 2 ,
329
+ "conv_rate" : [0.5 , 0.7 ],
330
+ "acc_rate" : [0.8 , 0.9 ],
331
+ "avg_daily_trips" : [10 , 12 ],
332
+ }
333
+ )
334
+
319
335
# This should not raise an exception
320
336
await self .store .write_to_online_store_async (
321
337
feature_view_name = "driver_hourly_stats" , df = valid_df
322
338
)
323
-
339
+
324
340
asyncio .run (test_async_valid ())
325
341
326
342
def test_mixed_dataframe_with_some_valid_features (self ):
327
343
"""Test that dataframe with some valid feature values succeeds"""
328
344
current_time = pd .Timestamp .now ()
329
- mixed_df = pd .DataFrame ({
330
- "driver_id" : [1001 , 1002 , 1003 ],
331
- "event_timestamp" : [current_time ] * 3 ,
332
- "created" : [current_time ] * 3 ,
333
- "conv_rate" : [0.5 , None , 0.7 ], # Mixed values
334
- "acc_rate" : [0.8 , 0.9 , None ], # Mixed values
335
- "avg_daily_trips" : [10 , 12 , 15 ] # All valid
336
- })
337
-
345
+ mixed_df = pd .DataFrame (
346
+ {
347
+ "driver_id" : [1001 , 1002 , 1003 ],
348
+ "event_timestamp" : [current_time ] * 3 ,
349
+ "created" : [current_time ] * 3 ,
350
+ "conv_rate" : [0.5 , None , 0.7 ], # Mixed values
351
+ "acc_rate" : [0.8 , 0.9 , None ], # Mixed values
352
+ "avg_daily_trips" : [10 , 12 , 15 ], # All valid
353
+ }
354
+ )
355
+
338
356
# This should not raise an exception because not all feature values are null
339
357
self .store .write_to_online_store (
340
358
feature_view_name = "driver_hourly_stats" , df = mixed_df
@@ -346,15 +364,17 @@ def test_empty_inputs_dict_raises_error(self):
346
364
"driver_id" : [],
347
365
"conv_rate" : [],
348
366
"acc_rate" : [],
349
- "avg_daily_trips" : []
367
+ "avg_daily_trips" : [],
350
368
}
351
-
369
+
352
370
with self .assertRaises (ValueError ) as context :
353
371
self .store .write_to_online_store (
354
372
feature_view_name = "driver_hourly_stats" , inputs = empty_inputs
355
373
)
356
-
357
- self .assertIn ("Cannot write empty dataframe to online store" , str (context .exception ))
374
+
375
+ self .assertIn (
376
+ "Cannot write empty dataframe to online store" , str (context .exception )
377
+ )
358
378
359
379
def test_inputs_dict_with_empty_features_raises_error (self ):
360
380
"""Test that inputs dict with empty feature values raises ValueError"""
@@ -365,15 +385,18 @@ def test_inputs_dict_with_empty_features_raises_error(self):
365
385
"created" : [current_time ] * 3 ,
366
386
"conv_rate" : [None , None , None ],
367
387
"acc_rate" : [None , None , None ],
368
- "avg_daily_trips" : [None , None , None ]
388
+ "avg_daily_trips" : [None , None , None ],
369
389
}
370
-
390
+
371
391
with self .assertRaises (ValueError ) as context :
372
392
self .store .write_to_online_store (
373
393
feature_view_name = "driver_hourly_stats" , inputs = empty_feature_inputs
374
394
)
375
-
376
- self .assertIn ("Cannot write dataframe with empty feature columns to online store" , str (context .exception ))
395
+
396
+ self .assertIn (
397
+ "Cannot write dataframe with empty feature columns to online store" ,
398
+ str (context .exception ),
399
+ )
377
400
378
401
379
402
class TestOnlineWritesWithTransform (unittest .TestCase ):
0 commit comments