Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Added support to call genai utils handler for langchain LLM invocations.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889](#3889))

- Added span support for genAI langchain llm invocation.
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
OpenTelemetryLangChainCallbackHandler,
)
from opentelemetry.instrumentation.langchain.package import _instruments
from opentelemetry.instrumentation.langchain.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import get_tracer
from opentelemetry.util.genai.handler import get_telemetry_handler

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this coupling, opentelemetry-util-genai should be added to the dependency in pyproject.toml



class LangChainInstrumentor(BaseInstrumentor):
Expand All @@ -72,15 +70,12 @@ def _instrument(self, **kwargs: Any):
Enable Langchain instrumentation.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_37_0.value,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the version that is being used in opentelemetry-util-genai is Schemas.V1_36_0.value. Should it be updated over there as we are downgrading?

)

telemetry_handler = get_telemetry_handler(
tracer_provider=tracer_provider
)
otel_callback_handler = OpenTelemetryLangChainCallbackHandler(
tracer=tracer,
telemetry_handler=telemetry_handler
)

wrap_function_wrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@
from langchain_core.messages import BaseMessage # type: ignore
from langchain_core.outputs import LLMResult # type: ignore

from opentelemetry.instrumentation.langchain.span_manager import _SpanManager
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
from opentelemetry.instrumentation.langchain.invocation_manager import (
_InvocationManager,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import (
Error,
InputMessage,
LLMInvocation,
OutputMessage,
Text,
)
from opentelemetry.trace import Tracer


class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): # type: ignore[misc]
"""
A callback handler for LangChain that uses OpenTelemetry to create spans for LLM calls and chains, tools etc,. in future.
"""

def __init__(
self,
tracer: Tracer,
) -> None:
def __init__(self, telemetry_handler: TelemetryHandler) -> None:
super().__init__() # type: ignore

self.span_manager = _SpanManager(
tracer=tracer,
)
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()

def on_chat_model_start(
self,
Expand Down Expand Up @@ -82,60 +83,79 @@ def on_chat_model_start(
if request_model == "unknown":
return

span = self.span_manager.create_chat_span(
# TODO: uncomment and modify following after PR #3862 is merged
# if params is not None:
# top_p = params.get("top_p")
# if top_p is not None:
# span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p)
# frequency_penalty = params.get("frequency_penalty")
# if frequency_penalty is not None:
# span.set_attribute(
# GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty
# )
# presence_penalty = params.get("presence_penalty")
# if presence_penalty is not None:
# span.set_attribute(
# GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty
# )
# stop_sequences = params.get("stop")
# if stop_sequences is not None:
# span.set_attribute(
# GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences
# )
# seed = params.get("seed")
# if seed is not None:
# span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed)
# # ChatOpenAI
# temperature = params.get("temperature")
# if temperature is not None:
# span.set_attribute(
# GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
# )
# # ChatOpenAI
# max_tokens = params.get("max_completion_tokens")
# if max_tokens is not None:
# span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
#
provider = "unknown"
if metadata is not None:
provider = metadata.get("ls_provider")

# TODO: uncomment and modify following after PR #3862 is merged

# # ChatBedrock
# temperature = metadata.get("ls_temperature")
# if temperature is not None:
# span.set_attribute(
# GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
# )
# # ChatBedrock
# max_tokens = metadata.get("ls_max_tokens")
# if max_tokens is not None:
# span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)

input_messages: list[InputMessage] = []
for sub_messages in messages: # type: ignore[reportUnknownVariableType]
for message in sub_messages: # type: ignore[reportUnknownVariableType]
content = get_property_value(message, "content") # type: ignore[reportUnknownVariableType]
role = get_property_value(message, "type") # type: ignore[reportUnknownArgumentType, reportUnknownVariableType]
parts = [Text(content=content, type="text")] # type: ignore[reportArgumentType]
input_messages.append(InputMessage(parts=parts, role=role)) # type: ignore[reportArgumentType]

llm_invocation = LLMInvocation(
request_model=request_model,
input_messages=input_messages,
provider=provider,
)
llm_invocation = self._telemetry_handler.start_llm(
invocation=llm_invocation
)
self._invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=parent_run_id,
request_model=request_model,
invocation=llm_invocation,
)

if params is not None:
top_p = params.get("top_p")
if top_p is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p)
frequency_penalty = params.get("frequency_penalty")
if frequency_penalty is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty
)
presence_penalty = params.get("presence_penalty")
if presence_penalty is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty
)
stop_sequences = params.get("stop")
if stop_sequences is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences
)
seed = params.get("seed")
if seed is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed)
# ChatOpenAI
temperature = params.get("temperature")
if temperature is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
)
# ChatOpenAI
max_tokens = params.get("max_completion_tokens")
if max_tokens is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)

if metadata is not None:
provider = metadata.get("ls_provider")
if provider is not None:
span.set_attribute("gen_ai.provider.name", provider)
# ChatBedrock
temperature = metadata.get("ls_temperature")
if temperature is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
)
# ChatBedrock
max_tokens = metadata.get("ls_max_tokens")
if max_tokens is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)

def on_llm_end(
self,
response: LLMResult, # type: ignore [reportUnknownParameterType]
Expand All @@ -144,25 +164,28 @@ def on_llm_end(
parent_run_id: UUID | None,
**kwargs: Any,
) -> None:
span = self.span_manager.get_span(run_id)
invocation = self._invocation_manager.get_invocation(run_id=run_id)

if span is None:
# If the span does not exist, we cannot set attributes or end it
if invocation is None or not isinstance(invocation, LLMInvocation):
# If the invocation does not exist, we cannot set attributes or end it
return

finish_reasons: list[str] = []
llm_invocation: LLMInvocation = invocation

output_messages: list[OutputMessage] = []
for generation in getattr(response, "generations", []): # type: ignore
for chat_generation in generation:
# Get finish reason
generation_info = getattr(
chat_generation, "generation_info", None
)
if generation_info is not None:
finish_reason = generation_info.get(
"finish_reason", "unknown"
)
if finish_reason is not None:
finish_reasons.append(str(finish_reason))

if chat_generation.message:
# Get finish reason if generation_info is None above
if (
generation_info is None
and chat_generation.message.response_metadata
Expand All @@ -172,46 +195,57 @@ def on_llm_end(
"stopReason", "unknown"
)
)
if finish_reason is not None:
finish_reasons.append(str(finish_reason))

# Get message content
parts = [
Text(
content=get_property_value( # type: ignore[reportArgumentType]
chat_generation.message, "content"
),
type="text",
)
]
role = get_property_value(chat_generation.message, "type") # type: ignore[reportUnknownVariableType]
output_message = OutputMessage(
role=role, # type: ignore[reportArgumentType]
parts=parts,
finish_reason=finish_reason, # type: ignore[reportPossiblyUnboundVariable, reportArgumentType]
)
output_messages.append(output_message)

# Get token usage if available
if chat_generation.message.usage_metadata:
input_tokens = (
chat_generation.message.usage_metadata.get(
"input_tokens", 0
)
)
llm_invocation.input_tokens = input_tokens

output_tokens = (
chat_generation.message.usage_metadata.get(
"output_tokens", 0
)
)
span.set_attribute(
GenAI.GEN_AI_USAGE_INPUT_TOKENS, input_tokens
)
span.set_attribute(
GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens
)
llm_invocation.output_tokens = output_tokens

span.set_attribute(
GenAI.GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons
)
llm_invocation.output_messages = output_messages

llm_output = getattr(response, "llm_output", None) # type: ignore
if llm_output is not None:
response_model = llm_output.get("model_name") or llm_output.get(
"model"
)
if response_model is not None:
span.set_attribute(
GenAI.GEN_AI_RESPONSE_MODEL, str(response_model)
)
llm_invocation.response_model_name = str(response_model)

response_id = llm_output.get("id")
if response_id is not None:
span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, str(response_id))
llm_invocation.response_id = str(response_id)

# End the LLM span
self.span_manager.end_span(run_id)
invocation = self._telemetry_handler.stop_llm(invocation=invocation)
if not invocation.span.is_recording(): # type: ignore[reportOptionalMemberAccess]
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_llm_error(
self,
Expand All @@ -221,4 +255,25 @@ def on_llm_error(
parent_run_id: UUID | None,
**kwargs: Any,
) -> None:
self.span_manager.handle_error(error, run_id)
invocation = self._invocation_manager.get_invocation(run_id=run_id) # type: ignore[reportAssignmentType]

if invocation is None or not isinstance(invocation, LLMInvocation): # type: ignore[reportUnnecessaryIsInstance, reportUnnecessaryComparison]
# If the invocation does not exist, we cannot set attributes or end it
return

invocation: LLMInvocation = invocation

error = Error(message=str(error), type=type(error)) # type: ignore[reportAssignmentType]
invocation = self._telemetry_handler.fail_llm(
invocation=invocation,
error=error, # type: ignore[reportArgumentType]
)
if not invocation.span.is_recording(): # type: ignore[reportOptionalMemberAccess]
self._invocation_manager.delete_invocation_state(run_id=run_id)


def get_property_value(obj, property_name): # type: ignore[reportUnknownParameterType]
if isinstance(obj, dict):
return obj.get(property_name, None) # type: ignore[reportUnknownArgumentType]

return getattr(obj, property_name, None) # type: ignore[reportUnknownArgumentType]
Loading