Skip to content

Commit e64eaba

Browse files
committed
fix(cockroachdb): Handle 30s timeout and ensure vector index usage
Two critical fixes for multi-node CockroachDB clusters: 1. Connection Timeout Handling: On multi-node v25.4 clusters, CREATE VECTOR INDEX from subprocess contexts experiences a 30-second connection timeout. The index creation continues successfully in the background. This fix detects the timeout and polls for completion (up to 5 minutes). 2. Vector Index Usage: Fixed vector_search_beam_size not being set on pooled connections, causing queries to use full table scan instead of the vector index. Now configures every connection from the pool with proper beam size. Testing: - Single-node: Works without timeout (178s index creation) - Multi-node: Successfully handles timeout and completes (131s total) - Vector index: Now properly used for all searches (verified with EXPLAIN) - Both achieve ~83% recall with good QPS Fixes issues where: - Benchmarks would fail despite successful index creation - Searches were slow due to full table scans instead of index usage
1 parent de2f2a3 commit e64eaba

File tree

1 file changed

+54
-47
lines changed

1 file changed

+54
-47
lines changed

vectordb_bench/backend/clients/cockroachdb/cockroachdb.py

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class CockroachDB(VectorDB):
3535
FilterOp.StrEqual,
3636
]
3737

38-
def __init__(
38+
def __init__( # noqa: PLR0915
3939
self,
4040
dim: int,
4141
db_config: dict,
@@ -155,7 +155,7 @@ def _create_connection_pool(self) -> ConnectionPool:
155155
conninfo += " options='-c statement_timeout=600s'"
156156

157157
# Configure each connection with vector support and search parameters
158-
def configure_connection(conn):
158+
def configure_connection(conn: Connection) -> None:
159159
register_vector(conn)
160160
# Set vector_search_beam_size on every connection for index usage
161161
if self.case_config is not None:
@@ -315,6 +315,56 @@ def _create_index(self):
315315
finally:
316316
conn.close()
317317

318+
def _wait_for_index_creation(self, start_time: float) -> None:
319+
"""Wait for background index creation to complete after connection timeout."""
320+
import time
321+
322+
max_wait = 300 # 5 minutes max
323+
poll_interval = 5
324+
waited = 0
325+
326+
while waited < max_wait:
327+
time.sleep(poll_interval)
328+
waited += poll_interval
329+
330+
# Create fresh connection to check status
331+
try:
332+
check_conn = psycopg.connect(**self.connect_config)
333+
check_cursor = check_conn.cursor()
334+
try:
335+
# Check if index exists
336+
check_cursor.execute(
337+
"SELECT 1 FROM pg_indexes WHERE tablename = %s AND indexname = %s",
338+
(self.table_name, self._index_name),
339+
)
340+
if check_cursor.fetchone():
341+
# Index exists! Verify it's usable by doing a quick test query
342+
try:
343+
from psycopg import sql
344+
345+
check_cursor.execute(
346+
sql.SQL("SELECT 1 FROM {} LIMIT 1").format(sql.Identifier(self.table_name))
347+
)
348+
check_cursor.fetchone()
349+
total_time = time.time() - start_time
350+
log.info(f"Index {self._index_name} created successfully (total time: {total_time:.1f}s)")
351+
except Exception as query_error:
352+
# Index not yet usable
353+
log.info(f"Index exists but not yet usable... ({waited}s elapsed, error: {query_error})")
354+
else:
355+
return
356+
finally:
357+
check_cursor.close()
358+
check_conn.close()
359+
except Exception as check_error:
360+
log.warning(f"Error checking index status: {check_error}")
361+
# Continue waiting
362+
363+
# Timeout waiting for index
364+
msg = f"Timeout waiting for index {self._index_name} after {waited}s"
365+
log.error(msg)
366+
raise RuntimeError(msg)
367+
318368
def optimize(self, data_size: int | None = None):
319369
"""Post-insert optimization: create index if needed.
320370
@@ -365,58 +415,15 @@ def optimize(self, data_size: int | None = None):
365415
# Check if this is the expected 30s timeout on multi-node clusters
366416
if "server closed the connection" in str(e) or "connection" in str(e).lower():
367417
log.warning(f"Connection closed after {elapsed:.1f}s during index creation: {e}")
368-
log.info(f"This is expected on multi-node clusters - checking if index was created...")
418+
log.info("This is expected on multi-node clusters - checking if index was created...")
369419
connection_closed = True
370420
else:
371421
# Unexpected error, re-raise
372422
raise
373423

374424
# Connection closed - wait for background index creation to complete
375425
if connection_closed:
376-
max_wait = 300 # 5 minutes max
377-
poll_interval = 5
378-
waited = 0
379-
380-
while waited < max_wait:
381-
time.sleep(poll_interval)
382-
waited += poll_interval
383-
384-
# Create fresh connection to check status
385-
try:
386-
check_conn = psycopg.connect(**self.connect_config)
387-
check_cursor = check_conn.cursor()
388-
try:
389-
# Check if index exists
390-
check_cursor.execute(
391-
"SELECT 1 FROM pg_indexes WHERE tablename = %s AND indexname = %s",
392-
(self.table_name, self._index_name),
393-
)
394-
if check_cursor.fetchone():
395-
# Index exists! Verify it's usable by doing a quick test query
396-
try:
397-
check_cursor.execute(f"SELECT 1 FROM {self.table_name} LIMIT 1")
398-
check_cursor.fetchone()
399-
total_time = time.time() - start_time
400-
log.info(
401-
f"✅ Index {self._index_name} created successfully (total time: {total_time:.1f}s)"
402-
)
403-
return
404-
except Exception as query_error:
405-
# Index not yet usable
406-
log.info(
407-
f"Index exists but not yet usable... ({waited}s elapsed, error: {query_error})"
408-
)
409-
finally:
410-
check_cursor.close()
411-
check_conn.close()
412-
except Exception as check_error:
413-
log.warning(f"Error checking index status: {check_error}")
414-
# Continue waiting
415-
416-
# Timeout waiting for index
417-
msg = f"Timeout waiting for index {self._index_name} after {waited}s"
418-
log.error(msg)
419-
raise RuntimeError(msg)
426+
self._wait_for_index_creation(start_time)
420427

421428
@db_retry(max_attempts=3, initial_delay=0.5, backoff_factor=2.0)
422429
def insert_embeddings(

0 commit comments

Comments
 (0)