Skip to content

Commit f80ef95

Browse files
authored
Merge pull request #22 from volcengine/feat/support-agent-server-telemetry
feat: support agent server telemetry
2 parents 77b0633 + f7dfab3 commit f80ef95

File tree

3 files changed

+225
-0
lines changed

3 files changed

+225
-0
lines changed

agentkit/apps/agent_server_app/agent_server_app.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from fastapi import Request
2020
from fastapi import HTTPException
2121
from fastapi.responses import StreamingResponse
22+
from opentelemetry import trace
2223
from google.adk.agents.base_agent import BaseAgent
2324
from google.adk.artifacts.in_memory_artifact_service import (
2425
InMemoryArtifactService,
@@ -41,6 +42,8 @@
4142
from veadk.memory.short_term_memory import ShortTermMemory
4243

4344
from agentkit.apps.base_app import BaseAgentkitApp
45+
from agentkit.apps.agent_server_app.telemetry import telemetry
46+
from agentkit.apps.agent_server_app.middleware import AgentkitTelemetryHTTPMiddleware
4447

4548

4649
class AgentKitAgentLoader(BaseAgentLoader):
@@ -87,7 +90,13 @@ def __init__(
8790

8891
self.app = self.server.get_fast_api_app()
8992

93+
# Attach ASGI middleware for unified telemetry across all routes
94+
self.app.add_middleware(AgentkitTelemetryHTTPMiddleware)
95+
9096
async def _invoke_compat(request: Request):
97+
# Use current request span from middleware for telemetry
98+
span = trace.get_current_span()
99+
91100
# Extract headers (fallback keys supported)
92101
headers = request.headers
93102
user_id = (
@@ -126,6 +135,14 @@ async def _invoke_compat(request: Request):
126135
text = ""
127136
content = types.UserContent(parts=[types.Part(text=text or "")])
128137

138+
# trace request attributes on current span
139+
telemetry.trace_agent_server(
140+
func_name="_invoke_compat",
141+
span=span,
142+
headers=dict(headers),
143+
text=text or "",
144+
)
145+
129146
# Ensure session exists
130147
session = await self.server.session_service.get_session(
131148
app_name=app_name, user_id=user_id, session_id=session_id
@@ -154,8 +171,11 @@ async def event_generator():
154171
)
155172
+ "\n\n"
156173
)
174+
# finish span on successful end of stream handled by middleware
175+
pass
157176
except Exception as e:
158177
yield f'data: {{"error": "{str(e)}"}}\n\n'
178+
telemetry.trace_agent_server_finish(func_result="", exception=e)
159179

160180
return StreamingResponse(
161181
event_generator(),
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Callable
16+
17+
from opentelemetry import trace
18+
from opentelemetry import context as context_api
19+
20+
from agentkit.apps.agent_server_app.telemetry import telemetry
21+
22+
_EXCLUDED_HEADERS = {
23+
"authorization",
24+
"token"
25+
}
26+
27+
28+
class AgentkitTelemetryHTTPMiddleware:
29+
def __init__(self, app: Callable):
30+
self.app = app
31+
32+
async def __call__(self, scope, receive, send):
33+
print(f"test: {scope}")
34+
if scope["type"] != "http":
35+
return await self.app(scope, receive, send)
36+
37+
method = scope.get("method", "")
38+
path = scope.get("path", "")
39+
headers_list = scope.get("headers", [])
40+
headers = {k.decode("latin-1"): v.decode("latin-1") for k, v in headers_list}
41+
span = telemetry.tracer.start_span(name="agent_server_request")
42+
ctx = trace.set_span_in_context(span)
43+
context_api.attach(ctx)
44+
headers = {
45+
k: v for k, v in headers.items()
46+
if k.lower() not in _EXCLUDED_HEADERS
47+
}
48+
49+
# Currently unable to retrieve user_id and session_id from headers; keep logic for future use
50+
user_id = headers.get("user_id") or headers.get("x-user-id") or ""
51+
session_id = headers.get("session_id") or headers.get("x-session-id") or ""
52+
headers["user_id"] = user_id
53+
headers["session_id"] = session_id
54+
telemetry.trace_agent_server(
55+
func_name=f"{method} {path}",
56+
span=span,
57+
headers=headers,
58+
text="", # do not consume body in middleware
59+
)
60+
61+
async def send_wrapper(message):
62+
try:
63+
if message.get("type") == "http.response.body":
64+
more_body = message.get("more_body", False)
65+
if not more_body:
66+
telemetry.trace_agent_server_finish(
67+
path=path, func_result="", exception=None
68+
)
69+
elif message.get("type") == "http.response.start":
70+
# could record status code if needed
71+
pass
72+
finally:
73+
await send(message)
74+
75+
try:
76+
await self.app(scope, receive, send_wrapper)
77+
except Exception as e:
78+
telemetry.trace_agent_server_finish(path=path,func_result="", exception=e)
79+
raise
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import time
17+
from typing import Optional
18+
19+
from opentelemetry import trace
20+
from opentelemetry.trace import get_tracer
21+
from opentelemetry.metrics import get_meter
22+
from opentelemetry.trace.span import Span
23+
24+
from agentkit.apps.utils import safe_serialize_to_json_string
25+
26+
_INVOKE_PATH = ["/run_sse", "/run", "/invoke"]
27+
28+
_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
29+
0.01,
30+
0.02,
31+
0.04,
32+
0.08,
33+
0.16,
34+
0.32,
35+
0.64,
36+
1.28,
37+
2.56,
38+
5.12,
39+
10.24,
40+
20.48,
41+
40.96,
42+
81.92,
43+
163.84,
44+
]
45+
46+
logger = logging.getLogger("agentkit." + __name__)
47+
48+
49+
class Telemetry:
50+
def __init__(self):
51+
self.tracer = get_tracer("agentkit.agent_server_app")
52+
self.meter = get_meter("agentkit.agent_server_app")
53+
self.latency_histogram = self.meter.create_histogram(
54+
name="agentkit_runtime_operation_latency",
55+
description="operation latency",
56+
unit="s",
57+
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
58+
)
59+
60+
def trace_agent_server(
61+
self,
62+
func_name: str,
63+
span: Span,
64+
headers: dict,
65+
text: str,
66+
) -> None:
67+
span.set_attribute(key="gen_ai.system", value="agentkit")
68+
span.set_attribute(key="gen_ai.func_name", value=func_name)
69+
70+
span.set_attribute(
71+
key="gen_ai.request.headers",
72+
value=safe_serialize_to_json_string(headers),
73+
)
74+
75+
session_id = headers.get("session_id") or headers.get("x-session-id") or ""
76+
if session_id:
77+
span.set_attribute(key="gen_ai.session.id", value=session_id)
78+
user_id = headers.get("user_id") or headers.get("x-user-id") or ""
79+
if user_id:
80+
span.set_attribute(key="gen_ai.user.id", value=user_id)
81+
82+
# Currently unable to retrieve input
83+
# span.set_attribute(
84+
# key="gen_ai.input", value=safe_serialize_to_json_string(text)
85+
# )
86+
87+
span.set_attribute(key="gen_ai.span.kind", value="agent_server")
88+
span.set_attribute(key="gen_ai.operation.name", value="invoke_agent")
89+
span.set_attribute(key="gen_ai.operation.type", value="agent_server")
90+
91+
def trace_agent_server_finish(
92+
self,
93+
path: str,
94+
func_result: str,
95+
exception: Optional[Exception],
96+
) -> None:
97+
span = trace.get_current_span()
98+
if span and span.is_recording():
99+
# Currently unable to retrieve output
100+
# span.set_attribute(key="gen_ai.output", value=func_result)
101+
102+
attributes = {
103+
"gen_ai_operation_name": "invoke_agent",
104+
"gen_ai_operation_type": "agent_server",
105+
}
106+
if exception:
107+
self.handle_exception(span, exception)
108+
attributes["error_type"] = exception.__class__.__name__
109+
110+
# only record invoke request latency metrics
111+
if hasattr(span, "start_time") and self.latency_histogram and path in _INVOKE_PATH:
112+
duration = (time.time_ns() - span.start_time) / 1e9 # type: ignore
113+
self.latency_histogram.record(duration, attributes)
114+
span.end()
115+
116+
@staticmethod
117+
def handle_exception(span: trace.Span, exception: Exception) -> None:
118+
status = trace.Status(
119+
status_code=trace.StatusCode.ERROR,
120+
description=f"{type(exception).__name__}: {exception}",
121+
)
122+
span.set_status(status)
123+
span.record_exception(exception)
124+
125+
126+
telemetry = Telemetry()

0 commit comments

Comments
 (0)