Skip to content

Conversation

@g-k-s-03
Copy link

@g-k-s-03 g-k-s-03 commented Dec 26, 2025

Closes #193

Description

This PR fixes a race condition in the lazy loading of the
SentenceTransformer model inside EmbeddingService.

Under concurrent async or multi-threaded access, multiple requests
could attempt to initialize the model at the same time, leading to
duplicate GPU memory usage or crashes.

Issue

The SentenceTransformer model was lazy-loaded without sufficient
thread safety, allowing concurrent initialization under async or
multi-threaded access.

Root Cause

Multiple requests could access the model property simultaneously,
leading to duplicate model instantiation and potential GPU memory waste.

Changes Made

  • Added a class-level lock around SentenceTransformer initialization
  • Used double-checked locking to ensure the model is instantiated only once
  • Prevented concurrent model initialization under async/threaded load
  • No behavior changes to embedding or summarization logic

Result

  • No concurrent model initialization
  • Prevents duplicate GPU memory allocation
  • Safe under async + executor workloads

Summary by CodeRabbit

  • New Features

    • Asynchronous profile search, summarization and embedding operations with explicit startup/shutdown/context management.
    • Public endpoints to request embeddings, profile summaries, searches, and richer model/tokenizer info.
  • Improvements

    • Shared model caching, thread/async executors and GPU concurrency controls for more efficient parallel processing.
    • Optional token-counting support and stricter guarantee that profile summaries include embeddings.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 26, 2025

📝 Walkthrough

Walkthrough

Refactored EmbeddingService to use a shared, thread-safe global model and explicit lifecycle; added executors, GPU concurrency controls, optional tokenization, async LLM invocation, expanded public API (embedding/LLM properties and methods), and made ProfileSummaryResult.embedding required.

Changes

Cohort / File(s) Summary
Core service implementation
backend/app/services/embedding_service/service.py
Replaced per-instance model with class-level _global_model guarded by _model_lock; added lazy, thread-safe initialization, explicit shutdown / shutdown_global, and async context management (__aenter__/__aexit__). Exposed model, llm, tokenizer, embedding_executor, and llm_executor properties.
Concurrency & GPU controls
backend/app/services/embedding_service/service.py
Added ThreadPoolExecutor pools, per-resource locks, _gpu_semaphores map and _get_gpu_semaphore() helper, and new constants (SAFE_BATCH_SIZE, EXECUTOR_MAX_WORKERS, DEFAULT_MAX_CONCURRENT_GPU_TASKS) to cap concurrent GPU-bound encoding tasks.
Tokenization & embedding flow
backend/app/services/embedding_service/service.py
Introduced optional tiktoken detection (TIKTOKEN_AVAILABLE), tokenizer property, _count_tokens() and _encode() helpers, batch-safe embedding paths, and updated get_embedding() / get_embeddings() to support async/fallback and max_concurrent_tasks.
LLM integration & profile workflows
backend/app/services/embedding_service/service.py
Added _invoke_llm() with async and sync fallback; implemented summarize_user_profile(), process_user_profile(), and search_similar_profiles() to call LLM, count tokens, produce embeddings, and interact with the datastore.
Data model / API surface
backend/app/services/embedding_service/service.py
Changed ProfileSummaryResult.embedding to required List[float]; expanded get_model_info() metadata (tokenizer availability, batch sizes, concurrency limits, model/state flags) and reorganized public methods/state exposure.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EmbedSvc as EmbeddingService
    participant LLM as LLMService
    participant Model as SentenceTransformer
    participant Store as DataStore

    rect rgb(232,240,255)
    Note over Client,EmbedSvc: Profile summarization (async-aware)
    Client->>EmbedSvc: summarize_user_profile(profile)
    EmbedSvc->>EmbedSvc: build_prompt(profile)
    EmbedSvc->>LLM: _invoke_llm(prompt) (async or sync fallback)
    alt LLM returns
        LLM-->>EmbedSvc: summary_text
    end
    EmbedSvc->>EmbedSvc: _count_tokens(summary_text)
    EmbedSvc->>EmbedSvc: _get_gpu_semaphore(limit)
    EmbedSvc->>Model: _encode(summary_text) (via embedding_executor, gated by GPU semaphore)
    Model-->>EmbedSvc: embedding
    EmbedSvc->>Store: store/update(profile_summary with embedding)
    EmbedSvc-->>Client: ProfileSummaryResult
    end

    rect rgb(232,255,232)
    Note over Client,EmbedSvc: Search flow
    Client->>EmbedSvc: search_similar_profiles(query)
    EmbedSvc->>Model: _encode(query) (executor + GPU semaphore)
    Model-->>EmbedSvc: query_embedding
    EmbedSvc->>Store: lookup_similar(query_embedding)
    Store-->>EmbedSvc: similar_profiles
    EmbedSvc-->>Client: results
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 With thread-lock whiskers snug and neat,

I guard the model so it's loaded once complete.
Executors hum, semaphores softly sing,
Tokens counted, embeddings take wing.
One shared load — hop, hop — what a treat! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning The PR includes extensive out-of-scope changes beyond the thread-safe model loading fix, including GPU concurrency controls, async/executor infrastructure, tokenization support, and refactored public API that were not required by issue #193. Scope the PR to only address thread-safe model loading as specified in issue #193, deferring additional enhancements (GPU semaphores, tokenization, executor management, API refactoring) to separate focused PRs.
Docstring Coverage ⚠️ Warning Docstring coverage is 10.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately summarizes the main change: fixing thread-safe lazy loading of the SentenceTransformer model, which is the core focus of the changeset.
Linked Issues check ✅ Passed The PR successfully addresses issue #193's requirements by implementing thread-safe lazy loading with a class-level lock and double-checked locking pattern to prevent concurrent model initialization.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
backend/app/services/embedding_service/service.py (2)

138-144: Consider logging tokenization failures for observability.

The bare except Exception: pass silently swallows errors. While the fallback is reasonable, logging at debug level would help diagnose issues with tiktoken encoding.

Proposed improvement
     def _count_tokens(self, text: str) -> int:
         if self.tokenizer:
             try:
                 return len(self.tokenizer.encode(text))
-            except Exception:
-                pass
+            except Exception as e:
+                logger.debug("Tokenization failed, using fallback: %s", e)
         return max(1, int(len(text.split()) * 1.3))

240-252: Consider making min_distance configurable.

The min_distance=0.5 is hardcoded here, while the underlying search_similar_contributors function defaults to 0.7. Consider exposing this as a parameter for flexibility.

Proposed improvement
     async def search_similar_profiles(
         self,
         query_text: str,
         limit: int = 10,
+        min_distance: float = 0.5,
         max_concurrent_tasks: Optional[int] = None,
     ) -> List[Dict[str, Any]]:
         query_embedding = await self.get_embedding(query_text, max_concurrent_tasks)
         from app.database.weaviate.operations import search_similar_contributors
         return await search_similar_contributors(
             query_embedding=query_embedding,
             limit=limit,
-            min_distance=0.5,
+            min_distance=min_distance,
         )
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8eeacad and 805ed26.

📒 Files selected for processing (1)
  • backend/app/services/embedding_service/service.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/embedding_service/service.py (3)
backend/app/models/database/weaviate.py (1)
  • WeaviateUserProfile (32-128)
backend/app/database/falkor/code-graph-backend/api/graph.py (1)
  • stats (592-609)
backend/app/database/weaviate/operations.py (2)
  • search_similar_contributors (115-164)
  • search_similar_contributors (347-352)
🪛 Ruff (0.14.10)
backend/app/services/embedding_service/service.py

103-103: Avoid specifying long messages outside the exception class

(TRY003)


142-143: try-except-pass detected, consider logging the exception

(S110)


142-142: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (3)
backend/app/services/embedding_service/service.py (3)

100-113: Thread-safe model loading correctly implemented.

The double-checked locking pattern with a class-level lock and model storage correctly addresses the race condition described in Issue #193. This ensures only one thread can initialize the model while others wait, and subsequent accesses are lock-free due to the outer check.


254-265: Calling get_model_info triggers model loading.

Accessing self.model.get_sentence_embedding_dimension() on line 258 will initialize the model if not already loaded. If this is unintended (e.g., for lightweight health checks), consider conditionally accessing this field.

Alternative if lazy info is needed
     def get_model_info(self) -> Dict[str, Any]:
+        model_loaded = EmbeddingService._global_model is not None
         return {
             "model_name": MODEL_NAME,
             "device": EMBEDDING_DEVICE,
-            "embedding_size": self.model.get_sentence_embedding_dimension(),
+            "embedding_size": (
+                EmbeddingService._global_model.get_sentence_embedding_dimension()
+                if model_loaded else None
+            ),
             "tiktoken_available": TIKTOKEN_AVAILABLE,
             "safe_batch_size": SAFE_BATCH_SIZE,
             "max_batch_size": MAX_BATCH_SIZE,
             "default_max_concurrent_gpu_tasks": DEFAULT_MAX_CONCURRENT_GPU_TASKS,
             "executor_workers": EXECUTOR_MAX_WORKERS,
-            "model_loaded": EmbeddingService._global_model is not None,
+            "model_loaded": model_loaded,
         }

284-288: Async context manager correctly implemented for lifecycle management.

The async with support provides clean resource management, ensuring shutdown() is called even on exceptions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
backend/app/services/embedding_service/service.py (2)

70-76: Mixing threading.Lock with asyncio primitives is functionally correct but worth reviewing.

The threading.Lock protects dictionary mutation in _gpu_semaphores, which is safe since dictionary operations are synchronous. However, since _get_gpu_semaphore is called from async code (line 156), consider whether asyncio.Lock would be more idiomatic.

Given that the service has both synchronous initialization and async execution paths, threading.Lock is appropriate here. This is an optional consideration for consistency.


103-103: Optional: Extract exception message to class constant.

The static analysis tool suggests avoiding long exception messages outside the exception class. While this is a minor style issue, you could extract the message to a class constant for consistency.

🔎 Optional refactor
+    _SHUTDOWN_ERROR_MSG = "EmbeddingService is shutting down"
+
     @property
     def model(self) -> SentenceTransformer:
         if self._shutting_down:
-            raise RuntimeError("EmbeddingService is shutting down")
+            raise RuntimeError(self._SHUTDOWN_ERROR_MSG)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 805ed26 and ac9553d.

📒 Files selected for processing (1)
  • backend/app/services/embedding_service/service.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/embedding_service/service.py (3)
backend/app/models/database/weaviate.py (1)
  • WeaviateUserProfile (32-128)
backend/app/database/falkor/code-graph-backend/api/graph.py (1)
  • stats (592-609)
backend/app/database/weaviate/operations.py (2)
  • search_similar_contributors (115-164)
  • search_similar_contributors (347-352)
🪛 Ruff (0.14.10)
backend/app/services/embedding_service/service.py

103-103: Avoid specifying long messages outside the exception class

(TRY003)


142-143: try-except-pass detected, consider logging the exception

(S110)


142-142: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (7)
backend/app/services/embedding_service/service.py (7)

1-35: LGTM! Good defensive configuration loading.

The imports support the thread-safety improvements, and the optional tiktoken handling with the availability flag is a good defensive pattern. Using getattr with sensible defaults for new config values ensures backward compatibility.


78-98: LGTM! Double-checked locking correctly implemented for executors.

The thread pool executor initialization uses proper double-checked locking with a shared lock. The thread name prefixes are helpful for debugging.


146-178: Empty input handling correctly implemented.

The early return for empty input at lines 152-154 properly addresses the previous review comment about avoiding torch.cat([]) errors. The implementation returns an empty tensor with the correct embedding dimension.

The async batching logic with semaphore-based GPU concurrency control is well-structured.


196-208: Good fallback pattern for LLM invocation.

The async-first approach with synchronous fallback ensures reliability when async invocation fails. The executor-based fallback maintains thread safety.


210-243: LGTM! Profile processing flows are well-structured.

The summarization and processing methods correctly use the async API and propagate the embedding through the result. The embedding is now required in ProfileSummaryResult, which is correctly populated at line 233.


289-293: LGTM! Async context manager properly delegates to shutdown.

The __aenter__ and __aexit__ methods correctly implement the async context manager protocol, ensuring cleanup when used with async with.


37-41: All code paths that construct ProfileSummaryResult provide the embedding field. The change to require the embedding is safe—there are no breaking usages.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
backend/app/services/embedding_service/service.py (1)

106-125: Race condition remains between instance shutdown check and model loading.

A past review comment flagged the race condition where self._shutting_down is checked at Line 109 outside the lock. The following sequence is still possible:

  1. Thread A checks self._shutting_down (False) at Line 109
  2. Thread B calls shutdown(), setting self._shutting_down = True
  3. Thread A acquires the lock and loads the model

While the impact is limited (the model is class-level and usable by other instances), it's inconsistent with the shutdown intent for this specific instance.

🔎 Proposed fix to recheck shutdown inside the lock
 @property
 def model(self) -> SentenceTransformer:
-    # First check instance shutdown flag
-    if self._shutting_down:
-        raise RuntimeError("This EmbeddingService instance is shutting down")
-
     # Then check and load model with proper locking
     if EmbeddingService._global_model is None:
         with EmbeddingService._global_model_lock:
+            # Check instance shutdown inside the lock
+            if self._shutting_down:
+                raise RuntimeError("This EmbeddingService instance is shutting down")
+            
             # Check global shutdown flag inside the lock
             if EmbeddingService._shutting_down_global:
                 raise RuntimeError("EmbeddingService globally is shutting down")
             
             if EmbeddingService._global_model is None:
                 logger.info("Loading embedding model: %s", MODEL_NAME)
                 EmbeddingService._global_model = SentenceTransformer(
                     MODEL_NAME,
                     device=EMBEDDING_DEVICE,
                 )
+    # Final check after lock
+    if self._shutting_down:
+        raise RuntimeError("This EmbeddingService instance is shutting down")
     return EmbeddingService._global_model
🧹 Nitpick comments (3)
backend/app/services/embedding_service/service.py (3)

208-223: Unused exception variable in fallback handler.

Line 212 catches Exception as e but never uses the variable. For consistency with Line 155 (in _count_tokens), consider including the exception in the log message.

🔎 Proposed fix
         except Exception as e:
             logger.exception(
-                "LLM invocation failed for profile=%s",
+                "LLM invocation failed for profile=%s: %s",
                 getattr(messages[0], "content", "")[:50],
+                e,
             )

275-288: get_model_info may raise during shutdown.

Line 279 accesses self.model, which raises RuntimeError if the instance or global service is shutting down. Consider whether get_model_info should gracefully handle this case (e.g., return shutdown state without attempting to load the model) or document that it may raise.

🔎 Proposed fix to handle shutdown gracefully
 def get_model_info(self) -> Dict[str, Any]:
+    try:
+        embedding_size = self.model.get_sentence_embedding_dimension()
+    except RuntimeError:
+        embedding_size = None
+    
     return {
         "model_name": MODEL_NAME,
         "device": EMBEDDING_DEVICE,
-        "embedding_size": self.model.get_sentence_embedding_dimension(),
+        "embedding_size": embedding_size,
         "tiktoken_available": TIKTOKEN_AVAILABLE,
         "safe_batch_size": SAFE_BATCH_SIZE,
         "max_batch_size": MAX_BATCH_SIZE,
         "default_max_concurrent_gpu_tasks": DEFAULT_MAX_CONCURRENT_GPU_TASKS,
         "executor_workers": EXECUTOR_MAX_WORKERS,
         "model_loaded": EmbeddingService._global_model is not None,
         "global_shutdown": EmbeddingService._shutting_down_global,
         "instance_shutdown": self._shutting_down,
     }

290-323: Clarify when to use shutdown() vs shutdown_global().

The separation of instance-level shutdown() and class-level shutdown_global() is good, but the usage contract is unclear:

  • When should callers invoke shutdown_global() vs just shutdown()?
  • Should shutdown() automatically call shutdown_global() when it's the last instance?
  • Can new instances be created after shutdown_global() is called?

Consider adding docstrings or comments explaining the lifecycle semantics.

📝 Suggested documentation
 def shutdown(self) -> None:
-    """Shutdown this instance and optionally the global model if no other instances exist."""
+    """
+    Shutdown this instance's resources (executors, LLM, tokenizer).
+    
+    The global shared model is NOT cleared. Other instances can continue
+    to use it. To fully unload the model and free GPU memory, call
+    shutdown_global() as a class method after all instances are shutdown.
+    """
     self._shutting_down = True
 @classmethod
 def shutdown_global(cls) -> None:
-    """Shutdown all global resources (model) shared across all instances."""
+    """
+    Shutdown all global resources (model) shared across all instances.
+    
+    This clears the global model and GPU cache. After calling this,
+    any existing or new instances attempting to access the model will
+    raise RuntimeError. This should only be called during application
+    shutdown after all instance-level shutdown() calls are complete.
+    """
     with cls._global_model_lock:
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ac9553d and 534be2a.

📒 Files selected for processing (1)
  • backend/app/services/embedding_service/service.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/embedding_service/service.py (2)
backend/app/models/database/weaviate.py (1)
  • WeaviateUserProfile (32-128)
backend/app/database/weaviate/operations.py (2)
  • search_similar_contributors (115-164)
  • search_similar_contributors (347-352)
🪛 Ruff (0.14.10)
backend/app/services/embedding_service/service.py

110-110: Avoid specifying long messages outside the exception class

(TRY003)


117-117: Avoid specifying long messages outside the exception class

(TRY003)


154-154: Do not catch blind exception: Exception

(BLE001)


212-212: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

🔇 Additional comments (3)
backend/app/services/embedding_service/service.py (3)

150-157: Token counting fallback now includes proper logging.

The exception logging at Line 155 addresses the previous review comment. The broad Exception catch is acceptable here given the fallback behavior and logging.

Based on past review comments, the logging was added to make tokenization failures visible.


44-44: No action required: the embedding field is properly required throughout the codebase.

The embedding field in ProfileSummaryResult is now required (changed from Optional[List[float]]), and this breaking change is properly implemented. There is only one instantiation point for ProfileSummaryResult (line 245-248) and it always provides a valid embedding. No code in the repository checks if the embedding field is None, confirming that all callers have been updated to handle the required field.


260-273: Move inline import to module level.

The import of search_similar_contributors from app.database.weaviate.operations at line 267 should be moved to the module level imports. No circular dependency exists, and keeping imports at the top of the file follows PEP 8 and improves code readability.

Likely an incorrect or invalid review comment.

@g-k-s-03
Copy link
Author

Hi @smokeyScraper ,

I’ve updated the EmbeddingService based on the CodeRabbit review:

Clarified lifecycle: class-level _global_model stays shared; instance shutdown cleans only per-instance resources. Added shutdown_global() to explicitly release the shared model.

Moved shutdown check inside the global lock to prevent potential race conditions.

Tokenizer exceptions are now logged instead of silently ignored.

All changes are in service.py. Could you please review these updates quickly and approve if everything looks good?

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

BUG:Race Condition Risk in EmbeddingService Model Initialization

1 participant