Skip to content

Commit ba60b47

Browse files
committed
Use StreamingEventKind.kind() instead of instanceof for seralization choices etc.
1 parent 654cbd4 commit ba60b47

File tree

9 files changed

+65
-86
lines changed

9 files changed

+65
-86
lines changed

extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void testA2AMessageReplicatedToKafka() throws Exception {
177177
assertEquals(contextId, statusUpdateEvent.contextId(), "Event context ID should match original context ID");
178178
assertEquals(TaskState.SUBMITTED, statusUpdateEvent.status().state(), "Event should show SUBMITTED state");
179179
assertFalse(statusUpdateEvent.isFinal(), "Event should show final:false");
180-
assertEquals("status-update", statusUpdateEvent.kind(), "Event should indicate status-update type");
180+
assertEquals(TaskStatusUpdateEvent.STREAMING_EVENT_ID, statusUpdateEvent.kind(), "Event should indicate status-update type");
181181
}
182182

183183
@Test

spec-grpc/src/main/java/io/a2a/grpc/mapper/StreamResponseMapper.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,22 @@ default io.a2a.grpc.StreamResponse toProto(StreamingEventKind domain) {
4040
return null;
4141
}
4242

43-
if (domain instanceof Task task) {
44-
return io.a2a.grpc.StreamResponse.newBuilder()
45-
.setTask(TaskMapper.INSTANCE.toProto(task))
43+
return switch (domain.kind()) {
44+
case Task.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
45+
.setTask(TaskMapper.INSTANCE.toProto((Task) domain))
4646
.build();
47-
} else if (domain instanceof Message message) {
48-
return io.a2a.grpc.StreamResponse.newBuilder()
49-
.setMsg(MessageMapper.INSTANCE.toProto(message))
47+
case Message.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
48+
.setMsg(MessageMapper.INSTANCE.toProto((Message) domain))
5049
.build();
51-
} else if (domain instanceof TaskStatusUpdateEvent statusUpdate) {
52-
return io.a2a.grpc.StreamResponse.newBuilder()
53-
.setStatusUpdate(TaskStatusUpdateEventMapper.INSTANCE.toProto(statusUpdate))
50+
case TaskStatusUpdateEvent.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
51+
.setStatusUpdate(TaskStatusUpdateEventMapper.INSTANCE.toProto((TaskStatusUpdateEvent) domain))
5452
.build();
55-
} else if (domain instanceof TaskArtifactUpdateEvent artifactUpdate) {
56-
return io.a2a.grpc.StreamResponse.newBuilder()
57-
.setArtifactUpdate(TaskArtifactUpdateEventMapper.INSTANCE.toProto(artifactUpdate))
53+
case TaskArtifactUpdateEvent.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
54+
.setArtifactUpdate(TaskArtifactUpdateEventMapper.INSTANCE.toProto((TaskArtifactUpdateEvent) domain))
5855
.build();
59-
}
60-
61-
throw new IllegalArgumentException("Unknown StreamingEventKind type: " + domain.getClass().getName());
56+
default ->
57+
throw new IllegalArgumentException("Unknown StreamingEventKind type: " + domain.getClass().getName());
58+
};
6259
}
6360

6461
/**

spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -139,39 +139,33 @@ public static StreamResponse streamResponse(StreamingEventKind streamingEventKin
139139
}
140140

141141
public static io.a2a.grpc.SendMessageResponse taskOrMessage(EventKind eventKind) {
142-
if (eventKind instanceof Task) {
143-
return io.a2a.grpc.SendMessageResponse.newBuilder()
142+
return switch (eventKind.kind()) {
143+
case Task.STREAMING_EVENT_ID -> io.a2a.grpc.SendMessageResponse.newBuilder()
144144
.setTask(task((Task) eventKind))
145145
.build();
146-
} else if (eventKind instanceof Message) {
147-
return io.a2a.grpc.SendMessageResponse.newBuilder()
146+
case Message.STREAMING_EVENT_ID -> io.a2a.grpc.SendMessageResponse.newBuilder()
148147
.setMsg(message((Message) eventKind))
149148
.build();
150-
} else {
151-
throw new IllegalArgumentException("Unsupported event type: " + eventKind);
152-
}
149+
default -> throw new IllegalArgumentException("Unsupported event type: " + eventKind);
150+
};
153151
}
154152

155153
public static io.a2a.grpc.StreamResponse taskOrMessageStream(StreamingEventKind eventKind) {
156-
if (eventKind instanceof Task task) {
157-
return io.a2a.grpc.StreamResponse.newBuilder()
158-
.setTask(task(task))
154+
return switch (eventKind.kind()) {
155+
case Task.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
156+
.setTask(task((Task) eventKind))
159157
.build();
160-
} else if (eventKind instanceof Message msg) {
161-
return io.a2a.grpc.StreamResponse.newBuilder()
162-
.setMsg(message(msg))
158+
case Message.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
159+
.setMsg(message((Message) eventKind))
163160
.build();
164-
} else if (eventKind instanceof TaskArtifactUpdateEvent update) {
165-
return io.a2a.grpc.StreamResponse.newBuilder()
166-
.setArtifactUpdate(taskArtifactUpdateEvent(update))
161+
case TaskStatusUpdateEvent.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
162+
.setStatusUpdate(taskStatusUpdateEvent((TaskStatusUpdateEvent) eventKind))
167163
.build();
168-
} else if (eventKind instanceof TaskStatusUpdateEvent update) {
169-
return io.a2a.grpc.StreamResponse.newBuilder()
170-
.setStatusUpdate(taskStatusUpdateEvent(update))
164+
case TaskArtifactUpdateEvent.STREAMING_EVENT_ID -> io.a2a.grpc.StreamResponse.newBuilder()
165+
.setArtifactUpdate(taskArtifactUpdateEvent((TaskArtifactUpdateEvent) eventKind))
171166
.build();
172-
} else {
173-
throw new IllegalArgumentException("Unsupported event type: " + eventKind);
174-
}
167+
default -> throw new IllegalArgumentException("Unsupported event type: " + eventKind);
168+
};
175169
}
176170

177171
public static io.a2a.grpc.TaskPushNotificationConfig setTaskPushNotificationConfigResponse(TaskPushNotificationConfig config) {

spec/src/main/java/io/a2a/json/JsonUtil.java

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -631,26 +631,16 @@ public void write(JsonWriter out, StreamingEventKind value) throws java.io.IOExc
631631
// Write wrapper object with member name as discriminator
632632
out.beginObject();
633633

634-
if (value instanceof Task task) {
635-
// Task: { "task": {...} }
636-
out.name("task");
637-
delegateGson.toJson(task, Task.class, out);
638-
} else if (value instanceof Message message) {
639-
// Message: { "message": {...} }
640-
out.name("message");
641-
delegateGson.toJson(message, Message.class, out);
642-
} else if (value instanceof TaskStatusUpdateEvent event) {
643-
// TaskStatusUpdateEvent: { "statusUpdate": {...} } - camelCase per spec
644-
out.name("statusUpdate");
645-
delegateGson.toJson(event, TaskStatusUpdateEvent.class, out);
646-
} else if (value instanceof TaskArtifactUpdateEvent event) {
647-
// TaskArtifactUpdateEvent: { "artifactUpdate": {...} } - camelCase per spec
648-
out.name("artifactUpdate");
649-
delegateGson.toJson(event, TaskArtifactUpdateEvent.class, out);
650-
} else {
651-
throw new JsonSyntaxException("Unknown StreamingEventKind implementation: " + value.getClass().getName());
652-
}
634+
Type type = switch (value.kind()) {
635+
case Task.STREAMING_EVENT_ID -> Task.class;
636+
case Message.STREAMING_EVENT_ID -> Message.class;
637+
case TaskStatusUpdateEvent.STREAMING_EVENT_ID -> TaskStatusUpdateEvent.class;
638+
case TaskArtifactUpdateEvent.STREAMING_EVENT_ID -> TaskArtifactUpdateEvent.class;
639+
default -> throw new JsonSyntaxException("Unknown StreamingEventKind implementation: " + value.getClass().getName());
640+
};
653641

642+
out.name(value.kind());
643+
delegateGson.toJson(value, type, out);
654644
out.endObject();
655645
}
656646

@@ -671,14 +661,16 @@ StreamingEventKind read(JsonReader in) throws java.io.IOException {
671661
com.google.gson.JsonObject jsonObject = jsonElement.getAsJsonObject();
672662

673663
// Check for wrapped member name discriminators (v1.0 protocol - streaming format)
674-
if (jsonObject.has("task")) {
675-
return delegateGson.fromJson(jsonObject.get("task"), Task.class);
676-
} else if (jsonObject.has("message")) {
677-
return delegateGson.fromJson(jsonObject.get("message"), Message.class);
678-
} else if (jsonObject.has("statusUpdate")) {
679-
return delegateGson.fromJson(jsonObject.get("statusUpdate"), TaskStatusUpdateEvent.class);
680-
} else if (jsonObject.has("artifactUpdate")) {
681-
return delegateGson.fromJson(jsonObject.get("artifactUpdate"), TaskArtifactUpdateEvent.class);
664+
if (jsonObject.has(Task.STREAMING_EVENT_ID)) {
665+
return delegateGson.fromJson(jsonObject.get(Task.STREAMING_EVENT_ID), Task.class);
666+
} else if (jsonObject.has(Message.STREAMING_EVENT_ID)) {
667+
return delegateGson.fromJson(jsonObject.get(Message.STREAMING_EVENT_ID), Message.class);
668+
} else if (jsonObject.has(TaskStatusUpdateEvent.STREAMING_EVENT_ID)) {
669+
return delegateGson.fromJson(
670+
jsonObject.get(TaskStatusUpdateEvent.STREAMING_EVENT_ID), TaskStatusUpdateEvent.class);
671+
} else if (jsonObject.has(TaskArtifactUpdateEvent.STREAMING_EVENT_ID)) {
672+
return delegateGson.fromJson(
673+
jsonObject.get(TaskArtifactUpdateEvent.STREAMING_EVENT_ID), TaskArtifactUpdateEvent.class);
682674
}
683675

684676
// Check for unwrapped format (direct Task/Message deserialization)

spec/src/main/java/io/a2a/spec/Message.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
import io.a2a.util.Assert;
88

9-
import static io.a2a.spec.Message.MESSAGE;
10-
import static io.a2a.util.Utils.SPEC_VERSION_1_0;
11-
129
/**
1310
* Represents a single message in the conversation between a user and an agent in the A2A Protocol.
1411
* <p>
@@ -42,7 +39,10 @@ public record Message(Role role, List<Part<?>> parts,
4239
Map<String, Object> metadata, List<String> extensions
4340
) implements EventKind, StreamingEventKind {
4441

45-
public static final String MESSAGE = "message";
42+
/**
43+
* The identifier when used in streaming responses
44+
*/
45+
public static final String STREAMING_EVENT_ID = "message";
4646

4747
/**
4848
* Compact constructor with validation and defensive copying.
@@ -64,7 +64,7 @@ public record Message(Role role, List<Part<?>> parts,
6464

6565
@Override
6666
public String kind() {
67-
return MESSAGE;
67+
return STREAMING_EVENT_ID;
6868
}
6969

7070
/**

spec/src/main/java/io/a2a/spec/StreamingEventKind.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public sealed interface StreamingEventKind extends Event permits Task, Message,
4848
* }
4949
* }</pre>
5050
*
51-
* @return the event type identifier (e.g., "task", "message", "status-update", "artifact-update")
51+
* @return the event type identifier (e.g., "task", "message", "statusUpdate", "artifactUpdate")
5252
*/
5353
String kind();
5454
}

spec/src/main/java/io/a2a/spec/Task.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55

66
import io.a2a.util.Assert;
77

8-
import static io.a2a.spec.Task.TASK;
9-
import static io.a2a.util.Utils.SPEC_VERSION_1_0;
10-
118
/**
129
* Represents a single, stateful operation or conversation between a client and an agent in the A2A Protocol.
1310
* <p>
@@ -54,7 +51,10 @@ public record Task(
5451
Map<String, Object> metadata
5552
) implements EventKind, StreamingEventKind {
5653

57-
public static final String TASK = "task";
54+
/**
55+
* The identifier when used in streaming responses
56+
*/
57+
public static final String STREAMING_EVENT_ID = "task";
5858

5959
/**
6060
* Compact constructor with validation and defensive copying.
@@ -72,7 +72,7 @@ public record Task(
7272

7373
@Override
7474
public String kind() {
75-
return TASK;
75+
return STREAMING_EVENT_ID;
7676
}
7777

7878
/**

spec/src/main/java/io/a2a/spec/TaskArtifactUpdateEvent.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
import io.a2a.util.Assert;
66

7-
import static io.a2a.spec.TaskArtifactUpdateEvent.ARTIFACT_UPDATE;
8-
97
/**
108
* Event notifying that a task artifact has been created, modified, or appended to.
119
* <p>
@@ -50,9 +48,9 @@ public record TaskArtifactUpdateEvent(
5048
) implements EventKind, StreamingEventKind, UpdateEvent {
5149

5250
/**
53-
* The kind identifier for artifact update events: "artifact-update".
51+
* The identifier when used in streaming responses
5452
*/
55-
public static final String ARTIFACT_UPDATE = "artifact-update";
53+
public static final String STREAMING_EVENT_ID = "artifactUpdate";
5654

5755
/**
5856
* Compact constructor with validation.
@@ -67,7 +65,7 @@ public record TaskArtifactUpdateEvent(
6765

6866
@Override
6967
public String kind() {
70-
return ARTIFACT_UPDATE;
68+
return STREAMING_EVENT_ID;
7169
}
7270

7371
/**

spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import io.a2a.util.Assert;
77

8-
import static io.a2a.spec.TaskStatusUpdateEvent.STATUS_UPDATE;
9-
108
/**
119
* An event sent by the agent to notify the client of a change in a task's status.
1210
* This is typically used in streaming or subscription models.
@@ -26,9 +24,9 @@ public record TaskStatusUpdateEvent(
2624
) implements EventKind, StreamingEventKind, UpdateEvent {
2725

2826
/**
29-
* The kind identifier for status update events: "status-update".
27+
* The identifier when used in streaming responses
3028
*/
31-
public static final String STATUS_UPDATE = "status-update";
29+
public static final String STREAMING_EVENT_ID = "statusUpdate";
3230

3331
/**
3432
* Compact constructor with validation.
@@ -41,7 +39,7 @@ public record TaskStatusUpdateEvent(
4139

4240
@Override
4341
public String kind() {
44-
return STATUS_UPDATE;
42+
return STREAMING_EVENT_ID;
4543
}
4644

4745
/**

0 commit comments

Comments
 (0)