Skip to content

Commit cffaf6d

Browse files
authored
feat(lancedb): expose a shared connection for query (#1036)
1 parent 7df8617 commit cffaf6d

File tree

1 file changed

+32
-14
lines changed

1 file changed

+32
-14
lines changed

python/cocoindex/targets/lancedb.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
import datetime
21
import dataclasses
2+
import threading
33
import uuid
4+
import weakref
5+
import datetime
6+
47
from typing import Any
58

69
import lancedb # type: ignore
@@ -21,7 +24,6 @@
2124

2225
@dataclasses.dataclass
2326
class DatabaseOptions:
24-
read_consistency_interval: datetime.timedelta | None = None
2527
storage_options: dict[str, Any] | None = None
2628

2729

@@ -45,17 +47,33 @@ class _TableKey:
4547
table_name: str
4648

4749

48-
async def _open_db(
49-
db_uri: str, db_options: DatabaseOptions | None
50-
) -> lancedb.AsyncConnection:
51-
db_options = db_options or DatabaseOptions()
50+
_DbConnectionsLock = threading.Lock()
51+
_DbConnections: weakref.WeakValueDictionary[str, lancedb.AsyncConnection] = (
52+
weakref.WeakValueDictionary()
53+
)
54+
5255

53-
# TODO: reuse cached connections
54-
return await lancedb.connect_async(
55-
db_uri,
56-
read_consistency_interval=db_options.read_consistency_interval,
57-
storage_options=db_options.storage_options,
58-
)
56+
async def connect_async(
57+
db_uri: str,
58+
*,
59+
db_options: DatabaseOptions | None = None,
60+
read_consistency_interval: datetime.timedelta | None = None,
61+
) -> lancedb.AsyncConnection:
62+
"""
63+
Helper function to connect to a LanceDB database.
64+
It will reuse the connection if it already exists.
65+
The connection will be shared with the target used by cocoindex, so it achieves strong consistency.
66+
"""
67+
with _DbConnectionsLock:
68+
conn = _DbConnections.get(db_uri)
69+
if conn is None:
70+
db_options = db_options or DatabaseOptions()
71+
_DbConnections[db_uri] = conn = await lancedb.connect_async(
72+
db_uri,
73+
storage_options=db_options.storage_options,
74+
read_consistency_interval=read_consistency_interval,
75+
)
76+
return conn
5977

6078

6179
def make_pa_schema(
@@ -262,7 +280,7 @@ async def apply_setup_change(
262280
latest_state = current or previous
263281
if not latest_state:
264282
return
265-
db_conn = await _open_db(key.db_uri, latest_state.db_options)
283+
db_conn = await connect_async(key.db_uri, db_options=latest_state.db_options)
266284

267285
reuse_table = (
268286
previous is not None
@@ -291,7 +309,7 @@ async def prepare(
291309
spec: LanceDB,
292310
setup_state: _State,
293311
) -> _MutateContext:
294-
db_conn = await _open_db(spec.db_uri, spec.db_options)
312+
db_conn = await connect_async(spec.db_uri, db_options=spec.db_options)
295313
table = await db_conn.open_table(spec.table_name)
296314
return _MutateContext(
297315
table=table,

0 commit comments

Comments
 (0)