Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions ai/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
search_vectors as _search_vectors,
get_chunk_text as _get_chunk_text,
)
from .openai import get_embedding_for_text, call_coding_api
from .openai import call_coding_api, EmbeddingClient
from llama_index.core import Document
from utils.logger import get_logger
from utils import compute_file_hash, chunk_text, norm, cosine
Expand Down Expand Up @@ -59,15 +59,20 @@

logger = get_logger(__name__)

# Initialize EmbeddingClient for structured logging and retry logic
_embedding_client = EmbeddingClient()

def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, model: Optional[str] = None):

def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None):
"""
Wrapper to acquire semaphore inside executor task to avoid deadlock.
The semaphore is acquired in the worker thread, not the main thread.
Now uses EmbeddingClient for better logging and error handling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot line useless

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rimosso. Commit: c9310f1

"""
semaphore.acquire()
try:
return get_embedding_for_text(text, model)
# Use the embedding client with enhanced logging
return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index)
finally:
semaphore.release()

Expand Down Expand Up @@ -192,7 +197,7 @@ def _process_file_sync(
for idx, chunk_doc in batch:
# Submit task to executor; semaphore will be acquired inside the worker
embedding_start_time = time.time()
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, embedding_model)
future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model)
embedding_futures.append((idx, chunk_doc, future, embedding_start_time))

# Wait for batch to complete and store results
Expand Down Expand Up @@ -434,7 +439,7 @@ def search_semantic(query: str, database_path: str, top_k: int = 5):
Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns
a list of {file_id, path, chunk_index, score}.
"""
q_emb = get_embedding_for_text(query)
q_emb = _embedding_client.embed_text(query, file_path="<query>", chunk_index=0)
if not q_emb:
return []

Expand Down
211 changes: 210 additions & 1 deletion ai/openai.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import Optional
from typing import Optional, List, Dict, Any
import os
import time
import uuid
import json
import logging
import traceback
import threading
from openai import OpenAI
import requests

from utils.config import CFG

Expand All @@ -13,6 +18,22 @@
DEFAULT_EMBEDDING_MODEL = CFG.get("embedding_model")
DEFAULT_CODING_MODEL = CFG.get("coding_model")

# Embedding client logger
_embedding_logger = logging.getLogger("ai.analyzer.embedding")

# Embedding client configuration (can override via environment)
EMBEDDING_API_URL = os.getenv("PICOCODE_EMBEDDING_URL", CFG.get("api_url", "https://example.com/v1/embeddings"))
EMBEDDING_API_KEY = os.getenv("PICOCODE_EMBEDDING_API_KEY", CFG.get("api_key", ""))
DEFAULT_TIMEOUT = float(os.getenv("PICOCODE_EMBEDDING_TIMEOUT", "30")) # seconds per request
MAX_RETRIES = int(os.getenv("PICOCODE_EMBEDDING_RETRIES", "2"))
BACKOFF_FACTOR = float(os.getenv("PICOCODE_EMBEDDING_BACKOFF", "1.5"))
EMBEDDING_MODEL_NAME = os.getenv("PICOCODE_EMBEDDING_MODEL", DEFAULT_EMBEDDING_MODEL or "text-embedding-3-small")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot this lines doesn't make any sense this settings already exits check the config files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corretto. EmbeddingClient ora usa direttamente i valori da CFG (API_URL, API_KEY, EMBEDDING_MODEL) invece di duplicarli. Solo timeout, retries e backoff rimangono come override via environment. Commit: c9310f1


# Optionally enable requests debug logging by setting PICOCODE_HTTP_DEBUG=true
if os.getenv("PICOCODE_HTTP_DEBUG", "").lower() in ("1", "true", "yes"):
logging.getLogger("requests").setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.DEBUG)

# Rate limiting configuration
_RATE_LIMIT_CALLS = 100 # max calls per minute
_RATE_LIMIT_WINDOW = 60.0 # seconds
Expand Down Expand Up @@ -100,6 +121,194 @@ def _retry_with_backoff(func, *args, **kwargs):
time.sleep(delay)


class EmbeddingError(Exception):
"""Custom exception for embedding failures"""
pass


class EmbeddingClient:
"""
Embedding client with detailed logging, retry logic, and configurable timeouts.
Provides better debugging for embedding API failures.
"""
def __init__(self,
api_url: str = EMBEDDING_API_URL,
api_key: str = EMBEDDING_API_KEY,
model: str = EMBEDDING_MODEL_NAME,
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES,
backoff: float = BACKOFF_FACTOR):
self.api_url = api_url
self.api_key = api_key
self.model = model
self.timeout = timeout
self.max_retries = max_retries
self.backoff = backoff
self.session = requests.Session()
if api_key:
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
self.session.headers.update({"Content-Type": "application/json"})

def _log_request_start(self, request_id: str, file_path: str, chunk_index: int, chunk_len: int):
_embedding_logger.debug(
"Embedding request START",
extra={
"request_id": request_id,
"file": file_path,
"chunk_index": chunk_index,
"chunk_length": chunk_len,
"model": self.model,
"api_url": self.api_url,
"timeout": self.timeout,
},
)

def _log_request_end(self, request_id: str, elapsed: float, status: Optional[int], response_body_preview: str):
_embedding_logger.debug(
"Embedding request END",
extra={
"request_id": request_id,
"elapsed_s": elapsed,
"status": status,
"response_preview": response_body_preview,
},
)

def embed_text(self, text: str, file_path: str = "<unknown>", chunk_index: int = 0) -> List[float]:
"""
Embed a single chunk of text. Returns the embedding vector.
Raises EmbeddingError on failure.
"""
request_id = str(uuid.uuid4())
chunk_len = len(text)
self._log_request_start(request_id, file_path, chunk_index, chunk_len)

payload = {
"model": self.model,
"input": text,
}

attempt = 0
err_msg = ""
while True:
attempt += 1
start = time.perf_counter()
try:
resp = self.session.post(
self.api_url,
data=json.dumps(payload),
timeout=self.timeout,
)
elapsed = time.perf_counter() - start

# Try to parse JSON safely
try:
resp_json = resp.json()
except Exception:
resp_json = None

preview = ""
if resp_json is not None:
preview = json.dumps(resp_json)[:1000]
else:
preview = (resp.text or "")[:1000]

self._log_request_end(request_id, elapsed, resp.status_code, preview)

if resp.status_code >= 200 and resp.status_code < 300:
# expected format: {"data": [{"embedding": [...]}], ...}
if not resp_json:
raise EmbeddingError(f"Empty JSON response (status={resp.status_code})")
try:
# tolerant extraction
data = resp_json.get("data") if isinstance(resp_json, dict) else None
if data and isinstance(data, list) and len(data) > 0:
emb = data[0].get("embedding")
if emb and isinstance(emb, list):
_embedding_logger.info(
"Embedding succeeded",
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index},
)
return emb
# Fallback: maybe top-level "embedding" key
if isinstance(resp_json, dict) and "embedding" in resp_json:
emb = resp_json["embedding"]
if isinstance(emb, list):
return emb
raise EmbeddingError(f"Unexpected embedding response shape: {resp_json}")
except KeyError as e:
raise EmbeddingError(f"Missing keys in embedding response: {e}")
else:
# Non-2xx
_embedding_logger.warning(
"Embedding API returned non-2xx",
extra={
"request_id": request_id,
"status_code": resp.status_code,
"file": file_path,
"chunk_index": chunk_index,
"attempt": attempt,
"body_preview": preview,
},
)
# fall through to retry logic
err_msg = f"Status {resp.status_code}: {preview}"

except requests.Timeout as e:
elapsed = time.perf_counter() - start
err_msg = f"Timeout after {elapsed:.2f}s: {e}"
_embedding_logger.error("Embedding API Timeout", extra={"request_id": request_id, "error": str(e)})
except requests.RequestException as e:
elapsed = time.perf_counter() - start
err_msg = f"RequestException after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
_embedding_logger.error("Embedding request exception", extra={"request_id": request_id, "error": err_msg})
except Exception as e:
elapsed = time.perf_counter() - start
err_msg = f"Unexpected error after {elapsed:.2f}s: {e}\n{traceback.format_exc()}"
_embedding_logger.exception("Unexpected embedding exception", extra={"request_id": request_id})

# Retry logic
if attempt > self.max_retries:
_embedding_logger.error(
"Max retries exceeded for embedding request",
extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index, "attempts": attempt},
)
raise EmbeddingError(f"Failed to get embedding after {attempt} attempts. Last error: {err_msg}")

# Backoff and retry
sleep_for = self.backoff * (2 ** (attempt - 1))
_embedding_logger.info(
"Retrying embedding request",
extra={
"request_id": request_id,
"file": file_path,
"chunk_index": chunk_index,
"attempt": attempt,
"sleep_s": sleep_for,
},
)
time.sleep(sleep_for)

def embed_multiple(self, chunks: List[str], file_path: str = "<unknown>") -> List[Dict[str, Any]]:
"""
Embed a list of text chunks. Returns list of dicts: {"chunk_index": i, "embedding": [...]}.
This method logs progress and errors for each chunk.
"""
results = []
for i, chunk in enumerate(chunks):
try:
emb = self.embed_text(chunk, file_path=file_path, chunk_index=i)
results.append({"chunk_index": i, "embedding": emb})
except EmbeddingError as e:
_embedding_logger.error(
"Failed to embed chunk",
extra={"file": file_path, "chunk_index": i, "error": str(e)},
)
# append a failure marker or skip depending on desired behavior
results.append({"chunk_index": i, "embedding": None, "error": str(e)})
return results


def get_embedding_for_text(text: str, model: Optional[str] = None):
"""
Return embedding vector (list[float]) using the new OpenAI client.
Expand Down