Skip to content

Commit 55209bc

Browse files
aniketpaluntkathole
authored andcommitted
Added test case to test multiple FeatureView materilization (one empty FeatureView)
Signed-off-by: Aniket Paluskar <[email protected]>
1 parent 0710500 commit 55209bc

File tree

1 file changed

+146
-4
lines changed

1 file changed

+146
-4
lines changed

sdk/python/tests/unit/online_store/test_online_writes.py

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,12 +430,154 @@ def test_inputs_dict_with_empty_features_warns(self):
430430
feature_view_name="driver_hourly_stats", inputs=empty_feature_inputs
431431
)
432432

433-
self.assertEqual(len(warning_list), 1)
434-
self.assertIn(
435-
"Cannot write dataframe with empty feature columns to online store",
436-
str(warning_list[0].message),
433+
# Check that our specific warning message is present
434+
warning_messages = [str(w.message) for w in warning_list]
435+
self.assertTrue(
436+
any(
437+
"Cannot write dataframe with empty feature columns to online store"
438+
in msg
439+
for msg in warning_messages
440+
),
441+
f"Expected warning not found. Actual warnings: {warning_messages}",
437442
)
438443

444+
def test_multiple_feature_views_materialization_with_empty_data(self):
445+
"""Test materializing multiple feature views where one has empty data - should not break materialization"""
446+
import tempfile
447+
from datetime import timedelta
448+
449+
with tempfile.TemporaryDirectory() as data_dir:
450+
# Create a new store for this test
451+
test_store = FeatureStore(
452+
config=RepoConfig(
453+
project="test_multiple_fv_materialization",
454+
registry=os.path.join(data_dir, "registry.db"),
455+
provider="local",
456+
entity_key_serialization_version=3,
457+
online_store=SqliteOnlineStoreConfig(
458+
path=os.path.join(data_dir, "online.db")
459+
),
460+
)
461+
)
462+
463+
# Create entities
464+
driver = Entity(name="driver", join_keys=["driver_id"])
465+
customer = Entity(name="customer", join_keys=["customer_id"])
466+
467+
# Create 5 feature views with data
468+
current_time = pd.Timestamp.now().replace(microsecond=0)
469+
start_date = current_time - timedelta(hours=2)
470+
end_date = current_time - timedelta(minutes=10)
471+
feature_views = []
472+
dataframes = []
473+
offline_paths = []
474+
475+
for i in range(5):
476+
# Create file path for offline data
477+
offline_path = os.path.join(data_dir, f"feature_view_{i + 1}.parquet")
478+
offline_paths.append(offline_path)
479+
480+
# Create feature view with real file source
481+
fv = FeatureView(
482+
name=f"feature_view_{i + 1}",
483+
entities=[driver if i % 2 == 0 else customer],
484+
ttl=timedelta(days=1),
485+
schema=[
486+
Field(name=f"feature_{i + 1}_rate", dtype=Float32),
487+
Field(name=f"feature_{i + 1}_count", dtype=Int64),
488+
],
489+
online=True,
490+
source=FileSource(
491+
name=f"source_{i + 1}",
492+
path=offline_path,
493+
timestamp_field="event_timestamp",
494+
created_timestamp_column="created",
495+
),
496+
)
497+
feature_views.append(fv)
498+
499+
# Create data - make 2nd feature view (index 1) empty
500+
if i == 1: # 2nd feature view gets empty data
501+
df = pd.DataFrame() # Empty dataframe
502+
else:
503+
# Create valid data for other feature views
504+
entity_key = "driver_id" if i % 2 == 0 else "customer_id"
505+
df = pd.DataFrame(
506+
{
507+
entity_key: [1000 + j for j in range(3)],
508+
"event_timestamp": [
509+
start_date + timedelta(minutes=j * 10) for j in range(3)
510+
],
511+
"created": [current_time] * 3,
512+
f"feature_{i + 1}_rate": [0.5 + j * 0.1 for j in range(3)],
513+
f"feature_{i + 1}_count": [10 + j for j in range(3)],
514+
}
515+
)
516+
517+
# Write data to offline store (parquet files) - offline store allows empty dataframes
518+
if len(df) > 0:
519+
df.to_parquet(offline_path, allow_truncated_timestamps=True)
520+
else:
521+
# Create empty parquet file with correct schema (timezone-aware timestamps)
522+
entity_key = "driver_id" if i % 2 == 0 else "customer_id"
523+
empty_schema_df = pd.DataFrame(
524+
{
525+
entity_key: pd.Series([], dtype="int64"),
526+
"event_timestamp": pd.Series(
527+
[], dtype="datetime64[ns, UTC]"
528+
), # ✅ Timezone-aware
529+
"created": pd.Series(
530+
[], dtype="datetime64[ns, UTC]"
531+
), # ✅ Timezone-aware
532+
f"feature_{i + 1}_rate": pd.Series([], dtype="float32"),
533+
f"feature_{i + 1}_count": pd.Series([], dtype="int64"),
534+
}
535+
)
536+
empty_schema_df.to_parquet(
537+
offline_path, allow_truncated_timestamps=True
538+
)
539+
540+
dataframes.append(df)
541+
542+
# Apply entities and feature views
543+
test_store.apply([driver, customer] + feature_views)
544+
545+
# Test: Use materialize() to move data from offline to online store
546+
test_store.materialize(
547+
start_date=start_date,
548+
end_date=end_date,
549+
feature_views=[fv.name for fv in feature_views],
550+
)
551+
552+
# Verify that the operation was successful by checking that non-empty feature views have data
553+
successful_materializations = 0
554+
for i, fv in enumerate(feature_views):
555+
if i != 1: # Skip the empty one (2nd feature view)
556+
entity_key = "driver_id" if i % 2 == 0 else "customer_id"
557+
entity_value = 1000 # First entity from our test data
558+
559+
# Try to retrieve features to verify they were written successfully
560+
online_response = test_store.get_online_features(
561+
entity_rows=[{entity_key: entity_value}],
562+
features=[
563+
f"{fv.name}:feature_{i + 1}_rate",
564+
f"{fv.name}:feature_{i + 1}_count",
565+
],
566+
).to_dict()
567+
568+
# Verify we got some data back (not None/null)
569+
rate_value = online_response.get(f"feature_{i + 1}_rate")
570+
count_value = online_response.get(f"feature_{i + 1}_count")
571+
572+
if rate_value is not None and count_value is not None:
573+
successful_materializations += 1
574+
575+
self.assertIsNotNone(rate_value)
576+
self.assertIsNotNone(count_value)
577+
578+
# Verify that 4 out of 4 non-empty feature views were successfully materialized
579+
self.assertEqual(successful_materializations, 4)
580+
439581

440582
class TestOnlineWritesWithTransform(unittest.TestCase):
441583
def test_transform_on_write_pdf(self):

0 commit comments

Comments
 (0)