diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 3a432da25..1b7866273 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -79,6 +79,7 @@ from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache from langfuse.api.resources.commons.errors.error import Error +from langfuse.api.resources.commons.errors.not_found_error import NotFoundError from langfuse.api.resources.ingestion.types.score_body import ScoreBody from langfuse.api.resources.prompts.types import ( CreatePromptRequest_Chat, @@ -3597,6 +3598,14 @@ def fetch_prompts() -> Any: return prompt + except NotFoundError as not_found_error: + langfuse_logger.warning( + f"Prompt '{cache_key}' not found during refresh, evicting from cache." + ) + if self._resources is not None: + self._resources.prompt_cache.delete(cache_key) + raise not_found_error + except Exception as e: langfuse_logger.error( f"Error while fetching prompt '{cache_key}': {str(e)}" diff --git a/langfuse/_utils/prompt_cache.py b/langfuse/_utils/prompt_cache.py index 919333b6b..09d78bc8b 100644 --- a/langfuse/_utils/prompt_cache.py +++ b/langfuse/_utils/prompt_cache.py @@ -158,6 +158,9 @@ def set(self, key: str, value: PromptClient, ttl_seconds: Optional[int]) -> None self._cache[key] = PromptCacheItem(value, ttl_seconds) + def delete(self, key: str) -> None: + self._cache.pop(key, None) + def invalidate(self, prompt_name: str) -> None: """Invalidate all cached prompts with the given prompt name.""" for key in list(self._cache): diff --git a/tests/test_prompt.py b/tests/test_prompt.py index e5346debf..bc3a5b7eb 100644 --- a/tests/test_prompt.py +++ b/tests/test_prompt.py @@ -7,8 +7,10 @@ from langfuse._client.client import Langfuse from langfuse._utils.prompt_cache import ( DEFAULT_PROMPT_CACHE_TTL_SECONDS, + PromptCache, PromptCacheItem, ) +from langfuse.api.resources.commons.errors.not_found_error import NotFoundError from langfuse.api.resources.prompts import Prompt_Chat, Prompt_Text from langfuse.model import ChatPromptClient, TextPromptClient from tests.utils import create_uuid, get_api @@ -679,9 +681,15 @@ def test_prompt_end_to_end(): @pytest.fixture def langfuse(): + from langfuse._client.resource_manager import LangfuseResourceManager + langfuse_instance = Langfuse() langfuse_instance.api = Mock() + if langfuse_instance._resources is None: + langfuse_instance._resources = Mock(spec=LangfuseResourceManager) + langfuse_instance._resources.prompt_cache = PromptCache() + return langfuse_instance @@ -1157,6 +1165,71 @@ def test_get_expired_prompt_when_failing_fetch(mock_time, langfuse: Langfuse): assert result_call_2 == prompt_client +@patch.object(PromptCacheItem, "get_epoch_seconds") +def test_evict_prompt_cache_entry_when_refresh_returns_not_found( + mock_time, langfuse: Langfuse +) -> None: + mock_time.return_value = 0 + + prompt_name = "test_evict_prompt_cache_entry_when_refresh_returns_not_found" + ttl_seconds = 5 + fallback_prompt = "fallback text prompt" + + prompt = Prompt_Text( + name=prompt_name, + version=1, + prompt="Make me laugh", + labels=[], + type="text", + config={}, + tags=[], + ) + prompt_client = TextPromptClient(prompt) + cache_key = PromptCache.generate_cache_key(prompt_name, version=None, label=None) + + mock_server_call = langfuse.api.prompts.get + mock_server_call.return_value = prompt + + initial_result = langfuse.get_prompt( + prompt_name, + cache_ttl_seconds=ttl_seconds, + max_retries=0, + ) + assert initial_result == prompt_client + assert langfuse._resources.prompt_cache.get(cache_key) is not None + + # Expire cache entry and trigger background refresh + mock_time.return_value = ttl_seconds + 1 + + def raise_not_found(*_args: object, **_kwargs: object) -> None: + raise NotFoundError({"message": "Prompt not found"}) + + mock_server_call.side_effect = raise_not_found + + stale_result = langfuse.get_prompt( + prompt_name, + cache_ttl_seconds=ttl_seconds, + max_retries=0, + ) + assert stale_result == prompt_client + + while True: + if langfuse._resources.prompt_cache._task_manager.active_tasks() == 0: + break + sleep(0.1) + + assert langfuse._resources.prompt_cache.get(cache_key) is None + + fallback_result = langfuse.get_prompt( + prompt_name, + cache_ttl_seconds=ttl_seconds, + fallback=fallback_prompt, + max_retries=0, + ) + assert fallback_result.is_fallback + assert fallback_result.prompt == fallback_prompt + + # Should fetch new prompt if version changes def test_get_fresh_prompt_when_version_changes(langfuse: Langfuse): prompt_name = "test_get_fresh_prompt_when_version_changes"