Skip to content

Commit cc565c0

Browse files
committed
feat(lancedb): automatic schema migration and collection_metadata centralization
- schema_manager: add _get_sql_default_for_pa_type() to map PyArrow types to LanceDB SQL default expressions - Replace _validate_schema_fields with _ensure_schema_fields(): add missing columns to existing tables instead of raising; use add_columns() with type-appropriate defaults - _create_table: when table exists and schema given, call _ensure_schema_fields for best-effort schema evolution - ensure_chunks_table / ensure_embeddings_table: remove required_fields validation and long docstrings; rely on auto-migration - Add ensure_collection_metadata_table(); trim docstrings for main_pointers, prompt_templates, ingestion_runs - collection_manager: use ensure_collection_metadata_table(conn) in save path, remove _ensure_metadata_table() and pyarrow import - Add tests/core/tools/core/RAG_tools/LanceDB/test_schema_migration.py for default mapping, auto-migration, idempotency, and _ensure_schema_fields Migrated from FenixAOS fix/schema_adapter. Made-with: Cursor
1 parent 7845220 commit cc565c0

File tree

3 files changed

+187
-149
lines changed

3 files changed

+187
-149
lines changed

src/xagent/core/tools/core/RAG_tools/LanceDB/schema_manager.py

Lines changed: 73 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4+
from typing import Any
45

56
import pyarrow as pa # type: ignore
67
from lancedb.db import DBConnection
@@ -15,6 +16,7 @@
1516
"ensure_main_pointers_table",
1617
"ensure_prompt_templates_table",
1718
"ensure_ingestion_runs_table",
19+
"ensure_collection_metadata_table",
1820
]
1921

2022

@@ -26,18 +28,27 @@ def _table_exists(conn: DBConnection, name: str) -> bool:
2628
return False
2729

2830

29-
def _validate_schema_fields(
30-
conn: DBConnection, table_name: str, required_fields: list[str]
31-
) -> None:
32-
"""Validate that an existing table contains all required fields.
31+
def _get_sql_default_for_pa_type(pa_type: Any) -> str:
32+
"""Map PyArrow type to LanceDB SQL default value expression."""
33+
if pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type):
34+
return "''"
35+
if pa.types.is_integer(pa_type):
36+
return "0"
37+
if pa.types.is_floating(pa_type):
38+
return "0.0"
39+
if pa.types.is_boolean(pa_type):
40+
return "false"
41+
if pa.types.is_timestamp(pa_type):
42+
return "CAST(NULL AS TIMESTAMP)"
43+
return "NULL"
44+
3345

34-
Args:
35-
conn: LanceDB connection
36-
table_name: Name of the table to validate
37-
required_fields: List of required field names
46+
def _ensure_schema_fields(
47+
conn: DBConnection, table_name: str, target_schema: Any
48+
) -> None:
49+
"""Ensure an existing table matches the target schema by adding missing columns.
3850
39-
Raises:
40-
ValueError: If the table exists but is missing required fields.
51+
Only ADDS missing columns. Does not delete extra columns nor modify existing types.
4152
"""
4253
if not _table_exists(conn, table_name):
4354
return
@@ -46,34 +57,37 @@ def _validate_schema_fields(
4657
table = conn.open_table(table_name)
4758
existing_schema = table.schema
4859
existing_field_names = {field.name for field in existing_schema}
49-
50-
missing_fields = [f for f in required_fields if f not in existing_field_names]
60+
missing_fields = [
61+
f for f in target_schema if f.name not in existing_field_names
62+
]
5163

5264
if missing_fields:
53-
error_msg = (
54-
f"Table '{table_name}' exists but is missing required fields: {missing_fields}. "
55-
f"This is likely due to a schema upgrade. "
56-
f"Please delete the existing table or manually add the missing fields. "
57-
f"Note: During development, we do not provide automatic migration scripts. "
58-
f"To upgrade, you can either:\n"
59-
f"1. Delete the table (data will be lost): conn.drop_table('{table_name}')\n"
60-
f"2. Manually add the missing fields using LanceDB's schema update capabilities"
65+
logger.info(
66+
"Auto-migrating schema for table '%s'. Adding missing fields: %s",
67+
table_name,
68+
[f.name for f in missing_fields],
6169
)
62-
logger.error(error_msg)
63-
raise ValueError(error_msg)
64-
except ValueError:
65-
# Re-raise ValueError (our validation error)
66-
raise
70+
new_cols = {}
71+
for field in missing_fields:
72+
default_expr = _get_sql_default_for_pa_type(field.type)
73+
new_cols[field.name] = default_expr
74+
try:
75+
table.add_columns(new_cols)
76+
logger.info("Successfully migrated schema for table '%s'", table_name)
77+
except Exception as e:
78+
logger.error("Failed to add columns to table '%s': %s", table_name, e)
6779
except Exception as e:
68-
# Log other errors but don't fail - schema validation is best-effort
6980
logger.warning(
70-
f"Could not validate schema for table '{table_name}': {e}. "
71-
f"Proceeding with table creation/usage."
81+
"Could not validate or migrate schema for table '%s': %s",
82+
table_name,
83+
e,
7284
)
7385

7486

75-
def _create_table(conn: DBConnection, name: str, schema: object | None = None) -> None:
87+
def _create_table(conn: DBConnection, name: str, schema: Any | None = None) -> None:
7688
if _table_exists(conn, name):
89+
if schema:
90+
_ensure_schema_fields(conn, name, schema)
7791
return
7892
conn.create_table(name, schema=schema)
7993

@@ -139,32 +153,9 @@ def ensure_parses_table(conn: DBConnection) -> None:
139153
def ensure_chunks_table(conn: DBConnection) -> None:
140154
"""Ensure the chunks table exists with proper schema.
141155
142-
This function creates the table if it doesn't exist, and validates that
143-
existing tables contain all required fields (especially 'metadata').
144-
145-
Args:
146-
conn: LanceDB connection
147-
148-
Raises:
149-
ValueError: If the table exists but is missing required fields.
150-
This typically happens when an old table schema doesn't include
151-
the 'metadata' field. During development, we do not provide
152-
automatic migration scripts. Users must either delete the table
153-
or manually add the missing fields.
154-
155-
Note:
156-
There's no upgrade path for existing chunks tables. Any deployment
157-
with an existing table will hit schema-mismatch errors once the pipeline
158-
starts writing a column that doesn't exist. If you encounter this error,
159-
you need to either delete the existing table or manually add the missing
160-
'metadata' field.
156+
If the table already exists, we attempt best-effort schema evolution by
157+
adding any missing columns (see _ensure_schema_fields).
161158
"""
162-
# Required fields that must exist in the table (especially for schema validation)
163-
required_fields = ["metadata"]
164-
165-
# Validate existing table schema before creating/using it
166-
_validate_schema_fields(conn, "chunks", required_fields)
167-
168159
schema = pa.schema(
169160
[
170161
pa.field("collection", pa.string()),
@@ -192,36 +183,11 @@ def ensure_embeddings_table(
192183
) -> None:
193184
"""Ensure the embeddings table exists with proper schema.
194185
195-
This function creates the table if it doesn't exist, and validates that
196-
existing tables contain all required fields (especially 'metadata').
197-
198-
Args:
199-
conn: LanceDB connection
200-
model_tag: Model tag used to construct the table name (e.g., 'bge_large')
201-
vector_dim: Optional vector dimension for fixed-size vectors
202-
203-
Raises:
204-
ValueError: If the table exists but is missing required fields.
205-
This typically happens when an old table schema doesn't include
206-
the 'metadata' field. During development, we do not provide
207-
automatic migration scripts. Users must either delete the table
208-
or manually add the missing fields.
209-
210-
Note:
211-
There's no upgrade path for existing embeddings tables. Any deployment
212-
with an existing table will hit schema-mismatch errors once the pipeline
213-
starts writing a column that doesn't exist. If you encounter this error,
214-
you need to either delete the existing table or manually add the missing
215-
'metadata' field.
186+
If the table already exists, we attempt best-effort schema evolution by
187+
adding any missing columns (see _ensure_schema_fields).
216188
"""
217189
table_name = f"embeddings_{model_tag}"
218190

219-
# Required fields that must exist in the table (especially for schema validation)
220-
required_fields = ["metadata"]
221-
222-
# Validate existing table schema before creating/using it
223-
_validate_schema_fields(conn, table_name, required_fields)
224-
225191
# Support dynamic vector dimension: if provided, create a FixedSizeList; otherwise allow variable-length
226192
vector_field_type = (
227193
pa.list_(pa.float32(), list_size=vector_dim)
@@ -252,11 +218,6 @@ def ensure_embeddings_table(
252218

253219

254220
def ensure_main_pointers_table(conn: DBConnection) -> None:
255-
"""Ensure the main_pointers table exists with proper schema.
256-
257-
Args:
258-
conn: LanceDB connection
259-
"""
260221
schema = pa.schema(
261222
[
262223
pa.field("collection", pa.string()),
@@ -274,11 +235,6 @@ def ensure_main_pointers_table(conn: DBConnection) -> None:
274235

275236

276237
def ensure_prompt_templates_table(conn: DBConnection) -> None:
277-
"""Ensure the prompt_templates table exists with proper schema.
278-
279-
Args:
280-
conn: LanceDB connection
281-
"""
282238
table_name = "prompt_templates"
283239
schema = pa.schema(
284240
[
@@ -311,13 +267,6 @@ def ensure_prompt_templates_table(conn: DBConnection) -> None:
311267

312268

313269
def ensure_ingestion_runs_table(conn: DBConnection) -> None:
314-
"""Ensure the ingestion_runs table exists with proper schema.
315-
316-
This table tracks the status of document ingestion processes.
317-
318-
Args:
319-
conn: LanceDB connection
320-
"""
321270
schema = pa.schema(
322271
[
323272
pa.field("collection", pa.string()),
@@ -346,3 +295,29 @@ def ensure_ingestion_runs_table(conn: DBConnection) -> None:
346295
)
347296

348297
_create_table(conn, "ingestion_runs", schema=schema)
298+
299+
300+
def ensure_collection_metadata_table(conn: DBConnection) -> None:
301+
"""Ensure the collection_metadata table exists with proper schema."""
302+
schema = pa.schema(
303+
[
304+
pa.field("name", pa.string()),
305+
pa.field("schema_version", pa.string()),
306+
pa.field("embedding_model_id", pa.string()),
307+
pa.field("embedding_dimension", pa.int32()),
308+
pa.field("documents", pa.int32()),
309+
pa.field("processed_documents", pa.int32()),
310+
pa.field("parses", pa.int32()),
311+
pa.field("chunks", pa.int32()),
312+
pa.field("embeddings", pa.int32()),
313+
pa.field("document_names", pa.string()),
314+
pa.field("collection_locked", pa.bool_()),
315+
pa.field("allow_mixed_parse_methods", pa.bool_()),
316+
pa.field("skip_config_validation", pa.bool_()),
317+
pa.field("created_at", pa.timestamp("us")),
318+
pa.field("updated_at", pa.timestamp("us")),
319+
pa.field("last_accessed_at", pa.timestamp("us")),
320+
pa.field("extra_metadata", pa.string()),
321+
]
322+
)
323+
_create_table(conn, "collection_metadata", schema=schema)

src/xagent/core/tools/core/RAG_tools/management/collection_manager.py

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
from functools import wraps
1313
from typing import Any, Awaitable, Callable, Optional, TypeVar
1414

15-
import pyarrow as pa # type: ignore
16-
1715
from ......providers.vector_store.lancedb import DBConnection, get_connection_from_env
1816
from ..core.parser_registry import get_supported_parsers, validate_parser_compatibility
1917
from ..core.schemas import CollectionInfo
18+
from ..LanceDB.schema_manager import ensure_collection_metadata_table
2019
from ..utils.model_resolver import resolve_embedding_adapter
2120
from ..utils.string_utils import escape_lancedb_string
2221

@@ -198,8 +197,8 @@ async def _save_collection_with_retry(
198197

199198
for attempt in range(max_retries):
200199
try:
201-
# Ensure collection_metadata table exists
202-
await self._ensure_metadata_table()
200+
# Ensure collection_metadata table exists and is up to date
201+
ensure_collection_metadata_table(conn)
203202

204203
# Prepare data for storage
205204
data = collection.to_storage()
@@ -235,53 +234,6 @@ async def _save_collection_with_retry(
235234
)
236235
await asyncio.sleep(wait_time)
237236

238-
async def _ensure_metadata_table(self) -> None:
239-
"""Ensure collection_metadata table exists in LanceDB.
240-
241-
Creates the table if it doesn't exist, otherwise does nothing.
242-
"""
243-
244-
conn = await self._get_connection()
245-
246-
schema = pa.schema(
247-
[
248-
("name", pa.string()),
249-
("schema_version", pa.string()),
250-
("embedding_model_id", pa.string()), # Nullable
251-
("embedding_dimension", pa.int32()), # Nullable
252-
("documents", pa.int32()),
253-
("processed_documents", pa.int32()),
254-
("parses", pa.int32()),
255-
("chunks", pa.int32()),
256-
("embeddings", pa.int32()),
257-
("document_names", pa.string()), # JSON string
258-
("collection_locked", pa.bool_()),
259-
("allow_mixed_parse_methods", pa.bool_()),
260-
("skip_config_validation", pa.bool_()),
261-
("created_at", pa.timestamp("us")),
262-
("updated_at", pa.timestamp("us")),
263-
("last_accessed_at", pa.timestamp("us")),
264-
("extra_metadata", pa.string()), # JSON string
265-
]
266-
)
267-
268-
# Check if table already exists
269-
table_names_fn = getattr(conn, "table_names", None)
270-
table_exists = False
271-
if table_names_fn:
272-
try:
273-
existing_tables = table_names_fn()
274-
table_exists = "collection_metadata" in existing_tables
275-
except Exception as e:
276-
logger.debug(f"Table names check failed: {e}")
277-
278-
if not table_exists:
279-
try:
280-
conn.create_table("collection_metadata", schema=schema)
281-
except Exception as e:
282-
logger.debug(f"Table creation failed (may already exist): {e}")
283-
# Table might already exist, continue
284-
285237
async def initialize_collection_embedding(
286238
self, collection_name: str, embedding_model_id: str
287239
) -> "CollectionInfo":

0 commit comments

Comments
 (0)