Skip to content

Commit f3b5f18

Browse files
authored
Merge pull request #1954 from theopenconversationkit/upgrade/langfusev3/DERCBOT-1746
[DERCBOT-1746] Langfuse V3 Callback Fixes
2 parents 4a38c1f + 003eccb commit f3b5f18

File tree

23 files changed

+3730
-2722
lines changed

23 files changed

+3730
-2722
lines changed

gen-ai/orchestrator-server/src/main/python/server/poetry.lock

Lines changed: 2998 additions & 2566 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen-ai/orchestrator-server/src/main/python/server/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ opensearch-py = "^2.8.0"
1919
path = "^17.1.0"
2020
colorlog = "^6.9.0"
2121
boto3 = "^1.35.96"
22-
urllib3 = "^2.3.0"
22+
urllib3 = "^2.6.0"
2323
jinja2 = "^3.1.5"
24-
langfuse = "^3.3.0"
24+
langfuse = "^3.10.6"
2525
httpx-auth-awssigv4 = "^0.1.4"
2626
langchain-postgres = "^0.0.12"
2727
google-cloud-secret-manager = "^2.22.0"

gen-ai/orchestrator-server/src/main/python/server/src/gen_ai_orchestrator/services/langchain/factories/callback_handlers/callback_handlers_factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from abc import ABC, abstractmethod
1919
from typing import Any
2020

21-
from langfuse.langchain.CallbackHandler import LangchainCallbackHandler
21+
from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler
2222
from pydantic import BaseModel
2323

2424
from gen_ai_orchestrator.models.observability.observability_setting import (
@@ -34,7 +34,7 @@ class LangChainCallbackHandlerFactory(ABC, BaseModel):
3434
setting: BaseObservabilitySetting
3535

3636
@abstractmethod
37-
def get_callback_handler(self, **kwargs: Any) -> LangchainCallbackHandler:
37+
def get_callback_handler(self, **kwargs: Any) -> LangfuseCallbackHandler:
3838
"""
3939
Fabric a callback handler.
4040
:return: LangchainCallbackHandler.

gen-ai/orchestrator-server/src/main/python/server/src/gen_ai_orchestrator/services/langchain/factories/callback_handlers/langfuse_callback_handler_factory.py

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from langfuse import Langfuse
2626
from langfuse.api.core import ApiError
2727
from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler
28+
from pydantic import PrivateAttr
2829

2930
from gen_ai_orchestrator.configurations.environment.settings import (
3031
application_settings,
@@ -59,30 +60,82 @@ class LangfuseCallbackHandlerFactory(LangChainCallbackHandlerFactory):
5960

6061
setting: ObservabilitySetting
6162

63+
# Internal client cache
64+
_langfuse_client: Optional[Langfuse] = PrivateAttr(default=None)
65+
66+
def _get_langfuse_client(self) -> Langfuse:
67+
"""
68+
Create or return the initialized Langfuse client
69+
"""
70+
if self._langfuse_client is None:
71+
settings = self._fetch_settings()
72+
self._langfuse_client = Langfuse(
73+
public_key=settings['public_key'],
74+
secret_key=settings['secret_key'],
75+
base_url=settings['base_url'],
76+
timeout=settings['timeout'],
77+
httpx_client=self._get_httpx_client(),
78+
)
79+
return self._langfuse_client
80+
6281
def get_callback_handler(self, **kwargs: Any) -> LangfuseCallbackHandler:
63-
return LangfuseCallbackHandler(**self._fetch_settings(), httpx_client=self._get_httpx_client(), **kwargs)
82+
"""
83+
Create Langfuse CallbackHandler
84+
"""
85+
self._get_langfuse_client()
86+
87+
# Ignore Langfuse V2 parameters to stay backward-compatible
88+
if kwargs:
89+
logger.debug(
90+
'Ignoring unsupported Langfuse CallbackHandler kwargs in V3: %s',
91+
list(kwargs.keys()),
92+
)
93+
94+
# Langfuse SDK maintains an internal map / pool of clients based on there public key, that why the client isn't passed to the callbackhandler constructor.
95+
return LangfuseCallbackHandler(
96+
public_key=self.setting.public_key,
97+
)
6498

6599
def check_observability_setting(self) -> bool:
66100
"""Check if the provided credentials (public and secret key) are valid,
67101
while tracing a sample phrase"""
68102
try:
69-
self.get_callback_handler().auth_check()
70-
Langfuse(**self._fetch_settings(), httpx_client=self._get_httpx_client()).trace(
71-
name=ObservabilityTrace.CHECK_OBSERVABILITY_SETTINGS.value, output='Check observability setting trace')
103+
client = self._get_langfuse_client()
104+
logger.debug('Lang')
105+
106+
if not client.auth_check():
107+
logger.error('Langfuse auth_check() returned False')
108+
raise GenAIObservabilityErrorException(
109+
'Langfuse authentication check failed'
110+
)
111+
112+
with client.start_as_current_observation(
113+
as_type='span',
114+
name=ObservabilityTrace.CHECK_OBSERVABILITY_SETTINGS.value,
115+
input={'message': 'Check observability setting'},
116+
) as span:
117+
span.update(output='Check observability setting trace')
118+
119+
client.flush()
120+
72121
except ApiError as exc:
73122
logger.error(exc)
74123
raise GenAIObservabilityErrorException(
75124
create_error_info_langfuse(exc)
76125
)
77126
return True
78127

79-
def _fetch_settings(self):
128+
def _fetch_settings(self) -> dict:
129+
"""
130+
Fetch necessary parameters to initialise Langfuse client.
131+
"""
80132
return {
81-
'host': str(self.setting.url),
133+
'base_url': str(self.setting.url),
82134
'public_key': self.setting.public_key,
83135
'secret_key': fetch_secret_key_value(self.setting.secret_key),
84136
'timeout': application_settings.observability_provider_timeout,
85-
'max_retries': application_settings.observability_provider_max_retries
137+
# kept for backward-compatibility, not used anymore
138+
'max_retries': application_settings.observability_provider_max_retries,
86139
}
87140

88141
def _get_httpx_client(self) -> Optional[Client]:

gen-ai/orchestrator-server/src/main/python/server/src/gen_ai_orchestrator/services/langchain/rag_chain.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
RunnableSerializable,
4242
)
4343
from langchain_core.vectorstores import VectorStoreRetriever
44-
from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler
44+
from langfuse import get_client, propagate_attributes
4545
from typing_extensions import Any
4646

4747
from gen_ai_orchestrator.errors.exceptions.exceptions import (
@@ -131,7 +131,7 @@ async def execute_rag_chain(
131131
message_history.add_ai_message(msg.text)
132132
session_id = (request.dialog.dialog_id,)
133133
user_id = (request.dialog.user_id,)
134-
tags = (request.dialog.tags,)
134+
tags = (request.dialog.tags,) or []
135135

136136
logger.debug(
137137
'RAG chain - Use chat history: %s',
@@ -160,16 +160,23 @@ async def execute_rag_chain(
160160
# Langfuse callback handler
161161
observability_handler = create_observability_callback_handler(
162162
observability_setting=request.observability_setting,
163-
trace_name=ObservabilityTrace.RAG.value,
164-
session_id=session_id,
165-
user_id=user_id,
166-
tags=tags,
167163
)
168164
callback_handlers.append(observability_handler)
169165

166+
metadata = {}
167+
if user_id is not None:
168+
metadata['langfuse_user_id'] = str(user_id)
169+
if session_id is not None:
170+
metadata['langfuse_session_id'] = str(session_id)
171+
if tags:
172+
metadata['langfuse_tags'] = list(tags)
173+
170174
response = await conversational_retrieval_chain.ainvoke(
171175
input=inputs,
172-
config={'callbacks': callback_handlers},
176+
config={
177+
'callbacks': callback_handlers,
178+
'metadata': metadata,
179+
},
173180
)
174181

175182
# RAG Guard
@@ -204,7 +211,10 @@ async def execute_rag_chain(
204211
)
205212
),
206213
),
207-
observability_info=get_observability_info(observability_handler),
214+
observability_info=get_observability_info(
215+
observability_handler,
216+
ObservabilityTrace.RAG.value if observability_handler is not None else None,
217+
),
208218
debug=get_rag_debug_data(request, records_callback_handler, rag_duration)
209219
if debug
210220
else None,

gen-ai/orchestrator-server/src/main/python/server/src/gen_ai_orchestrator/services/observability/observabilty_service.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818
from typing import Optional
1919

20+
from langfuse import get_client
2021
from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler
2122

2223
from gen_ai_orchestrator.models.observability.observability_type import (
@@ -45,13 +46,20 @@ def check_observability_setting(setting: ObservabilitySetting) -> bool:
4546
return get_callback_handler_factory(setting).check_observability_setting()
4647

4748

48-
def get_observability_info(observability_handler) -> Optional[ObservabilityInfo]:
49+
def get_observability_info(observability_handler, trace_name: Optional[str] = None) -> Optional[ObservabilityInfo]:
4950
"""Get the observability Information"""
50-
if isinstance(observability_handler, LangfuseCallbackHandler):
51-
return ObservabilityInfo(
52-
trace_id=observability_handler.trace.id,
53-
trace_name=observability_handler.trace_name,
54-
trace_url=observability_handler.get_trace_url()
55-
)
56-
else:
57-
return None
51+
if not isinstance(observability_handler, LangfuseCallbackHandler):
52+
return None
53+
54+
trace_id = getattr(observability_handler, 'last_trace_id', None)
55+
if trace_id is None:
56+
return None
57+
58+
langfuse_client = observability_handler.client
59+
trace_url = langfuse_client.get_trace_url(trace_id=trace_id)
60+
61+
return ObservabilityInfo(
62+
trace_id=trace_id,
63+
trace_name=trace_name,
64+
trace_url=trace_url,
65+
)

gen-ai/orchestrator-server/src/main/python/server/tests/services/test_rag_chain.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
#
1515
import os
16-
from unittest.mock import AsyncMock, MagicMock, patch
16+
from unittest.mock import ANY, AsyncMock, MagicMock, patch
1717

1818
import pytest
1919
from langchain_core.documents import Document
@@ -184,7 +184,10 @@ async def test_rag_chain(
184184
# Assert qa chain is ainvoke()d with the expected settings from request
185185
mocked_chain.ainvoke.assert_called_once_with(
186186
input=inputs,
187-
config={'callbacks': [mocked_callback, mocked_langfuse_callback]},
187+
config={
188+
'callbacks': [mocked_callback, mocked_langfuse_callback],
189+
'metadata': ANY,
190+
},
188191
)
189192
# Assert the response is build using the expected settings
190193
mocked_rag_response.assert_called_once_with(
@@ -430,7 +433,7 @@ def test_rag_guard_accepts_no_answer_even_with_docs(mocked_log):
430433
'documents': ['a doc as a string'],
431434
}
432435
rag_chain.rag_guard(inputs, response, documents_required=True)
433-
assert response['documents'] == ['a doc as a string']
436+
assert response['documents'] == []
434437

435438

436439
@patch('gen_ai_orchestrator.services.langchain.rag_chain.rag_log')

gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.idea/runConfigurations/create_dataset.xml

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.idea/runConfigurations/export_experiments.xml

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.idea/runConfigurations/run_evaluation.xml

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)