Skip to content

Commit b8215d2

Browse files
committed
chore: Ensure that the TaskStateProcessor passed to the ResultAggregator is never null
Signed-off-by: Jeff Mesnil <[email protected]>
1 parent bae253a commit b8215d2

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.a2a.server.util.async.AsyncUtils.consumer;
44
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
55
import static io.a2a.server.util.async.AsyncUtils.processor;
6+
import static io.a2a.util.Assert.checkNotNullParam;
67

78
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.CompletionException;
@@ -34,6 +35,9 @@ public class ResultAggregator {
3435
private volatile Message message;
3536

3637
public ResultAggregator(TaskManager taskManager, Message message, Executor executor, TaskStateProcessor stateProcessor) {
38+
checkNotNullParam("taskManager", taskManager);
39+
checkNotNullParam("executor", executor);
40+
checkNotNullParam("stateProcessor", stateProcessor);
3741
this.taskManager = taskManager;
3842
this.message = message;
3943
this.executor = executor;
@@ -218,10 +222,8 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
218222
LOGGER.debug("Persisted final task state after background consumption: {}", taskIdForLogging());
219223
}
220224
// Remove task from state processor after final persistence
221-
if (stateProcessor != null) {
222-
stateProcessor.removeTask(finalTask.getId());
223-
LOGGER.debug("Removed task {} from state processor after background consumption", finalTask.getId());
224-
}
225+
stateProcessor.removeTask(finalTask.getId());
226+
LOGGER.debug("Removed task {} from state processor after background consumption", finalTask.getId());
225227
}
226228
}
227229
completionFuture.complete(null);

server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class ResultAggregatorTest {
4949
@BeforeEach
5050
void setUp() {
5151
MockitoAnnotations.openMocks(this);
52-
aggregator = new ResultAggregator(mockTaskManager, null, testExecutor, null);
52+
aggregator = new ResultAggregator(mockTaskManager, null, testExecutor, new TaskStateProcessor());
5353
}
5454

5555
// Helper methods for creating sample data
@@ -75,7 +75,7 @@ private Task createSampleTask(String taskId, TaskState statusState, String conte
7575
@Test
7676
void testConstructorWithMessage() {
7777
Message initialMessage = createSampleMessage("initial", "msg1", Message.Role.USER);
78-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage, testExecutor, null);
78+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage, testExecutor, new TaskStateProcessor());
7979

8080
// Test that the message is properly stored by checking getCurrentResult
8181
assertEquals(initialMessage, aggregatorWithMessage.getCurrentResult());
@@ -86,7 +86,7 @@ void testConstructorWithMessage() {
8686
@Test
8787
void testGetCurrentResultWithMessageSet() {
8888
Message sampleMessage = createSampleMessage("hola", "msg1", Message.Role.USER);
89-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage, testExecutor, null);
89+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage, testExecutor, new TaskStateProcessor());
9090

9191
EventKind result = aggregatorWithMessage.getCurrentResult();
9292

@@ -121,7 +121,7 @@ void testConstructorStoresTaskManagerCorrectly() {
121121

122122
@Test
123123
void testConstructorWithNullMessage() {
124-
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null, testExecutor, null);
124+
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null, testExecutor, new TaskStateProcessor());
125125
Task expectedTask = createSampleTask("null_msg_task", TaskState.WORKING, "ctx1");
126126
when(mockTaskManager.getTask()).thenReturn(expectedTask);
127127

@@ -181,7 +181,7 @@ void testMultipleGetCurrentResultCalls() {
181181
void testGetCurrentResultWithMessageTakesPrecedence() {
182182
// Test that when both message and task are available, message takes precedence
183183
Message message = createSampleMessage("priority message", "pri1", Message.Role.USER);
184-
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message, testExecutor, null);
184+
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message, testExecutor, new TaskStateProcessor());
185185

186186
// Even if we set up the task manager to return something, message should take precedence
187187
Task task = createSampleTask("should_not_be_returned", TaskState.WORKING, "ctx1");

0 commit comments

Comments
 (0)