Skip to content

Commit 1fc1a2c

Browse files
CopilotMte90
andauthored
Add EmbeddingClient to openai.py with detailed logging and retries (#10)
Co-authored-by: Mte90 <[email protected]> Co-authored-by: copilot-swe-agent[bot] <[email protected]>
1 parent 9cb1905 commit 1fc1a2c

File tree

3 files changed

+205
-23
lines changed

3 files changed

+205
-23
lines changed

ai/analyzer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
search_vectors as _search_vectors,
1919
get_chunk_text as _get_chunk_text,
2020
)
21-
from .openai import get_embedding_for_text, call_coding_api
21+
from .openai import call_coding_api, EmbeddingClient
2222
from llama_index.core import Document
2323
from utils.logger import get_logger
2424
from utils import compute_file_hash, chunk_text, norm, cosine
@@ -59,15 +59,18 @@
5959

6060
logger = get_logger(__name__)
6161

62+
# Initialize EmbeddingClient for structured logging and retry logic
63+
_embedding_client = EmbeddingClient()
6264

63-
def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, model: Optional[str] = None):
65+
66+
def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None):
6467
"""
6568
Wrapper to acquire semaphore inside executor task to avoid deadlock.
6669
The semaphore is acquired in the worker thread, not the main thread.
6770
"""
6871
semaphore.acquire()
6972
try:
70-
return get_embedding_for_text(text, model)
73+
return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index)
7174
finally:
7275
semaphore.release()
7376

@@ -192,7 +195,7 @@ def _process_file_sync(
192195
for idx, chunk_doc in batch:
193196
# Submit task to executor; semaphore will be acquired inside the worker
194197
embedding_start_time = time.time()
195-
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, embedding_model)
198+
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model)
196199
embedding_futures.append((idx, chunk_doc, future, embedding_start_time))
197200

198201
# Wait for batch to complete and store results
@@ -434,7 +437,7 @@ def search_semantic(query: str, database_path: str, top_k: int = 5):
434437
Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns
435438
a list of {file_id, path, chunk_index, score}.
436439
"""
437-
q_emb = get_embedding_for_text(query)
440+
q_emb = _embedding_client.embed_text(query, file_path="<query>", chunk_index=0)
438441
if not q_emb:
439442
return []
440443

ai/llama_integration.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
from typing import List
55
from llama_index.core import Document
66

7-
from .openai import get_embedding_for_text
7+
from .openai import EmbeddingClient
88
from utils.logger import get_logger
99

1010
logger = get_logger(__name__)
1111

12+
# Create a module-level embedding client instance
13+
_embedding_client = EmbeddingClient()
14+
1215

1316
def llama_index_retrieve_documents(query: str, database_path: str, top_k: int = 5,
1417
search_func=None, get_chunk_func=None) -> List[Document]:
@@ -28,7 +31,7 @@ def llama_index_retrieve_documents(query: str, database_path: str, top_k: int =
2831
if search_func is None or get_chunk_func is None:
2932
raise ValueError("search_func and get_chunk_func must be provided")
3033

31-
q_emb = get_embedding_for_text(query)
34+
q_emb = _embedding_client.embed_text(query, file_path="<query>", chunk_index=0)
3235
if not q_emb:
3336
return []
3437

ai/openai.py

Lines changed: 192 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import Optional
1+
from typing import Optional, List, Dict, Any
22
import os
33
import time
4+
import uuid
5+
import json
6+
import logging
7+
import traceback
48
import threading
59
from openai import OpenAI
10+
import requests
611

712
from utils.config import CFG
813

@@ -13,6 +18,9 @@
1318
DEFAULT_EMBEDDING_MODEL = CFG.get("embedding_model")
1419
DEFAULT_CODING_MODEL = CFG.get("coding_model")
1520

21+
# Embedding client logger
22+
_embedding_logger = logging.getLogger("ai.analyzer.embedding")
23+
1624
# Rate limiting configuration
1725
_RATE_LIMIT_CALLS = 100 # max calls per minute
1826
_RATE_LIMIT_WINDOW = 60.0 # seconds
@@ -100,24 +108,192 @@ def _retry_with_backoff(func, *args, **kwargs):
100108
time.sleep(delay)
101109

102110

103-
def get_embedding_for_text(text: str, model: Optional[str] = None):
111+
class EmbeddingError(Exception):
112+
"""Custom exception for embedding failures"""
113+
pass
114+
115+
116+
class EmbeddingClient:
104117
"""
105-
Return embedding vector (list[float]) using the new OpenAI client.
106-
Includes rate limiting, retry logic with exponential backoff, and circuit breaker.
107-
model: optional model id; if not provided, uses DEFAULT_EMBEDDING_MODEL from CFG.
118+
Embedding client with detailed logging, retry logic, and configurable timeouts.
119+
Provides better debugging for embedding API failures.
108120
"""
109-
model_to_use = model or DEFAULT_EMBEDDING_MODEL
110-
if not model_to_use:
111-
raise RuntimeError("No embedding model configured. Set EMBEDDING_MODEL in .env or pass model argument.")
121+
def __init__(self,
122+
api_url: Optional[str] = None,
123+
api_key: Optional[str] = None,
124+
model: Optional[str] = None,
125+
timeout: float = 30.0,
126+
max_retries: int = 2,
127+
backoff: float = 1.5):
128+
self.api_url = api_url or CFG.get("api_url")
129+
self.api_key = api_key or CFG.get("api_key")
130+
self.model = model or DEFAULT_EMBEDDING_MODEL or "text-embedding-3-small"
131+
self.timeout = timeout
132+
self.max_retries = max_retries
133+
self.backoff = backoff
134+
self.session = requests.Session()
135+
if self.api_key:
136+
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
137+
self.session.headers.update({"Content-Type": "application/json"})
112138

113-
def _get_embedding():
114-
resp = _client.embeddings.create(model=model_to_use, input=text)
115-
return resp.data[0].embedding
116-
117-
try:
118-
return _retry_with_backoff(_get_embedding)
119-
except Exception as e:
120-
raise RuntimeError(f"Failed to obtain embedding from OpenAI client: {e}") from e
139+
def _log_request_start(self, request_id: str, file_path: str, chunk_index: int, chunk_len: int):
140+
_embedding_logger.debug(
141+
"Embedding request START",
142+
extra={
143+
"request_id": request_id,
144+
"file": file_path,
145+
"chunk_index": chunk_index,
146+
"chunk_length": chunk_len,
147+
"model": self.model,
148+
"api_url": self.api_url,
149+
"timeout": self.timeout,
150+
},
151+
)
152+
153+
def _log_request_end(self, request_id: str, elapsed: float, status: Optional[int], response_body_preview: str):
154+
_embedding_logger.debug(
155+
"Embedding request END",
156+
extra={
157+
"request_id": request_id,
158+
"elapsed_s": elapsed,
159+
"status": status,
160+
"response_preview": response_body_preview,
161+
},
162+
)
163+
164+
def embed_text(self, text: str, file_path: str = "<unknown>", chunk_index: int = 0) -> List[float]:
165+
"""
166+
Embed a single chunk of text. Returns the embedding vector.
167+
Raises EmbeddingError on failure.
168+
"""
169+
request_id = str(uuid.uuid4())
170+
chunk_len = len(text)
171+
self._log_request_start(request_id, file_path, chunk_index, chunk_len)
172+
173+
payload = {
174+
"model": self.model,
175+
"input": text,
176+
}
177+
178+
attempt = 0
179+
err_msg = ""
180+
while True:
181+
attempt += 1
182+
start = time.perf_counter()
183+
try:
184+
resp = self.session.post(
185+
self.api_url,
186+
data=json.dumps(payload),
187+
timeout=self.timeout,
188+
)
189+
elapsed = time.perf_counter() - start
190+
191+
# Try to parse JSON safely
192+
try:
193+
resp_json = resp.json()
194+
except Exception:
195+
resp_json = None
196+
197+
preview = ""
198+
if resp_json is not None:
199+
preview = json.dumps(resp_json)[:1000]
200+
else:
201+
preview = (resp.text or "")[:1000]
202+
203+
self._log_request_end(request_id, elapsed, resp.status_code, preview)
204+
205+
if resp.status_code >= 200 and resp.status_code < 300:
206+
# expected format: {"data": [{"embedding": [...]}], ...}
207+
if not resp_json:
208+
raise EmbeddingError(f"Empty JSON response (status={resp.status_code})")
209+
try:
210+
# tolerant extraction
211+
data = resp_json.get("data") if isinstance(resp_json, dict) else None
212+
if data and isinstance(data, list) and len(data) > 0:
213+
emb = data[0].get("embedding")
214+
if emb and isinstance(emb, list):
215+
_embedding_logger.info(
216+
"Embedding succeeded",
217+
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index},
218+
)
219+
return emb
220+
# Fallback: maybe top-level "embedding" key
221+
if isinstance(resp_json, dict) and "embedding" in resp_json:
222+
emb = resp_json["embedding"]
223+
if isinstance(emb, list):
224+
return emb
225+
raise EmbeddingError(f"Unexpected embedding response shape: {resp_json}")
226+
except KeyError as e:
227+
raise EmbeddingError(f"Missing keys in embedding response: {e}")
228+
else:
229+
# Non-2xx
230+
_embedding_logger.warning(
231+
"Embedding API returned non-2xx",
232+
extra={
233+
"request_id": request_id,
234+
"status_code": resp.status_code,
235+
"file": file_path,
236+
"chunk_index": chunk_index,
237+
"attempt": attempt,
238+
"body_preview": preview,
239+
},
240+
)
241+
# fall through to retry logic
242+
err_msg = f"Status {resp.status_code}: {preview}"
243+
244+
except requests.Timeout as e:
245+
elapsed = time.perf_counter() - start
246+
err_msg = f"Timeout after {elapsed:.2f}s: {e}"
247+
_embedding_logger.error("Embedding API Timeout", extra={"request_id": request_id, "error": str(e)})
248+
except requests.RequestException as e:
249+
elapsed = time.perf_counter() - start
250+
err_msg = f"RequestException after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
251+
_embedding_logger.error("Embedding request exception", extra={"request_id": request_id, "error": err_msg})
252+
except Exception as e:
253+
elapsed = time.perf_counter() - start
254+
err_msg = f"Unexpected error after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
255+
_embedding_logger.exception("Unexpected embedding exception", extra={"request_id": request_id})
256+
257+
# Retry logic
258+
if attempt > self.max_retries:
259+
_embedding_logger.error(
260+
"Max retries exceeded for embedding request",
261+
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index, "attempts": attempt},
262+
)
263+
raise EmbeddingError(f"Failed to get embedding after {attempt} attempts. Last error: {err_msg}")
264+
265+
# Backoff and retry
266+
sleep_for = self.backoff * (2 ** (attempt - 1))
267+
_embedding_logger.info(
268+
"Retrying embedding request",
269+
extra={
270+
"request_id": request_id,
271+
"file": file_path,
272+
"chunk_index": chunk_index,
273+
"attempt": attempt,
274+
"sleep_s": sleep_for,
275+
},
276+
)
277+
time.sleep(sleep_for)
278+
279+
def embed_multiple(self, chunks: List[str], file_path: str = "<unknown>") -> List[Dict[str, Any]]:
280+
"""
281+
Embed a list of text chunks. Returns list of dicts: {"chunk_index": i, "embedding": [...]}.
282+
This method logs progress and errors for each chunk.
283+
"""
284+
results = []
285+
for i, chunk in enumerate(chunks):
286+
try:
287+
emb = self.embed_text(chunk, file_path=file_path, chunk_index=i)
288+
results.append({"chunk_index": i, "embedding": emb})
289+
except EmbeddingError as e:
290+
_embedding_logger.error(
291+
"Failed to embed chunk",
292+
extra={"file": file_path, "chunk_index": i, "error": str(e)},
293+
)
294+
# append a failure marker or skip depending on desired behavior
295+
results.append({"chunk_index": i, "embedding": None, "error": str(e)})
296+
return results
121297

122298

123299
def call_coding_api(prompt: str, model: Optional[str] = None, max_tokens: int = 1024):

0 commit comments

Comments
 (0)