Skip to content

Commit 3258a32

Browse files
committed
BREAKING CHANGE: Adding temporal fields and migration script for milvus
1 parent 7ca1af1 commit 3258a32

File tree

11 files changed

+350
-52
lines changed

11 files changed

+350
-52
lines changed

.hydra_config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ vectordb:
4040
collection_name: ${oc.env:VDB_COLLECTION_NAME, vdb_test}
4141
hybrid_search: ${oc.env:VDB_HYBRID_SEARCH, true}
4242
enable: true
43+
schema_version: 1 # Increment when the collection schema changes and a migration is required
4344

4445
rdb:
4546
host: ${oc.env:POSTGRES_HOST, rdb}

docs/content/docs/documentation/API.mdx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ Upload a new file to a specific partition for indexing.
7878
- `201 Created`: Returns task status URL
7979
- `409 Conflict`: File already exists in partition
8080

81+
##### Temporal Filtering
82+
OpenRAG supports temporal filtering to retrieve documents from specific time periods.
83+
The client can include temporal fields to allow temporal aware search in search endpoints.
84+
85+
4 temporal fields are automatically added to the schema of the collection:
86+
87+
* `datetime`: ISO 8601 format date of the primary timestamp of when a file is created in your system
88+
* `created_at`: ISO 8601 format date of when the file was created
89+
* `updated_at`: ISO 8601 format date of when the file was last modified
90+
* `indexed_at`: ISO 8601 format date of when the file was indexed in the vector database
91+
92+
:::info
93+
`datetime`, `created_at` and `updated_at` are provided by the client in the metadata of the file during upload, while `indexed_at` is automatically set by openRAG at indexing time.
94+
:::
95+
96+
8197
##### Upload files while modeling relations between them
8298

8399
OpenRAG supports document relationships to enable context-aware retrieval.

docs/content/docs/documentation/milvus_migration.md

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,6 @@ results = client.query(
4141
> * `PT3H` = 3 hours
4242
> * `P2DT6H` = 2 days and 6 hours.
4343
44-
## Current State
45-
46-
:::info
47-
Temporal fields are currently stored as **strings**, not **`TIMESTAMPTZ`**. Migrating to `TIMESTAMPTZ` requires a schema and index change, and Milvus doesn't support migrations on schema and index changes: it has to be handled manually.
48-
49-
Until a Milvus schema & index migration strategy is defined, filtering still works via **lexicographic string comparison** on ISO 8601 strings:
50-
```python
51-
expr = "tsz != '2025-01-03T00:00:00+08:00'" # No ISO/INTERVAL keywords
52-
results = client.query(
53-
collection_name,
54-
filter=expr,
55-
output_fields=["id", "tsz"],
56-
limit=10
57-
)
58-
```
59-
Full `TIMESTAMPTZ` support will be activated in a future release once the migration is established.
60-
:::
61-
6244
## Milvus version upgrade Steps
6345
:::danger[Before running Milvus Version Migration]
6446
These steps must be performed on a deployment running OpenRAG **prior to version 1.1.6** (Milvus 2.5.4) before switching to a newest version of Openrag.
@@ -83,7 +65,7 @@ Then restart the stack:
8365

8466
```bash
8567
docker compose down
86-
docker compose up -d
68+
docker compose up --build milvus -d
8769
```
8870

8971
Wait for all services to be healthy before continuing.
@@ -129,4 +111,66 @@ docker inspect milvus-standalone --format '{{ .Config.Image }}'
129111
# Expected: milvusdb/milvus:v2.6.11
130112
```
131113

132-
Now you can switch to the newest release of OpenRAG and it should work fine.
114+
Now you can switch to the newest release of OpenRAG and it should work fine.
115+
116+
## Schema Migration — Add Temporal Fields
117+
118+
:::info
119+
This migration adds `TIMESTAMPTZ` fields (`datetime`, `created_at`, `updated_at`, `indexed_at`) and their `STL_SORT` indexes to an existing collection.
120+
121+
Existing documents will have `null` for these fields; new documents will have them populated at index time.
122+
:::
123+
124+
:::danger[OpenRAG must be stopped]
125+
Stop the OpenRAG application before running this migration.
126+
:::
127+
128+
### Step 1 — Start only the Milvus container
129+
130+
```bash
131+
docker compose up -d milvus
132+
```
133+
134+
Wait until Milvus is healthy:
135+
136+
```bash
137+
docker compose ps milvus
138+
```
139+
140+
### Step 2 — Dry-run (inspect, no changes)
141+
142+
```bash
143+
docker compose run --no-deps --rm --build --entrypoint "" openrag \
144+
uv run python scripts/migrations/milvus/1.add_temporal_fields.py --dry-run
145+
```
146+
147+
Review the output to confirm which fields and indexes are missing.
148+
149+
### Step 3 — Apply the migration
150+
151+
```bash
152+
docker compose run --no-deps --rm --build --entrypoint "" openrag \
153+
uv run python scripts/migrations/milvus/1.add_temporal_fields.py
154+
```
155+
156+
The script will:
157+
1. Add any missing `TIMESTAMPTZ` fields (nullable)
158+
2. Create `STL_SORT` indexes for each field
159+
3. Stamp the collection with `schema_version=1` so OpenRAG no longer reports a migration error on startup
160+
161+
### Step 4 — Restart OpenRAG
162+
163+
```bash
164+
docker compose up --build -d
165+
```
166+
167+
### Rollback
168+
169+
Milvus does not yet support dropping fields. The rollback only removes the indexes and resets the version stamp — the fields remain in the schema but are unused:
170+
171+
```bash
172+
docker compose run --no-deps --rm --entrypoint "" openrag \
173+
uv run python scripts/migrations/milvus/1.add_temporal_fields.py --downgrade
174+
```
175+
176+
To fully remove the fields you would need to recreate the collection from scratch.

openrag/components/indexer/vectordb/vectordb.py

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import time
33
from abc import ABC, abstractmethod
4+
from datetime import UTC, datetime
45

56
import numpy as np
67
import ray
@@ -102,12 +103,8 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool =
102103
async def get_chunk_by_id(self, chunk_id: str):
103104
pass
104105

105-
# @abstractmethod
106-
# def sample_chunk_ids(
107-
# self, partition: str, n_ids: int = 100, seed: int | None = None
108-
# ):
109-
# pass
110106

107+
SCHEMA_VERSION_PROPERTY_KEY = "openrag.schema_version"
111108

112109
MAX_LENGTH = 65_535
113110

@@ -140,6 +137,7 @@ def __init__(self):
140137

141138
self.config = load_config()
142139
self.logger = get_logger()
140+
self.time_fields = ["datetime", "created_at", "updated_at", "indexed_at"]
143141

144142
# init milvus clients
145143
self.port = self.config.vectordb.get("port")
@@ -189,6 +187,7 @@ def load_collection(self):
189187
try:
190188
if self._client.has_collection(self.collection_name):
191189
self.logger.warning(f"Collection `{self.collection_name}` already exists. Loading it.")
190+
self._check_schema_version()
192191
else:
193192
self.logger.info("Creating empty collection")
194193
index_params = self._create_index()
@@ -212,6 +211,7 @@ def load_collection(self):
212211
collection_name=self.collection_name,
213212
operation="create_collection",
214213
)
214+
self._store_schema_version()
215215
try:
216216
self._client.load_collection(self.collection_name)
217217
self.collection_loaded = True
@@ -283,6 +283,9 @@ def _create_schema(self):
283283
dim=self.embedder.embedding_dimension,
284284
)
285285

286+
for time_field in self.time_fields:
287+
schema.add_field(field_name=time_field, datatype=DataType.TIMESTAMPTZ, nullable=True)
288+
286289
if self.hybrid_search:
287290
# Add sparse field for BM25 - this will be auto-generated
288291
schema.add_field(
@@ -336,9 +339,54 @@ def _create_index(self):
336339
"bm25_b": 0.75,
337340
},
338341
)
342+
# indexes for dates TIMESTAMPTZ field
343+
for time_field in self.time_fields:
344+
index_params.add_index(
345+
field_name=time_field,
346+
index_type="STL_SORT", # Index for TIMESTAMPTZ
347+
index_name=f"{time_field}_idx",
348+
)
339349

340350
return index_params
341351

352+
def _store_schema_version(self) -> None:
353+
"""Persist the configured schema_version as a collection property after collection creation."""
354+
schema_version = self.config.vectordb.get("schema_version")
355+
self._client.alter_collection_properties(
356+
collection_name=self.collection_name,
357+
properties={SCHEMA_VERSION_PROPERTY_KEY: str(schema_version)},
358+
)
359+
self.logger.info(f"Schema version {schema_version} stored on collection `{self.collection_name}`.")
360+
361+
def _check_schema_version(self) -> None:
362+
"""
363+
Read the stored schema version from collection properties and compare it
364+
against the configured schema_version. Raises VDBSchemaMigrationRequiredError
365+
if they diverge so the application fails fast instead of silently working on a
366+
stale schema.
367+
"""
368+
expected_version = self.config.vectordb.get("schema_version")
369+
desc = self._client.describe_collection(self.collection_name)
370+
props = desc.get("properties", {})
371+
raw = props.get(SCHEMA_VERSION_PROPERTY_KEY)
372+
373+
try:
374+
stored_version = int(raw) if raw is not None else 0
375+
except (ValueError, TypeError):
376+
stored_version = 0
377+
378+
if stored_version != expected_version:
379+
raise VDBSchemaMigrationRequiredError(
380+
f"Collection `{self.collection_name}` is at schema version {stored_version} "
381+
f"but the application requires version {expected_version}. "
382+
"Please perform the migration script.",
383+
collection_name=self.collection_name,
384+
stored_version=stored_version,
385+
expected_version=expected_version,
386+
)
387+
388+
self.logger.info(f"Collection `{self.collection_name}` schema version {stored_version} — OK.")
389+
342390
async def list_collections(self) -> list[str]:
343391
return self._client.list_collections()
344392

@@ -379,12 +427,14 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None:
379427
entities = []
380428
vectors = await self.embedder.aembed_documents(chunks)
381429
order_metadata_l: list[dict] = _gen_chunk_order_metadata(n=len(chunks))
430+
indexed_at = datetime.now(UTC).isoformat()
382431

383432
for chunk, vector, order_metadata in zip(chunks, vectors, order_metadata_l):
384433
entities.append(
385434
{
386435
"text": chunk.page_content,
387436
"vector": vector,
437+
"indexed_at": indexed_at,
388438
**order_metadata,
389439
**chunk.metadata,
390440
}
@@ -396,6 +446,7 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None:
396446
)
397447

398448
# insert file_id and partition into partition_file_manager
449+
file_metadata.update({"indexed_at": indexed_at})
399450
self.partition_file_manager.add_file_to_partition(
400451
file_id=file_id,
401452
partition=partition,

openrag/routers/extract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
- `partition`: Partition name
3333
- `page`: Page number in source document
3434
- `datetime`: Document date (if set)
35-
- `modified_at`: File modification timestamp
35+
- `updated_at`: File modification timestamp
3636
- `created_at`: File creation timestamp
3737
- `indexed_at`: Chunk indexing timestamp
3838
- Additional custom metadata

openrag/routers/indexer.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,30 @@
4949
# URL scheme configuration
5050
PREFERRED_URL_SCHEME = config.server.preferred_url_scheme
5151

52+
# DATETIME FIELDS: Fields provided by the client
53+
54+
TEMPORAL_FIELDS = ["datetime", "created_at", "updated_at"]
55+
56+
57+
def get_temporal_fields(metadata: dict) -> None:
58+
temporal_fields = {}
59+
60+
## Use provided created_at if available, otherwise extract from file system
61+
for field in TEMPORAL_FIELDS:
62+
datetime_str = metadata.get(field, None)
63+
if datetime_str:
64+
try:
65+
# Try parsing the provided datetime to ensure it's valid
66+
d = datetime.fromisoformat(datetime_str)
67+
temporal_fields[field] = d.isoformat()
68+
except ValueError:
69+
raise HTTPException(
70+
status_code=status.HTTP_400_BAD_REQUEST,
71+
detail=f"Invalid ISO 8601 datetime field ({datetime_str}) for field '{field}'.",
72+
)
73+
74+
return temporal_fields
75+
5276

5377
def build_url(request: Request, route_name: str, **path_params) -> str:
5478
"""Build a URL using the preferred scheme if configured."""
@@ -100,9 +124,14 @@ async def get_supported_types():
100124
"mimetype": "text/plain",
101125
"author": "John Doe",
102126
...
127+
"indexed_at": "2024-01-01T12:00:00Z" // Optional temporal field (ISO 8601)
103128
}
104129
```
105130
131+
**Temporal Fields:**
132+
- You can provide temporal fields such as `created_at`, `updated_at`, or `datetime` in the metadata for time-based queries and filtering.
133+
- Datetime values must be in ISO 8601 format (e.g., `2024-01-01T12:00:00Z`).
134+
106135
**Common Mimetypes:**
107136
- `text/plain` - Plain text files
108137
- `text/markdown` - Markdown files
@@ -161,9 +190,12 @@ async def add_file(
161190

162191
# Append extra metadata
163192
metadata["file_size"] = human_readable_size(file_stat.st_size)
164-
metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat()
165193
metadata["file_id"] = file_id
166194

195+
## Add temporal fields to metadata, using provided values if available, otherwise extracting from file system
196+
temporal_fields = get_temporal_fields(metadata)
197+
metadata.update(temporal_fields)
198+
167199
# Indexing the file
168200
task = indexer.add_file.remote(path=file_path, metadata=metadata, partition=partition, user=user)
169201
await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED")
@@ -224,9 +256,14 @@ async def delete_file(
224256
"mimetype": "text/plain",
225257
"author": "John Doe",
226258
...
259+
"indexed_at": "2024-01-01T12:00:00Z" // Optional temporal field (ISO 8601)
227260
}
228261
```
229262
263+
**Temporal Fields:**
264+
- You can provide temporal fields such as `created_at`, `updated_at`, or `datetime` in the metadata for time-based queries and filtering.
265+
- Datetime values must be in ISO 8601 format (e.g., `2024-01-01T12:00:00Z`).
266+
230267
**Response:**
231268
Returns 202 Accepted with a task status URL for tracking indexing progress.
232269
""",
@@ -277,9 +314,12 @@ async def put_file(
277314

278315
# Append extra metadata
279316
metadata["file_size"] = human_readable_size(file_stat.st_size)
280-
metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat()
281317
metadata["file_id"] = file_id
282318

319+
## Add temporal fields to metadata, using provided values if available, otherwise extracting from file system
320+
temporal_fields = get_temporal_fields(metadata)
321+
metadata.update(temporal_fields)
322+
283323
# Indexing the file
284324
task = indexer.add_file.remote(path=file_path, metadata=metadata, partition=partition, user=user)
285325
await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED")

0 commit comments

Comments
 (0)