forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfigure_otel_providers_with_parameters.py
More file actions
182 lines (148 loc) · 7.24 KB
/
configure_otel_providers_with_parameters.py
File metadata and controls
182 lines (148 loc) · 7.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright (c) Microsoft. All rights reserved.
import argparse
import asyncio
import logging
from contextlib import suppress
from random import randint
from typing import TYPE_CHECKING, Annotated, Literal
from agent_framework import Message, tool
from agent_framework.observability import configure_otel_providers, get_tracer
from agent_framework.openai import OpenAIResponsesClient
from dotenv import load_dotenv
from opentelemetry import trace
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
if TYPE_CHECKING:
from agent_framework import SupportsChatGetResponse
"""
This sample shows how you can configure observability with custom exporters passed directly
to the `configure_otel_providers()` function.
This approach gives you full control over exporter configuration (endpoints, headers, compression, etc.)
and allows you to add multiple exporters programmatically.
For standard OTLP setup, it's recommended to use environment variables (see configure_otel_providers_with_env_var.py).
Use this approach when you need custom exporter configuration beyond what environment variables provide.
"""
# Load environment variables from .env file
load_dotenv()
# Define the scenarios that can be run to show the telemetry data collected by the SDK
SCENARIOS = ["client", "client_stream", "tool", "all"]
# NOTE: approval_mode="never_require" is for sample brevity.
# Use "always_require" in production; see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client(client: "SupportsChatGetResponse", stream: bool = False) -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
client: The chat client to use.
stream: Whether to use streaming for the response
Remarks:
For the scenario below, you should see the following:
1 Client span, with 4 children:
2 Internal span with gen_ai.operation.name=chat
The first has finish_reason "tool_calls"
The second has finish_reason "stop"
2 Internal span with gen_ai.operation.name=execute_tool
"""
scenario_name = "Chat Client Stream" if stream else "Chat Client"
with get_tracer().start_as_current_span(name=f"Scenario: {scenario_name}", kind=trace.SpanKind.CLIENT):
print("Running scenario:", scenario_name)
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_response(
[Message(role="user", text=message)], stream=True, tools=get_weather
):
if chunk.text:
print(chunk.text, end="")
print("")
else:
response = await client.get_response([Message(role="user", text=message)], tools=get_weather)
print(f"Assistant: {response}")
async def run_tool() -> None:
"""Run a AI function.
This function runs a AI function and prints the output.
Telemetry will be collected for the function execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI function execution
and the AI service execution.
"""
with get_tracer().start_as_current_span("Scenario: AI Function", kind=trace.SpanKind.CLIENT):
print("Running scenario: AI Function")
weather = await get_weather.invoke(location="Amsterdam")
print(f"Weather in Amsterdam:\n{weather}")
async def main(scenario: Literal["client", "client_stream", "tool", "all"] = "all"):
"""Run the selected scenario(s)."""
# Setup the logging with the more complete format
logging.basicConfig(
format="[%(asctime)s - %(pathname)s:%(lineno)d - %(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Create custom OTLP exporters with specific configuration
# Note: You need to install opentelemetry-exporter-otlp-proto-grpc or -http separately
try:
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( # pyright: ignore[reportMissingImports]
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( # pyright: ignore[reportMissingImports]
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( # pyright: ignore[reportMissingImports]
OTLPSpanExporter,
)
# Create exporters with custom configuration
# These will be added to any exporters configured via environment variables
custom_exporters = [
OTLPSpanExporter(endpoint="http://localhost:4317"),
OTLPMetricExporter(endpoint="http://localhost:4317"),
OTLPLogExporter(endpoint="http://localhost:4317"),
]
except ImportError:
print(
"Warning: opentelemetry-exporter-otlp-proto-grpc not installed. "
"Install with: pip install opentelemetry-exporter-otlp-proto-grpc"
)
print("Continuing without custom exporters...\n")
custom_exporters = []
# Setup observability with custom exporters and sensitive data enabled
# The exporters parameter allows you to add custom exporters alongside
# those configured via environment variables (OTEL_EXPORTER_OTLP_*)
configure_otel_providers(
enable_sensitive_data=True,
exporters=custom_exporters,
)
with get_tracer().start_as_current_span("Sample Scenarios", kind=trace.SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
client = OpenAIResponsesClient()
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
if scenario == "tool" or scenario == "all":
with suppress(Exception):
await run_tool()
if scenario == "client_stream" or scenario == "all":
with suppress(Exception):
await run_chat_client(client, stream=True)
if scenario == "client" or scenario == "all":
with suppress(Exception):
await run_chat_client(client, stream=False)
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--scenario",
type=str,
choices=SCENARIOS,
default="all",
help="The scenario to run. Default is all.",
)
args = arg_parser.parse_args()
asyncio.run(main(args.scenario))