11import dataclasses
2+ import logging
23import threading
34import uuid
45import weakref
1920 VectorTypeSchema ,
2021 TableType ,
2122)
22- from ..index import IndexOptions
23+ from ..index import VectorIndexDef , IndexOptions , VectorSimilarityMetric
24+
25+ _logger = logging .getLogger (__name__ )
26+
27+ _LANCEDB_VECTOR_METRIC : dict [VectorSimilarityMetric , str ] = {
28+ VectorSimilarityMetric .COSINE_SIMILARITY : "cosine" ,
29+ VectorSimilarityMetric .L2_DISTANCE : "l2" ,
30+ VectorSimilarityMetric .INNER_PRODUCT : "dot" ,
31+ }
2332
2433
25- @dataclasses .dataclass
2634class DatabaseOptions :
2735 storage_options : dict [str , Any ] | None = None
2836
@@ -33,11 +41,18 @@ class LanceDB(op.TargetSpec):
3341 db_options : DatabaseOptions | None = None
3442
3543
44+ @dataclasses .dataclass
45+ class _VectorIndex :
46+ name : str
47+ field_name : str
48+ metric : VectorSimilarityMetric
49+
50+
3651@dataclasses .dataclass
3752class _State :
3853 key_field_schema : FieldSchema
3954 value_fields_schema : list [FieldSchema ]
40- index_options : IndexOptions
55+ vector_indexes : list [ _VectorIndex ] | None = None
4156 db_options : DatabaseOptions | None = None
4257
4358
@@ -233,6 +248,37 @@ class _MutateContext:
233248 pa_schema : pa .Schema
234249
235250
251+ # Not used for now, because of https://github.com/lancedb/lance/issues/3443
252+ #
253+ # async def _update_table_schema(
254+ # table: lancedb.AsyncTable,
255+ # expected_schema: pa.Schema,
256+ # ) -> None:
257+ # existing_schema = await table.schema()
258+ # unseen_existing_field_names = {field.name: field for field in existing_schema}
259+ # new_columns = []
260+ # updated_columns = []
261+ # for field in expected_schema:
262+ # existing_field = unseen_existing_field_names.pop(field.name, None)
263+ # if existing_field is None:
264+ # new_columns.append(field)
265+ # else:
266+ # if field.type != existing_field.type:
267+ # updated_columns.append(
268+ # {
269+ # "path": field.name,
270+ # "data_type": field.type,
271+ # "nullable": field.nullable,
272+ # }
273+ # )
274+ # if new_columns:
275+ # table.add_columns(new_columns)
276+ # if updated_columns:
277+ # table.alter_columns(*updated_columns)
278+ # if unseen_existing_field_names:
279+ # table.drop_columns(unseen_existing_field_names.keys())
280+
281+
236282@op .target_connector (
237283 spec_cls = LanceDB , persistent_key_type = _TableKey , setup_state_cls = _State
238284)
@@ -254,7 +300,18 @@ def get_setup_state(
254300 key_field_schema = key_fields_schema [0 ],
255301 value_fields_schema = value_fields_schema ,
256302 db_options = spec .db_options ,
257- index_options = index_options ,
303+ vector_indexes = (
304+ [
305+ _VectorIndex (
306+ name = f"__{ index .field_name } __{ _LANCEDB_VECTOR_METRIC [index .metric ]} __idx" ,
307+ field_name = index .field_name ,
308+ metric = index .metric ,
309+ )
310+ for index in index_options .vector_indexes
311+ ]
312+ if index_options .vector_indexes is not None
313+ else None
314+ ),
258315 )
259316
260317 @staticmethod
@@ -292,17 +349,62 @@ async def apply_setup_change(
292349 if not reuse_table :
293350 await db_conn .drop_table (key .table_name , ignore_missing = True )
294351
295- if current is not None :
296- if not reuse_table :
297- await db_conn .create_table (
352+ if current is None :
353+ return
354+
355+ table : lancedb .AsyncTable | None = None
356+ if reuse_table :
357+ try :
358+ table = await db_conn .open_table (key .table_name )
359+ except Exception as e : # pylint: disable=broad-exception-caught
360+ _logger .warning (
361+ "Exception in opening table %s, creating it" ,
298362 key .table_name ,
299- schema = make_pa_schema (
300- current .key_field_schema , current .value_fields_schema
301- ),
302- exist_ok = True ,
363+ exc_info = e ,
303364 )
365+ table = None
366+
367+ if table is None :
368+ table = await db_conn .create_table (
369+ key .table_name ,
370+ schema = make_pa_schema (
371+ current .key_field_schema , current .value_fields_schema
372+ ),
373+ mode = "overwrite" ,
374+ )
375+ await table .create_index (
376+ current .key_field_schema .name , config = lancedb .index .BTree ()
377+ )
304378
305- # TODO: deal with the index options
379+ unseen_prev_vector_indexes = {
380+ index .name for index in (previous and previous .vector_indexes ) or []
381+ }
382+ existing_vector_indexes = {index .name for index in await table .list_indices ()}
383+
384+ for index in current .vector_indexes or []:
385+ if index .name in unseen_prev_vector_indexes :
386+ unseen_prev_vector_indexes .remove (index .name )
387+ else :
388+ try :
389+ await table .create_index (
390+ index .field_name ,
391+ name = index .name ,
392+ config = lancedb .index .HnswPq (
393+ distance_type = _LANCEDB_VECTOR_METRIC [index .metric ]
394+ ),
395+ )
396+ except Exception as e : # pylint: disable=broad-exception-caught
397+ raise RuntimeError (
398+ f"Exception in creating index on field { index .field_name } . "
399+ f"This may be caused by a limitation of LanceDB, "
400+ f"which requires data existing in the table to train the index. "
401+ f"See: https://github.com/lancedb/lance/issues/4034" ,
402+ index .name ,
403+ ) from e
404+
405+ for vector_index_name in unseen_prev_vector_indexes :
406+ if vector_index_name in existing_vector_indexes :
407+ await table .drop_index (vector_index_name )
306408
307409 @staticmethod
308410 async def prepare (
0 commit comments