Skip to content

Commit b787fd5

Browse files
committed
Server side tracing is now working
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 02c294a commit b787fd5

File tree

10 files changed

+135
-94
lines changed

10 files changed

+135
-94
lines changed

extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,28 @@ public Object trace(InvocationContext context) throws Exception {
3535
.getMethod()
3636
.getAnnotation(Trace.class)
3737
.extractor();
38-
SpanBuilder spanBuilder = tracer.spanBuilder(context.getMethod().getName())
38+
String name = context.getTarget().getClass().getName();
39+
if (name != null && name.endsWith("_Subclass")) {
40+
name = name.substring(0, name.length() - "_Subclass".length());
41+
}
42+
name = name + '#' + context.getMethod().getName();
43+
SpanBuilder spanBuilder = tracer.spanBuilder(name)
3944
.setSpanKind(SpanKind.valueOf(kind.toString()));
45+
AttributeExtractor extractor = null;
4046
if (extractorClass != null) {
41-
Map<String, String> attributes = extractorClass.getConstructor(new Class[0]).newInstance(new Object[0]).extract(context.getTarget(), context.getMethod().getName(), context.getParameters());
47+
extractor = extractorClass.getConstructor(new Class[0]).newInstance(new Object[0]);
48+
Map<String, String> attributes = extractor.extract(context.getTarget(), context.getMethod().getName(), context.getParameters());
4249
for (Map.Entry<String, String> attribute : attributes.entrySet()) {
4350
spanBuilder.setAttribute(attribute.getKey(), attribute.getValue());
4451
}
4552
}
4653
Span span = spanBuilder.startSpan();
47-
Scope scope = span.makeCurrent();
48-
context.getContextData().put(OTEL_SCOPE_KEY_NAME, scope);
49-
context.getContextData().put(OTEL_SPAN_KEY_NAME, span);
50-
try {
54+
try (Scope scope = span.makeCurrent()) {
5155
Object ret = context.proceed();
5256
span.setStatus(StatusCode.OK);
57+
if (ret != null) {
58+
span.setAttribute("response", ret.toString());
59+
}
5360
span.end();
5461
return ret;
5562
} catch (Exception ex) {

extras/opentelemetry/src/main/resources/META-INF/beans.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,4 @@
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd"
55
bean-discovery-mode="annotated">
6-
<interceptors>
7-
<class>io.a2a.extras.opentelemetry.SpanInterceptor</class>
8-
</interceptors>
96
</beans>

reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
/**
1414
* gRPC server interceptor that captures request metadata and context information,
1515
* providing equivalent functionality to Python's grpc.aio.ServicerContext.
16-
*
16+
*
1717
* This interceptor:
1818
* - Extracts A2A extension headers from incoming requests
1919
* - Captures ServerCall and Metadata for rich context access
@@ -23,26 +23,27 @@
2323
@ApplicationScoped
2424
public class A2AExtensionsInterceptor implements ServerInterceptor {
2525

26-
2726
@Override
2827
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
2928
ServerCall<ReqT, RespT> serverCall,
3029
Metadata metadata,
3130
ServerCallHandler<ReqT, RespT> serverCallHandler) {
3231

3332
// Extract A2A extensions header
34-
Metadata.Key<String> extensionsKey =
35-
Metadata.Key.of(A2AHeaders.X_A2A_EXTENSIONS, Metadata.ASCII_STRING_MARSHALLER);
33+
Metadata.Key<String> extensionsKey
34+
= Metadata.Key.of(A2AHeaders.X_A2A_EXTENSIONS, Metadata.ASCII_STRING_MARSHALLER);
3635
String extensions = metadata.get(extensionsKey);
3736

3837
// Create enhanced context with rich information (equivalent to Python's ServicerContext)
3938
Context context = Context.current()
40-
// Store complete metadata for full header access
41-
.withValue(GrpcContextKeys.METADATA_KEY, metadata)
42-
// Store method name (equivalent to Python's context.method())
43-
.withValue(GrpcContextKeys.METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName())
44-
// Store peer information for client connection details
45-
.withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall));
39+
// Store complete metadata for full header access
40+
.withValue(GrpcContextKeys.METADATA_KEY, metadata)
41+
// Store Grpc method name
42+
.withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName())
43+
// Store method name (equivalent to Python's context.method())
44+
.withValue(GrpcContextKeys.METHOD_NAME_KEY, GrpcContextKeys.METHOD_MAPPING.get(serverCall.getMethodDescriptor().getBareMethodName()))
45+
// Store peer information for client connection details
46+
.withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall));
4647

4748
// Store A2A extensions if present
4849
if (extensions != null) {
@@ -55,7 +56,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
5556

5657
/**
5758
* Safely extracts peer information from the ServerCall.
58-
*
59+
*
5960
* @param serverCall the gRPC ServerCall
6061
* @return peer information string, or "unknown" if not available
6162
*/

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.a2a.server.requesthandlers;
22

3+
import static io.a2a.server.interceptors.Kind.SERVER;
34
import static io.a2a.server.util.async.AsyncUtils.convertingProcessor;
45
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
56
import static io.a2a.server.util.async.AsyncUtils.processor;
@@ -32,6 +33,7 @@
3233
import io.a2a.server.events.EventQueueItem;
3334
import io.a2a.server.events.QueueManager;
3435
import io.a2a.server.events.TaskQueueExistsException;
36+
import io.a2a.server.interceptors.Trace;
3537
import io.a2a.server.tasks.PushNotificationConfigStore;
3638
import io.a2a.server.tasks.PushNotificationSender;
3739
import io.a2a.server.tasks.ResultAggregator;
@@ -131,6 +133,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor
131133
}
132134

133135
@Override
136+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
134137
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
135138
LOGGER.debug("onGetTask {}", params.id());
136139
Task task = taskStore.get(params.id());
@@ -164,6 +167,7 @@ private static Task limitTaskHistory(Task task, int historyLength) {
164167
}
165168

166169
@Override
170+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
167171
public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws JSONRPCError {
168172
LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}",
169173
params.contextId(), params.status(), params.pageSize(), params.pageToken());
@@ -173,6 +177,7 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con
173177
}
174178

175179
@Override
180+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
176181
public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError {
177182
Task task = taskStore.get(params.id());
178183
if (task == null) {
@@ -225,6 +230,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
225230
}
226231

227232
@Override
233+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
228234
public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws JSONRPCError {
229235
LOGGER.debug("onMessageSend - task: {}; context {}", params.message().getTaskId(), params.message().getContextId());
230236
MessageSendSetup mss = initMessageSend(params, context);
@@ -350,6 +356,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
350356
}
351357

352358
@Override
359+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
353360
public Flow.Publisher<StreamingEventKind> onMessageSendStream(
354361
MessageSendParams params, ServerCallContext context) throws JSONRPCError {
355362
LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}",
@@ -504,6 +511,7 @@ private void startBackgroundConsumption() {
504511
}
505512

506513
@Override
514+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
507515
public TaskPushNotificationConfig onSetTaskPushNotificationConfig(
508516
TaskPushNotificationConfig params, ServerCallContext context) throws JSONRPCError {
509517
if (pushConfigStore == null) {
@@ -519,6 +527,7 @@ public TaskPushNotificationConfig onSetTaskPushNotificationConfig(
519527
}
520528

521529
@Override
530+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
522531
public TaskPushNotificationConfig onGetTaskPushNotificationConfig(
523532
GetTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
524533
if (pushConfigStore == null) {
@@ -550,6 +559,7 @@ private PushNotificationConfig getPushNotificationConfig(List<PushNotificationCo
550559
}
551560

552561
@Override
562+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
553563
public Flow.Publisher<StreamingEventKind> onResubscribeToTask(
554564
TaskIdParams params, ServerCallContext context) throws JSONRPCError {
555565
LOGGER.debug("onResubscribeToTask - taskId: {}", params.id());
@@ -581,6 +591,7 @@ public Flow.Publisher<StreamingEventKind> onResubscribeToTask(
581591
}
582592

583593
@Override
594+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
584595
public List<TaskPushNotificationConfig> onListTaskPushNotificationConfig(
585596
ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
586597
if (pushConfigStore == null) {
@@ -604,6 +615,7 @@ public List<TaskPushNotificationConfig> onListTaskPushNotificationConfig(
604615
}
605616

606617
@Override
618+
@Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
607619
public void onDeleteTaskPushNotificationConfig(
608620
DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
609621
if (pushConfigStore == null) {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.a2a.server.requesthandlers;
2+
3+
import io.a2a.server.ServerCallContext;
4+
import io.a2a.server.interceptors.AttributeExtractor;
5+
import java.util.Collections;
6+
import java.util.Map;
7+
import java.util.stream.Collectors;
8+
9+
10+
public class RequestHandlerAttributeExtractor implements AttributeExtractor {
11+
12+
@Override
13+
public Map<String, String> extract(Object target, String method, Object[] parameters) {
14+
switch (method) {
15+
case "onMessageSend",
16+
"onMessageSendStream",
17+
"onCancelTask",
18+
"onResubscribeToTask",
19+
"getPushNotificationConfig",
20+
"setPushNotificationConfig",
21+
"onGetTask",
22+
"listPushNotificationConfig",
23+
"deletePushNotificationConfig",
24+
"onListTasks" -> {
25+
ServerCallContext context = (ServerCallContext) parameters[1];
26+
return Map.of("request", parameters[0].toString(), "extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")), "a2a.method", (String) context.getState().get("method"));
27+
}
28+
default -> {
29+
return Collections.emptyMap();
30+
}
31+
}
32+
}
33+
}

transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,73 @@
11
package io.a2a.transport.grpc.context;
22

3+
import io.a2a.spec.GetTaskPushNotificationConfigRequest;
4+
import io.a2a.spec.CancelTaskRequest;
5+
import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
6+
import io.a2a.spec.GetTaskRequest;
7+
import io.a2a.spec.ListTaskPushNotificationConfigRequest;
8+
import io.a2a.spec.ListTasksRequest;
9+
import io.a2a.spec.SendMessageRequest;
10+
import io.a2a.spec.SendStreamingMessageRequest;
11+
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
12+
import io.a2a.spec.TaskResubscriptionRequest;
313
import io.grpc.Context;
14+
import java.util.Map;
415

516
/**
617
* Shared gRPC context keys for A2A protocol data.
7-
*
18+
*
819
* These keys provide access to gRPC context information similar to
920
* Python's grpc.aio.ServicerContext, enabling rich context access
1021
* in service method implementations.
1122
*/
1223
public final class GrpcContextKeys {
13-
24+
1425
/**
1526
* Context key for storing the X-A2A-Extensions header value.
1627
* Set by server interceptors and accessed by service handlers.
1728
*/
18-
public static final Context.Key<String> EXTENSIONS_HEADER_KEY =
19-
Context.key("x-a2a-extensions");
20-
29+
public static final Context.Key<String> EXTENSIONS_HEADER_KEY
30+
= Context.key("x-a2a-extensions");
31+
2132
/**
2233
* Context key for storing the complete gRPC Metadata object.
2334
* Provides access to all request headers and metadata.
2435
*/
25-
public static final Context.Key<io.grpc.Metadata> METADATA_KEY =
26-
Context.key("grpc-metadata");
27-
36+
public static final Context.Key<io.grpc.Metadata> METADATA_KEY
37+
= Context.key("grpc-metadata");
38+
39+
/**
40+
* Context key for storing the method name being called.
41+
* Equivalent to Python's context.method() functionality.
42+
*/
43+
public static final Context.Key<String> GRPC_METHOD_NAME_KEY
44+
= Context.key("grpc-method-name");
45+
2846
/**
2947
* Context key for storing the method name being called.
3048
* Equivalent to Python's context.method() functionality.
3149
*/
32-
public static final Context.Key<String> METHOD_NAME_KEY =
33-
Context.key("grpc-method-name");
34-
50+
public static final Context.Key<String> METHOD_NAME_KEY
51+
= Context.key("method");
52+
3553
/**
3654
* Context key for storing the peer information.
3755
* Provides access to client connection details.
3856
*/
39-
public static final Context.Key<String> PEER_INFO_KEY =
40-
Context.key("grpc-peer-info");
57+
public static final Context.Key<String> PEER_INFO_KEY
58+
= Context.key("grpc-peer-info");
59+
60+
public static final Map<String, String> METHOD_MAPPING = Map.of(
61+
"SendMessage", SendMessageRequest.METHOD,
62+
"SendStreamingMessage", SendStreamingMessageRequest.METHOD,
63+
"GetTask", GetTaskRequest.METHOD,
64+
"ListTask", ListTasksRequest.METHOD,
65+
"CancelTask", CancelTaskRequest.METHOD,
66+
"TaskSubscription", TaskResubscriptionRequest.METHOD,
67+
"CreateTaskPushNotification", SetTaskPushNotificationConfigRequest.METHOD,
68+
"GetTaskPushNotification", GetTaskPushNotificationConfigRequest.METHOD,
69+
"ListTaskPushNotification", ListTaskPushNotificationConfigRequest.METHOD,
70+
"DeleteTaskPushNotification", DeleteTaskPushNotificationConfigRequest.METHOD);
4171

4272
private GrpcContextKeys() {
4373
// Utility class

transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ private <V> ServerCallContext createCallContext(StreamObserver<V> responseObserv
390390
state.put("grpc_metadata", grpcMetadata);
391391
}
392392

393-
String methodName = GrpcContextKeys.METHOD_NAME_KEY.get(currentContext);
393+
String methodName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext);
394394
if (methodName != null) {
395395
state.put("grpc_method_name", methodName);
396396
}
@@ -590,7 +590,7 @@ protected static io.grpc.Metadata getCurrentMetadata() {
590590
* @return the method name, or null if not available
591591
*/
592592
protected static String getCurrentMethodName() {
593-
return getFromContext(GrpcContextKeys.METHOD_NAME_KEY);
593+
return getFromContext(GrpcContextKeys.GRPC_METHOD_NAME_KEY);
594594
}
595595

596596
/**

transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpctAttributeExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public Map<String, String> extract(Object target, String method, Object[] parame
2222
"taskSubscription",
2323
"deleteTaskPushNotificationConfig" -> {
2424
Context currentContext = Context.current();
25-
return Map.of("body", parameters[0].toString(), "extensions", GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(), "a2a.method", GrpcContextKeys.METHOD_NAME_KEY.get(currentContext));
25+
return Map.of("request", parameters[0].toString(), "extensions", GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(), "a2a.method", GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext));
2626
}
2727

2828
default -> {

0 commit comments

Comments
 (0)