Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 113 additions & 28 deletions vectordb_bench/backend/clients/cockroachdb/cockroachdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CockroachDB(VectorDB):
FilterOp.StrEqual,
]

def __init__(
def __init__( # noqa: PLR0915
self,
dim: int,
db_config: dict,
Expand Down Expand Up @@ -154,14 +154,25 @@ def _create_connection_pool(self) -> ConnectionPool:
# Add statement timeout for long-running vector index operations
conninfo += " options='-c statement_timeout=600s'"

# Configure each connection with vector support and search parameters
def configure_connection(conn: Connection) -> None:
register_vector(conn)
# Set vector_search_beam_size on every connection for index usage
if self.case_config is not None:
search_param = self.case_config.search_param()
beam_size = search_param.get("vector_search_beam_size", 32)
with conn.cursor() as cur:
cur.execute(f"SET vector_search_beam_size = {beam_size}")
conn.commit()

return ConnectionPool(
conninfo=conninfo,
min_size=self.pool_size,
max_size=self.pool_size + self.max_overflow,
max_lifetime=self.pool_recycle,
max_idle=300,
reconnect_timeout=10.0,
configure=lambda conn: register_vector(conn),
configure=configure_connection,
)

@contextmanager
Expand Down Expand Up @@ -304,41 +315,115 @@ def _create_index(self):
finally:
conn.close()

def _wait_for_index_creation(self, start_time: float) -> None:
"""Wait for background index creation to complete after connection timeout."""
import time

max_wait = 300 # 5 minutes max
poll_interval = 5
waited = 0

while waited < max_wait:
time.sleep(poll_interval)
waited += poll_interval

# Create fresh connection to check status
try:
check_conn = psycopg.connect(**self.connect_config)
check_cursor = check_conn.cursor()
try:
# Check if index exists
check_cursor.execute(
"SELECT 1 FROM pg_indexes WHERE tablename = %s AND indexname = %s",
(self.table_name, self._index_name),
)
if check_cursor.fetchone():
# Index exists! Verify it's usable by doing a quick test query
try:
from psycopg import sql

check_cursor.execute(
sql.SQL("SELECT 1 FROM {} LIMIT 1").format(sql.Identifier(self.table_name))
)
check_cursor.fetchone()
total_time = time.time() - start_time
log.info(f"Index {self._index_name} created successfully (total time: {total_time:.1f}s)")
except Exception as query_error:
# Index not yet usable
log.info(f"Index exists but not yet usable... ({waited}s elapsed, error: {query_error})")
else:
return
finally:
check_cursor.close()
check_conn.close()
except Exception as check_error:
log.warning(f"Error checking index status: {check_error}")
# Continue waiting

# Timeout waiting for index
msg = f"Timeout waiting for index {self._index_name} after {waited}s"
log.error(msg)
raise RuntimeError(msg)

def optimize(self, data_size: int | None = None):
"""Post-insert optimization: create index if needed.

Note: Uses connection pool instead of creating new connection to avoid
subprocess timeout issues in CockroachDB.
Note: On multi-node clusters, CockroachDB v25.4 may close connections
at 30s during CREATE VECTOR INDEX from subprocess contexts. The index
creation continues in background. We handle this gracefully by checking
if the index was created successfully after timeout.
"""
log.info(f"{self.name} post-insert optimization")
if self.case_config is not None and self.case_config.create_index_after_load:
# Use existing pool connection instead of creating new one
with self.pool.connection() as conn:
register_vector(conn)
conn.autocommit = True
cursor = conn.cursor()
import time

# Build CREATE INDEX SQL
index_param = self.case_config.index_param()
options_list = []
for option in index_param["index_creation_with_options"]:
if option["val"] is not None:
options_list.append(f"{option['option_name']} = {option['val']}")

with_clause = f" WITH ({', '.join(options_list)})" if options_list else ""
sql_str = (
f"CREATE VECTOR INDEX IF NOT EXISTS {self._index_name} "
f"ON {self.table_name} ({self._vector_field} {index_param['metric']})"
f"{with_clause}"
)

try:
# Build CREATE INDEX SQL (SKIP DROP to avoid timeouts)
index_param = self.case_config.index_param()
options_list = []
for option in index_param["index_creation_with_options"]:
if option["val"] is not None:
options_list.append(f"{option['option_name']} = {option['val']}")

with_clause = f" WITH ({', '.join(options_list)})" if options_list else ""
sql_str = (
f"CREATE VECTOR INDEX IF NOT EXISTS {self._index_name} "
f"ON {self.table_name} ({self._vector_field} {index_param['metric']})"
f"{with_clause}"
)
log.info(f"{self.name} creating vector index: {self._index_name}")
log.info(f"Index SQL: {sql_str}")

log.info(f"{self.name} creating vector index: {self._index_name}")
log.info(f"Index SQL: {sql_str}")
cursor.execute(sql_str)
start_time = time.time()
connection_closed = False

finally:
cursor.close()
# Try to create index
try:
with self.pool.connection() as conn:
register_vector(conn)
conn.autocommit = True
cursor = conn.cursor()
try:
cursor.execute(sql_str)
elapsed = time.time() - start_time
log.info(f"{self.name} index created successfully in {elapsed:.1f}s")
return # Success!
finally:
cursor.close()
except Exception as e:
elapsed = time.time() - start_time
# Check if this is the expected 30s timeout on multi-node clusters
if "server closed the connection" in str(e) or "connection" in str(e).lower():
log.warning(f"Connection closed after {elapsed:.1f}s during index creation: {e}")
log.info("This is expected on multi-node clusters - checking if index was created...")
connection_closed = True
else:
# Unexpected error, re-raise
raise

# Connection closed - wait for background index creation to complete
if connection_closed:
self._wait_for_index_creation(start_time)

@db_retry(max_attempts=3, initial_delay=0.5, backoff_factor=2.0)
def insert_embeddings(
Expand Down