|
62 | 62 | # Initialize EmbeddingClient for structured logging and retry logic |
63 | 63 | _embedding_client = EmbeddingClient() |
64 | 64 |
|
| 65 | +# Thread-local storage to track execution state inside futures |
| 66 | +_thread_state = threading.local() |
| 67 | + |
65 | 68 |
|
66 | 69 | def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None): |
67 | 70 | """ |
68 | 71 | Wrapper to acquire semaphore inside executor task to avoid deadlock. |
69 | 72 | The semaphore is acquired in the worker thread, not the main thread. |
| 73 | + Tracks execution state for debugging timeout issues. |
70 | 74 | """ |
| 75 | + # Initialize thread state tracking |
| 76 | + _thread_state.stage = "acquiring_semaphore" |
| 77 | + _thread_state.file_path = file_path |
| 78 | + _thread_state.chunk_index = chunk_index |
| 79 | + _thread_state.start_time = time.time() |
| 80 | + |
71 | 81 | semaphore.acquire() |
72 | 82 | try: |
73 | | - return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) |
| 83 | + _thread_state.stage = "calling_embed_text" |
| 84 | + logger.debug(f"Worker thread starting embed_text for {file_path} chunk {chunk_index}") |
| 85 | + result = _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) |
| 86 | + _thread_state.stage = "completed" |
| 87 | + logger.debug(f"Worker thread completed embed_text for {file_path} chunk {chunk_index}") |
| 88 | + return result |
| 89 | + except Exception as e: |
| 90 | + _thread_state.stage = f"exception: {type(e).__name__}" |
| 91 | + _thread_state.exception = str(e) |
| 92 | + logger.error(f"Worker thread exception in embed_text for {file_path} chunk {chunk_index}: {e}") |
| 93 | + raise |
74 | 94 | finally: |
| 95 | + _thread_state.stage = "releasing_semaphore" |
75 | 96 | semaphore.release() |
| 97 | + _thread_state.stage = "finished" |
76 | 98 |
|
77 | 99 |
|
78 | 100 | def detect_language(path: str): |
@@ -210,7 +232,61 @@ def _process_file_sync( |
210 | 232 | if embedding_duration > 5.0: |
211 | 233 | logger.warning(f"Slow embedding API response for {rel_path} chunk {idx}: {embedding_duration:.2f}s total") |
212 | 234 | except concurrent.futures.TimeoutError: |
213 | | - logger.error(f"Embedding API timeout ({EMBEDDING_TIMEOUT}s) for {rel_path} chunk {idx}") |
| 235 | + elapsed = time.time() - embedding_start_time |
| 236 | + |
| 237 | + # Try to get exception info from the future if available |
| 238 | + future_exception = None |
| 239 | + try: |
| 240 | + future_exception = future.exception(timeout=0.1) |
| 241 | + except concurrent.futures.TimeoutError: |
| 242 | + future_exception = None # Still running |
| 243 | + except Exception as e: |
| 244 | + future_exception = e |
| 245 | + |
| 246 | + # Build diagnostic information |
| 247 | + diagnostic_info = [ |
| 248 | + f"Future timeout ({EMBEDDING_TIMEOUT}s) for {rel_path} chunk {idx}:", |
| 249 | + f" - Elapsed time: {elapsed:.2f}s", |
| 250 | + f" - Future state: {future._state if hasattr(future, '_state') else 'unknown'}", |
| 251 | + ] |
| 252 | + |
| 253 | + if future_exception: |
| 254 | + diagnostic_info.append(f" - Future exception: {type(future_exception).__name__}: {future_exception}") |
| 255 | + else: |
| 256 | + diagnostic_info.append(f" - Future exception: None (still running or completed)") |
| 257 | + |
| 258 | + # Add information about running status |
| 259 | + if future.running(): |
| 260 | + diagnostic_info.append(f" - Future.running(): True - worker thread is still executing") |
| 261 | + elif future.done(): |
| 262 | + diagnostic_info.append(f" - Future.done(): True - worker thread completed but future.result() timed out retrieving result") |
| 263 | + else: |
| 264 | + diagnostic_info.append(f" - Future status: Pending/Unknown") |
| 265 | + |
| 266 | + # Generate curl command for debugging |
| 267 | + try: |
| 268 | + payload = { |
| 269 | + "model": _embedding_client.model, |
| 270 | + "input": chunk_doc.text, |
| 271 | + } |
| 272 | + curl_command = _embedding_client._generate_curl_command( |
| 273 | + _embedding_client.api_url, |
| 274 | + dict(_embedding_client.session.headers), |
| 275 | + payload |
| 276 | + ) |
| 277 | + except Exception as e: |
| 278 | + curl_command = f"Failed to generate curl command: {e}" |
| 279 | + |
| 280 | + diagnostic_info.extend([ |
| 281 | + f" - The future.result() call timed out after {EMBEDDING_TIMEOUT}s", |
| 282 | + f" - Embedding API state:", |
| 283 | + f" - API timeout: {_embedding_client.timeout}s", |
| 284 | + f" - Max retries: {_embedding_client.max_retries}", |
| 285 | + f" - Curl command to reproduce:", |
| 286 | + f"{curl_command}" |
| 287 | + ]) |
| 288 | + |
| 289 | + logger.error("\n".join(diagnostic_info)) |
214 | 290 | emb = None |
215 | 291 | failed_count += 1 |
216 | 292 | except Exception as e: |
@@ -241,10 +317,13 @@ def _process_file_sync( |
241 | 317 | print(err_content) |
242 | 318 | except Exception: |
243 | 319 | logger.exception("Failed to write chunk-insert error to disk for %s chunk %d", rel_path, idx) |
| 320 | + else: |
| 321 | + logger.debug(f"Skipping chunk {idx} for {rel_path} - no embedding vector available") |
244 | 322 |
|
245 | 323 | # Log batch completion with timing and status |
246 | 324 | batch_duration = time.time() - batch_start_time |
247 | | - logger.info(f"Completed batch {batch_num}/{num_batches} for {rel_path}: {saved_count} saved, {failed_count} failed, {batch_duration:.2f}s elapsed") |
| 325 | + batch_status = "FAILED" if failed_count > 0 and saved_count == 0 else ("PARTIAL" if failed_count > 0 else "SUCCESS") |
| 326 | + logger.info(f"Batch {batch_num}/{num_batches} for {rel_path} - Status: {batch_status} - {saved_count} saved, {failed_count} failed, {batch_duration:.2f}s elapsed") |
248 | 327 |
|
249 | 328 | return {"stored": True, "embedded": embedded_any, "skipped": False} |
250 | 329 | except Exception: |
|
0 commit comments