diff --git a/.secrets.baseline b/.secrets.baseline index 9d27a7b000d..260d37dfee8 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1460,14 +1460,14 @@ "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "d90e76ef629fb00c95f4e84fec29fbda111e2392", "is_verified": false, - "line_number": 459 + "line_number": 486 }, { "type": "Secret Keyword", "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 461 + "line_number": 488 } ], "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py": [ @@ -1539,5 +1539,5 @@ } ] }, - "generated_at": "2026-03-18T08:09:25Z" + "generated_at": "2026-03-20T20:55:36Z" } diff --git a/design-notes/CASEY_SESSION_NOTES.md b/design-notes/CASEY_SESSION_NOTES.md new file mode 100644 index 00000000000..7a0b6f158f8 --- /dev/null +++ b/design-notes/CASEY_SESSION_NOTES.md @@ -0,0 +1,109 @@ +# MongoDB Feast Integration β€” Session Notes +_Last updated: 2026-03-16. Resume here after OS upgrade._ + +--- + +## Status at a Glance + +| Component | Branch | Status | +|---|---|---| +| **Online Store** | `INTPYTHON-297-MongoDB-Feast-Integration` | βœ… **Merged to upstream/master** | +| **Offline Store** | `FEAST-OfflineStore-INTPYTHON-297` | πŸ”§ In progress β€” next focus | + +--- + +## Online Store β€” COMPLETE βœ… + +### What was done +- Implemented `MongoDBOnlineStore` with full sync + async API +- Refactored write path: extracted `_build_write_ops` static method to eliminate code + duplication between `online_write_batch` and `online_write_batch_async` +- Added Feast driver metadata to MongoDB client instantiations +- Registered MongoDB in the feast-operator (kubebuilder enums, `ValidOnlineStoreDBStorePersistenceTypes`, operator YAMLs) +- Updated online store status from `alpha` β†’ `preview` in docs +- All 5 unit tests pass (including Docker-based testcontainers integration test) + +### Key files +- `sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py` β€” main implementation +- `sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py` β€” test suite +- `sdk/python/tests/universal/feature_repos/universal/online_store/mongodb.py` β€” universal test repo config + +### Git history cleanup (this session) +The PR had two merge commits (`632e103a6`, `26ce79b37`) that blocked squash-and-merge. +Resolution: +1. `git fetch --all` +2. Created clean branch `FEAST-OnlineStore-INTPYTHON-297` from `upstream/master` +3. Cherry-picked all 47 commits (oldest β†’ newest), skipping the two merge commits +4. Resolved conflicts: directory rename (`tests/integration/` β†’ `tests/universal/`), + `pixi.lock` auto-resolved, `detect-secrets` false positives got `# pragma: allowlist secret` +5. Force-pushed to `INTPYTHON-297-MongoDB-Feast-Integration` β€” maintainer squash-merged βœ… + +### Versioning +Version is derived dynamically via `setuptools_scm` from git tags (no hardcoded version). +Latest tag at time of merge: **`v0.60.0`**. Feature ships in the next release after that. +Update JIRA with the next release tag once the maintainers cut it. + +--- + +## Offline Store β€” IN PROGRESS πŸ”§ + +### Branch +``` +FEAST-OfflineStore-INTPYTHON-297 +``` + +### Commits on branch (not yet in upstream/master) +``` +cd3eef677 Started work on full Mongo/MQL implementation. Kept MongoDBOfflineStoreIbis and MongoDBOfflineStoreNative +71469f69a feat: restore test-python-universal-mongodb-online Makefile target +904505244 fix: pass onerror to pkgutil.walk_packages +946d84e4c fix: broaden import exception handling in doctest runner +55de0e9b5 fix: catch FeastExtrasDependencyImportError in doctest runner +157a71d77 refactor: improve MongoDB offline store code quality +67632af2f feat: Add MongoDB offline store (ibis-based PIT join, v1 alpha) +``` + +### Key files +- `sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py` + - Contains **two prototype implementations**: + - `MongoDBOfflineStoreIbis` β€” uses Ibis for point-in-time joins (delegates to `get_historical_features_ibis`) + - `MongoDBOfflineStoreNative` β€” native MQL implementation (started in `cd3eef677`, in progress) +- `sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_source.py` β€” `MongoDBSource` data source + +### Architecture: Ibis vs Native +- **Ibis approach**: delegates PIT join to `feast.infra.offline_stores.ibis` helpers. + Pro: less code, consistency with other ibis-backed stores. + Con: requires ibis-mongodb connector; PIT correctness depends on ibis translation. +- **Native approach**: implements PIT join directly in MQL (MongoDB aggregation pipeline). + Pro: no extra dependency, full control. + Con: more complex; MQL aggregation pipelines can be verbose. +- Decision pending benchmarking / correctness validation between the two. + +### Next steps for offline store +1. Finish `MongoDBOfflineStoreNative` MQL implementation (started in latest commit) +2. Validate PIT correctness for both implementations against the Feast universal test suite +3. Run: `make test-python-universal-mongodb-offline` (target may need creating β€” see `71469f69a`) +4. Choose Ibis vs Native based on results; remove the other +5. Add to operator (same pattern as online store: kubebuilder enums, install.yaml) +6. Open PR β€” follow same DCO + linear history discipline as online store + +--- + +## Environment Notes + +- **Python env**: always use `uv run pytest ...` (uses `.venv` in repo root, Python 3.11) +- **Do NOT use**: system Python (`/Library/Frameworks/Python.framework/...`) or conda envs +- **Docker**: must be running for the testcontainers integration test +- **Stale container**: `72d14b345b6a` (mongo:latest, port 57120) β€” leftover from testing, safe to stop +- **DCO**: all commits must be signed: `git commit -s` +- **No push/merge without explicit user approval** + +--- + +## Git Workflow Reminder +To keep history clean (lesson from online store PR): +- Always branch from `upstream/master` (after `git fetch --all`) +- Never merge upstream into a feature branch β€” rebase or cherry-pick instead +- Before opening a PR, verify with: `git log --merges ^upstream/master --oneline` + (must return empty) + diff --git a/design-notes/design-hybrid-with-batches.md b/design-notes/design-hybrid-with-batches.md new file mode 100644 index 00000000000..080986579ef --- /dev/null +++ b/design-notes/design-hybrid-with-batches.md @@ -0,0 +1,239 @@ +Native MongoDB Offline Store (Hybrid Design) + +Design Document + +Overview + +This document describes the design of the Native MongoDB Offline Store for Feast using a hybrid execution model. The system combines MongoDB’s strengths in indexed data retrieval with Python’s strengths in relational and temporal joins. + +The implementation uses a single-collection schema in MongoDB to store feature data across all FeatureViews and performs point-in-time (PIT) joins using a β€œfetch + pandas join” strategy. This replaces an earlier fully in-database $lookup approach that proved unscalable for large workloads. + +The result is a design that is performant, scalable, and aligned with Feast’s semantics. + +βΈ» + +Data Model + +All FeatureViews share a single MongoDB collection (feature_history). Each document represents an observation of a FeatureView for a given entity at a specific timestamp. + +Each document contains: + β€’ A serialized entity identifier (entity_id) + β€’ A FeatureView identifier (feature_view) + β€’ A subdocument of feature values (features) + β€’ An event timestamp (event_timestamp) + β€’ An ingestion timestamp (created_at) + +This schema supports: + β€’ Sparse feature storage (not all features present in every document) + β€’ Flexible schema evolution over time + β€’ Efficient indexing across FeatureViews + +A compound index is maintained on: + β€’ (entity_id, feature_view, event_timestamp DESC) + +This index supports efficient filtering by entity, FeatureView, and time range. + +βΈ» + +Execution Model + +High-Level Strategy + +The system implements historical feature retrieval in three stages: + 1. Preprocessing (Python) + β€’ Normalize timestamps to UTC + β€’ Serialize entity keys into entity_id + β€’ Partition the input entity_df into manageable chunks + 2. Data Fetching (MongoDB) + β€’ Query MongoDB using $in on entity IDs + β€’ Filter by FeatureView and time bounds + β€’ Retrieve matching feature documents in batches + 3. Point-in-Time Join (Python) + β€’ Convert MongoDB results into pandas DataFrames + β€’ Perform per-FeatureView joins using merge_asof + β€’ Apply TTL constraints and feature selection + +This design avoids per-row database joins and instead performs a small number of efficient indexed scans. + +βΈ» + +Chunking and Batching + +To ensure scalability, the system separates concerns between: + β€’ Chunk size (entity_df) +Controls memory usage in Python +Default: ~5,000 rows + β€’ Batch size (MongoDB queries) +Controls query size and index efficiency +Default: ~1,000 entity IDs per query + +Each chunk of entity_df is processed independently: + β€’ Entity IDs are extracted and deduplicated + β€’ Feature data is fetched in batches + β€’ Results are joined and accumulated + +This ensures: + β€’ Bounded memory usage + β€’ Predictable query performance + β€’ Compatibility with large workloads + +βΈ» + +Point-in-Time Join Semantics + +For each FeatureView: + β€’ Feature data is sorted by (entity_id, event_timestamp) + β€’ The entity dataframe is similarly sorted + β€’ A backward merge_asof is performed + +This ensures: + β€’ Only feature values with timestamps ≀ entity timestamp are used + β€’ The most recent valid feature value is selected + +TTL constraints are applied after the join: + β€’ If the matched feature timestamp is older than the allowed TTL window, the value is set to NULL + +βΈ» + +Key Improvements in Current Design + +1. Projection (Reduced Data Transfer) + +The system now explicitly limits fields retrieved from MongoDB to only those required: + β€’ entity_id + β€’ feature_view + β€’ event_timestamp + β€’ Requested feature fields within features + +This reduces: + β€’ Network overhead + β€’ BSON decoding cost + β€’ Memory usage in pandas + +This is especially important for wide FeatureViews or large documents. + +βΈ» + +2. Bounded Time Filtering + +Queries now include both: + β€’ An upper bound (<= max_ts) + β€’ A lower bound (>= min_ts) + +This significantly reduces the amount of historical data scanned when: + β€’ The entity dataframe spans a narrow time window + β€’ The feature store contains deep history + +This optimization improves: + β€’ Query latency + β€’ Index selectivity + β€’ Memory footprint of retrieved data + +Future enhancements may incorporate TTL-aware lower bounds. + +βΈ» + +3. Correct Sorting for Temporal Joins + +The system ensures proper sorting before merge_asof: + β€’ Both dataframes are sorted by (entity_id, timestamp) + +This is critical for correctness when: + β€’ Multiple entities are processed in a single batch + β€’ Data is interleaved across entities + +Without this, joins may silently produce incorrect results. + +βΈ» + +Tradeoffs + +Advantages + β€’ Scalability: Avoids O(n Γ— m) behavior of correlated joins + β€’ Flexibility: Supports sparse and evolving schemas + β€’ Performance: Leverages MongoDB indexes efficiently + β€’ Simplicity: Uses well-understood pandas join semantics + +Limitations + β€’ Memory-bound joins: Requires chunking for large workloads + β€’ Multiple passes: Each FeatureView requires a separate join + β€’ No server-side joins: MongoDB is used only for filtering, not relational logic + +βΈ» + +Comparison to Alternative Designs + +Full MongoDB Join ($lookup) + +Rejected due to: + β€’ Poor scaling with large entity sets + β€’ Repeated execution of correlated subqueries + β€’ High latency (orders of magnitude slower) + +βΈ» + +Ibis-Based Design + β€’ Uses one collection per FeatureView + β€’ Loads data into memory and performs joins in Python + +Comparison: + β€’ Similar performance after hybrid redesign + β€’ Simpler query model + β€’ Less flexible schema + +The Native design trades simplicity for: + β€’ Unified storage + β€’ Better alignment with document-based ingestion + β€’ More flexible feature evolution + +βΈ» + +Operational Considerations + +Index Management + +Indexes are created lazily at runtime: + β€’ Ensures correctness without manual setup + β€’ Avoids placing responsibility on users + +Future improvements may include: + β€’ Optional strict index validation + β€’ Configuration-driven index management + +βΈ» + +MongoDB Client Usage + +Each chunk currently uses a separate MongoDB client instance. + +This is acceptable for moderate workloads but may be optimized in the future by: + β€’ Reusing a shared client per retrieval job + β€’ Leveraging connection pooling more explicitly + +βΈ» + +Future Work + +Several enhancements are possible: + 1. Streaming Joins + β€’ Avoid materializing all feature data in memory + β€’ Process data incrementally + 2. Adaptive Chunking + β€’ Dynamically adjust chunk size based on memory pressure + 3. TTL Pushdown + β€’ Incorporate TTL constraints into MongoDB queries + 4. Parallel Execution + β€’ Process chunks concurrently for large workloads + +βΈ» + +Conclusion + +The hybrid MongoDB + pandas design represents a significant improvement over the initial fully in-database approach. It aligns system responsibilities with the strengths of each component: + β€’ MongoDB handles indexed filtering and retrieval + β€’ Python handles temporal join logic + +With the addition of projection, bounded time filtering, and correct sorting, the system is now both performant and correct for large-scale historical feature retrieval. + +This design provides a strong foundation for further optimization and production use. + diff --git a/design-notes/native_implementation_notes.md b/design-notes/native_implementation_notes.md new file mode 100644 index 00000000000..891751e56c2 --- /dev/null +++ b/design-notes/native_implementation_notes.md @@ -0,0 +1,191 @@ +# Native MongoDB Offline Store Implementation Review + +## Overview + +This document reviews the native MongoDB offline store implementation (`mongodb_native.py`) in the context of Feast idioms, the MongoDB online store implementation, and best practices. + +--- + +## Schema Alignment: Online ↔ Offline + +### Online Store Schema (mongodb_online_store/mongodb.py) +```javascript +{ + "_id": bytes, // serialized entity key + "features": { + "": { + "": value + } + }, + "event_timestamps": { "": datetime }, + "created_timestamp": datetime +} +``` + +### Offline Store Schema (Native) +```javascript +{ + "_id": ObjectId(), + "entity_id": bytes, // serialized entity key (same format as online _id) + "feature_view": "driver_stats", // discriminator + "features": { "": value }, + "event_timestamp": datetime, + "created_at": datetime +} +``` + +### βœ… Alignment Strengths +1. **Entity key serialization**: Both use `serialize_entity_key()` from `key_encoding_utils.py` +2. **Nested features**: Both use `features: { ... }` subdocument pattern +3. **Timestamps**: Both track event and created timestamps + +### ⚠️ Alignment Concerns +1. **`_id` usage**: Online uses `_id` = entity_id; Offline uses `_id` = ObjectId() with separate `entity_id` field + - **Recommendation**: Consider using `_id` = `{entity_id, feature_view, event_timestamp}` compound key for offline, eliminating ObjectId overhead + +2. **Feature nesting depth**: Online nests by feature_view then feature; Offline nests only by feature (feature_view is top-level) + - This is intentional (offline is one doc per event; online is one doc per entity with all FVs) + +--- + +## Feast Idioms Compliance + +### βœ… Correctly Followed +1. **RetrievalJob pattern**: Returns `MongoDBNativeRetrievalJob` wrapping a `query_fn` closure +2. **Arrow output**: `_to_arrow_internal()` returns `pyarrow.Table` (hard requirement) +3. **Warnings for preview**: Uses `warnings.warn()` with `RuntimeWarning` +4. **Config inheritance**: `MongoDBOfflineStoreNativeConfig` extends `FeastConfigBaseModel` +5. **DataSource pattern**: `MongoDBSourceNative` extends `DataSource` with `from_proto`/`_to_proto_impl` + +### ⚠️ Missing or Incomplete +1. **`offline_write_batch`**: Not implemented (raises `NotImplementedError` in persist) + - Required for push sources and `feast materialize` reverse path + - Should accept `pyarrow.Table` and insert into `feature_history` collection + +2. **`write_logged_features`**: Not implemented + - Lower priority but needed for feature logging + +3. **`persist()` on RetrievalJob**: Not implemented + - Should write results to a new collection for saved datasets + +--- + +## MQL Pipeline Quality + +### βœ… Well Implemented +1. **`pull_all_from_table_or_query`**: Clean range scan with `$project` flattening features server-side +2. **`pull_latest_from_table_or_query`**: Proper `$sort` β†’ `$group` β†’ `$project` pattern +3. **`get_historical_features`**: Uses `$lookup` with correlated subpipeline for server-side PIT join +4. **Per-FV TTL via `$switch`**: Elegant solution for different TTLs per feature view + +### ⚠️ Potential Improvements +1. **Index usage in `$lookup`**: The `$expr` in `$match` may not use indexes efficiently + - MongoDB 5.0+ has better support for `$expr` index usage + - Consider adding `hint` option if performance is critical + +2. **Temp collection cleanup**: Currently uses `try/finally` but could benefit from context manager pattern + +3. **Connection pooling**: Each method creates a new `MongoClient`. The online store caches `_client` and `_collection` + - **Recommendation**: Add `_client` caching to the offline store class or use connection pooling + +--- + +## Comparison with Online Store Patterns + +| Aspect | Online Store | Offline Store (Native) | +|--------|--------------|------------------------| +| Client caching | `_client`, `_collection` instance vars | New client per operation | +| Async support | Yes (`AsyncMongoClient`) | No | +| Batch operations | `bulk_write` with `UpdateOne` | `insert_many` | +| Error handling | Raises `RuntimeError` for config mismatch | Raises `ValueError` | +| DriverInfo | βœ… Yes | βœ… Yes | + +### Recommendations +1. **Add client caching** to avoid connection overhead per query +2. **Consider async support** for large entity_df scenarios +3. **Standardize error types** (use `RuntimeError` or `FeastError` subclasses) + +--- + +## Missing Features for Production Readiness + +### High Priority +1. **`offline_write_batch`**: Insert Arrow table into feature_history + ```python + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + # Convert Arrow β†’ docs with schema: + # { entity_id, feature_view, features: {...}, event_timestamp, created_at } + # Then insert_many() + ``` + +2. **Index creation helper**: Document or auto-create the compound index + ```javascript + db.feature_history.createIndex({ + entity_id: 1, + feature_view: 1, + event_timestamp: -1 + }) + ``` + +3. **Connection pooling / client reuse** + +### Medium Priority +4. **`persist()` for saved datasets**: Write retrieval results to a collection +5. **`write_logged_features`**: For feature logging support +6. **Async operations**: Mirror online store's async pattern + +### Lower Priority +7. **Streaming cursor support**: For very large result sets +8. **Explain plan logging**: Debug mode to show MQL execution plan + +--- + +## Code Quality Observations + +### βœ… Good +- Clear docstrings explaining schema and index requirements +- Type hints throughout +- Helper functions extracted (`_ttl_to_ms`, `_build_ttl_gte_expr`, `_serialize_entity_key_from_row`) +- Proper cleanup of temp collections in `finally` block + +### ⚠️ Could Improve +- Some duplication in timestamp timezone handling (could extract helper) +- Magic strings like `"event_timestamp"`, `"created_at"` could be constants +- The `_run()` closures are large β€” consider extracting to separate methods + +--- + +## Test Coverage Assessment + +Current tests cover: +- βœ… `pull_latest_from_table_or_query` +- βœ… `pull_all_from_table_or_query` +- βœ… `get_historical_features` (PIT join) +- βœ… TTL filtering +- βœ… Multiple feature views +- βœ… Compound join keys + +Missing tests: +- ❌ `offline_write_batch` (not implemented) +- ❌ Empty result handling edge cases +- ❌ Very large entity_df (performance/memory) +- ❌ Concurrent access to temp collections +- ❌ Index usage verification (explain plans) + +--- + +## Summary + +The native implementation is a solid foundation with proper use of MQL aggregation pipelines. Key next steps: + +1. **Implement `offline_write_batch`** β€” Required for push sources +2. **Add client caching** β€” Match online store pattern +3. **Document/automate index creation** β€” Critical for performance +4. **Consider `_id` schema optimization** β€” Use compound `_id` instead of ObjectId + entity_id + diff --git a/design-notes/offline_store_design.md b/design-notes/offline_store_design.md new file mode 100644 index 00000000000..fbe7120a3c1 --- /dev/null +++ b/design-notes/offline_store_design.md @@ -0,0 +1,98 @@ +# Corrected MongoDB OfflineStore Design + +## What the interface actually requires + +`RetrievalJob._to_arrow_internal` must return a `pyarrow.Table`. This is non-negotiable +because the compute engines call `retrieval_job.to_arrow()` directly: + +```python +# sdk/python/feast/infra/compute_engines/local/nodes.py +retrieval_job = create_offline_store_retrieval_job(...) +arrow_table = retrieval_job.to_arrow() # ← hard requirement +``` + +The compute engine then converts Arrow β†’ proto tuples itself before calling +`OnlineStore.online_write_batch(data: List[Tuple[EntityKeyProto, ...]])`. +The offline store never sees the proto tuple format. + +`OfflineStore.offline_write_batch` (the push-source write path) takes a `pyarrow.Table` +β€” so Arrow is also the *input* format for writes. + +## The right approach β€” native aggregation, then Arrow + +The Couchbase offline store is the correct reference. It: +1. Expresses computation natively in the database (SQL++ window functions). +2. Iterates the cursor in Python. +3. Converts directly: `pa.Table.from_pylist(processed_rows)` β€” **no pandas intermediate**. + +MongoDB should follow the same pattern using its aggregation pipeline. + +## pull_latest_from_table_or_query + +The `$group` + `$sort` aggregation is the natural MongoDB equivalent of +`ROW_NUMBER() OVER(PARTITION BY entity ORDER BY timestamp DESC) = 1`: + +```python +pipeline = [ + {"$match": { + timestamp_field: {"$gte": start_date, "$lte": end_date} + }}, + {"$sort": {timestamp_field: -1, created_timestamp_column: -1}}, + {"$group": { + "_id": {k: f"${k}" for k in join_key_columns}, + **{f: {"$first": f"${f}"} for f in feature_name_columns}, + timestamp_field: {"$first": f"${timestamp_field}"}, + }}, +] +# cursor β†’ pa.Table.from_pylist([doc for doc in collection.aggregate(pipeline)]) +``` + +No pandas. No Feast join utilities. The database does the work. + +## get_historical_features + +This is harder. The point-in-time join requires: for each (entity, entity_timestamp) row, +find the feature row with the latest `event_timestamp <= entity_timestamp`. + +MongoDB has no SQL window functions, but the aggregation pipeline can express this: + +``` +For each feature view: + $match: entity_ids in entity_df AND event_timestamp <= max(entity_timestamps) + $sort: entity_id, event_timestamp DESC + $lookup or unwind against entity_df rows + $match: event_timestamp <= entity_row.entity_timestamp (and TTL if set) + $group by (entity_id, entity_row_id): $first of features +``` + +This is complex but keeps computation in MongoDB and avoids loading the full history +into Python memory. The result cursor is then converted via `pa.Table.from_pylist()`. + +For an initial implementation it is acceptable to pull the filtered documents into +memory and do the join in Python (like the Dask store) β€” but this should be noted +as a known limitation, not the target design. + +## offline_write_batch + +Receives a `pyarrow.Table` from Feast (push-source path). Convert with +`table.to_pylist()` and `insert_many()` into the collection. + +## What changes from the previous design + +| Previous (incorrect) | Corrected | +|---------------------------------------------|---------------------------------------------| +| Pull docs into pandas, use offline_utils | Use MongoDB aggregation pipeline | +| pandas is the intermediate format | MongoDB cursor β†’ `pa.Table.from_pylist()` | +| Arrow is an afterthought | Arrow is the required output of the job | +| Claimed online_write_batch takes Arrow | It takes proto tuples; compute engine converts | + +## Implementation order (unchanged) + +1. `MongoDBSource` β€” DataSource subclass (connection_string, database, collection, timestamp_field). +2. `MongoDBOfflineStoreConfig` β€” pydantic config. +3. `MongoDBRetrievalJob` β€” wraps aggregation pipeline, implements `_to_arrow_internal`. +4. `offline_write_batch` β€” `pyarrow.Table` β†’ `insert_many`. +5. `pull_latest_from_table_or_query` β€” `$sort` + `$group` aggregation. +6. `pull_all_from_table_or_query` β€” `$match` time-range scan. +7. `get_historical_features` β€” aggregation pipeline PIT join (or in-memory fallback). + diff --git a/design-notes/prompt-mdb-fetch-pandas-join-with-batches.md b/design-notes/prompt-mdb-fetch-pandas-join-with-batches.md new file mode 100644 index 00000000000..9bd8fb437c3 --- /dev/null +++ b/design-notes/prompt-mdb-fetch-pandas-join-with-batches.md @@ -0,0 +1,108 @@ +Enhance MongoDBOfflineStoreNative.get_historical_features to support chunked execution for large entity_df, while preserving the existing fetch + pandas PIT join logic. + +Goals + β€’ Prevent memory blowups for large entity_df + β€’ Reuse the current implementation as much as possible + β€’ Keep the code clean and idiomatic to Feast + +βΈ» + +Requirements + +1. Add chunking based on entity_df size + β€’ Introduce a constant: +``` python +CHUNK_SIZE = 5000 # make configurable configurable +``` + β€’ If len(entity_df) <= CHUNK_SIZE: + β€’ Run the existing _run() logic unchanged + β€’ Else: + β€’ Split entity_df into chunks of size CHUNK_SIZE + +βΈ» + +2. Extract existing logic into reusable function +Refactor the current _run() implementation into a helper: +``` python +def _run_single(entity_subset_df: pd.DataFrame) -> pd.DataFrame: + ... +``` +This function should: + β€’ Perform: + β€’ entity_id serialization + β€’ MongoDB fetch ($in query) + β€’ pandas normalization + β€’ per-feature-view merge_asof + β€’ Return a pandas DataFrame (not Arrow) +3. Implement chunked execution +In _run(): +``` python +if len(entity_df) <= CHUNK_SIZE: + df = _run_single(entity_df) +else: + dfs = [] + for chunk in chunk_dataframe(entity_df, CHUNK_SIZE): + dfs.append(_run_single(chunk)) + df = pd.concat(dfs, ignore_index=True) +``` +4. Implement chunk helper +Add: +``` +def chunk_dataframe(df: pd.DataFrame, size: int): + for i in range(0, len(df), size): + yield df.iloc[i:i+size] +``` +5. Preserve ordering + β€’ Ensure final DataFrame preserves original row order + β€’ Use a _row_idx column if necessary +6. Handle edge cases +Ensure chunked version correctly handles: + β€’ Empty MongoDB results + β€’ Missing feature_views + β€’ Missing features inside documents + β€’ TTL filtering (already implemented in pandas) + +βΈ» + +7. Return Arrow table +Final _run() must still return: +``` +pyarrow.Table.from_pandas(df, preserve_index=False) +``` +Constraints + β€’ Do NOT reintroduce $lookup + β€’ Do NOT use temp collections + β€’ Do NOT duplicate large blocks of logic + β€’ Keep code readable and maintainable + +βΈ» + +Optional (nice-to-have) + β€’ Add logging or debug print: + β€’ number of chunks processed + β€’ rows per chunk + +βΈ» + +Outcome + β€’ Small workloads behave exactly as before + β€’ Large workloads are processed safely in chunks + β€’ Performance remains close to Ibis for moderate sizes + β€’ Memory usage is bounded + +βΈ» + +🧠 Why this design is the right one + +This keeps your system: + +βœ… Fast + β€’ still uses vectorized joins + +βœ… Scalable + β€’ bounded memory + +βœ… Clean + β€’ no duplication + β€’ no branching chaos + diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md new file mode 100644 index 00000000000..db6318ee17d --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md @@ -0,0 +1,161 @@ +# MongoDB Offline Store + +Two MongoDB offline store implementations optimized for different use cases. + +## Overview + +| Aspect | `MongoDBOfflineStoreMany` | `MongoDBOfflineStoreOne` | +|--------|---------------------------|--------------------------| +| Collections | One per FeatureView | Single shared collection | +| Schema | Flat documents | Nested `features` subdoc | +| Entity ID | Separate columns | Serialized bytes | +| Best for | Small-medium feature stores | Large feature stores | + +## MongoDBOfflineStoreMany (mongodb_many.py) + +**One collection per FeatureView** β€” each FeatureView maps to its own MongoDB collection. + +### Schema + +```javascript +// Collection: driver_stats +{ + "driver_id": 1001, + "event_timestamp": ISODate("2024-01-15T10:00:00Z"), + "created_at": ISODate("2024-01-15T10:00:01Z"), // Optional tie-breaker + "trips_today": 5, + "rating": 4.8 +} +``` + +Ties (same `event_timestamp`) are broken by `created_timestamp_column` if configured. + +### Configuration + +```yaml +offline_store: + type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many.MongoDBOfflineStoreMany + connection_string: mongodb://localhost:27017 + database: feast +``` + +### When to Use + +βœ… **Small to medium feature stores** β€” loads entire collection into memory +βœ… **Fast PIT joins** β€” Ibis memtables are highly optimized +βœ… **Simple schema** β€” flat documents, easy to query directly +βœ… **Per-collection indexes** β€” each FV can have tailored indexes + +⚠️ **Caution**: Loads ALL documents from each collection. May OOM on very large collections. + +## MongoDBOfflineStoreOne (mongodb_one.py) + +**Single shared collection** β€” all FeatureViews store data in one collection with a discriminator field. + +### Schema + +```javascript +// Collection: feature_history (shared by all FVs) +{ + "entity_id": Binary("..."), // Serialized entity key + "feature_view": "driver_stats", // Discriminator + "features": { // Nested subdocument + "trips_today": 5, + "rating": 4.8 + }, + "event_timestamp": ISODate("2024-01-15T10:00:00Z"), + "created_at": ISODate("2024-01-15T10:00:01Z") +} +``` + +### Configuration + +```yaml +offline_store: + type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one.MongoDBOfflineStoreOne + connection_string: mongodb://localhost:27017 + database: feast + collection: feature_history +``` + +### When to Use + +βœ… **Large feature stores** β€” filters by entity_id, doesn't load entire collection +βœ… **Memory-safe** β€” processes in chunks, bounded memory usage +βœ… **Schema consistency** β€” matches online store pattern +βœ… **Efficient materialization** β€” MQL aggregation pipeline + +⚠️ **Trade-off**: Slightly slower than Many for small workloads due to serialization overhead. + +## Performance Comparison + +Benchmarks with 10 features, 3 historical rows per entity: + +| Entity Rows | Many (time) | One (time) | Winner | +|-------------|-------------|------------|--------| +| 1,000 | 0.30s | 0.06s | One | +| 10,000 | 0.20s | 0.31s | Many | +| 100,000 | 1.51s | 5.22s | Many | +| 1,000,000 | 16.08s | 212s | Many | + +### Memory Behavior + +| Scenario | Many | One | +|----------|------|-----| +| Large feature collection, small entity_df | ❌ Loads all | βœ… Filters | +| Small feature collection, large entity_df | βœ… Fast | ⚠️ Slower | + +## Choosing an Implementation + +``` + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Is your feature collection β”‚ + β”‚ larger than available RAM? β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β–Ό β–Ό + YES NO + β”‚ β”‚ + β–Ό β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Use ONE β”‚ β”‚ Use MANY β”‚ + β”‚ (memory-safe) β”‚ β”‚ (faster) β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Index Recommendations + +### Many (per-collection) + +Each collection should have an index on the join keys + timestamp: + +```javascript +// For a FeatureView with join key "driver_id" +db.driver_stats.createIndex({ + "driver_id": 1, // Join key(s) + "event_timestamp": -1 +}) + +// For a FeatureView with compound join keys +db.order_stats.createIndex({ + "customer_id": 1, + "order_id": 1, + "event_timestamp": -1 +}) +``` + +**Note**: The Many implementation auto-creates indexes during `pull_latest_from_table_or_query` (materialization). + +### One (shared collection) + +```javascript +db.feature_history.createIndex({ + "entity_id": 1, + "feature_view": 1, + "event_timestamp": -1 +}) +``` + +The One implementation creates this index automatically on first use. + diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/__init__.py new file mode 100644 index 00000000000..535583bc38d --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/__init__.py @@ -0,0 +1,8 @@ +import feast.version + +try: + from pymongo.driver_info import DriverInfo + + DRIVER_METADATA = DriverInfo(name="Feast", version=feast.version.get_version()) +except ImportError: + DRIVER_METADATA = None # type: ignore[assignment] diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_many.py b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_many.py new file mode 100644 index 00000000000..b1112552f39 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_many.py @@ -0,0 +1,678 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +MongoDB Offline Store Implementation (Many Collections). + +This module implements a MongoDB offline store using a many-collection schema +where each FeatureView maps to its own dedicated MongoDB collection. It uses +Ibis for point-in-time joins, loading collection data into in-memory tables. + +Collection Structure: + Each FeatureView has its own collection named after the source: + - driver_stats FeatureView β†’ db.driver_stats collection + - vehicle_stats FeatureView β†’ db.vehicle_stats collection + +Collection Index (auto-created during materialization): + db..createIndex({ + "": 1, + "": 1, // if compound key + "event_timestamp": -1, + "created_at": -1 // if created_timestamp_column is set + }) + +Document Schema (example for driver_stats): + { + "_id": ObjectId(), + "driver_id": 1001, + "event_timestamp": ISODate("2026-01-20T12:00:00Z"), + "created_at": ISODate("2026-01-20T12:00:05Z"), + "rating": 4.91, + "trips_last_7d": 132 + } + + Note: Features are stored as top-level fields (flat schema), not nested + in a subdocument. This differs from the "One" implementation. + +Feature Freshness Semantics: + This implementation operates at *document-level freshness*, not + per-feature freshness. During retrieval (e.g. point-in-time joins), + the system selects the most recent document for a given entity that + satisfies time constraints, and then extracts all requested features + from that document. + + As a result, if a newer document contains only a subset of features, + missing features will be returned as NULLβ€”even if older documents + contained values for those features. The system does not backfill + individual feature values from earlier events. + + This behavior matches common Feast offline store semantics, but may + differ from systems that compute "latest value per feature". + +Schema Evolution ("Feature Creep"): + Because documents can have varying fields over time, different documents + in the same collection may contain different sets of feature fields. + This supports: + - Adding new features without backfilling historical data + - Partial writes or sparse feature computation + + However, it also implies: + - Newly added features will be NULL for older events + - Partially populated documents may lead to NULL values even + when older data contained those features + + Users should ensure that feature computation pipelines write complete + feature sets when consistent availability is required. + +Notes: + - Entity keys are stored as native MongoDB types (not serialized), + which differs from the "One" implementation. + - Point-in-time correctness is enforced per FeatureView. + - TTL (time-to-live) constraints are applied per FeatureView during + historical retrieval. + +Point-in-Time Join Strategy: + 1. Load entire collection into an Ibis memtable + 2. Load entity_df into an Ibis memtable + 3. Use Ibis/pandas merge_asof for point-in-time correctness + 4. Apply TTL filtering per FeatureView + +Performance Characteristics: + - Fast for small to medium collections (fits in memory) + - Optimized Ibis memtable operations for joins + - ⚠️ Loads ENTIRE collection into memory - may OOM on large collections + +When to Use: + βœ… Small to medium feature stores where collections fit in memory + βœ… When query performance is the priority + βœ… When you want simple, flat document schemas + βœ… When each FeatureView has independent scaling needs + + ❌ Avoid when collections are very large (use MongoDBOfflineStoreOne instead) + ❌ Avoid in memory-constrained environments + +Comparison with MongoDBOfflineStoreOne: + | Aspect | Many (this module) | One | + |-----------------|----------------------|------------------------| + | Collections | N (one per FV) | 1 (shared) | + | Schema | Flat top-level | Nested features{} | + | Memory | Loads all docs | Filters by entity | + | Performance | Faster at scale | Memory-efficient | + | Entity ID | Native columns | Serialized bytes | +""" + +import json +import warnings +from datetime import datetime +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast + +import ibis +import pandas as pd +from ibis.expr.types import Table +from pydantic import StrictStr + +try: + from pymongo import MongoClient +except ImportError: + MongoClient = None # type: ignore[assignment,misc] + +from feast.data_source import DataSource +from feast.errors import ( + DataSourceNoNameException, + FeastExtrasDependencyImportError, + SavedDatasetLocationAlreadyExists, +) +from feast.feature_view import FeatureView +from feast.infra.offline_stores.contrib.mongodb_offline_store import DRIVER_METADATA +from feast.infra.offline_stores.ibis import ( + get_historical_features_ibis, + pull_all_from_table_or_query_ibis, + pull_latest_from_table_or_query_ibis, +) +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.SavedDataset_pb2 import ( + SavedDatasetStorage as SavedDatasetStorageProto, +) +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.type_map import mongodb_to_feast_value_type +from feast.value_type import ValueType + +# --------------------------------------------------------------------------- +# Helper functions +# --------------------------------------------------------------------------- + + +def _infer_python_type_str(value: Any) -> Optional[str]: + """Infer a Feast-compatible type string from a Python value returned by pymongo.""" + if value is None: + return None + if isinstance(value, bool): + return "bool" + if isinstance(value, int): + return "int" + if isinstance(value, float): + return "float" + if isinstance(value, str): + return "str" + if isinstance(value, bytes): + return "bytes" + if isinstance(value, datetime): + return "datetime" + if isinstance(value, list): + if not value: + return "list[str]" + elem_type = _infer_python_type_str(value[0]) + if elem_type: + return f"list[{elem_type}]" + return "list[str]" + return None + + +# --------------------------------------------------------------------------- +# MongoDBSourceMany and related classes (one collection per FeatureView) +# --------------------------------------------------------------------------- + + +class MongoDBOptionsMany: + """Options for a MongoDB data source (database + collection).""" + + def __init__(self, database: str, collection: str): + self._database = database + self._collection = collection + + def to_proto(self) -> DataSourceProto.CustomSourceOptions: + """Serialize database and collection names as JSON into a CustomSourceOptions proto.""" + return DataSourceProto.CustomSourceOptions( + configuration=json.dumps( + {"database": self._database, "collection": self._collection} + ).encode() + ) + + @classmethod + def from_proto( + cls, options_proto: DataSourceProto.CustomSourceOptions + ) -> "MongoDBOptionsMany": + """Deserialize a CustomSourceOptions proto back into a MongoDBOptionsMany instance.""" + config = json.loads(options_proto.configuration.decode("utf8")) + return cls(database=config["database"], collection=config["collection"]) + + +class MongoDBSourceMany(DataSource): + """A MongoDB collection used as a Feast offline data source (one collection per FeatureView). + + ``name`` is the logical Feast name for this source. If omitted, it defaults + to the value of ``collection``. At least one of ``name`` or ``collection`` + must be supplied. + + ``database`` is the MongoDB database that contains the collection. When + omitted it falls back to ``MongoDBOfflineStoreManyConfig.database`` at query + time, so a single store-level default can be shared across many sources. + + ``schema_sample_size`` controls how many documents are randomly sampled + when Feast infers the collection schema (used by ``feast apply`` and + ``get_table_column_names_and_types``). Increase it for collections with + highly variable document shapes; decrease it to speed up ``feast apply`` + at the cost of schema coverage. + """ + + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.CUSTOM_SOURCE + + def __init__( + self, + name: Optional[str] = None, + database: Optional[str] = None, + collection: Optional[str] = None, + timestamp_field: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", + schema_sample_size: int = 100, + ): + if name is None and collection is None: + raise DataSourceNoNameException() + # At least one of name / collection is non-None; cast to satisfy the type checker. + name = cast(str, name or collection) + + self._mongodb_options = MongoDBOptionsMany( + database=database or "", + collection=collection or name, + ) + self._schema_sample_size = schema_sample_size + + super().__init__( + name=name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + description=description, + tags=tags, + owner=owner, + ) + + def __hash__(self): + return super().__hash__() + + def __eq__(self, other): + if not isinstance(other, MongoDBSourceMany): + raise TypeError( + "Comparisons should only involve MongoDBSourceMany class objects." + ) + return ( + super().__eq__(other) + and self._mongodb_options._database == other._mongodb_options._database + and self._mongodb_options._collection == other._mongodb_options._collection + and self.timestamp_field == other.timestamp_field + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def database(self) -> str: + return self._mongodb_options._database + + @property + def collection(self) -> str: + return self._mongodb_options._collection + + @staticmethod + def from_proto(data_source: DataSourceProto) -> "MongoDBSourceMany": + assert data_source.HasField("custom_options") + options = json.loads(data_source.custom_options.configuration) + return MongoDBSourceMany( + name=data_source.name, + database=options["database"], + collection=options["collection"], + field_mapping=dict(data_source.field_mapping), + timestamp_field=data_source.timestamp_field, + created_timestamp_column=data_source.created_timestamp_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + + def _to_proto_impl(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + name=self.name, + type=DataSourceProto.CUSTOM_SOURCE, + data_source_class_type="feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many.MongoDBSourceMany", + field_mapping=self.field_mapping, + custom_options=self._mongodb_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, + ) + data_source_proto.timestamp_field = self.timestamp_field + data_source_proto.created_timestamp_column = self.created_timestamp_column + return data_source_proto + + def validate(self, config: RepoConfig): + # No upfront schema validation is required for MongoDB; the connection + # is exercised lazily when features are actually retrieved. + pass + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return mongodb_to_feast_value_type + + def get_table_query_string(self) -> str: + return f"{self._mongodb_options._database}.{self._mongodb_options._collection}" + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + """Sample documents from the collection to infer field names and their Feast type strings. + + Uses ``$sample`` to fetch up to ``schema_sample_size`` documents, then + picks the most-frequent Python type observed per field. The ``_id`` + field is always excluded. + """ + if MongoClient is None: + raise FeastExtrasDependencyImportError( + "mongodb", "pymongo is not installed." + ) + connection_string = config.offline_store.connection_string + db_name = self.database or config.offline_store.database + client: Any = MongoClient(connection_string, tz_aware=True) + try: + docs = list( + client[db_name][self.collection].aggregate( + [{"$sample": {"size": self._schema_sample_size}}] + ) + ) + finally: + client.close() + + field_type_counts: Dict[str, Dict[str, int]] = {} + for doc in docs: + for field, value in doc.items(): + if field == "_id": + continue + type_str = _infer_python_type_str(value) + if type_str is None: + continue + field_type_counts.setdefault(field, {}) + field_type_counts[field][type_str] = ( + field_type_counts[field].get(type_str, 0) + 1 + ) + + return [ + (field, max(counts, key=lambda t: counts[t])) + for field, counts in field_type_counts.items() + ] + + +class SavedDatasetMongoDBStorageMany(SavedDatasetStorage): + """Persists a Feast SavedDataset into a MongoDB collection (many-collection schema).""" + + _proto_attr_name = "custom_storage" + + mongodb_options: MongoDBOptionsMany + + def __init__(self, database: str, collection: str): + self.mongodb_options = MongoDBOptionsMany( + database=database, + collection=collection, + ) + + @staticmethod + def from_proto( + storage_proto: SavedDatasetStorageProto, + ) -> "SavedDatasetMongoDBStorageMany": + options = json.loads(storage_proto.custom_storage.configuration) + return SavedDatasetMongoDBStorageMany( + database=options["database"], + collection=options["collection"], + ) + + def to_proto(self) -> SavedDatasetStorageProto: + return SavedDatasetStorageProto(custom_storage=self.mongodb_options.to_proto()) + + def to_data_source(self) -> DataSource: + return MongoDBSourceMany( + database=self.mongodb_options._database, + collection=self.mongodb_options._collection, + ) + + +# --------------------------------------------------------------------------- +# Offline store configuration and implementation +# --------------------------------------------------------------------------- + + +class MongoDBOfflineStoreManyConfig(FeastConfigBaseModel): + """Configuration for the MongoDB offline store (one collection per FeatureView).""" + + type: StrictStr = "feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many.MongoDBOfflineStoreMany" + """Offline store type selector""" + + connection_string: StrictStr = "mongodb://localhost:27017" + """MongoDB connection URI""" + + database: StrictStr = "feast" + """Default MongoDB database name""" + + +class MongoDBOfflineStoreMany(OfflineStore): + """Offline store backed by MongoDB (one collection per FeatureView). + + Uses Ibis memtables for point-in-time joins. Each FeatureView's data is stored + in a separate MongoDB collection, with the collection name matching the source name. + """ + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + if not isinstance(data_source, MongoDBSourceMany): + raise ValueError( + f"MongoDBOfflineStoreMany expected a MongoDBSourceMany, " + f"got {type(data_source).__name__!r}." + ) + warnings.warn( + "MongoDB offline store is in preview. API may change without notice.", + RuntimeWarning, + ) + + # Ensure index exists for efficient queries + if MongoClient is not None: + connection_string = config.offline_store.connection_string + db_name = data_source.database or config.offline_store.database + client: Any = MongoClient( + connection_string, driver=DRIVER_METADATA, tz_aware=True + ) + try: + _ensure_index_many( + client=client, + db_name=db_name, + collection_name=data_source.collection, + join_keys=join_key_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + ) + finally: + client.close() + + return pull_latest_from_table_or_query_ibis( + config=config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + data_source_reader=_build_data_source_reader(config), + data_source_writer=_build_data_source_writer(config), # type: ignore[arg-type] + ) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + warnings.warn( + "MongoDB offline store is in preview. API may change without notice.", + RuntimeWarning, + ) + return get_historical_features_ibis( + config=config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=registry, + project=project, + full_feature_names=full_feature_names, + data_source_reader=_build_data_source_reader(config), + data_source_writer=_build_data_source_writer(config), # type: ignore[arg-type] + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + if not isinstance(data_source, MongoDBSourceMany): + raise ValueError( + f"MongoDBOfflineStoreMany expected a MongoDBSourceMany, " + f"got {type(data_source).__name__!r}." + ) + warnings.warn( + "MongoDB offline store is in preview. API may change without notice.", + RuntimeWarning, + ) + return pull_all_from_table_or_query_ibis( + config=config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + data_source_reader=_build_data_source_reader(config), + data_source_writer=_build_data_source_writer(config), # type: ignore[arg-type] + ) + + +def _build_data_source_reader(config: RepoConfig) -> Callable[[DataSource, str], Table]: + """Return a closure that fetches a MongoDB collection as an ibis in-memory table.""" + + def reader(data_source: DataSource, repo_path: str) -> Table: + if MongoClient is None: + raise FeastExtrasDependencyImportError( + "mongodb", "pymongo is not installed." + ) + if not isinstance(data_source, MongoDBSourceMany): + raise ValueError( + f"MongoDBOfflineStoreMany reader expected a MongoDBSourceMany, " + f"got {type(data_source).__name__!r}." + ) + connection_string = config.offline_store.connection_string + db_name = data_source.database or config.offline_store.database + client: Any = MongoClient(connection_string, driver=DRIVER_METADATA) + try: + docs = list(client[db_name][data_source.collection].find({}, {"_id": 0})) + finally: + client.close() + + df = pd.DataFrame(docs) + if df.empty: + return ibis.memtable(df) + + # Localize naive datetime columns to UTC. MongoDB stores all dates as UTC, + # and with tz_aware=False (default), pymongo returns naive datetime objects. + # We convert them to timezone-aware UTC timestamps for pyarrow compatibility. + for col in df.columns: + if df[col].dtype == object and len(df[col].dropna()) > 0: + sample = df[col].dropna().iloc[0] + if isinstance(sample, datetime): + try: + df[col] = pd.to_datetime(df[col], utc=True) + except (ValueError, TypeError): + # Skip columns that can't be converted (e.g., mixed types) + pass + + return ibis.memtable(df) + + return reader + + +# Track which collections have had indexes ensured (module-level cache) +_indexes_ensured: set = set() + + +def _ensure_index_many( + client: Any, + db_name: str, + collection_name: str, + join_keys: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, +) -> None: + """Create recommended index on a Many-schema collection. + + Index is on: join_keys (ascending) + timestamp (descending) + created_at (descending). + Uses a module-level cache to avoid redundant index creation checks. + """ + cache_key = f"{db_name}.{collection_name}" + if cache_key in _indexes_ensured: + return + + coll = client[db_name][collection_name] + + # Build index key: join_keys (asc) + timestamp (desc) + created_at (desc) + index_keys = [(k, 1) for k in join_keys] + index_keys.append((timestamp_field, -1)) + if created_timestamp_column: + index_keys.append((created_timestamp_column, -1)) + + # Check if equivalent index already exists + existing_indexes = coll.index_information() + for idx_info in existing_indexes.values(): + if idx_info.get("key") == index_keys: + _indexes_ensured.add(cache_key) + return + + # Create the index + coll.create_index(index_keys, background=True) + _indexes_ensured.add(cache_key) + + +def _build_data_source_writer( + config: RepoConfig, +) -> Callable[[Table, DataSource, str, str, bool], None]: + """Return a closure that writes an ibis table to a MongoDB collection.""" + + def writer( + table: Table, + data_source: DataSource, + repo_path: str, + mode: str = "append", + allow_overwrite: bool = False, + ) -> None: + if MongoClient is None: + raise FeastExtrasDependencyImportError( + "mongodb", "pymongo is not installed." + ) + if not isinstance(data_source, MongoDBSourceMany): + raise ValueError( + f"MongoDBOfflineStoreMany writer expected a MongoDBSourceMany, " + f"got {type(data_source).__name__!r}." + ) + connection_string = config.offline_store.connection_string + db_name = data_source.database or config.offline_store.database + location = f"{db_name}.{data_source.collection}" + client: Any = MongoClient( + connection_string, driver=DRIVER_METADATA, tz_aware=True + ) + try: + coll = client[db_name][data_source.collection] + if mode == "overwrite": + if not allow_overwrite and coll.estimated_document_count() > 0: + raise SavedDatasetLocationAlreadyExists(location=location) + coll.drop() + records = table.to_pyarrow().to_pylist() + if records: + coll.insert_many(records) + finally: + client.close() + + return writer diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_one.py b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_one.py new file mode 100644 index 00000000000..3c1aeaf7081 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_one.py @@ -0,0 +1,818 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Native MongoDB Offline Store Implementation. + +This module implements a MongoDB offline store using native MQL aggregation +pipelines. It uses a single-collection schema where all feature views share +one collection. It is event-based: each document represents an observation +of a FeatureView at a specific point in time. Each document may contain a +subset (0 or more) of the features defined in that FeatureView, all sharing +a single event_timestamp. + +Collection Index: + db.feature_history.create_index([ + ("feature_view", ASCENDING), + ("entity_id", ASCENDING), + ("event_timestamp", DESCENDING), + ]) + +Document Schema (example): + { + "_id": ObjectId(), + "entity_id": "", + "feature_view": "driver_stats", + "features": { + "rating": 4.91, + "trips_last_7d": 132 + }, + "event_timestamp": ISODate("2026-01-20T12:00:00Z"), + "created_at": ISODate("2026-01-20T12:00:05Z") + } + +Feature Freshness Semantics: + This implementation operates at *document-level freshness*, not + per-feature freshness. During retrieval (e.g. point-in-time joins), + the system selects the most recent document for a given + (entity_id, feature_view) that satisfies time constraints, and then + extracts all requested features from that document. + + As a result, if a newer document contains only a subset of features, + missing features will be returned as NULLβ€”even if older documents + contained values for those features. The system does not backfill + individual feature values from earlier events. + + This behavior matches common Feast offline store semantics, but may + differ from systems that compute "latest value per feature". + +Schema Evolution ("Feature Creep"): + Because features are stored in a flexible subdocument, different + documents for the same FeatureView may contain different sets of + feature fields over time. This supports: + - adding new features without backfilling historical data + - partial writes or sparse feature computation + + However, it also implies: + - newly added features will be NULL for older events + - partially populated documents may lead to NULL values even + when older data contained those features + + Users should ensure that feature computation pipelines write + complete feature sets when consistent availability is required. + +Notes: + - Entity keys are serialized to ensure consistency with Feast’s + online store and to avoid type ambiguity. + - Point-in-time correctness is enforced per FeatureView. + - TTL (time-to-live) constraints are applied per FeatureView during + historical retrieval. +""" + +import json +import warnings +from datetime import datetime, timezone +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterable, + List, + Optional, + Tuple, + Union, +) + +import pandas as pd +import pyarrow + +try: + from pymongo import MongoClient +except ImportError: + MongoClient = None # type: ignore[assignment,misc] + +from pydantic import StrictStr + +from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException, FeastExtrasDependencyImportError +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.offline_stores.contrib.mongodb_offline_store import DRIVER_METADATA +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.offline_stores.offline_utils import ( + infer_event_timestamp_from_entity_df, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.type_map import mongodb_to_feast_value_type +from feast.value_type import ValueType + + +class MongoDBOfflineStoreOneConfig(FeastConfigBaseModel): + """Configuration for the MongoDB offline store (single shared collection).""" + + type: StrictStr = "feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one.MongoDBOfflineStoreOne" + """Offline store type selector""" + + connection_string: StrictStr = "mongodb://localhost:27017" + """MongoDB connection URI""" + + database: StrictStr = "feast" + """MongoDB database name""" + + collection: StrictStr = "feature_history" + """Single collection name for all feature views""" + + +class MongoDBSourceOne(DataSource): + """A MongoDB data source for the single-collection offline store. + + Unlike MongoDBSourceMany, this source does not map each FeatureView to + its own collection. Instead, all FeatureViews share a single MongoDB + collection (configured at the store level). + + Each document in that collection includes a ``feature_view`` field that + identifies which FeatureView it belongs to. The ``name`` of this data + source corresponds to that value and is used to filter documents during + queries. + """ + + def __init__( + self, + name: Optional[str] = None, + timestamp_field: str = "event_timestamp", + created_timestamp_column: str = "created_at", + field_mapping: Optional[Dict[str, str]] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", + ): + if name is None: + raise DataSourceNoNameException() + + super().__init__( + name=name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + description=description, + tags=tags, + owner=owner, + ) + + def __hash__(self): + return super().__hash__() + + def __eq__(self, other): + if not isinstance(other, MongoDBSourceOne): + raise TypeError( + "Comparisons should only involve MongoDBSourceOne class objects." + ) + return ( + super().__eq__(other) + and self.timestamp_field == other.timestamp_field + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def feature_view_name(self) -> str: + """The feature_view discriminator value (same as source name).""" + return self.name + + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.CUSTOM_SOURCE + + @staticmethod + def from_proto(data_source: DataSourceProto) -> "MongoDBSourceOne": + assert data_source.HasField("custom_options") + return MongoDBSourceOne( + name=data_source.name, + timestamp_field=data_source.timestamp_field, + created_timestamp_column=data_source.created_timestamp_column, + field_mapping=dict(data_source.field_mapping), + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + + def _to_proto_impl(self) -> DataSourceProto: + return DataSourceProto( + name=self.name, + type=DataSourceProto.CUSTOM_SOURCE, + data_source_class_type="feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one.MongoDBSourceOne", + field_mapping=self.field_mapping, + custom_options=DataSourceProto.CustomSourceOptions( + configuration=json.dumps({"feature_view": self.name}).encode() + ), + description=self.description, + tags=self.tags, + owner=self.owner, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, + ) + + def validate(self, config: RepoConfig): + pass + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return mongodb_to_feast_value_type + + def get_table_query_string(self) -> str: + return f"feature_history[feature_view={self.name}]" + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + """Sample documents to infer feature names and types. + + Queries documents matching this source's feature_view name and + inspects the ``features`` subdocument to determine schema. + """ + if MongoClient is None: + raise FeastExtrasDependencyImportError( + "mongodb", "pymongo is not installed." + ) + connection_string = config.offline_store.connection_string + db_name = config.offline_store.database + collection_name = config.offline_store.collection + client: Any = MongoClient(connection_string, driver=DRIVER_METADATA) + try: + pipeline = [ + {"$match": {"feature_view": self.name}}, + {"$sample": {"size": 100}}, + ] + docs = list(client[db_name][collection_name].aggregate(pipeline)) + finally: + client.close() + + field_type_counts: Dict[str, Dict[str, int]] = {} + for doc in docs: + features = doc.get("features", {}) + for field, value in features.items(): + type_str = _infer_python_type_str(value) + if type_str is None: + continue + field_type_counts.setdefault(field, {}) + field_type_counts[field][type_str] = ( + field_type_counts[field].get(type_str, 0) + 1 + ) + + return [ + (field, max(counts, key=lambda t: counts[t])) + for field, counts in field_type_counts.items() + ] + + +def _infer_python_type_str(value: Any) -> Optional[str]: + """Infer a Feast-compatible type string from a Python value.""" + if value is None: + return None + if isinstance(value, bool): + return "bool" + if isinstance(value, int): + return "int" + if isinstance(value, float): + return "float" + if isinstance(value, str): + return "str" + if isinstance(value, bytes): + return "bytes" + if isinstance(value, datetime): + return "datetime" + if isinstance(value, list): + if not value: + return "list[str]" + elem_type = _infer_python_type_str(value[0]) + if elem_type: + return f"list[{elem_type}]" + return "list[str]" + return None + + +def _fetch_documents( + client: Any, + database: str, + collection: str, + pipeline: List[Dict], +) -> List[Dict]: + """Execute an aggregation pipeline and return documents.""" + return list(client[database][collection].aggregate(pipeline)) + + +class MongoDBOneRetrievalJob(RetrievalJob): + """Retrieval job for native MongoDB offline store queries.""" + + def __init__( + self, + query_fn: Callable[[], pyarrow.Table], + full_feature_names: bool, + on_demand_feature_views: Optional[List[Any]] = None, + metadata: Optional[RetrievalMetadata] = None, + ): + self._query_fn = query_fn + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views or [] + self._metadata = metadata + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> List[Any]: + return self._on_demand_feature_views + + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + return self._to_arrow_internal(timeout).to_pandas() + + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: + return self._query_fn() + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + def persist( + self, + storage: Any, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ) -> None: + # TODO: Implement persist for native store + raise NotImplementedError("persist() not yet implemented for native store") + + +def _serialize_entity_key_from_row( + row: pd.Series, join_keys: List[str], entity_key_serialization_version: int +) -> bytes: + """Serialize entity key from a DataFrame row.""" + entity_key = EntityKeyProto() + for key in sorted(join_keys): + entity_key.join_keys.append(key) + value = row[key] + val = ValueProto() + if isinstance(value, int): + val.int64_val = value + elif isinstance(value, str): + val.string_val = value + elif isinstance(value, float): + val.double_val = value + else: + val.string_val = str(value) + entity_key.entity_values.append(val) + return serialize_entity_key(entity_key, entity_key_serialization_version) + + +class MongoDBOfflineStoreOne(OfflineStore): + """Native MongoDB offline store using single-collection schema. + + All feature views share one collection (``feature_history``), with documents + containing: + - ``entity_id``: serialized entity key (bytes) + - ``feature_view``: field matching FeatureView name + - ``features``: subdocument with feature name/value pairs + - ``event_timestamp``: event time + - ``created_at``: ingestion time + """ + + _index_initialized: bool = False + + @staticmethod + def _ensure_indexes(client: Any, db_name: str, collection_name: str) -> None: + """Create recommended indexes on the feature_history collection. + + Uses create_index with background=True. If index already exists + (with same or different name), this is a no-op. + """ + collection = client[db_name][collection_name] + # Check if an equivalent index already exists + existing_indexes = collection.index_information() + target_key = [("entity_id", 1), ("feature_view", 1), ("event_timestamp", -1)] + + for idx_info in existing_indexes.values(): + if idx_info.get("key") == target_key: + return # Index already exists + + collection.create_index( + target_key, + name="entity_fv_ts_idx", + background=True, + ) + + @classmethod + def _get_client_and_ensure_indexes(cls, config: RepoConfig) -> Any: + """Get a MongoClient and ensure indexes exist (once per process).""" + if MongoClient is None: + raise FeastExtrasDependencyImportError( + "mongodb", "pymongo is not installed." + ) + client: Any = MongoClient( + config.offline_store.connection_string, driver=DRIVER_METADATA + ) + + if not cls._index_initialized: + cls._ensure_indexes( + client, + config.offline_store.database, + config.offline_store.collection, + ) + cls._index_initialized = True + + return client + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + if not isinstance(data_source, MongoDBSourceOne): + raise ValueError( + f"MongoDBOfflineStoreOne expected MongoDBSourceOne, " + f"got {type(data_source).__name__!r}." + ) + warnings.warn( + "MongoDB offline store (native) is in preview. API may change without notice.", + RuntimeWarning, + ) + + db_name = config.offline_store.database + collection = config.offline_store.collection + feature_view_name = data_source.feature_view_name + + start_utc = start_date.astimezone(tz=timezone.utc) + end_utc = end_date.astimezone(tz=timezone.utc) + + # Build projection to flatten features subdoc to top-level fields + project_stage: Dict[str, Any] = { + "_id": 0, + "entity_id": "$doc.entity_id", + "event_timestamp": "$doc.event_timestamp", + } + if created_timestamp_column: + project_stage["created_at"] = "$doc.created_at" + for feat in feature_name_columns: + project_stage[feat] = f"$doc.features.{feat}" + + # Build aggregation pipeline + pipeline: List[Dict[str, Any]] = [ + { + "$match": { + "feature_view": feature_view_name, + "event_timestamp": {"$gte": start_utc, "$lte": end_utc}, + } + }, + {"$sort": {"entity_id": 1, "event_timestamp": -1, "created_at": -1}}, + { + "$group": { + "_id": "$entity_id", + "doc": {"$first": "$$ROOT"}, + } + }, + {"$project": project_stage}, + ] + + def _run() -> pyarrow.Table: + client = MongoDBOfflineStoreOne._get_client_and_ensure_indexes(config) + try: + docs = _fetch_documents(client, db_name, collection, pipeline) + if not docs: + return pyarrow.Table.from_pydict({}) + + df = pd.DataFrame(docs) + if not df.empty and "event_timestamp" in df.columns: + if df["event_timestamp"].dt.tz is None: + df["event_timestamp"] = pd.to_datetime( + df["event_timestamp"], utc=True + ) + return pyarrow.Table.from_pandas(df, preserve_index=False) + finally: + client.close() + + return MongoDBOneRetrievalJob(query_fn=_run, full_feature_names=False) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + if not isinstance(data_source, MongoDBSourceOne): + raise ValueError( + f"MongoDBOfflineStoreOne expected MongoDBSourceOne, " + f"got {type(data_source).__name__!r}." + ) + warnings.warn( + "MongoDB offline store (native) is in preview. API may change without notice.", + RuntimeWarning, + ) + + db_name = config.offline_store.database + collection = config.offline_store.collection + feature_view_name = data_source.feature_view_name + + # Build match filter: feature_view + optional time range + match_filter: Dict[str, Any] = {"feature_view": feature_view_name} + if start_date or end_date: + ts_filter: Dict[str, Any] = {} + if start_date: + ts_filter["$gte"] = start_date.astimezone(tz=timezone.utc) + if end_date: + ts_filter["$lte"] = end_date.astimezone(tz=timezone.utc) + match_filter["event_timestamp"] = ts_filter + + # Build projection: flatten features subdoc to top-level fields + # This uses $getField to extract each feature from the features subdoc + project_stage: Dict[str, Any] = { + "_id": 0, + "entity_id": 1, + "event_timestamp": 1, + } + if created_timestamp_column: + project_stage["created_at"] = 1 + for feat in feature_name_columns: + project_stage[feat] = f"$features.{feat}" + + # Simple range scan pipeline - no sorting for efficiency + pipeline: List[Dict[str, Any]] = [ + {"$match": match_filter}, + {"$project": project_stage}, + ] + + def _run() -> pyarrow.Table: + client = MongoDBOfflineStoreOne._get_client_and_ensure_indexes(config) + try: + docs = _fetch_documents(client, db_name, collection, pipeline) + if not docs: + return pyarrow.Table.from_pydict({}) + + df = pd.DataFrame(docs) + if not df.empty and "event_timestamp" in df.columns: + if df["event_timestamp"].dt.tz is None: + df["event_timestamp"] = pd.to_datetime( + df["event_timestamp"], utc=True + ) + return pyarrow.Table.from_pandas(df, preserve_index=False) + finally: + client.close() + + return MongoDBOneRetrievalJob(query_fn=_run, full_feature_names=False) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + """Fetch historical features using a "fetch + pandas join" strategy. + + Instead of using $lookup (which scales poorly), this: + 1. Extracts unique entity_ids and computes timestamp bounds + 2. Fetches all matching feature data in batched queries + 3. Uses pd.merge_asof for efficient point-in-time joins in Python + + For large entity_df, processing is chunked to bound memory usage. + """ + if isinstance(entity_df, str): + raise ValueError( + "MongoDBOfflineStoreOne does not support SQL entity_df strings. " + "Pass a pandas DataFrame instead." + ) + warnings.warn( + "MongoDB offline store (native) is in preview. API may change without notice.", + RuntimeWarning, + ) + + db_name = config.offline_store.database + feature_collection = config.offline_store.collection + entity_key_version = config.entity_key_serialization_version + + entity_schema = dict(zip(entity_df.columns, entity_df.dtypes)) + event_timestamp_col = infer_event_timestamp_from_entity_df(entity_schema) + + # Map "feature_view:feature" refs β†’ {fv_name: [feature, ...]} + fv_to_features: Dict[str, List[str]] = {} + for ref in feature_refs: + fv_name, feat_name = ref.split(":", 1) + fv_to_features.setdefault(fv_name, []).append(feat_name) + + fv_names = list(fv_to_features.keys()) + fv_by_name = {fv.name: fv for fv in feature_views} + + # Chunk size for entity_df processing (bounds memory usage) + CHUNK_SIZE = 50_000 + # Batch size for MongoDB $in queries + MONGO_BATCH_SIZE = 10_000 + + def _chunk_dataframe( + df: pd.DataFrame, size: int + ) -> Generator[pd.DataFrame, None, None]: + """Yield successive chunks of a DataFrame.""" + for i in range(0, len(df), size): + yield df.iloc[i : i + size] + + def _run_single(entity_subset_df: pd.DataFrame, coll: Any) -> pd.DataFrame: + """Process a single chunk of entity_df and return joined features. + + Args: + entity_subset_df: Chunk of entity DataFrame to process + coll: MongoDB collection object (reused across chunks) + """ + # Prepare entity_df: ensure timestamps are UTC + result = entity_subset_df.copy() + # Convert timestamp column to datetime if needed + if not pd.api.types.is_datetime64_any_dtype(result[event_timestamp_col]): + result[event_timestamp_col] = pd.to_datetime( + result[event_timestamp_col], utc=True + ) + elif result[event_timestamp_col].dt.tz is None: + result[event_timestamp_col] = pd.to_datetime( + result[event_timestamp_col], utc=True + ) + + # Get join keys (all columns except event_timestamp and internal columns) + entity_columns = [ + c + for c in result.columns + if c != event_timestamp_col and not c.startswith("_") + ] + + # Serialize entity keys to bytes (same format as online store) + result["_entity_id"] = result.apply( + lambda row: _serialize_entity_key_from_row( + row, entity_columns, entity_key_version + ), + axis=1, + ) + + # Extract unique entity_ids and timestamp bounds for this chunk + unique_entity_ids = result["_entity_id"].unique().tolist() + max_ts = result[event_timestamp_col].max() + + # Fetch feature data in batches + all_feature_docs: List[Dict] = [] + + for i in range(0, len(unique_entity_ids), MONGO_BATCH_SIZE): + batch_ids = unique_entity_ids[i : i + MONGO_BATCH_SIZE] + + query = { + "entity_id": {"$in": batch_ids}, + "feature_view": {"$in": fv_names}, + "event_timestamp": {"$lte": max_ts}, + } + docs = list(coll.find(query, {"_id": 0})) + all_feature_docs.extend(docs) + + # Handle empty result + if not all_feature_docs: + for fv_name, features in fv_to_features.items(): + for feat in features: + col_name = f"{fv_name}__{feat}" if full_feature_names else feat + result[col_name] = None + return result.drop(columns=["_entity_id"]) + + # Convert to DataFrame and flatten features subdoc + feature_df = pd.DataFrame(all_feature_docs) + feature_df = feature_df.rename(columns={"entity_id": "_entity_id"}) + + if "features" in feature_df.columns: + features_expanded = pd.json_normalize(feature_df["features"]) + feature_df = pd.concat( + [feature_df.drop(columns=["features"]), features_expanded], axis=1 + ) + + if feature_df["event_timestamp"].dt.tz is None: + feature_df["event_timestamp"] = pd.to_datetime( + feature_df["event_timestamp"], utc=True + ) + + # Sort result for merge_asof + result = result.sort_values(event_timestamp_col).reset_index(drop=True) + + # Perform PIT join for each feature view + for fv_name, features in fv_to_features.items(): + fv = fv_by_name.get(fv_name) + fv_df = feature_df[feature_df["feature_view"] == fv_name].copy() + + if fv_df.empty: + for feat in features: + col_name = f"{fv_name}__{feat}" if full_feature_names else feat + result[col_name] = None + continue + + fv_df = fv_df.sort_values("event_timestamp").reset_index(drop=True) + + merge_cols = ["_entity_id", "event_timestamp"] + [ + f for f in features if f in fv_df.columns + ] + fv_df_subset = fv_df[ + [c for c in merge_cols if c in fv_df.columns] + ].copy() + fv_df_subset = fv_df_subset.rename( + columns={"event_timestamp": "_fv_ts"} + ) + + result = pd.merge_asof( + result, + fv_df_subset, + left_on=event_timestamp_col, + right_on="_fv_ts", + by="_entity_id", + direction="backward", + ) + + # Apply TTL + if fv and fv.ttl: + cutoff = result[event_timestamp_col] - fv.ttl + stale_mask = result["_fv_ts"] < cutoff + for feat in features: + if feat in result.columns: + result.loc[stale_mask, feat] = None + + # Rename features if full_feature_names + for feat in features: + if feat in result.columns and full_feature_names: + result = result.rename(columns={feat: f"{fv_name}__{feat}"}) + elif feat not in result.columns: + col_name = f"{fv_name}__{feat}" if full_feature_names else feat + result[col_name] = None + + result = result.drop(columns=["_fv_ts"], errors="ignore") + + return result.drop(columns=["_entity_id"], errors="ignore") + + def _run() -> pyarrow.Table: + # Add row index to preserve original ordering + working_df = entity_df.copy() + working_df["_row_idx"] = range(len(working_df)) + + # Create client once for all chunks + client = MongoDBOfflineStoreOne._get_client_and_ensure_indexes(config) + try: + coll = client[db_name][feature_collection] + + if len(working_df) <= CHUNK_SIZE: + # Small workload: process in single pass + result_df = _run_single(working_df, coll) + else: + # Large workload: process in chunks + chunk_results = [] + for chunk in _chunk_dataframe(working_df, CHUNK_SIZE): + chunk_results.append(_run_single(chunk, coll)) + + result_df = pd.concat(chunk_results, ignore_index=True) + finally: + client.close() + + # Restore original ordering and remove index column + result_df = result_df.sort_values("_row_idx").reset_index(drop=True) + result_df = result_df.drop(columns=["_row_idx"], errors="ignore") + + # Ensure timestamp column is tz-aware + if not result_df.empty and event_timestamp_col in result_df.columns: + if result_df[event_timestamp_col].dt.tz is None: + result_df[event_timestamp_col] = pd.to_datetime( + result_df[event_timestamp_col], utc=True + ) + + return pyarrow.Table.from_pandas(result_df, preserve_index=False) + + return MongoDBOneRetrievalJob( + query_fn=_run, + full_feature_names=full_feature_names, + ) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 5e77f532c9c..b383963c0c9 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -1762,6 +1762,30 @@ def cb_columnar_type_to_feast_value_type(type_str: str) -> ValueType: return value +def mongodb_to_feast_value_type(type_str: str) -> ValueType: + """Map a Python type string (as inferred from pymongo documents) to a Feast ValueType. + + The type strings are produced by + ``feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_source._infer_python_type_str``. + Unrecognised strings are mapped to ``ValueType.UNKNOWN``. + """ + type_map: Dict[str, ValueType] = { + "str": ValueType.STRING, + "int": ValueType.INT64, + "float": ValueType.DOUBLE, + "bool": ValueType.BOOL, + "bytes": ValueType.BYTES, + "datetime": ValueType.UNIX_TIMESTAMP, + "list[str]": ValueType.STRING_LIST, + "list[int]": ValueType.INT64_LIST, + "list[float]": ValueType.DOUBLE_LIST, + "list[bool]": ValueType.BOOL_LIST, + "list[bytes]": ValueType.BYTES_LIST, + "list[datetime]": ValueType.UNIX_TIMESTAMP_LIST, + } + return type_map.get(type_str, ValueType.UNKNOWN) + + def convert_scalar_column( series: pd.Series, value_type: ValueType, target_pandas_type: str ) -> pd.Series: diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 1ad76b978e4..d5ad19660b7 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -21,6 +21,7 @@ markers = cloud: Tests requiring cloud credentials local_only: Tests that run entirely locally xdist_group: Group tests to run in the same xdist worker + mongodb: Tests requiring MongoDB timeout = 300 timeout_method = thread diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/__init__.py b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/benchmark.py b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/benchmark.py new file mode 100644 index 00000000000..fa7f99e06bf --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/benchmark.py @@ -0,0 +1,838 @@ +""" +Performance benchmarks comparing MongoDB offline store implementations: Many vs One. + +- Many: One collection per FeatureView (MongoDBOfflineStoreMany) +- One: Single shared collection for all FeatureViews (MongoDBOfflineStoreOne) + +These tests measure performance across different scaling dimensions: +1. Row count scaling (entity_df size) +2. Feature width scaling (features per FeatureView) +3. Entity distribution (unique vs skewed/repeated entity_ids) + +Metrics captured: +- Runtime (wall clock) +- Memory (peak Python memory via tracemalloc) +- MongoDB server metrics (opcounters, execution stats) + +Run with: pytest benchmark.py -v -s +Skip slow tests: pytest benchmark.py -v -s -m "not slow" +""" + +import time +import tracemalloc +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any, Dict, Generator, Optional + +import pandas as pd +import pytest +import pytz + +pytest.importorskip("pymongo") + +from unittest.mock import MagicMock + +from pymongo import MongoClient +from testcontainers.mongodb import MongoDbContainer + +from feast import Entity, FeatureView, Field +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many import ( + MongoDBOfflineStoreMany, + MongoDBOfflineStoreManyConfig, + MongoDBSourceMany, +) +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one import ( + MongoDBOfflineStoreOne, + MongoDBOfflineStoreOneConfig, + MongoDBSourceOne, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig +from feast.types import Float64, Int64 +from feast.value_type import ValueType + +# Check if Docker is available +docker_available = False +try: + import docker + + try: + client = docker.from_env() + client.ping() + docker_available = True + except Exception: + pass +except ImportError: + pass + +_requires_docker = pytest.mark.skipif( + not docker_available, + reason="Docker is not available or not running.", +) + +ENTITY_KEY_VERSION = 3 + + +@dataclass +class BenchmarkResult: + """Container for benchmark results.""" + + implementation: str + test_name: str + dimension: str + value: int + duration_seconds: float + rows_per_second: float + peak_memory_mb: float = 0.0 + mongo_docs_examined: int = 0 + mongo_keys_examined: int = 0 + mongo_execution_time_ms: int = 0 + + +@dataclass +class MongoMetrics: + """MongoDB server metrics captured before/after a query.""" + + opcounters: Dict[str, int] = field(default_factory=dict) + docs_examined: int = 0 + keys_examined: int = 0 + + @staticmethod + def capture(client: Any) -> "MongoMetrics": + """Capture current MongoDB server metrics.""" + status = client.admin.command("serverStatus") + return MongoMetrics( + opcounters=dict(status.get("opcounters", {})), + ) + + def delta(self, after: "MongoMetrics") -> Dict[str, int]: + """Calculate delta between two metric snapshots.""" + return { + k: after.opcounters.get(k, 0) - self.opcounters.get(k, 0) + for k in after.opcounters + } + + +def _make_entity_id(driver_id: int) -> bytes: + """Create serialized entity key.""" + entity_key = EntityKeyProto() + entity_key.join_keys.append("driver_id") + val = ValueProto() + val.int64_val = driver_id + entity_key.entity_values.append(val) + return serialize_entity_key(entity_key, ENTITY_KEY_VERSION) + + +@pytest.fixture(scope="module") +def mongodb_container() -> Generator[MongoDbContainer, None, None]: + """Start a MongoDB container for benchmarks.""" + container = MongoDbContainer( + "mongo:latest", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + container.start() + yield container + container.stop() + + +@pytest.fixture +def mongodb_connection_string(mongodb_container: MongoDbContainer) -> str: + """Get MongoDB connection string.""" + exposed_port = mongodb_container.get_exposed_port(27017) + return f"mongodb://test:test@localhost:{exposed_port}" # pragma: allowlist secret + + +@pytest.fixture +def many_config(mongodb_connection_string: str) -> RepoConfig: + """RepoConfig for Many implementation (one collection per FeatureView).""" + return RepoConfig( + project="benchmark", + registry="memory://", + provider="local", + offline_store=MongoDBOfflineStoreManyConfig( + connection_string=mongodb_connection_string, + database="benchmark_db", + ), + online_store={"type": "sqlite"}, + entity_key_serialization_version=ENTITY_KEY_VERSION, + ) + + +@pytest.fixture +def one_config(mongodb_connection_string: str) -> RepoConfig: + """RepoConfig for One implementation (single shared collection).""" + return RepoConfig( + project="benchmark", + registry="memory://", + provider="local", + offline_store=MongoDBOfflineStoreOneConfig( + connection_string=mongodb_connection_string, + database="benchmark_db", + collection="feature_history", + ), + online_store={"type": "sqlite"}, + entity_key_serialization_version=ENTITY_KEY_VERSION, + ) + + +def _generate_many_data( + client: MongoClient, + db_name: str, + collection_name: str, + num_entities: int, + num_features: int, + rows_per_entity: int = 5, +) -> datetime: + """Generate test data for Many (one collection per FV, flat schema).""" + collection = client[db_name][collection_name] + collection.drop() + + now = datetime.now(tz=pytz.UTC) + docs = [] + + for entity_id in range(num_entities): + for row in range(rows_per_entity): + doc = { + "driver_id": entity_id, + "event_timestamp": now - timedelta(hours=row), + } + for f in range(num_features): + doc[f"feature_{f}"] = float(entity_id * 100 + f + row * 0.1) + docs.append(doc) + + collection.insert_many(docs) + return now + + +def _generate_one_data( + client: MongoClient, + db_name: str, + collection_name: str, + feature_view_name: str, + num_entities: int, + num_features: int, + rows_per_entity: int = 5, +) -> datetime: + """Generate test data for One (single collection, nested features).""" + collection = client[db_name][collection_name] + # Don't drop - may have multiple FVs in same collection + + now = datetime.now(tz=pytz.UTC) + docs = [] + + for entity_id in range(num_entities): + for row in range(rows_per_entity): + features = {} + for f in range(num_features): + features[f"feature_{f}"] = float(entity_id * 100 + f + row * 0.1) + + doc = { + "entity_id": _make_entity_id(entity_id), + "feature_view": feature_view_name, + "features": features, + "event_timestamp": now - timedelta(hours=row), + "created_at": now - timedelta(hours=row), + } + docs.append(doc) + + collection.insert_many(docs) + return now + + +def _create_many_fv(num_features: int) -> tuple: + """Create Many source and FeatureView.""" + source = MongoDBSourceMany( + name="driver_benchmark", + database="benchmark_db", + collection="driver_benchmark", + timestamp_field="event_timestamp", + ) + entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + + schema = [Field(name="driver_id", dtype=Int64)] + for f in range(num_features): + schema.append(Field(name=f"feature_{f}", dtype=Float64)) + + fv = FeatureView( + name="driver_benchmark", + entities=[entity], + schema=schema, + source=source, + ttl=timedelta(days=1), + ) + return source, fv + + +def _create_one_fv(num_features: int) -> tuple: + """Create One source and FeatureView.""" + source = MongoDBSourceOne( + name="driver_benchmark", + timestamp_field="event_timestamp", + ) + entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + + schema = [Field(name="driver_id", dtype=Int64)] + for f in range(num_features): + schema.append(Field(name=f"feature_{f}", dtype=Float64)) + + fv = FeatureView( + name="driver_benchmark", + entities=[entity], + schema=schema, + source=source, + ttl=timedelta(days=1), + ) + return source, fv + + +def _run_benchmark(func, name: str) -> float: + """Run a function and return elapsed time.""" + start = time.perf_counter() + func() # Execute the function + elapsed = time.perf_counter() - start + return elapsed + + +@dataclass +class FullBenchmarkResult: + """Full benchmark results with all metrics.""" + + elapsed_seconds: float + peak_memory_mb: float + mongo_opcounters_delta: Dict[str, int] + + +def _run_benchmark_full( + func, + mongo_client: Optional[Any] = None, +) -> FullBenchmarkResult: + """Run a benchmark capturing runtime, memory, and MongoDB metrics.""" + # Capture MongoDB metrics before + mongo_before = None + if mongo_client: + mongo_before = MongoMetrics.capture(mongo_client) + + # Start memory tracking + tracemalloc.start() + + # Run the benchmark + start = time.perf_counter() + func() + elapsed = time.perf_counter() - start + + # Capture peak memory + _, peak_memory = tracemalloc.get_traced_memory() + tracemalloc.stop() + peak_memory_mb = peak_memory / (1024 * 1024) + + # Capture MongoDB metrics after + mongo_delta = {} + if mongo_client and mongo_before: + mongo_after = MongoMetrics.capture(mongo_client) + mongo_delta = mongo_before.delta(mongo_after) + + return FullBenchmarkResult( + elapsed_seconds=elapsed, + peak_memory_mb=peak_memory_mb, + mongo_opcounters_delta=mongo_delta, + ) + + +def _print_benchmark_result( + impl: str, + dimension_name: str, + dimension_value: int, + result: FullBenchmarkResult, + num_rows: Optional[int] = None, +) -> None: + """Pretty print benchmark results.""" + print(f"\n[{impl}] {dimension_name}: {dimension_value:,}") + print(f" Time: {result.elapsed_seconds:.3f}s") + print(f" Memory: {result.peak_memory_mb:.1f} MB") + if num_rows: + rate = num_rows / result.elapsed_seconds if result.elapsed_seconds > 0 else 0 + print(f" Rate: {rate:,.0f} rows/s") + if result.mongo_opcounters_delta: + print(f" Mongo ops: {result.mongo_opcounters_delta}") + + +# ============================================================================= +# Test 1: Scale Rows (entity_df size) +# ============================================================================= + +ROW_COUNTS = [ + 1000, + 5000, + 10000, +] # Reduced for CI; use [10000, 50000, 100000, 500000] for full benchmark + + +@_requires_docker +@pytest.mark.parametrize("num_rows", ROW_COUNTS) +def test_scale_rows_many( + mongodb_connection_string: str, many_config: RepoConfig, num_rows: int +) -> None: + """Benchmark Many implementation with varying entity_df sizes. + + Measures: runtime, peak memory, MongoDB opcounters. + """ + num_features = 10 + num_entities = num_rows # One row per entity for simplicity + + client = MongoClient(mongodb_connection_string) + try: + now = _generate_many_data( + client, + "benchmark_db", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=3, + ) + + _, fv = _create_many_fv(num_features) + + entity_df = pd.DataFrame( + { + "driver_id": list(range(num_entities)), + "event_timestamp": [now] * num_entities, + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreMany.get_historical_features( + config=many_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + _print_benchmark_result("IBIS", "Rows", num_rows, result, num_rows=num_rows) + + finally: + client.close() + + +@_requires_docker +@pytest.mark.parametrize("num_rows", ROW_COUNTS) +def test_scale_rows_one( + mongodb_connection_string: str, one_config: RepoConfig, num_rows: int +) -> None: + """Benchmark One implementation with varying entity_df sizes. + + Measures: runtime, peak memory, MongoDB opcounters. + """ + num_features = 10 + num_entities = num_rows + + client = MongoClient(mongodb_connection_string) + try: + client["benchmark_db"]["feature_history"].drop() + now = _generate_one_data( + client, + "benchmark_db", + "feature_history", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=3, + ) + + _, fv = _create_one_fv(num_features) + + entity_df = pd.DataFrame( + { + "driver_id": list(range(num_entities)), + "event_timestamp": [now] * num_entities, + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreOne.get_historical_features( + config=one_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + _print_benchmark_result("NATIVE", "Rows", num_rows, result, num_rows=num_rows) + + finally: + client.close() + + +# ============================================================================= +# Test 2: Wide Feature Views (features per FV) +# ============================================================================= + +FEATURE_COUNTS = [10, 50, 100] # Use [50, 100, 150, 200] for full benchmark + + +@_requires_docker +@pytest.mark.parametrize("num_features", FEATURE_COUNTS) +def test_wide_features_many( + mongodb_connection_string: str, many_config: RepoConfig, num_features: int +) -> None: + """Benchmark Many with varying feature width.""" + num_entities = 1000 + + client = MongoClient(mongodb_connection_string) + try: + now = _generate_many_data( + client, + "benchmark_db", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=3, + ) + + _, fv = _create_many_fv(num_features) + + entity_df = pd.DataFrame( + { + "driver_id": list(range(num_entities)), + "event_timestamp": [now] * num_entities, + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreMany.get_historical_features( + config=many_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + _print_benchmark_result( + "IBIS", "Features", num_features, result, num_rows=num_entities + ) + + finally: + client.close() + + +@_requires_docker +@pytest.mark.parametrize("num_features", FEATURE_COUNTS) +def test_wide_features_one( + mongodb_connection_string: str, one_config: RepoConfig, num_features: int +) -> None: + """Benchmark One with varying feature width.""" + num_entities = 1000 + + client = MongoClient(mongodb_connection_string) + try: + client["benchmark_db"]["feature_history"].drop() + now = _generate_one_data( + client, + "benchmark_db", + "feature_history", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=3, + ) + + _, fv = _create_one_fv(num_features) + + entity_df = pd.DataFrame( + { + "driver_id": list(range(num_entities)), + "event_timestamp": [now] * num_entities, + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreOne.get_historical_features( + config=one_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + _print_benchmark_result( + "NATIVE", "Features", num_features, result, num_rows=num_entities + ) + + finally: + client.close() + + +# ============================================================================= +# Test 3: Skewed Entity Distribution +# ============================================================================= + + +@_requires_docker +@pytest.mark.parametrize("unique_ratio", [1.0, 0.5, 0.1]) # 100%, 50%, 10% unique +def test_entity_skew_many( + mongodb_connection_string: str, many_config: RepoConfig, unique_ratio: float +) -> None: + """Benchmark Many with varying entity uniqueness in entity_df.""" + import numpy as np + + total_rows = 5000 + num_features = 10 + num_unique_entities = int(total_rows * unique_ratio) + num_unique_entities = max(num_unique_entities, 1) + + client = MongoClient(mongodb_connection_string) + try: + now = _generate_many_data( + client, + "benchmark_db", + "driver_benchmark", + num_entities=num_unique_entities, + num_features=num_features, + rows_per_entity=5, + ) + + _, fv = _create_many_fv(num_features) + + # Create entity_df with repeated entity_ids + entity_ids = np.random.choice( + num_unique_entities, size=total_rows, replace=True + ) + entity_df = pd.DataFrame( + { + "driver_id": entity_ids, + "event_timestamp": [ + now - timedelta(minutes=i % 60) for i in range(total_rows) + ], + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreMany.get_historical_features( + config=many_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + print( + f"\n[MANY] Unique ratio: {unique_ratio:.0%} ({num_unique_entities:,} unique / {total_rows:,} rows)" + ) + print(f" Time: {result.elapsed_seconds:.3f}s") + print(f" Memory: {result.peak_memory_mb:.1f} MB") + print(f" Mongo ops: {result.mongo_opcounters_delta}") + + finally: + client.close() + + +@_requires_docker +@pytest.mark.parametrize("unique_ratio", [1.0, 0.5, 0.1]) +def test_entity_skew_one( + mongodb_connection_string: str, one_config: RepoConfig, unique_ratio: float +) -> None: + """Benchmark One with varying entity uniqueness in entity_df.""" + import numpy as np + + total_rows = 5000 + num_features = 10 + num_unique_entities = int(total_rows * unique_ratio) + num_unique_entities = max(num_unique_entities, 1) + + client = MongoClient(mongodb_connection_string) + try: + client["benchmark_db"]["feature_history"].drop() + now = _generate_one_data( + client, + "benchmark_db", + "feature_history", + "driver_benchmark", + num_entities=num_unique_entities, + num_features=num_features, + rows_per_entity=5, + ) + + _, fv = _create_one_fv(num_features) + + entity_ids = np.random.choice( + num_unique_entities, size=total_rows, replace=True + ) + entity_df = pd.DataFrame( + { + "driver_id": entity_ids, + "event_timestamp": [ + now - timedelta(minutes=i % 60) for i in range(total_rows) + ], + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + def run_query(): + job = MongoDBOfflineStoreOne.get_historical_features( + config=one_config, + feature_views=[fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + result = _run_benchmark_full(run_query, mongo_client=client) + print( + f"\n[ONE] Unique ratio: {unique_ratio:.0%} ({num_unique_entities:,} unique / {total_rows:,} rows)" + ) + print(f" Time: {result.elapsed_seconds:.3f}s") + print(f" Memory: {result.peak_memory_mb:.1f} MB") + print(f" Mongo ops: {result.mongo_opcounters_delta}") + + finally: + client.close() + + +# ============================================================================= +# Summary comparison test +# ============================================================================= + + +@_requires_docker +def test_summary_comparison( + mongodb_connection_string: str, many_config: RepoConfig, one_config: RepoConfig +) -> None: + """Run a standard comparison and print summary with full metrics.""" + num_entities = 2000 + num_features = 20 + + client = MongoClient(mongodb_connection_string) + try: + # Setup Many data + now = _generate_many_data( + client, + "benchmark_db", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=5, + ) + + # Setup One data + client["benchmark_db"]["feature_history"].drop() + _generate_one_data( + client, + "benchmark_db", + "feature_history", + "driver_benchmark", + num_entities=num_entities, + num_features=num_features, + rows_per_entity=5, + ) + + entity_df = pd.DataFrame( + { + "driver_id": list(range(num_entities)), + "event_timestamp": [now] * num_entities, + } + ) + + feature_refs = [f"driver_benchmark:feature_{i}" for i in range(num_features)] + + # Many benchmark + _, many_fv = _create_many_fv(num_features) + + def run_many(): + job = MongoDBOfflineStoreMany.get_historical_features( + config=many_config, + feature_views=[many_fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + many_result = _run_benchmark_full(run_many, mongo_client=client) + + # One benchmark + _, one_fv = _create_one_fv(num_features) + + def run_one(): + job = MongoDBOfflineStoreOne.get_historical_features( + config=one_config, + feature_views=[one_fv], + feature_refs=feature_refs, + entity_df=entity_df, + registry=MagicMock(), + project="benchmark", + full_feature_names=False, + ) + return job.to_df() + + one_result = _run_benchmark_full(run_one, mongo_client=client) + + # Print summary + print("\n" + "=" * 70) + print("SUMMARY COMPARISON: Many vs One") + print("=" * 70) + print(f"Entities: {num_entities:,} | Features: {num_features}") + print("-" * 70) + print(f"{'Metric':<20} {'Many':>20} {'One':>20}") + print("-" * 70) + print( + f"{'Time (s)':<20} {many_result.elapsed_seconds:>20.3f} {one_result.elapsed_seconds:>20.3f}" + ) + print( + f"{'Memory (MB)':<20} {many_result.peak_memory_mb:>20.1f} {one_result.peak_memory_mb:>20.1f}" + ) + print( + f"{'Rows/sec':<20} {num_entities / many_result.elapsed_seconds:>20,.0f} {num_entities / one_result.elapsed_seconds:>20,.0f}" + ) + print("-" * 70) + + if one_result.elapsed_seconds > 0: + ratio = one_result.elapsed_seconds / many_result.elapsed_seconds + faster = "Many" if ratio > 1 else "One" + print(f"{faster} is {max(ratio, 1 / ratio):.1f}x faster") + print("=" * 70) + + finally: + client.close() diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_many.py b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_many.py new file mode 100644 index 00000000000..cbd43ea8d18 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_many.py @@ -0,0 +1,646 @@ +""" +Unit tests for MongoDB offline store (Ibis-based implementation). + +Docker-dependent tests are marked with ``@_requires_docker`` and are skipped when +Docker is unavailable. +""" + +from datetime import datetime, timedelta +from typing import Generator +from unittest.mock import MagicMock + +import pandas as pd +import pytest +import pytz + +pytest.importorskip("pymongo") + +from pymongo import MongoClient +from testcontainers.mongodb import MongoDbContainer + +from feast import Entity, FeatureView, Field +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many import ( + MongoDBOfflineStoreMany, + MongoDBOfflineStoreManyConfig, + MongoDBSourceMany, +) +from feast.repo_config import RepoConfig +from feast.types import Float64, Int64, String +from feast.value_type import ValueType + +# Check if Docker is available +docker_available = False +try: + import docker + + try: + client = docker.from_env() + client.ping() + docker_available = True + except Exception: + pass +except ImportError: + pass + +_requires_docker = pytest.mark.skipif( + not docker_available, + reason="Docker is not available or not running.", +) + + +@pytest.fixture(scope="module") +def mongodb_container() -> Generator[MongoDbContainer, None, None]: + """Start a MongoDB container for testing.""" + container = MongoDbContainer( + "mongo:latest", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + container.start() + yield container + container.stop() + + +@pytest.fixture +def mongodb_connection_string(mongodb_container: MongoDbContainer) -> str: + """Get MongoDB connection string from the container.""" + exposed_port = mongodb_container.get_exposed_port(27017) + return f"mongodb://test:test@localhost:{exposed_port}" # pragma: allowlist secret + + +@pytest.fixture +def repo_config(mongodb_connection_string: str) -> RepoConfig: + """Create a RepoConfig with MongoDB offline store.""" + return RepoConfig( + project="test_project", + registry="memory://", + provider="local", + offline_store=MongoDBOfflineStoreManyConfig( + connection_string=mongodb_connection_string, + database="feast_test", + ), + online_store={"type": "sqlite"}, + entity_key_serialization_version=3, + ) + + +@pytest.fixture +def sample_data(mongodb_connection_string: str) -> datetime: + """Insert sample driver stats data into MongoDB. + + Returns the 'now' timestamp used as the latest event_timestamp. + + Note: The collection name 'driver_stats' is defined in the MongoDBSource + (see driver_source fixture), not in the RepoConfig. RepoConfig provides + connection_string and database; the source defines the collection. + """ + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["driver_stats"] + collection.drop() + + now = datetime.now(tz=pytz.UTC) + docs = [ + { + "driver_id": 1, + "conv_rate": 0.5, + "acc_rate": 0.9, + "event_timestamp": now - timedelta(hours=2), + }, + { + "driver_id": 1, + "conv_rate": 0.6, + "acc_rate": 0.85, + "event_timestamp": now - timedelta(hours=1), + }, + {"driver_id": 1, "conv_rate": 0.7, "acc_rate": 0.8, "event_timestamp": now}, + { + "driver_id": 2, + "conv_rate": 0.3, + "acc_rate": 0.95, + "event_timestamp": now - timedelta(hours=2), + }, + # Driver 2 has no "now" timestamp - only data from 2 hours ago + # This tests that pull_latest correctly handles entities with different latest timestamps + ] + collection.insert_many(docs) + client.close() + return now + + +@pytest.fixture +def driver_source() -> MongoDBSourceMany: + """Create a MongoDBSourceMany for driver stats.""" + return MongoDBSourceMany( + name="driver_stats", + database="feast_test", + collection="driver_stats", + timestamp_field="event_timestamp", + ) + + +@pytest.fixture +def driver_fv(driver_source: MongoDBSourceMany) -> FeatureView: + """Create a FeatureView for driver stats. + + The ttl (time-to-live) parameter defines how far back in time Feast will look + for feature values during point-in-time joins. If a feature's event_timestamp + is older than (entity_timestamp - ttl), that feature value is considered stale + and will be returned as NULL. + + This is different from MongoDB TTL indexes which automatically delete documents + after a period of time. Feast TTL is a query-time filter, not a storage policy. + """ + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + return FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=[ + # Include entity column in schema so entity_columns is populated + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float64), + ], + source=driver_source, + ttl=timedelta(days=1), + ) + + +@_requires_docker +def test_pull_latest_from_table_or_query( + repo_config: RepoConfig, sample_data: datetime, driver_source: MongoDBSourceMany +) -> None: + """Test pulling latest features per entity from MongoDB. + + This test verifies that pull_latest returns only the most recent feature + values for each entity (driver_id), even when entities have different + latest timestamps. Driver 1 has data at now, but driver 2's latest data + is from 2 hours ago. + """ + now = sample_data + job = MongoDBOfflineStoreMany.pull_latest_from_table_or_query( + config=repo_config, + data_source=driver_source, + join_key_columns=["driver_id"], + feature_name_columns=["conv_rate", "acc_rate"], + timestamp_field="event_timestamp", + created_timestamp_column=None, + start_date=now - timedelta(days=1), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + + # Validate DataFrame structure + assert isinstance(df, pd.DataFrame) + assert set(df.columns) == {"driver_id", "conv_rate", "acc_rate", "event_timestamp"} + assert len(df) == 2 # Two unique drivers + + # Extract rows for each driver + driver1_rows = df[df["driver_id"] == 1] + driver2_rows = df[df["driver_id"] == 2] + + # Each driver should have exactly one row (the latest) + assert len(driver1_rows) == 1 + assert len(driver2_rows) == 1 + + driver1 = driver1_rows.iloc[0] + driver2 = driver2_rows.iloc[0] + + # Validate types + assert isinstance(driver1["conv_rate"], float) + assert isinstance(driver1["acc_rate"], float) + + # Driver 1's latest values (from "now") + assert driver1["conv_rate"] == pytest.approx(0.7) + assert driver1["acc_rate"] == pytest.approx(0.8) + + # Driver 2's latest values (from 2 hours ago - driver 2 has no "now" data) + # This demonstrates that pull_latest correctly handles entities with + # different "latest" timestamps + assert driver2["conv_rate"] == pytest.approx(0.3) + assert driver2["acc_rate"] == pytest.approx(0.95) + + +@_requires_docker +def test_get_historical_features_pit_join( + repo_config: RepoConfig, sample_data: datetime, driver_fv: FeatureView +) -> None: + """Test point-in-time join retrieves correct feature values. + + Point-in-time (PIT) join ensures that for each entity row, we get the + feature values that were valid AT THAT POINT IN TIME - not future data + that would cause data leakage in ML training. + """ + now = sample_data + + # Entity dataframe: request features at specific timestamps + # Each row says "give me driver X's features as they were at time T" + entity_df = pd.DataFrame( + { + "driver_id": [1, 1, 2], + "event_timestamp": [ + now + - timedelta( + hours=1, minutes=30 + ), # Should get conv_rate=0.5 (before 0.6 was written) + now + - timedelta( + minutes=30 + ), # Should get conv_rate=0.6 (before 0.7 was written) + now + - timedelta(hours=1), # Should get conv_rate=0.3 (only data available) + ], + } + ) + + job = MongoDBOfflineStoreMany.get_historical_features( + config=repo_config, + feature_views=[driver_fv], + feature_refs=["driver_stats:conv_rate", "driver_stats:acc_rate"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df() + assert isinstance(result_df, pd.DataFrame) + assert len(result_df) == 3 + + # Sort by driver_id and event_timestamp for predictable assertions + result_df = result_df.sort_values(["driver_id", "event_timestamp"]).reset_index( + drop=True + ) + + # Driver 1, first request (1.5 hours ago) β†’ should get value from 2 hours ago + assert result_df.loc[0, "conv_rate"] == pytest.approx(0.5) + + # Driver 1, second request (30 min ago) β†’ should get value from 1 hour ago + assert result_df.loc[1, "conv_rate"] == pytest.approx(0.6) + + # Driver 2, request (1 hour ago) β†’ should get value from 2 hours ago + assert result_df.loc[2, "conv_rate"] == pytest.approx(0.3) + + +@_requires_docker +def test_pull_all_from_table_or_query( + repo_config: RepoConfig, sample_data: datetime, driver_source: MongoDBSourceMany +) -> None: + """Test pulling all features within a time range (no deduplication).""" + now = sample_data + job = MongoDBOfflineStoreMany.pull_all_from_table_or_query( + config=repo_config, + data_source=driver_source, + join_key_columns=["driver_id"], + feature_name_columns=["conv_rate", "acc_rate"], + timestamp_field="event_timestamp", + created_timestamp_column=None, + start_date=now - timedelta(hours=1, minutes=30), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + assert isinstance(df, pd.DataFrame) + # Should get 2 rows: driver 1 (1hr ago, now) + # Excludes: driver 1 row from 2 hours ago (before start_date) + # driver 2 row from 2 hours ago (before start_date) + assert len(df) == 2 + + +@_requires_docker +def test_ttl_excludes_stale_features( + repo_config: RepoConfig, + mongodb_connection_string: str, + driver_source: MongoDBSourceMany, +) -> None: + """Test that TTL causes stale feature values to be returned as NULL. + + Feast TTL (time-to-live) is a query-time filter: if a feature's event_timestamp + is older than (entity_timestamp - ttl), that feature is considered stale. + This is different from MongoDB TTL indexes which delete documents. + """ + # Insert data with a very old timestamp + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["driver_stats_ttl_test"] + collection.drop() + + now = datetime.now(tz=pytz.UTC) + docs = [ + # Fresh data (within TTL) + {"driver_id": 1, "conv_rate": 0.9, "event_timestamp": now - timedelta(hours=1)}, + # Stale data (outside 1-day TTL when queried from "now") + {"driver_id": 2, "conv_rate": 0.5, "event_timestamp": now - timedelta(days=2)}, + ] + collection.insert_many(docs) + client.close() + + # Create source and feature view with 1-day TTL + ttl_source = MongoDBSourceMany( + name="driver_stats_ttl_test", + database="feast_test", + collection="driver_stats_ttl_test", + timestamp_field="event_timestamp", + ) + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + ttl_fv = FeatureView( + name="driver_stats_ttl_test", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float64), + ], + source=ttl_source, + ttl=timedelta(days=1), # Features older than 1 day are stale + ) + + # Request features "as of now" for both drivers + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": [now, now], + } + ) + + job = MongoDBOfflineStoreMany.get_historical_features( + config=repo_config, + feature_views=[ttl_fv], + feature_refs=["driver_stats_ttl_test:conv_rate"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df().sort_values("driver_id").reset_index(drop=True) + + # Driver 1: fresh data within TTL β†’ should have value + assert result_df.loc[0, "conv_rate"] == pytest.approx(0.9) + + # Driver 2: stale data outside TTL β†’ should be NULL + assert pd.isna(result_df.loc[1, "conv_rate"]) + + +@_requires_docker +def test_multiple_feature_views( + repo_config: RepoConfig, mongodb_connection_string: str +) -> None: + """Test joining features from multiple MongoDB collections/FeatureViews. + + This simulates a real-world scenario where features come from different + data sources (e.g., driver stats from one collection, vehicle stats from another). + """ + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + + # Collection 1: Driver stats + driver_collection = db["driver_stats_multi"] + driver_collection.drop() + now = datetime.now(tz=pytz.UTC) + driver_docs = [ + {"driver_id": 1, "rating": 4.8, "event_timestamp": now - timedelta(hours=1)}, + {"driver_id": 2, "rating": 4.5, "event_timestamp": now - timedelta(hours=1)}, + ] + driver_collection.insert_many(driver_docs) + + # Collection 2: Vehicle stats (same driver_id, different features) + vehicle_collection = db["vehicle_stats_multi"] + vehicle_collection.drop() + vehicle_docs = [ + { + "driver_id": 1, + "vehicle_age": 2, + "mileage": 50000, + "event_timestamp": now - timedelta(hours=1), + }, + { + "driver_id": 2, + "vehicle_age": 5, + "mileage": 120000, + "event_timestamp": now - timedelta(hours=1), + }, + ] + vehicle_collection.insert_many(vehicle_docs) + client.close() + + # Create sources for each collection + driver_source = MongoDBSourceMany( + name="driver_stats_multi", + database="feast_test", + collection="driver_stats_multi", + timestamp_field="event_timestamp", + ) + vehicle_source = MongoDBSourceMany( + name="vehicle_stats_multi", + database="feast_test", + collection="vehicle_stats_multi", + timestamp_field="event_timestamp", + ) + + # Create entities and feature views + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + + driver_fv = FeatureView( + name="driver_stats_multi", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="rating", dtype=Float64), + ], + source=driver_source, + ttl=timedelta(days=1), + ) + + vehicle_fv = FeatureView( + name="vehicle_stats_multi", + entities=[ + driver_entity + ], # todo these two FeatureViews have the same entities list [driver_entity] + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="vehicle_age", dtype=Int64), + Field(name="mileage", dtype=Int64), + ], + source=vehicle_source, + ttl=timedelta(days=1), + ) + + # Entity dataframe requesting features for both drivers + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": [now, now], + } + ) + + # Request features from BOTH feature views + job = MongoDBOfflineStoreMany.get_historical_features( + config=repo_config, + feature_views=[driver_fv, vehicle_fv], + feature_refs=[ + "driver_stats_multi:rating", + "vehicle_stats_multi:vehicle_age", + "vehicle_stats_multi:mileage", + ], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df().sort_values("driver_id").reset_index(drop=True) + + # Verify we got features from both collections joined correctly + assert len(result_df) == 2 + assert set(result_df.columns) >= {"driver_id", "rating", "vehicle_age", "mileage"} + + # Driver 1 + assert result_df.loc[0, "rating"] == pytest.approx(4.8) + assert result_df.loc[0, "vehicle_age"] == 2 + assert result_df.loc[0, "mileage"] == 50000 + + # Driver 2 + assert result_df.loc[1, "rating"] == pytest.approx(4.5) + assert result_df.loc[1, "vehicle_age"] == 5 + assert result_df.loc[1, "mileage"] == 120000 + + +@_requires_docker +def test_compound_join_keys( + repo_config: RepoConfig, mongodb_connection_string: str +) -> None: + """Test with compound/composite join keys (multiple entity columns). + + This tests scenarios where entities are identified by multiple keys, + e.g., (user_id, device_id) or (store_id, product_id). + """ + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + + # Create collection with compound key (user_id + device_id) + collection = db["user_device_features"] + collection.drop() + now = datetime.now(tz=pytz.UTC) + + # Same user_id can have different device_ids with different features + docs = [ + { + "user_id": 1, + "device_id": "mobile", + "app_opens": 50, + "event_timestamp": now - timedelta(hours=2), + }, + { + "user_id": 1, + "device_id": "mobile", + "app_opens": 55, + "event_timestamp": now - timedelta(hours=1), + }, + { + "user_id": 1, + "device_id": "desktop", + "app_opens": 10, + "event_timestamp": now - timedelta(hours=1), + }, + { + "user_id": 2, + "device_id": "mobile", + "app_opens": 100, + "event_timestamp": now - timedelta(hours=1), + }, + { + "user_id": 2, + "device_id": "tablet", + "app_opens": 25, + "event_timestamp": now - timedelta(hours=1), + }, + ] + collection.insert_many(docs) + client.close() + + # Create source + source = MongoDBSourceMany( + name="user_device_features", + database="feast_test", + collection="user_device_features", + timestamp_field="event_timestamp", + ) + + # Create entities with compound keys + user_entity = Entity( + name="user_id", join_keys=["user_id"], value_type=ValueType.INT64 + ) + device_entity = Entity( + name="device_id", join_keys=["device_id"], value_type=ValueType.STRING + ) + + fv = FeatureView( + name="user_device_features", + entities=[user_entity, device_entity], + schema=[ + Field(name="user_id", dtype=Int64), + Field(name="device_id", dtype=String), + Field(name="app_opens", dtype=Int64), + ], + source=source, + ttl=timedelta(days=1), + ) + + # Test pull_latest: should get one row per unique (user_id, device_id) combination + job = MongoDBOfflineStoreMany.pull_latest_from_table_or_query( + config=repo_config, + data_source=source, + join_key_columns=["user_id", "device_id"], + feature_name_columns=["app_opens"], + timestamp_field="event_timestamp", + created_timestamp_column=None, + start_date=now - timedelta(days=1), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + assert len(df) == 4 # 4 unique (user_id, device_id) combinations + + # Verify user 1, mobile got the LATEST value (55, not 50) + user1_mobile = df[(df["user_id"] == 1) & (df["device_id"] == "mobile")] + assert len(user1_mobile) == 1 + assert user1_mobile.iloc[0]["app_opens"] == 55 + + # Test get_historical_features with compound keys + entity_df = pd.DataFrame( + { + "user_id": [1, 1, 2], + "device_id": ["mobile", "desktop", "tablet"], + "event_timestamp": [now, now, now], + } + ) + + job = MongoDBOfflineStoreMany.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["user_device_features:app_opens"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df() + assert len(result_df) == 3 + + # Sort for predictable assertions + result_df = result_df.sort_values(["user_id", "device_id"]).reset_index(drop=True) + + # user 1, desktop + assert result_df.loc[0, "app_opens"] == 10 + # user 1, mobile (latest value) + assert result_df.loc[1, "app_opens"] == 55 + # user 2, tablet + assert result_df.loc[2, "app_opens"] == 25 diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_one.py b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_one.py new file mode 100644 index 00000000000..689fef915ee --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_one.py @@ -0,0 +1,609 @@ +""" +Unit tests for MongoDB Native offline store implementation. + +This tests the single-collection schema where all feature views share one +collection (``feature_history``), discriminated by ``feature_view`` field. + +Schema: + { + "entity_id": bytes, # serialized entity key + "feature_view": str, + "features": { "feat1": val, ... }, + "event_timestamp": datetime, + "created_at": datetime + } + +Docker-dependent tests are marked with ``@_requires_docker`` and are skipped +when Docker is unavailable. +""" + +from datetime import datetime, timedelta +from typing import Generator +from unittest.mock import MagicMock + +import pandas as pd +import pytest +import pytz + +pytest.importorskip("pymongo") + +from pymongo import MongoClient +from testcontainers.mongodb import MongoDbContainer + +from feast import Entity, FeatureView, Field +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one import ( + MongoDBOfflineStoreOne, + MongoDBOfflineStoreOneConfig, + MongoDBSourceOne, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig +from feast.types import Float64, Int64, String +from feast.value_type import ValueType + +# Check if Docker is available +docker_available = False +try: + import docker + + try: + client = docker.from_env() + client.ping() + docker_available = True + except Exception: + pass +except ImportError: + pass + +_requires_docker = pytest.mark.skipif( + not docker_available, + reason="Docker is not available or not running.", +) + +ENTITY_KEY_VERSION = 3 + + +def _make_entity_id(join_keys: dict) -> bytes: + """Create serialized entity key from join key dict.""" + entity_key = EntityKeyProto() + for key in sorted(join_keys.keys()): + entity_key.join_keys.append(key) + val = ValueProto() + value = join_keys[key] + if isinstance(value, int): + val.int64_val = value + elif isinstance(value, str): + val.string_val = value + else: + val.string_val = str(value) + entity_key.entity_values.append(val) + return serialize_entity_key(entity_key, ENTITY_KEY_VERSION) + + +@pytest.fixture(scope="module") +def mongodb_container() -> Generator[MongoDbContainer, None, None]: + """Start a MongoDB container for testing.""" + container = MongoDbContainer( + "mongo:latest", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + container.start() + yield container + container.stop() + + +@pytest.fixture +def mongodb_connection_string(mongodb_container: MongoDbContainer) -> str: + """Get MongoDB connection string from the container.""" + exposed_port = mongodb_container.get_exposed_port(27017) + return f"mongodb://test:test@localhost:{exposed_port}" # pragma: allowlist secret + + +@pytest.fixture +def repo_config(mongodb_connection_string: str) -> RepoConfig: + """Create a RepoConfig with MongoDB Native offline store.""" + return RepoConfig( + project="test_project", + registry="memory://", + provider="local", + offline_store=MongoDBOfflineStoreOneConfig( + connection_string=mongodb_connection_string, + database="feast_test", + collection="feature_history", + ), + online_store={"type": "sqlite"}, + entity_key_serialization_version=ENTITY_KEY_VERSION, + ) + + +@pytest.fixture +def sample_data(mongodb_connection_string: str) -> datetime: + """Insert sample data using the single-collection schema. + + Creates documents for 'driver_stats' feature view with entity_id, + feature_view discriminator, and nested features subdocument. + """ + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["feature_history"] + collection.drop() + + now = datetime.now(tz=pytz.UTC) + + # Create documents using the native schema + docs = [ + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "driver_stats", + "features": {"conv_rate": 0.5, "acc_rate": 0.9}, + "event_timestamp": now - timedelta(hours=2), + "created_at": now - timedelta(hours=2), + }, + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "driver_stats", + "features": {"conv_rate": 0.6, "acc_rate": 0.85}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "driver_stats", + "features": {"conv_rate": 0.7, "acc_rate": 0.8}, + "event_timestamp": now, + "created_at": now, + }, + { + "entity_id": _make_entity_id({"driver_id": 2}), + "feature_view": "driver_stats", + "features": {"conv_rate": 0.3, "acc_rate": 0.95}, + "event_timestamp": now - timedelta(hours=2), + "created_at": now - timedelta(hours=2), + }, + ] + collection.insert_many(docs) + client.close() + return now + + +@pytest.fixture +def driver_source() -> MongoDBSourceOne: + """Create a MongoDBSourceOne for driver stats.""" + return MongoDBSourceOne( + name="driver_stats", + timestamp_field="event_timestamp", + created_timestamp_column="created_at", + ) + + +@pytest.fixture +def driver_fv(driver_source: MongoDBSourceOne) -> FeatureView: + """Create a FeatureView for driver stats.""" + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + return FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float64), + ], + source=driver_source, + ttl=timedelta(days=1), + ) + + +@_requires_docker +def test_pull_latest_from_table_or_query( + repo_config: RepoConfig, sample_data: datetime, driver_source: MongoDBSourceOne +) -> None: + """Test pulling latest features per entity from the single collection.""" + now = sample_data + job = MongoDBOfflineStoreOne.pull_latest_from_table_or_query( + config=repo_config, + data_source=driver_source, + join_key_columns=["driver_id"], + feature_name_columns=["conv_rate", "acc_rate"], + timestamp_field="event_timestamp", + created_timestamp_column="created_at", + start_date=now - timedelta(days=1), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + + assert isinstance(df, pd.DataFrame) + assert len(df) == 2 # Two unique entity_ids + + # Sort by entity_id for predictable assertions + # Note: entity_id is bytes, so we check features directly + conv_rates = sorted(df["conv_rate"].tolist()) + assert conv_rates[0] == pytest.approx(0.3) # Driver 2's only value + assert conv_rates[1] == pytest.approx(0.7) # Driver 1's latest value + + +@_requires_docker +def test_get_historical_features_pit_join( + repo_config: RepoConfig, sample_data: datetime, driver_fv: FeatureView +) -> None: + """Test point-in-time join retrieves correct feature values.""" + now = sample_data + + # Entity dataframe with driver_id column (must match join keys) + entity_df = pd.DataFrame( + { + "driver_id": [1, 1, 2], + "event_timestamp": [ + now - timedelta(hours=1, minutes=30), # Should get conv_rate=0.5 + now - timedelta(minutes=30), # Should get conv_rate=0.6 + now - timedelta(hours=1), # Should get conv_rate=0.3 + ], + } + ) + + job = MongoDBOfflineStoreOne.get_historical_features( + config=repo_config, + feature_views=[driver_fv], + feature_refs=["driver_stats:conv_rate", "driver_stats:acc_rate"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df() + assert isinstance(result_df, pd.DataFrame) + assert len(result_df) == 3 + + # Sort by driver_id and event_timestamp for predictable assertions + result_df = result_df.sort_values(["driver_id", "event_timestamp"]).reset_index( + drop=True + ) + + # Driver 1, first request (1.5 hours ago) β†’ should get value from 2 hours ago + assert result_df.loc[0, "conv_rate"] == pytest.approx(0.5) + + # Driver 1, second request (30 min ago) β†’ should get value from 1 hour ago + assert result_df.loc[1, "conv_rate"] == pytest.approx(0.6) + + # Driver 2, request (1 hour ago) β†’ should get value from 2 hours ago + assert result_df.loc[2, "conv_rate"] == pytest.approx(0.3) + + +@_requires_docker +def test_pull_all_from_table_or_query( + repo_config: RepoConfig, sample_data: datetime, driver_source: MongoDBSourceOne +) -> None: + """Test pulling all features within a time range (no deduplication).""" + now = sample_data + job = MongoDBOfflineStoreOne.pull_all_from_table_or_query( + config=repo_config, + data_source=driver_source, + join_key_columns=["driver_id"], + feature_name_columns=["conv_rate", "acc_rate"], + timestamp_field="event_timestamp", + created_timestamp_column="created_at", + start_date=now - timedelta(hours=1, minutes=30), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + assert isinstance(df, pd.DataFrame) + # Should get 2 rows: driver 1 (1hr ago, now) + # Excludes: driver 1 from 2 hours ago, driver 2 from 2 hours ago + assert len(df) == 2 + + +@_requires_docker +def test_ttl_excludes_stale_features( + repo_config: RepoConfig, mongodb_connection_string: str +) -> None: + """Test that TTL causes stale feature values to be returned as NULL.""" + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["feature_history"] + + now = datetime.now(tz=pytz.UTC) + + # Insert docs with different ages + ttl_docs = [ + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "driver_stats_ttl", + "features": {"conv_rate": 0.9}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"driver_id": 2}), + "feature_view": "driver_stats_ttl", + "features": {"conv_rate": 0.5}, + "event_timestamp": now - timedelta(days=2), # Stale + "created_at": now - timedelta(days=2), + }, + ] + collection.insert_many(ttl_docs) + client.close() + + ttl_source = MongoDBSourceOne( + name="driver_stats_ttl", + timestamp_field="event_timestamp", + ) + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + ttl_fv = FeatureView( + name="driver_stats_ttl", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float64), + ], + source=ttl_source, + ttl=timedelta(days=1), + ) + + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": [now, now], + } + ) + + job = MongoDBOfflineStoreOne.get_historical_features( + config=repo_config, + feature_views=[ttl_fv], + feature_refs=["driver_stats_ttl:conv_rate"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df().sort_values("driver_id").reset_index(drop=True) + + # Driver 1: fresh β†’ has value + assert result_df.loc[0, "conv_rate"] == pytest.approx(0.9) + + # Driver 2: stale β†’ NULL + assert pd.isna(result_df.loc[1, "conv_rate"]) + + +@_requires_docker +def test_multiple_feature_views( + repo_config: RepoConfig, mongodb_connection_string: str +) -> None: + """Test joining features from multiple feature views in the same collection.""" + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["feature_history"] + + now = datetime.now(tz=pytz.UTC) + + # Insert documents for two different feature views + multi_docs = [ + # driver_stats_multi + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "driver_stats_multi", + "features": {"rating": 4.8}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"driver_id": 2}), + "feature_view": "driver_stats_multi", + "features": {"rating": 4.5}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + # vehicle_stats_multi + { + "entity_id": _make_entity_id({"driver_id": 1}), + "feature_view": "vehicle_stats_multi", + "features": {"vehicle_age": 2, "mileage": 50000}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"driver_id": 2}), + "feature_view": "vehicle_stats_multi", + "features": {"vehicle_age": 5, "mileage": 120000}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + ] + collection.insert_many(multi_docs) + client.close() + + # Create sources and feature views + driver_source = MongoDBSourceOne(name="driver_stats_multi") + vehicle_source = MongoDBSourceOne(name="vehicle_stats_multi") + + driver_entity = Entity( + name="driver_id", join_keys=["driver_id"], value_type=ValueType.INT64 + ) + + driver_fv = FeatureView( + name="driver_stats_multi", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="rating", dtype=Float64), + ], + source=driver_source, + ttl=timedelta(days=1), + ) + + vehicle_fv = FeatureView( + name="vehicle_stats_multi", + entities=[driver_entity], + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="vehicle_age", dtype=Int64), + Field(name="mileage", dtype=Int64), + ], + source=vehicle_source, + ttl=timedelta(days=1), + ) + + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": [now, now], + } + ) + + job = MongoDBOfflineStoreOne.get_historical_features( + config=repo_config, + feature_views=[driver_fv, vehicle_fv], + feature_refs=[ + "driver_stats_multi:rating", + "vehicle_stats_multi:vehicle_age", + "vehicle_stats_multi:mileage", + ], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df().sort_values("driver_id").reset_index(drop=True) + + assert len(result_df) == 2 + assert set(result_df.columns) >= {"driver_id", "rating", "vehicle_age", "mileage"} + + # Driver 1 + assert result_df.loc[0, "rating"] == pytest.approx(4.8) + assert result_df.loc[0, "vehicle_age"] == 2 + assert result_df.loc[0, "mileage"] == 50000 + + # Driver 2 + assert result_df.loc[1, "rating"] == pytest.approx(4.5) + assert result_df.loc[1, "vehicle_age"] == 5 + assert result_df.loc[1, "mileage"] == 120000 + + +@_requires_docker +def test_compound_join_keys( + repo_config: RepoConfig, mongodb_connection_string: str +) -> None: + """Test with compound/composite join keys (multiple entity columns).""" + client: MongoClient = MongoClient(mongodb_connection_string) + db = client["feast_test"] + collection = db["feature_history"] + + now = datetime.now(tz=pytz.UTC) + + # Insert documents with compound keys (user_id + device_id) + compound_docs = [ + { + "entity_id": _make_entity_id({"user_id": 1, "device_id": "mobile"}), + "feature_view": "user_device_features", + "features": {"app_opens": 50}, + "event_timestamp": now - timedelta(hours=2), + "created_at": now - timedelta(hours=2), + }, + { + "entity_id": _make_entity_id({"user_id": 1, "device_id": "mobile"}), + "feature_view": "user_device_features", + "features": {"app_opens": 55}, # Latest for this entity + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"user_id": 1, "device_id": "desktop"}), + "feature_view": "user_device_features", + "features": {"app_opens": 10}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + "entity_id": _make_entity_id({"user_id": 2, "device_id": "tablet"}), + "feature_view": "user_device_features", + "features": {"app_opens": 25}, + "event_timestamp": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + ] + collection.insert_many(compound_docs) + client.close() + + source = MongoDBSourceOne(name="user_device_features") + + user_entity = Entity( + name="user_id", join_keys=["user_id"], value_type=ValueType.INT64 + ) + device_entity = Entity( + name="device_id", join_keys=["device_id"], value_type=ValueType.STRING + ) + + fv = FeatureView( + name="user_device_features", + entities=[user_entity, device_entity], + schema=[ + Field(name="user_id", dtype=Int64), + Field(name="device_id", dtype=String), + Field(name="app_opens", dtype=Int64), + ], + source=source, + ttl=timedelta(days=1), + ) + + # Test pull_latest: should get one row per unique (user_id, device_id) + job = MongoDBOfflineStoreOne.pull_latest_from_table_or_query( + config=repo_config, + data_source=source, + join_key_columns=["user_id", "device_id"], + feature_name_columns=["app_opens"], + timestamp_field="event_timestamp", + created_timestamp_column="created_at", + start_date=now - timedelta(days=1), + end_date=now + timedelta(hours=1), + ) + + df = job.to_df() + assert len(df) == 3 # 3 unique (user_id, device_id) combinations + + # Verify we got the latest value (55) for user 1, mobile + app_opens_values = sorted(df["app_opens"].tolist()) + assert 55 in app_opens_values # Latest for user 1, mobile + assert 10 in app_opens_values # user 1, desktop + assert 25 in app_opens_values # user 2, tablet + + # Test get_historical_features with compound keys + entity_df = pd.DataFrame( + { + "user_id": [1, 1, 2], + "device_id": ["mobile", "desktop", "tablet"], + "event_timestamp": [now, now, now], + } + ) + + job = MongoDBOfflineStoreOne.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["user_device_features:app_opens"], + entity_df=entity_df, + registry=MagicMock(), + project=repo_config.project, + full_feature_names=False, + ) + + result_df = job.to_df() + assert len(result_df) == 3 + + # Sort for predictable assertions + result_df = result_df.sort_values(["user_id", "device_id"]).reset_index(drop=True) + + # user 1, desktop + assert result_df.loc[0, "app_opens"] == 10 + # user 1, mobile (latest value) + assert result_df.loc[1, "app_opens"] == 55 + # user 2, tablet + assert result_df.loc[2, "app_opens"] == 25 diff --git a/sdk/python/tests/universal/feature_repos/repo_configuration.py b/sdk/python/tests/universal/feature_repos/repo_configuration.py index ddd952f71dc..2033d416032 100644 --- a/sdk/python/tests/universal/feature_repos/repo_configuration.py +++ b/sdk/python/tests/universal/feature_repos/repo_configuration.py @@ -108,6 +108,33 @@ ] ) +# MongoDB offline stores (require testcontainers and pymongo) +if os.getenv("FEAST_LOCAL_ONLINE_CONTAINER", "False") == "True": + try: + from tests.universal.feature_repos.universal.data_sources.mongodb import ( + MongoDBManyDataSourceCreator, + # MongoDBOneDataSourceCreator, # TODO: Not registered - see TODO in mongodb.py + ) + + AVAILABLE_OFFLINE_STORES.extend( + [ + ("local", MongoDBManyDataSourceCreator), + # TODO: MongoDBOneDataSourceCreator requires DataSourceCreator interface + # changes to pass entity/join key info. See mongodb.py for details. + # ("local", MongoDBOneDataSourceCreator), + ] + ) + OFFLINE_STORE_TO_PROVIDER_CONFIG["mongodb_many"] = ( + "local", + MongoDBManyDataSourceCreator, + ) + # OFFLINE_STORE_TO_PROVIDER_CONFIG["mongodb_one"] = ( + # "local", + # MongoDBOneDataSourceCreator, + # ) + except ImportError: + pass # pymongo or testcontainers not installed + AVAILABLE_ONLINE_STORES: Dict[ str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]] ] = {"sqlite": ({"type": "sqlite"}, None)} diff --git a/sdk/python/tests/universal/feature_repos/universal/data_sources/mongodb.py b/sdk/python/tests/universal/feature_repos/universal/data_sources/mongodb.py new file mode 100644 index 00000000000..8eedc3b6957 --- /dev/null +++ b/sdk/python/tests/universal/feature_repos/universal/data_sources/mongodb.py @@ -0,0 +1,316 @@ +""" +MongoDB DataSourceCreator implementations for universal Feast tests. + +Provides two implementations matching the two offline store schemas: +- MongoDBManyDataSourceCreator: One collection per FeatureView (Many) +- MongoDBOneDataSourceCreator: Single shared collection (One) +""" + +from typing import Any, Dict, Optional + +import pandas as pd +import pytest +from testcontainers.mongodb import MongoDbContainer + +from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_many import ( + MongoDBOfflineStoreManyConfig, + MongoDBSourceMany, + SavedDatasetMongoDBStorageMany, +) +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb_one import ( + MongoDBOfflineStoreOneConfig, + MongoDBSourceOne, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.saved_dataset import SavedDatasetStorage +from tests.universal.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + +# Import pymongo - will be available since we're testing MongoDB +try: + from pymongo import MongoClient +except ImportError: + MongoClient = None # type: ignore + + +class MongoDBManyDataSourceCreator(DataSourceCreator): + """DataSourceCreator for MongoDBOfflineStoreMany (one collection per FeatureView).""" + + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + self.container = MongoDbContainer( + "mongo:7.0", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + self.container.start() + self.port = self.container.get_exposed_port(27017) + self.connection_string = ( + f"mongodb://test:test@localhost:{self.port}" # pragma: allowlist secret + ) + self.database = f"feast_test_{project_name}" + self.collections_created: list[str] = [] + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + created_timestamp_column: str = "created_ts", + field_mapping: Optional[Dict[str, str]] = None, + timestamp_field: Optional[str] = "ts", + ) -> DataSource: + """Create a MongoDB data source by inserting df into a collection.""" + collection_name = self.get_prefixed_table_name(destination_name) + + # Insert data into MongoDB + client: Any = MongoClient(self.connection_string, tz_aware=True) + try: + coll = client[self.database][collection_name] + coll.drop() # Clean slate + records = df.to_dict("records") + if records: + coll.insert_many(records) + self.collections_created.append(collection_name) + finally: + client.close() + + return MongoDBSourceMany( + name=destination_name, + database=self.database, + collection=collection_name, + timestamp_field=timestamp_field or "ts", + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + ) + + def get_prefixed_table_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}" + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return MongoDBOfflineStoreManyConfig( + connection_string=self.connection_string, + database=self.database, + ) + + def create_saved_dataset_destination(self) -> SavedDatasetStorage: + return SavedDatasetMongoDBStorageMany( + database=self.database, + collection=f"{self.project_name}_saved_dataset", + ) + + def create_logged_features_destination(self) -> LoggingDestination: + # MongoDB doesn't have a native LoggingDestination yet + # Return None or raise NotImplementedError for now + raise NotImplementedError( + "MongoDB LoggingDestination not implemented. " + "Tests requiring logging features will be skipped." + ) + + def teardown(self): + """Clean up: drop collections and stop container.""" + try: + client: Any = MongoClient(self.connection_string, tz_aware=True) + try: + db = client[self.database] + for coll_name in self.collections_created: + db[coll_name].drop() + finally: + client.close() + except Exception: + pass # Container may already be stopped + self.container.stop() + + @staticmethod + def test_markers() -> list: + """Mark tests as requiring MongoDB.""" + return [pytest.mark.mongodb] + + +class MongoDBOneDataSourceCreator(DataSourceCreator): + """DataSourceCreator for MongoDBOfflineStoreOne (single shared collection). + + This implementation uses the nested features schema where all FeatureViews + share a single collection with a discriminator field. + + TODO: This DataSourceCreator has a fundamental limitation. The One schema + requires knowing which columns are join keys vs features to properly + serialize entity_id and nest features. However, create_data_source() only + receives a DataFrame and column names - it doesn't have access to Entity + definitions that specify join keys. + + Current workaround uses heuristics (columns ending in '_id' with int/string + dtype), which is fragile. A proper fix would require modifying the + DataSourceCreator interface to pass entity/join key information to + create_data_source(), which is a Feast core change. + + For now, universal tests may fail for FeatureViews where the heuristic + doesn't correctly identify join keys. Use unit tests in + tests/unit/infra/offline_stores/contrib/mongodb_offline_store/test_one.py + for comprehensive testing of the One implementation. + """ + + ENTITY_KEY_VERSION = 3 + + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + self.container = MongoDbContainer( + "mongo:7.0", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + self.container.start() + self.port = self.container.get_exposed_port(27017) + self.connection_string = ( + f"mongodb://test:test@localhost:{self.port}" # pragma: allowlist secret + ) + self.database = f"feast_test_{project_name}" + self.collection = "feature_history" + self.feature_views_created: list[str] = [] + # Track entity key columns per feature view for serialization + self._entity_key_columns: Dict[str, list[str]] = {} + + def _serialize_entity_key(self, row: pd.Series, join_keys: list[str]) -> bytes: + """Serialize entity key columns to bytes.""" + entity_key = EntityKeyProto() + for key in join_keys: + entity_key.join_keys.append(key) + val = ValueProto() + value = row[key] + if isinstance(value, int): + val.int64_val = value + elif isinstance(value, str): + val.string_val = value + elif isinstance(value, float): + val.double_val = value + elif isinstance(value, bool): + val.bool_val = value + else: + val.string_val = str(value) + entity_key.entity_values.append(val) + return serialize_entity_key(entity_key, self.ENTITY_KEY_VERSION) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + created_timestamp_column: str = "created_ts", + field_mapping: Optional[Dict[str, str]] = None, + timestamp_field: Optional[str] = "ts", + ) -> DataSource: + """Create a MongoDB data source by inserting df into the shared collection. + + The data is transformed into the One schema: + - entity_id: serialized entity key + - feature_view: destination_name + - features: nested dict of feature values + - event_timestamp: from timestamp_field + - created_at: from created_timestamp_column + """ + # Determine which columns are join keys vs features + # Join keys must be integer or string types (serializable as entity keys) + timestamp_cols = {timestamp_field, created_timestamp_column} + all_cols = set(df.columns) - timestamp_cols - {None} + + # Heuristic: identify join keys + # 1. Must end with "_id" or be a known key name + # 2. Must be integer or string type (not float) + join_keys = [] + for c in all_cols: + if c.endswith("_id") or c in {"driver", "customer", "entity"}: + dtype = df[c].dtype + # Only integer or string types can be join keys + if dtype in ("int64", "int32", "object") or str(dtype).startswith( + "int" + ): + join_keys.append(c) + + if not join_keys: + # Fallback: first integer column + for c in all_cols: + if df[c].dtype in ("int64", "int32") or str(df[c].dtype).startswith( + "int" + ): + join_keys = [c] + break + + feature_cols = [c for c in all_cols if c not in join_keys] + + # Store for later use + self._entity_key_columns[destination_name] = join_keys + + # Transform to One schema + docs = [] + for _, row in df.iterrows(): + entity_id = self._serialize_entity_key(row, join_keys) + features = {col: row[col] for col in feature_cols if pd.notna(row.get(col))} + + doc = { + "entity_id": entity_id, + "feature_view": destination_name, + "features": features, + } + if timestamp_field and timestamp_field in row: + doc["event_timestamp"] = row[timestamp_field] + if created_timestamp_column and created_timestamp_column in row: + doc["created_at"] = row[created_timestamp_column] + + docs.append(doc) + + # Insert into MongoDB + client: Any = MongoClient(self.connection_string, tz_aware=True) + try: + coll = client[self.database][self.collection] + if docs: + coll.insert_many(docs) + self.feature_views_created.append(destination_name) + finally: + client.close() + + return MongoDBSourceOne( + name=destination_name, + timestamp_field="event_timestamp", + created_timestamp_column="created_at" if created_timestamp_column else None, + field_mapping=field_mapping, + ) + + def get_prefixed_table_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}" + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return MongoDBOfflineStoreOneConfig( + connection_string=self.connection_string, + database=self.database, + collection=self.collection, + ) + + def create_saved_dataset_destination(self) -> SavedDatasetStorage: + # One implementation doesn't have SavedDatasetStorage yet + raise NotImplementedError( + "MongoDBOfflineStoreOne SavedDatasetStorage not implemented." + ) + + def create_logged_features_destination(self) -> LoggingDestination: + raise NotImplementedError("MongoDB LoggingDestination not implemented.") + + def teardown(self): + """Clean up: drop the collection and stop container.""" + try: + client: Any = MongoClient(self.connection_string, tz_aware=True) + try: + client[self.database][self.collection].drop() + finally: + client.close() + except Exception: + pass + self.container.stop() + + @staticmethod + def test_markers() -> list: + """Mark tests as requiring MongoDB.""" + return [pytest.mark.mongodb]