-
Notifications
You must be signed in to change notification settings - Fork 172
Add Langfuse observability integration support #187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2d5722e
41b081b
2079a1d
c2a4e8a
1de9ec4
8c54f9b
9ef4bf4
f2b04c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,8 @@ | ||
| """Agent Factory for dynamic agent creation from definitions.""" | ||
|
|
||
| import inspect | ||
| import logging | ||
| from importlib import import_module | ||
| from typing import Any, Type, TypeVar | ||
|
|
||
| import httpx | ||
|
|
@@ -26,6 +28,44 @@ class AgentFactory: | |
| and create instances with the appropriate configuration. | ||
| """ | ||
|
|
||
| @classmethod | ||
| def _patch_langfuse_stream_close(cls) -> None: | ||
| """Patch OpenAI streaming implementation for Langfuse compatibility. | ||
|
|
||
| Langfuse wraps the underlying HTTP response so that it exposes | ||
| ``close()`` instead of ``aclose()``. OpenAI's | ||
| ``AsyncChatCompletionStream.close`` calls | ||
| ``self._response.aclose()`` which raises ``AttributeError``. | ||
|
|
||
| This patch replaces ``AsyncChatCompletionStream.close`` with a | ||
| version that prefers ``aclose()`` when available and falls back | ||
| to ``close()``, awaiting the result if necessary. | ||
| """ | ||
| try: | ||
| from openai.lib.streaming.chat._completions import AsyncChatCompletionStream | ||
| except Exception: # pragma: no cover | ||
| logger.warning("Failed to import OpenAI chat streaming module for Langfuse patch") | ||
| return | ||
|
|
||
| if getattr(AsyncChatCompletionStream.close, "_langfuse_patched", False): | ||
| return | ||
|
|
||
| async def safe_close(self) -> None: # type: ignore[override] | ||
| response = getattr(self, "_response", None) | ||
| if response is None: | ||
| return | ||
|
|
||
| close_method = getattr(response, "aclose", None) or getattr(response, "close", None) | ||
| if close_method is None: | ||
| return | ||
|
|
||
| result = close_method() | ||
| if inspect.isawaitable(result): | ||
| await result | ||
|
|
||
| safe_close._langfuse_patched = True # type: ignore[attr-defined] | ||
| AsyncChatCompletionStream.close = safe_close # type: ignore[assignment] | ||
|
|
||
| @classmethod | ||
| def _create_client(cls, llm_config: LLMConfig) -> AsyncOpenAI: | ||
| """Create OpenAI client from configuration. | ||
|
|
@@ -36,10 +76,35 @@ def _create_client(cls, llm_config: LLMConfig) -> AsyncOpenAI: | |
| Returns: | ||
| Configured AsyncOpenAI client | ||
| """ | ||
| config = GlobalConfig() | ||
| client_kwargs = {"base_url": llm_config.base_url, "api_key": llm_config.api_key} | ||
| if llm_config.proxy: | ||
| client_kwargs["http_client"] = httpx.AsyncClient(proxy=llm_config.proxy) | ||
|
|
||
| if getattr(config, "langfuse_enabled", False): | ||
| try: | ||
| lf_cfg = config.langfuse | ||
| if lf_cfg.public_key or lf_cfg.secret_key or lf_cfg.host: | ||
| LangfuseClient = getattr(import_module("langfuse"), "Langfuse") | ||
| kwargs = {} | ||
| if lf_cfg.public_key: | ||
| kwargs["public_key"] = lf_cfg.public_key | ||
| if lf_cfg.secret_key: | ||
| kwargs["secret_key"] = lf_cfg.secret_key | ||
| if lf_cfg.host: | ||
| kwargs["host"] = lf_cfg.host | ||
| LangfuseClient(**kwargs) | ||
| logger.info("Langfuse initialized with explicit credentials from config") | ||
| LangfuseAsyncOpenAI = getattr(import_module("langfuse.openai"), "AsyncOpenAI") | ||
| cls._patch_langfuse_stream_close() | ||
| logger.info("Creating Langfuse AsyncOpenAI client (langfuse_enabled=True)") | ||
| return LangfuseAsyncOpenAI(**client_kwargs) | ||
| except ImportError: | ||
| logger.warning( | ||
| "Langfuse is enabled but 'langfuse' package is not available. " | ||
| "Falling back to standard AsyncOpenAI client." | ||
| ) | ||
|
|
||
|
Comment on lines
+84
to
+107
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Что-то не очень здоровое.
прогонял на этом тесте, вроде всё работает
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return AsyncOpenAI(**client_kwargs) | ||
|
|
||
| @classmethod | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Выглядит излишним