Skip to content

Conversation

@smahdavi4
Copy link
Collaborator

@smahdavi4 smahdavi4 commented Jan 30, 2026

Fixing the following:

  • missing a semaphore in recipe for concurrent generations
  • Litellm diskcache generates one file per request on the disk, which is not network-disk friendly. Implementing a hybrid cache to keep the cache in memory and periodically save it to a file.

Summary by CodeRabbit

  • New Features

    • Implemented stable hybrid caching system combining in-memory storage with periodic disk persistence, ensuring deterministic cache operations and reliable state recovery across runs
  • Improvements

    • Enhanced proof generation concurrency handling with optimized request throttling for improved system performance and resource utilization

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: Sadegh Mahdavi <[email protected]>
Signed-off-by: Sadegh Mahdavi <[email protected]>
@smahdavi4 smahdavi4 requested a review from Kipok January 30, 2026 19:05
Signed-off-by: Sadegh Mahdavi <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

The changes introduce a hybrid caching system that combines in-memory storage with periodic disk persistence. StableLiteLLMCache replaces the previous disk-based litellm cache, providing deterministic cache key generation. Script generation is updated to throttle concurrent executions using an async semaphore.

Changes

Cohort / File(s) Summary
Hybrid Cache Implementation
nemo_skills/inference/litellm_hybrid_cache.py
New module introducing HybridCache class with in-memory storage, background persistence via configurable thread, reentrant lock-based concurrency control, atomic disk I/O, and force_save capability. Includes StableLiteLLMCache subclass that normalizes cache keys by sorting parameters for deterministic behavior. Provides synchronous and asynchronous APIs for cache operations (set, get, batch_get, increment, flush, delete, disconnect) and set-add operations.
Cache Integration
nemo_skills/inference/generate.py
Updates cache setup to use StableLiteLLMCache instead of litellm.Cache with disk backend. Imports StableLiteLLMCache from new hybrid cache module. Adds force_save call during cleanup to persist cache state before directory removal.
Script Generation Concurrency Control
recipes/proof-gen-verification/scripts/script_generation.py
Wraps user-provided script process_single invocation with async semaphore to throttle concurrent executions while maintaining existing kwargs assembly and generation key handling.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HybridCache
    participant MemoryStore as Memory Store
    participant DiskThread as Background Thread
    participant Disk

    Client->>HybridCache: set_cache(key, value)
    HybridCache->>MemoryStore: store value (with lock)
    HybridCache->>HybridCache: mark dirty flag
    
    Client->>HybridCache: get_cache(key)
    HybridCache->>MemoryStore: retrieve value (with lock)
    HybridCache-->>Client: return value
    
    DiskThread->>HybridCache: periodic check (save_interval)
    alt dirty flag set
        DiskThread->>MemoryStore: read all entries (with lock)
        DiskThread->>Disk: atomic write temp file + rename
        DiskThread->>HybridCache: clear dirty flag
    end
    
    Client->>HybridCache: force_save()
    HybridCache->>MemoryStore: read all entries (with lock)
    HybridCache->>Disk: atomic write temp file + rename
    
    Client->>HybridCache: disconnect()
    HybridCache->>DiskThread: stop background thread
    HybridCache->>Disk: final save (if needed)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title 'Improve tts' is unrelated to the actual changes, which focus on cache optimization and semaphore locking in inference/generation workflows, not text-to-speech functionality. Revise the title to reflect the main changes, such as 'Replace litellm disk cache with hybrid in-memory cache and add semaphore locking' or 'Optimize LLM caching and fix concurrent generation handling'.
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch improve-tts

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

data = {
"cache_dict": self.cache_dict.copy(),
}
self._dirty = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_dirty flag is cleared before the disk write completes. If the write fails (lines 78-82), the cache will incorrectly believe it's already saved and won't retry.

Suggested change
self._dirty = False
self._dirty = False # Move this after successful write

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@nemo_skills/inference/generate.py`:
- Line 40: The background save thread in the HybridCache (created via
StableLiteLLMCache / HybridCache instance) must be stopped before deleting its
storage directory: instead of only calling force_save() then shutil.rmtree(),
retain the cache instance and call its _shutdown() method to stop the background
thread and perform a final save, then call shutil.rmtree() to remove the
directory; update any code that currently only calls force_save() to call
cache._shutdown() (or cache.shutdown wrapper if available) prior to deleting the
directory.

In `@nemo_skills/inference/litellm_hybrid_cache.py`:
- Around line 101-153: The cache methods currently only validate kwargs in
set_cache via _check_no_ttl, letting unsupported kwargs silently pass in
get_cache, batch_get_cache, increment_cache and async variants; replace
_check_no_ttl with a new _validate_kwargs(self, **kwargs) that raises ValueError
when kwargs is non-empty and call _validate_kwargs at the start of all cache
entry points (set_cache, async_set_cache, async_set_cache_pipeline, get_cache,
async_get_cache, batch_get_cache, async_batch_get_cache, increment_cache,
async_increment) so every method fails fast on unsupported parameters.
🧹 Nitpick comments (1)
nemo_skills/inference/litellm_hybrid_cache.py (1)

68-83: Consider clearing _dirty only after a successful write.
If the write fails, _dirty is already false and future saves won’t retry; clearing after os.replace (and only if no new updates happened) avoids that edge case.

🔧 Suggested adjustment
     def _save_to_disk(self):
         """Save cache to disk."""
         with self._lock:
             if not self._dirty:
                 return
             data = {
                 "cache_dict": self.cache_dict.copy(),
             }
-            self._dirty = False
 
         temp_path = self.cache_file_path + ".tmp"
         Path(self.cache_file_path).parent.mkdir(parents=True, exist_ok=True)
         with open(temp_path, "wb") as f:
             pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
         os.replace(temp_path, self.cache_file_path)
+        with self._lock:
+            if self.cache_dict == data["cache_dict"]:
+                self._dirty = False

get_evaluator_class,
supports_single_eval,
)
from nemo_skills.inference.litellm_hybrid_cache import StableLiteLLMCache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and examine the generate.py file
fd -t f "generate.py" | head -5

Repository: NVIDIA-NeMo/Skills

Length of output: 254


🏁 Script executed:

# Get the actual code around lines 40 and 821-833
cat -n nemo_skills/inference/generate.py | sed -n '35,45p'

Repository: NVIDIA-NeMo/Skills

Length of output: 474


🏁 Script executed:

# Get cleanup code around lines 821-833
cat -n nemo_skills/inference/generate.py | sed -n '815,840p'

Repository: NVIDIA-NeMo/Skills

Length of output: 1273


🏁 Script executed:

# Find the StableLiteLLMCache implementation
fd -t f "litellm_hybrid_cache.py"

Repository: NVIDIA-NeMo/Skills

Length of output: 107


🏁 Script executed:

# Read the StableLiteLLMCache implementation to understand threading behavior
cat -n nemo_skills/inference/litellm_hybrid_cache.py

Repository: NVIDIA-NeMo/Skills

Length of output: 10684


🏁 Script executed:

# Check if HybridCache is used and what shutdown methods it has
rg "class HybridCache|class StableLiteLLMCache|def shutdown|def close|def _shutdown" nemo_skills/inference/litellm_hybrid_cache.py -A 5

Repository: NVIDIA-NeMo/Skills

Length of output: 728


🏁 Script executed:

# Check for background threads or save loops in the file
rg "Thread|thread|save.*loop|while.*save" nemo_skills/inference/litellm_hybrid_cache.py -B 2 -A 2

Repository: NVIDIA-NeMo/Skills

Length of output: 1210


🏁 Script executed:

# Check how cleanup_litellm_cache is called and if there are multiple instances
rg "cleanup_litellm_cache" nemo_skills/

Repository: NVIDIA-NeMo/Skills

Length of output: 279


🏁 Script executed:

# Check if disconnect() would be better - search for any async context in cleanup flow
rg "cleanup_litellm_cache|setup_litellm_cache" nemo_skills/inference/generate.py -B 5 -A 10

Repository: NVIDIA-NeMo/Skills

Length of output: 2472


🏁 Script executed:

# Look for the calling context to see if we could use async disconnect
ast-grep --pattern 'def generate(self) {
  $$$
  cleanup_litellm_cache
  $$$
}' nemo_skills/inference/generate.py

Repository: NVIDIA-NeMo/Skills

Length of output: 44


🏁 Script executed:

# Find where cleanup_litellm_cache is actually invoked
rg "cleanup_litellm_cache\(\)" nemo_skills/ -B 3 -A 1

Repository: NVIDIA-NeMo/Skills

Length of output: 381


🏁 Script executed:

# Check if there are any error handling or try-catch around cleanup_litellm_cache
rg "try:|except|cleanup_litellm_cache" nemo_skills/inference/generate.py -B 2 -A 2

Repository: NVIDIA-NeMo/Skills

Length of output: 1107


🏁 Script executed:

# Verify the async context - is generate() ever awaited or is it always sync?
rg "async def generate|await.*generate\(\)" nemo_skills/inference/generate.py

Repository: NVIDIA-NeMo/Skills

Length of output: 127


Stop the background save thread before deleting the cache directory to prevent race conditions.

The HybridCache instance runs a background thread that periodically saves to disk every 300 seconds. Calling force_save() only saves the current state but leaves the thread running. When rmtree() deletes the directory immediately after, the background thread may still attempt to write to a non-existent path or recreate the directory.

Store the cache instance and call its _shutdown() method before deleting the directory. This stops the background thread and performs a final save in the correct order:

Suggested change
     def setup_litellm_cache(self):
         if self.cfg.enable_litellm_cache:
             # One cache per (output_file_name, chunk_id) pair
             output_file_name = Path(self.cfg.output_file).name
             self.litellm_cache_dir = (
                 Path(self.cfg.output_file).parent / "litellm_cache" / f"{output_file_name}_{self.cfg.chunk_id or 0}"
             )
-            litellm.cache = StableLiteLLMCache(cache_file_path=str(self.litellm_cache_dir / "cache.pkl"))
+            self.litellm_cache = StableLiteLLMCache(
+                cache_file_path=str(self.litellm_cache_dir / "cache.pkl")
+            )
+            litellm.cache = self.litellm_cache
 
     def cleanup_litellm_cache(self):
         if self.cfg.enable_litellm_cache:
-            litellm.cache.cache.force_save()
+            self.litellm_cache.cache._shutdown()
             shutil.rmtree(self.litellm_cache_dir)
🤖 Prompt for AI Agents
In `@nemo_skills/inference/generate.py` at line 40, The background save thread in
the HybridCache (created via StableLiteLLMCache / HybridCache instance) must be
stopped before deleting its storage directory: instead of only calling
force_save() then shutil.rmtree(), retain the cache instance and call its
_shutdown() method to stop the background thread and perform a final save, then
call shutil.rmtree() to remove the directory; update any code that currently
only calls force_save() to call cache._shutdown() (or cache.shutdown wrapper if
available) prior to deleting the directory.

Comment on lines +101 to +153
def set_cache(self, key, value, **kwargs):
"""Set a value in the cache."""
self._check_no_ttl(**kwargs)
with self._lock:
self.cache_dict[key] = value
self._dirty = True

async def async_set_cache(self, key, value, **kwargs):
"""Async set - delegates to sync implementation since we're using in-memory."""
self.set_cache(key=key, value=value, **kwargs)

async def async_set_cache_pipeline(self, cache_list, **kwargs):
"""Set multiple cache entries."""
for cache_key, cache_value in cache_list:
self.set_cache(key=cache_key, value=cache_value, **kwargs)

def get_cache(self, key, **kwargs):
"""Get a value from the cache."""
with self._lock:
if key not in self.cache_dict:
return None
cached_response = self.cache_dict[key]
if isinstance(cached_response, str):
try:
cached_response = json.loads(cached_response)
except json.JSONDecodeError:
pass
return cached_response

async def async_get_cache(self, key, **kwargs):
"""Async get - delegates to sync implementation."""
return self.get_cache(key=key, **kwargs)

def batch_get_cache(self, keys: list, **kwargs):
"""Get multiple values from cache."""
return [self.get_cache(key=k, **kwargs) for k in keys]

async def async_batch_get_cache(self, keys: list, **kwargs):
"""Async batch get."""
return self.batch_get_cache(keys=keys, **kwargs)

def increment_cache(self, key, value: int, **kwargs) -> int:
"""Increment a cache value."""
with self._lock:
init_value = self.get_cache(key=key) or 0
new_value = init_value + value
self.set_cache(key, new_value, **kwargs)
return new_value

async def async_increment(self, key, value: float, **kwargs) -> float:
"""Async increment."""
return self.increment_cache(key, int(value), **kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

find . -name "litellm_hybrid_cache.py" -type f

Repository: NVIDIA-NeMo/Skills

Length of output: 109


🏁 Script executed:

# Get file size and read the full implementation
wc -l nemo_skills/inference/litellm_hybrid_cache.py

Repository: NVIDIA-NeMo/Skills

Length of output: 111


🏁 Script executed:

# Read the file to understand _check_no_ttl and the full context
cat -n nemo_skills/inference/litellm_hybrid_cache.py

Repository: NVIDIA-NeMo/Skills

Length of output: 10684


🌐 Web query:

LiteLLM cache interface documentation - what kwargs are supported by get_cache set_cache increment_cache methods

💡 Result:

LiteLLM’s cache backends expose a small “core” interface (get_cache, set_cache, increment_cache) and then accept cache-control knobs via kwargs (passed through from the cache={...} dict on a request).

Cache-control kwargs (passed via cache={...} on a request)

These are the per-request kwargs LiteLLM documents and routes into cache read/write behavior (i.e., they affect get_cache() / cache lookup and cache write):

  • ttl (int, seconds) – per-entry TTL override (used on write) [1][2]
  • s-maxage (int, seconds) – freshness requirement for accepting an entry (validated after a hit / on read) [1][2]
  • no-cache (bool) – bypass cache read (still allows write unless no-store) [1][2]
  • no-store (bool) – prevent cache write [1][2]
  • namespace (str) – per-request key prefix / cache segmentation [2]

Method-level kwargs / parameters you can rely on

Because the concrete backends differ, LiteLLM’s docs are most consistent about these method parameters:

  • get_cache(key, ...) (reads; influenced by no-cache, s-maxage, namespace) [2]
  • set_cache(key, value, ttl=None, ...) (writes; TTL commonly supported as an argument) [3]
  • increment_cache(key, ...) is used for counters (rate limits, etc.); docs don’t enumerate a separate, richer kwarg set beyond the same TTL/namespace concepts used throughout caching [2].

If you tell me which cache backend you’re implementing (e.g., InMemoryCache, RedisCache, DualCache, proxy cache), I can map these to the exact Python method signatures used in that backend.

Sources: [1] https://docs.litellm.ai/docs/caching/all_caches • [2] https://deepwiki.com/BerriAI/litellm/5.3-dynamic-cache-controls • [3] https://readmex.com/en-US/BerriAI/litellm/page-6d9ab5a93-b77f-4311-b15f-ac9ae2a3ec34


🏁 Script executed:

rg "\.get_cache\(|\.batch_get_cache\(|\.increment_cache\(" --type py -A 2 -B 2

Repository: NVIDIA-NeMo/Skills

Length of output: 2494


🏁 Script executed:

# Also check calls to async variants
rg "\.async_get_cache\(|\.async_batch_get_cache\(|\.async_increment\(" --type py -A 2 -B 2

Repository: NVIDIA-NeMo/Skills

Length of output: 44


🏁 Script executed:

# Search for external calls to HybridCache methods
rg "HybridCache|StableLiteLLMCache" --type py -A 3 | head -100

Repository: NVIDIA-NeMo/Skills

Length of output: 2256


🏁 Script executed:

# Check how LiteLLM uses cache interface - look for cache.get_cache or cache.set_cache patterns
rg "cache\.(get_cache|set_cache|batch_get_cache|increment_cache|async_get_cache)" --type py

Repository: NVIDIA-NeMo/Skills

Length of output: 44


🏁 Script executed:

# Search for where cache methods are actually invoked from LiteLLM
# Check the StableLiteLLMCache parent class inheritance and method overrides
rg "def (get_cache|set_cache|batch_get_cache|increment_cache)" nemo_skills/inference/litellm_hybrid_cache.py

Repository: NVIDIA-NeMo/Skills

Length of output: 312


🏁 Script executed:

# Check if HybridCache methods are actually called by LiteLLM by searching generate.py
cat -n nemo_skills/inference/generate.py | grep -A 30 "StableLiteLLMCache\|litellm.cache"

Repository: NVIDIA-NeMo/Skills

Length of output: 7738


Validate kwargs across all cache methods to fail fast on unsupported parameters.

Currently, get_cache(), batch_get_cache(), and async variants silently drop **kwargs, while only set_cache() validates via _check_no_ttl(). This inconsistency violates the guideline to avoid silently ignoring user-passed parameters. Since HybridCache is in-memory persistent storage without support for cache-control features (ttl, namespace, s-maxage, no-cache, no-store), all methods should validate and reject any unsupported kwargs.

Replace _check_no_ttl() with a comprehensive validation method that fails on any kwargs:

def _validate_kwargs(self, **kwargs):
    if kwargs:
        raise ValueError(f"Unsupported cache kwargs: {', '.join(kwargs)}")

Apply this to all cache methods: get_cache(), batch_get_cache(), increment_cache(), and their async variants—not just set_cache().

🧰 Tools
🪛 Ruff (0.14.14)

[warning] 117-117: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
In `@nemo_skills/inference/litellm_hybrid_cache.py` around lines 101 - 153, The
cache methods currently only validate kwargs in set_cache via _check_no_ttl,
letting unsupported kwargs silently pass in get_cache, batch_get_cache,
increment_cache and async variants; replace _check_no_ttl with a new
_validate_kwargs(self, **kwargs) that raises ValueError when kwargs is non-empty
and call _validate_kwargs at the start of all cache entry points (set_cache,
async_set_cache, async_set_cache_pipeline, get_cache, async_get_cache,
batch_get_cache, async_batch_get_cache, increment_cache, async_increment) so
every method fails fast on unsupported parameters.

@Kipok Kipok merged commit 3e65fbf into main Jan 31, 2026
6 of 7 checks passed
@Kipok Kipok deleted the improve-tts branch January 31, 2026 03:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants