@@ -154,14 +154,25 @@ def _create_connection_pool(self) -> ConnectionPool:
154154 # Add statement timeout for long-running vector index operations
155155 conninfo += " options='-c statement_timeout=600s'"
156156
157+ # Configure each connection with vector support and search parameters
158+ def configure_connection (conn ):
159+ register_vector (conn )
160+ # Set vector_search_beam_size on every connection for index usage
161+ if self .case_config is not None :
162+ search_param = self .case_config .search_param ()
163+ beam_size = search_param .get ("vector_search_beam_size" , 32 )
164+ with conn .cursor () as cur :
165+ cur .execute (f"SET vector_search_beam_size = { beam_size } " )
166+ conn .commit ()
167+
157168 return ConnectionPool (
158169 conninfo = conninfo ,
159170 min_size = self .pool_size ,
160171 max_size = self .pool_size + self .max_overflow ,
161172 max_lifetime = self .pool_recycle ,
162173 max_idle = 300 ,
163174 reconnect_timeout = 10.0 ,
164- configure = lambda conn : register_vector ( conn ) ,
175+ configure = configure_connection ,
165176 )
166177
167178 @contextmanager
@@ -307,38 +318,105 @@ def _create_index(self):
307318 def optimize (self , data_size : int | None = None ):
308319 """Post-insert optimization: create index if needed.
309320
310- Note: Uses connection pool instead of creating new connection to avoid
311- subprocess timeout issues in CockroachDB.
321+ Note: On multi-node clusters, CockroachDB v25.4 may close connections
322+ at 30s during CREATE VECTOR INDEX from subprocess contexts. The index
323+ creation continues in background. We handle this gracefully by checking
324+ if the index was created successfully after timeout.
312325 """
313326 log .info (f"{ self .name } post-insert optimization" )
314327 if self .case_config is not None and self .case_config .create_index_after_load :
315- # Use existing pool connection instead of creating new one
316- with self .pool .connection () as conn :
317- register_vector (conn )
318- conn .autocommit = True
319- cursor = conn .cursor ()
320-
321- try :
322- # Build CREATE INDEX SQL (SKIP DROP to avoid timeouts)
323- index_param = self .case_config .index_param ()
324- options_list = []
325- for option in index_param ["index_creation_with_options" ]:
326- if option ["val" ] is not None :
327- options_list .append (f"{ option ['option_name' ]} = { option ['val' ]} " )
328-
329- with_clause = f" WITH ({ ', ' .join (options_list )} )" if options_list else ""
330- sql_str = (
331- f"CREATE VECTOR INDEX IF NOT EXISTS { self ._index_name } "
332- f"ON { self .table_name } ({ self ._vector_field } { index_param ['metric' ]} )"
333- f"{ with_clause } "
334- )
335-
336- log .info (f"{ self .name } creating vector index: { self ._index_name } " )
337- log .info (f"Index SQL: { sql_str } " )
338- cursor .execute (sql_str )
339-
340- finally :
341- cursor .close ()
328+ import time
329+
330+ # Build CREATE INDEX SQL
331+ index_param = self .case_config .index_param ()
332+ options_list = []
333+ for option in index_param ["index_creation_with_options" ]:
334+ if option ["val" ] is not None :
335+ options_list .append (f"{ option ['option_name' ]} = { option ['val' ]} " )
336+
337+ with_clause = f" WITH ({ ', ' .join (options_list )} )" if options_list else ""
338+ sql_str = (
339+ f"CREATE VECTOR INDEX IF NOT EXISTS { self ._index_name } "
340+ f"ON { self .table_name } ({ self ._vector_field } { index_param ['metric' ]} )"
341+ f"{ with_clause } "
342+ )
343+
344+ log .info (f"{ self .name } creating vector index: { self ._index_name } " )
345+ log .info (f"Index SQL: { sql_str } " )
346+
347+ start_time = time .time ()
348+ connection_closed = False
349+
350+ # Try to create index
351+ try :
352+ with self .pool .connection () as conn :
353+ register_vector (conn )
354+ conn .autocommit = True
355+ cursor = conn .cursor ()
356+ try :
357+ cursor .execute (sql_str )
358+ elapsed = time .time () - start_time
359+ log .info (f"{ self .name } index created successfully in { elapsed :.1f} s" )
360+ return # Success!
361+ finally :
362+ cursor .close ()
363+ except Exception as e :
364+ elapsed = time .time () - start_time
365+ # Check if this is the expected 30s timeout on multi-node clusters
366+ if "server closed the connection" in str (e ) or "connection" in str (e ).lower ():
367+ log .warning (f"Connection closed after { elapsed :.1f} s during index creation: { e } " )
368+ log .info (f"This is expected on multi-node clusters - checking if index was created..." )
369+ connection_closed = True
370+ else :
371+ # Unexpected error, re-raise
372+ raise
373+
374+ # Connection closed - wait for background index creation to complete
375+ if connection_closed :
376+ max_wait = 300 # 5 minutes max
377+ poll_interval = 5
378+ waited = 0
379+
380+ while waited < max_wait :
381+ time .sleep (poll_interval )
382+ waited += poll_interval
383+
384+ # Create fresh connection to check status
385+ try :
386+ check_conn = psycopg .connect (** self .connect_config )
387+ check_cursor = check_conn .cursor ()
388+ try :
389+ # Check if index exists
390+ check_cursor .execute (
391+ "SELECT 1 FROM pg_indexes WHERE tablename = %s AND indexname = %s" ,
392+ (self .table_name , self ._index_name ),
393+ )
394+ if check_cursor .fetchone ():
395+ # Index exists! Verify it's usable by doing a quick test query
396+ try :
397+ check_cursor .execute (f"SELECT 1 FROM { self .table_name } LIMIT 1" )
398+ check_cursor .fetchone ()
399+ total_time = time .time () - start_time
400+ log .info (
401+ f"✅ Index { self ._index_name } created successfully (total time: { total_time :.1f} s)"
402+ )
403+ return
404+ except Exception as query_error :
405+ # Index not yet usable
406+ log .info (
407+ f"Index exists but not yet usable... ({ waited } s elapsed, error: { query_error } )"
408+ )
409+ finally :
410+ check_cursor .close ()
411+ check_conn .close ()
412+ except Exception as check_error :
413+ log .warning (f"Error checking index status: { check_error } " )
414+ # Continue waiting
415+
416+ # Timeout waiting for index
417+ msg = f"Timeout waiting for index { self ._index_name } after { waited } s"
418+ log .error (msg )
419+ raise RuntimeError (msg )
342420
343421 @db_retry (max_attempts = 3 , initial_delay = 0.5 , backoff_factor = 2.0 )
344422 def insert_embeddings (
0 commit comments