diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 3671da130..9b18b1cb8 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -9,7 +9,6 @@ import static org.wildfly.common.Assert.assertNotNull; import static org.wildfly.common.Assert.assertTrue; -import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; @@ -24,24 +23,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; import jakarta.ws.rs.core.MediaType; -import com.fasterxml.jackson.core.JsonProcessingException; - import io.a2a.client.A2AClient; import io.a2a.spec.A2AServerException; import io.a2a.spec.AgentCard; import io.a2a.spec.Artifact; -import io.a2a.spec.CancelTaskRequest; import io.a2a.spec.CancelTaskResponse; import io.a2a.spec.DeleteTaskPushNotificationConfigResponse; import io.a2a.spec.Event; import io.a2a.spec.GetTaskPushNotificationConfigParams; -import io.a2a.spec.GetTaskPushNotificationConfigRequest; import io.a2a.spec.GetTaskPushNotificationConfigResponse; -import io.a2a.spec.GetTaskRequest; import io.a2a.spec.GetTaskResponse; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.InvalidRequestError; @@ -54,28 +47,20 @@ import io.a2a.spec.MethodNotFoundError; import io.a2a.spec.Part; import io.a2a.spec.PushNotificationConfig; -import io.a2a.spec.SendMessageRequest; import io.a2a.spec.SendMessageResponse; -import io.a2a.spec.SendStreamingMessageRequest; -import io.a2a.spec.SendStreamingMessageResponse; -import io.a2a.spec.SetTaskPushNotificationConfigRequest; import io.a2a.spec.SetTaskPushNotificationConfigResponse; -import io.a2a.spec.StreamingJSONRPCRequest; import io.a2a.spec.Task; import io.a2a.spec.TaskArtifactUpdateEvent; import io.a2a.spec.TaskIdParams; import io.a2a.spec.TaskNotFoundError; import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; -import io.a2a.spec.TaskResubscriptionRequest; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; import io.a2a.util.Utils; -import io.restassured.RestAssured; -import io.restassured.specification.RequestSpecification; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -86,46 +71,45 @@ * which delegates to {@link TestUtilsBean}. */ public abstract class AbstractA2AServerTest { - private static final Task MINIMAL_TASK = new Task.Builder() .id("task-123") .contextId("session-xyz") .status(new TaskStatus(TaskState.SUBMITTED)) .build(); - + private static final Task CANCEL_TASK = new Task.Builder() .id("cancel-task-123") .contextId("session-xyz") .status(new TaskStatus(TaskState.SUBMITTED)) .build(); - + private static final Task CANCEL_TASK_NOT_SUPPORTED = new Task.Builder() .id("cancel-task-not-supported-123") .contextId("session-xyz") .status(new TaskStatus(TaskState.SUBMITTED)) .build(); - + private static final Task SEND_MESSAGE_NOT_SUPPORTED = new Task.Builder() .id("task-not-supported-123") .contextId("session-xyz") .status(new TaskStatus(TaskState.SUBMITTED)) .build(); - + private static final Message MESSAGE = new Message.Builder() .messageId("111") .role(Message.Role.AGENT) .parts(new TextPart("test message")) .build(); public static final String APPLICATION_JSON = "application/json"; - + private final int serverPort; private A2AClient client; - + protected AbstractA2AServerTest(int serverPort) { this.serverPort = serverPort; this.client = new A2AClient("http://localhost:" + serverPort); } - + @Test public void testTaskStoreMethodsSanityTest() throws Exception { Task task = new Task.Builder(MINIMAL_TASK).id("abcde").build(); @@ -134,139 +118,89 @@ public void testTaskStoreMethodsSanityTest() throws Exception { assertEquals(task.getId(), saved.getId()); assertEquals(task.getContextId(), saved.getContextId()); assertEquals(task.getStatus().state(), saved.getStatus().state()); - + deleteTaskInTaskStore(task.getId()); Task saved2 = getTaskFromTaskStore(task.getId()); assertNull(saved2); } - + @Test public void testGetTaskSuccess() throws Exception { testGetTask(); } - + private void testGetTask() throws Exception { testGetTask(null); } - + private void testGetTask(String mediaType) throws Exception { saveTaskInTaskStore(MINIMAL_TASK); try { - GetTaskRequest request = new GetTaskRequest("1", new TaskQueryParams(MINIMAL_TASK.getId())); - RequestSpecification requestSpecification = RestAssured.given() - .contentType(MediaType.APPLICATION_JSON) - .body(request); - if (mediaType != null) { - requestSpecification = requestSpecification.accept(mediaType); - } - GetTaskResponse response = requestSpecification - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(GetTaskResponse.class); + GetTaskResponse response = client.getTask("1", new TaskQueryParams(MINIMAL_TASK.getId())); assertEquals("1", response.getId()); assertEquals("task-123", response.getResult().getId()); assertEquals("session-xyz", response.getResult().getContextId()); assertEquals(TaskState.SUBMITTED, response.getResult().getStatus().state()); assertNull(response.getError()); + } catch (A2AServerException e) { + fail("Unexpected exception during getTask: " + e.getMessage(), e); } finally { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testGetTaskNotFound() throws Exception { assertTrue(getTaskFromTaskStore("non-existent-task") == null); - GetTaskRequest request = new GetTaskRequest("1", new TaskQueryParams("non-existent-task")); - GetTaskResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(GetTaskResponse.class); - assertEquals("1", response.getId()); - // this should be an instance of TaskNotFoundError, see https://github.com/a2aproject/a2a-java/issues/23 - assertInstanceOf(JSONRPCError.class, response.getError()); - assertEquals(new TaskNotFoundError().getCode(), response.getError().getCode()); - assertNull(response.getResult()); + try { + client.getTask("1", new TaskQueryParams("non-existent-task")); + fail("Expected A2AServerException to be thrown"); + } catch (A2AServerException e) { + assertInstanceOf(TaskNotFoundError.class, e.getCause()); + } } - + @Test public void testCancelTaskSuccess() throws Exception { saveTaskInTaskStore(CANCEL_TASK); try { - CancelTaskRequest request = new CancelTaskRequest("1", new TaskIdParams(CANCEL_TASK.getId())); - CancelTaskResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(CancelTaskResponse.class); + CancelTaskResponse response = client.cancelTask("1", new TaskIdParams(CANCEL_TASK.getId())); + assertEquals("1", response.getId()); assertNull(response.getError()); - assertEquals(request.getId(), response.getId()); Task task = response.getResult(); assertEquals(CANCEL_TASK.getId(), task.getId()); assertEquals(CANCEL_TASK.getContextId(), task.getContextId()); assertEquals(TaskState.CANCELED, task.getStatus().state()); - } catch (Exception e) { + } catch (A2AServerException e) { + fail("Unexpected exception during cancel task success test: " + e.getMessage(), e); } finally { deleteTaskInTaskStore(CANCEL_TASK.getId()); } } - + @Test public void testCancelTaskNotSupported() throws Exception { saveTaskInTaskStore(CANCEL_TASK_NOT_SUPPORTED); try { - CancelTaskRequest request = new CancelTaskRequest("1", new TaskIdParams(CANCEL_TASK_NOT_SUPPORTED.getId())); - CancelTaskResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(CancelTaskResponse.class); - assertEquals(request.getId(), response.getId()); - assertNull(response.getResult()); - // this should be an instance of UnsupportedOperationError, see https://github.com/a2aproject/a2a-java/issues/23 - assertInstanceOf(JSONRPCError.class, response.getError()); - assertEquals(new UnsupportedOperationError().getCode(), response.getError().getCode()); - } catch (Exception e) { + client.cancelTask("1", new TaskIdParams(CANCEL_TASK_NOT_SUPPORTED.getId())); + fail("Expected A2AServerException to be thrown"); + } catch (A2AServerException e) { + assertInstanceOf(UnsupportedOperationError.class, e.getCause()); } finally { deleteTaskInTaskStore(CANCEL_TASK_NOT_SUPPORTED.getId()); } } - + @Test public void testCancelTaskNotFound() { - CancelTaskRequest request = new CancelTaskRequest("1", new TaskIdParams("non-existent-task")); - CancelTaskResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(CancelTaskResponse.class) - ; - assertEquals(request.getId(), response.getId()); - assertNull(response.getResult()); - // this should be an instance of UnsupportedOperationError, see https://github.com/a2aproject/a2a-java/issues/23 - assertInstanceOf(JSONRPCError.class, response.getError()); - assertEquals(new TaskNotFoundError().getCode(), response.getError().getCode()); + try { + client.cancelTask("1", new TaskIdParams("non-existent-task")); + fail("Expected A2AServerException to be thrown"); + } catch (A2AServerException e) { + assertInstanceOf(TaskNotFoundError.class, e.getCause()); + } } - + @Test public void testSendMessageNewMessageSuccess() throws Exception { assertTrue(getTaskFromTaskStore(MINIMAL_TASK.getId()) == null); @@ -274,25 +208,23 @@ public void testSendMessageNewMessageSuccess() throws Exception { .taskId(MINIMAL_TASK.getId()) .contextId(MINIMAL_TASK.getContextId()) .build(); - SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); - SendMessageResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(SendMessageResponse.class); - assertNull(response.getError()); - Message messageResponse = (Message) response.getResult(); - assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); - assertEquals(MESSAGE.getRole(), messageResponse.getRole()); - Part part = messageResponse.getParts().get(0); - assertEquals(Part.Kind.TEXT, part.getKind()); - assertEquals("test message", ((TextPart) part).getText()); + MessageSendParams messageSendParams = new MessageSendParams(message, null, null); + + try { + SendMessageResponse response = client.sendMessage("1", messageSendParams); + assertEquals("1", response.getId()); + assertNull(response.getError()); + Message messageResponse = (Message) response.getResult(); + assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); + assertEquals(MESSAGE.getRole(), messageResponse.getRole()); + Part part = messageResponse.getParts().get(0); + assertEquals(Part.Kind.TEXT, part.getKind()); + assertEquals("test message", ((TextPart) part).getText()); + } catch (A2AServerException e) { + fail("Unexpected exception during send new message test: " + e.getMessage(), e); + } } - + @Test public void testSendMessageExistingTaskSuccess() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -301,16 +233,10 @@ public void testSendMessageExistingTaskSuccess() throws Exception { .taskId(MINIMAL_TASK.getId()) .contextId(MINIMAL_TASK.getContextId()) .build(); - SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); - SendMessageResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(SendMessageResponse.class); + MessageSendParams messageSendParams = new MessageSendParams(message, null, null); + + SendMessageResponse response = client.sendMessage("1", messageSendParams); + assertEquals("1", response.getId()); assertNull(response.getError()); Message messageResponse = (Message) response.getResult(); assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); @@ -318,108 +244,78 @@ public void testSendMessageExistingTaskSuccess() throws Exception { Part part = messageResponse.getParts().get(0); assertEquals(Part.Kind.TEXT, part.getKind()); assertEquals("test message", ((TextPart) part).getText()); - } catch (Exception e) { + } catch (A2AServerException e) { + fail("Unexpected exception during send message to existing task test: " + e.getMessage(), e); } finally { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testSetPushNotificationSuccess() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); try { - TaskPushNotificationConfig taskPushConfig = - new TaskPushNotificationConfig( - MINIMAL_TASK.getId(), new PushNotificationConfig.Builder().url("http://example.com").build()); - SetTaskPushNotificationConfigRequest request = new SetTaskPushNotificationConfigRequest("1", taskPushConfig); - SetTaskPushNotificationConfigResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(SetTaskPushNotificationConfigResponse.class); + PushNotificationConfig pushNotificationConfig = new PushNotificationConfig.Builder() + .url("http://example.com") + .build(); + SetTaskPushNotificationConfigResponse response = client.setTaskPushNotificationConfig("1", + MINIMAL_TASK.getId(), pushNotificationConfig); + assertEquals("1", response.getId()); assertNull(response.getError()); - assertEquals(request.getId(), response.getId()); TaskPushNotificationConfig config = response.getResult(); assertEquals(MINIMAL_TASK.getId(), config.taskId()); assertEquals("http://example.com", config.pushNotificationConfig().url()); - } catch (Exception e) { + } catch (A2AServerException e) { + fail("Unexpected exception during set push notification test: " + e.getMessage(), e); } finally { deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId()); deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testGetPushNotificationSuccess() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); try { - TaskPushNotificationConfig taskPushConfig = - new TaskPushNotificationConfig( - MINIMAL_TASK.getId(), new PushNotificationConfig.Builder().url("http://example.com").build()); - - SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = new SetTaskPushNotificationConfigRequest("1", taskPushConfig); - SetTaskPushNotificationConfigResponse setTaskPushNotificationResponse = given() - .contentType(MediaType.APPLICATION_JSON) - .body(setTaskPushNotificationRequest) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(SetTaskPushNotificationConfigResponse.class); - assertNotNull(setTaskPushNotificationResponse); - - GetTaskPushNotificationConfigRequest request = - new GetTaskPushNotificationConfigRequest("111", new GetTaskPushNotificationConfigParams(MINIMAL_TASK.getId())); - GetTaskPushNotificationConfigResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(GetTaskPushNotificationConfigResponse.class); + PushNotificationConfig pushNotificationConfig = new PushNotificationConfig.Builder() + .url("http://example.com") + .build(); + + // First set the push notification config + SetTaskPushNotificationConfigResponse setResponse = client.setTaskPushNotificationConfig("1", + MINIMAL_TASK.getId(), pushNotificationConfig); + assertNotNull(setResponse); + + // Then get the push notification config + GetTaskPushNotificationConfigResponse response = client.getTaskPushNotificationConfig("2", new GetTaskPushNotificationConfigParams(MINIMAL_TASK.getId())); + assertEquals("2", response.getId()); assertNull(response.getError()); - assertEquals(request.getId(), response.getId()); TaskPushNotificationConfig config = response.getResult(); assertEquals(MINIMAL_TASK.getId(), config.taskId()); assertEquals("http://example.com", config.pushNotificationConfig().url()); - } catch (Exception e) { + } catch (A2AServerException e) { + fail("Unexpected exception during get push notification test: " + e.getMessage(), e); } finally { deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId()); - deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testError() { Message message = new Message.Builder(MESSAGE) .taskId(SEND_MESSAGE_NOT_SUPPORTED.getId()) .contextId(SEND_MESSAGE_NOT_SUPPORTED.getContextId()) .build(); - SendMessageRequest request = new SendMessageRequest( - "1", new MessageSendParams(message, null, null)); - SendMessageResponse response = given() - .contentType(MediaType.APPLICATION_JSON) - .body(request) - .when() - .post("/") - .then() - .statusCode(200) - .extract() - .as(SendMessageResponse.class); - assertEquals(request.getId(), response.getId()); - assertNull(response.getResult()); - // this should be an instance of UnsupportedOperationError, see https://github.com/a2aproject/a2a-java/issues/23 - assertInstanceOf(JSONRPCError.class, response.getError()); - assertEquals(new UnsupportedOperationError().getCode(), response.getError().getCode()); - } + MessageSendParams messageSendParams = new MessageSendParams(message, null, null); + try { + client.sendMessage("1", messageSendParams); + fail("Expected A2AServerException to be thrown"); + } catch (A2AServerException e) { + assertInstanceOf(UnsupportedOperationError.class, e.getCause()); + } + } + @Test public void testGetAgentCard() { AgentCard agentCard = given() @@ -441,7 +337,7 @@ public void testGetAgentCard() { assertTrue(agentCard.capabilities().stateTransitionHistory()); assertTrue(agentCard.skills().isEmpty()); } - + @Test public void testGetExtendAgentCardNotSupported() { given() @@ -452,7 +348,7 @@ public void testGetExtendAgentCardNotSupported() { .statusCode(404) .body("error", equalTo("Extended agent card not supported or not enabled.")); } - + @Test public void testMalformedJSONRPCRequest() { // missing closing bracket @@ -469,20 +365,20 @@ public void testMalformedJSONRPCRequest() { assertNotNull(response.getError()); assertEquals(new JSONParseError().getCode(), response.getError().getCode()); } - + @Test public void testInvalidParamsJSONRPCRequest() { String invalidParamsRequest = """ {"jsonrpc": "2.0", "method": "message/send", "params": "not_a_dict", "id": "1"} """; testInvalidParams(invalidParamsRequest); - + invalidParamsRequest = """ {"jsonrpc": "2.0", "method": "message/send", "params": {"message": {"parts": "invalid"}}, "id": "1"} """; testInvalidParams(invalidParamsRequest); } - + private void testInvalidParams(String invalidParamsRequest) { JSONRPCErrorResponse response = given() .contentType(MediaType.APPLICATION_JSON) @@ -497,7 +393,7 @@ private void testInvalidParams(String invalidParamsRequest) { assertEquals(new InvalidParamsError().getCode(), response.getError().getCode()); assertEquals("1", response.getId()); } - + @Test public void testInvalidJSONRPCRequestMissingJsonrpc() { String invalidRequest = """ @@ -518,7 +414,7 @@ public void testInvalidJSONRPCRequestMissingJsonrpc() { assertNotNull(response.getError()); assertEquals(new InvalidRequestError().getCode(), response.getError().getCode()); } - + @Test public void testInvalidJSONRPCRequestMissingMethod() { String invalidRequest = """ @@ -536,7 +432,7 @@ public void testInvalidJSONRPCRequestMissingMethod() { assertNotNull(response.getError()); assertEquals(new InvalidRequestError().getCode(), response.getError().getCode()); } - + @Test public void testInvalidJSONRPCRequestInvalidId() { String invalidRequest = """ @@ -554,7 +450,7 @@ public void testInvalidJSONRPCRequestInvalidId() { assertNotNull(response.getError()); assertEquals(new InvalidRequestError().getCode(), response.getError().getCode()); } - + @Test public void testInvalidJSONRPCRequestNonExistentMethod() { String invalidRequest = """ @@ -572,13 +468,12 @@ public void testInvalidJSONRPCRequestNonExistentMethod() { assertNotNull(response.getError()); assertEquals(new MethodNotFoundError().getCode(), response.getError().getCode()); } - + @Test public void testNonStreamingMethodWithAcceptHeader() throws Exception { testGetTask(MediaType.APPLICATION_JSON); } - - + @Test public void testSendMessageStreamExistingTaskSuccess() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -587,115 +482,121 @@ public void testSendMessageStreamExistingTaskSuccess() throws Exception { .taskId(MINIMAL_TASK.getId()) .contextId(MINIMAL_TASK.getContextId()) .build(); - SendStreamingMessageRequest request = new SendStreamingMessageRequest( - "1", new MessageSendParams(message, null, null)); - - CompletableFuture>> responseFuture = initialiseStreamingRequest(request, null); - + MessageSendParams messageSendParams = new MessageSendParams(message, null, null); + CountDownLatch latch = new CountDownLatch(1); AtomicReference errorRef = new AtomicReference<>(); - - responseFuture.thenAccept(response -> { - if (response.statusCode() != 200) { - //errorRef.set(new IllegalStateException("Status code was " + response.statusCode())); - throw new IllegalStateException("Status code was " + response.statusCode()); - } - response.body().forEach(line -> { - try { - SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line); - if (jsonResponse != null) { - assertNull(jsonResponse.getError()); - Message messageResponse = (Message) jsonResponse.getResult(); - assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); - assertEquals(MESSAGE.getRole(), messageResponse.getRole()); - Part part = messageResponse.getParts().get(0); - assertEquals(Part.Kind.TEXT, part.getKind()); - assertEquals("test message", ((TextPart) part).getText()); + AtomicReference messageResponseRef = new AtomicReference<>(); + + // Replace the native HttpClient with A2AClient's sendStreamingMessage method. + client.sendStreamingMessage( + "1", + messageSendParams, + // eventHandler + (streamingEvent) -> { + try { + if (streamingEvent instanceof Message) { + messageResponseRef.set((Message) streamingEvent); + latch.countDown(); + } + } catch (Exception e) { + errorRef.set(e); latch.countDown(); } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + }, + // errorHandler + (jsonRpcError) -> { + errorRef.set(new RuntimeException("JSON-RPC Error: " + jsonRpcError.getMessage())); + latch.countDown(); + }, + // failureHandler + () -> { + if (errorRef.get() == null) { + errorRef.set(new RuntimeException("Stream processing failed")); + } + latch.countDown(); } - }); - }).exceptionally(t -> { - if (!isStreamClosedError(t)) { - errorRef.set(t); - } - latch.countDown(); - return null; - }); - + ); + boolean dataRead = latch.await(20, TimeUnit.SECONDS); Assertions.assertTrue(dataRead); Assertions.assertNull(errorRef.get()); + + Message messageResponse = messageResponseRef.get(); + Assertions.assertNotNull(messageResponse); + assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); + assertEquals(MESSAGE.getRole(), messageResponse.getRole()); + Part part = messageResponse.getParts().get(0); + assertEquals(Part.Kind.TEXT, part.getKind()); + assertEquals("test message", ((TextPart) part).getText()); } catch (Exception e) { + fail("Unexpected exception during send message stream to existing task test: " + e.getMessage(), e); } finally { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test @Timeout(value = 3, unit = TimeUnit.MINUTES) public void testResubscribeExistingTaskSuccess() throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); saveTaskInTaskStore(MINIMAL_TASK); - + try { // attempting to send a streaming message instead of explicitly calling queueManager#createOrTap // does not work because after the message is sent, the queue becomes null but task resubscription // requires the queue to still be active ensureQueueForTask(MINIMAL_TASK.getId()); - + CountDownLatch taskResubscriptionRequestSent = new CountDownLatch(1); CountDownLatch taskResubscriptionResponseReceived = new CountDownLatch(2); - AtomicReference firstResponse = new AtomicReference<>(); - AtomicReference secondResponse = new AtomicReference<>(); - + AtomicReference firstResponse = new AtomicReference<>(); + AtomicReference secondResponse = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + // resubscribe to the task, requires the task and its queue to still be active - TaskResubscriptionRequest taskResubscriptionRequest = new TaskResubscriptionRequest("1", new TaskIdParams(MINIMAL_TASK.getId())); - + TaskIdParams taskIdParams = new TaskIdParams(MINIMAL_TASK.getId()); + // Count down the latch when the MultiSseSupport on the server has started subscribing awaitStreamingSubscription() .whenComplete((unused, throwable) -> taskResubscriptionRequestSent.countDown()); - - CompletableFuture>> responseFuture = initialiseStreamingRequest(taskResubscriptionRequest, null); - - AtomicReference errorRef = new AtomicReference<>(); - - responseFuture.thenAccept(response -> { - - if (response.statusCode() != 200) { - throw new IllegalStateException("Status code was " + response.statusCode()); - } - try { - response.body().forEach(line -> { + + // Use A2AClient-like resubscribeToTask Method + client.resubscribeToTask( + "1", // requestId + taskIdParams, + // eventHandler + (streamingEvent) -> { try { - SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line); - if (jsonResponse != null) { - SendStreamingMessageResponse sendStreamingMessageResponse = Utils.OBJECT_MAPPER.readValue(line.substring("data: ".length()).trim(), SendStreamingMessageResponse.class); - if (taskResubscriptionResponseReceived.getCount() == 2) { - firstResponse.set(sendStreamingMessageResponse); - } else { - secondResponse.set(sendStreamingMessageResponse); - } + if (streamingEvent instanceof TaskArtifactUpdateEvent artifactUpdateEvent) { + firstResponse.set(artifactUpdateEvent); + taskResubscriptionResponseReceived.countDown(); + } else if (streamingEvent instanceof TaskStatusUpdateEvent statusUpdateEvent) { + secondResponse.set(statusUpdateEvent); taskResubscriptionResponseReceived.countDown(); - if (taskResubscriptionResponseReceived.getCount() == 0) { - throw new BreakException(); - } } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + } catch (Exception e) { + errorRef.set(e); + taskResubscriptionResponseReceived.countDown(); + taskResubscriptionResponseReceived.countDown(); // Make sure the counter is zeroed } - }); - } catch (BreakException e) { - } - }).exceptionally(t -> { - if (!isStreamClosedError(t)) { - errorRef.set(t); - } - return null; - }); - + }, + // errorHandler + (jsonRpcError) -> { + errorRef.set(new RuntimeException("JSON-RPC Error: " + jsonRpcError.getMessage())); + taskResubscriptionResponseReceived.countDown(); + taskResubscriptionResponseReceived.countDown(); // Make sure the counter is zeroed + }, + // failureHandler + () -> { + if (errorRef.get() == null) { + errorRef.set(new RuntimeException("Stream processing failed")); + } + taskResubscriptionResponseReceived.countDown(); + taskResubscriptionResponseReceived.countDown(); // Make sure the counter is zeroed + } + ); + try { taskResubscriptionRequestSent.await(); List events = List.of( @@ -713,37 +614,34 @@ public void testResubscribeExistingTaskSuccess() throws Exception { .status(new TaskStatus(TaskState.COMPLETED)) .isFinal(true) .build()); - + for (Event event : events) { enqueueEventOnServer(event); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - + // wait for the client to receive the responses taskResubscriptionResponseReceived.await(); - + + Assertions.assertNull(errorRef.get()); + assertNotNull(firstResponse.get()); - SendStreamingMessageResponse sendStreamingMessageResponse = firstResponse.get(); - assertNull(sendStreamingMessageResponse.getError()); - TaskArtifactUpdateEvent taskArtifactUpdateEvent = (TaskArtifactUpdateEvent) sendStreamingMessageResponse.getResult(); + TaskArtifactUpdateEvent taskArtifactUpdateEvent = firstResponse.get(); assertEquals(MINIMAL_TASK.getId(), taskArtifactUpdateEvent.getTaskId()); assertEquals(MINIMAL_TASK.getContextId(), taskArtifactUpdateEvent.getContextId()); Part part = taskArtifactUpdateEvent.getArtifact().parts().get(0); assertEquals(Part.Kind.TEXT, part.getKind()); assertEquals("text", ((TextPart) part).getText()); - + assertNotNull(secondResponse.get()); - sendStreamingMessageResponse = secondResponse.get(); - assertNull(sendStreamingMessageResponse.getError()); - TaskStatusUpdateEvent taskStatusUpdateEvent = (TaskStatusUpdateEvent) sendStreamingMessageResponse.getResult(); + TaskStatusUpdateEvent taskStatusUpdateEvent = secondResponse.get(); assertEquals(MINIMAL_TASK.getId(), taskStatusUpdateEvent.getTaskId()); assertEquals(MINIMAL_TASK.getContextId(), taskStatusUpdateEvent.getContextId()); assertEquals(TaskState.COMPLETED, taskStatusUpdateEvent.getStatus().state()); assertNotNull(taskStatusUpdateEvent.getStatus().timestamp()); } finally { - //setStreamingSubscribedRunnable(null); deleteTaskInTaskStore(MINIMAL_TASK.getId()); executorService.shutdown(); if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { @@ -751,109 +649,114 @@ public void testResubscribeExistingTaskSuccess() throws Exception { } } } - + @Test public void testResubscribeNoExistingTaskError() throws Exception { - TaskResubscriptionRequest request = new TaskResubscriptionRequest("1", new TaskIdParams("non-existent-task")); - - CompletableFuture>> responseFuture = initialiseStreamingRequest(request, null); - + TaskIdParams taskIdParams = new TaskIdParams("non-existent-task"); + CountDownLatch latch = new CountDownLatch(1); AtomicReference errorRef = new AtomicReference<>(); - - responseFuture.thenAccept(response -> { - if (response.statusCode() != 200) { - //errorRef.set(new IllegalStateException("Status code was " + response.statusCode())); - throw new IllegalStateException("Status code was " + response.statusCode()); - } - response.body().forEach(line -> { - try { - SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line); - if (jsonResponse != null) { - assertEquals(request.getId(), jsonResponse.getId()); - assertNull(jsonResponse.getResult()); - // this should be an instance of TaskNotFoundError, see https://github.com/a2aproject/a2a-java/issues/23 - assertInstanceOf(JSONRPCError.class, jsonResponse.getError()); - assertEquals(new TaskNotFoundError().getCode(), jsonResponse.getError().getCode()); - latch.countDown(); + AtomicReference jsonRpcErrorRef = new AtomicReference<>(); + + // Use A2AClient-like resubscribeToTask Method + client.resubscribeToTask( + "1", // requestId + taskIdParams, + // eventHandler + (streamingEvent) -> { + // Do not expect to receive any success events, as the task does not exist + errorRef.set(new RuntimeException("Unexpected event received for non-existent task")); + latch.countDown(); + }, + // errorHandler + (jsonRpcError) -> { + jsonRpcErrorRef.set(jsonRpcError); + latch.countDown(); + }, + // failureHandler + () -> { + if (errorRef.get() == null && jsonRpcErrorRef.get() == null) { + errorRef.set(new RuntimeException("Expected error for non-existent task")); } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + latch.countDown(); } - }); - }).exceptionally(t -> { - if (!isStreamClosedError(t)) { - errorRef.set(t); - } - latch.countDown(); - return null; - }); - + ); + boolean dataRead = latch.await(20, TimeUnit.SECONDS); Assertions.assertTrue(dataRead); Assertions.assertNull(errorRef.get()); + + // Validation returns the expected TaskNotFoundError + JSONRPCError jsonRpcError = jsonRpcErrorRef.get(); + Assertions.assertNotNull(jsonRpcError); + assertEquals(new TaskNotFoundError().getCode(), jsonRpcError.getCode()); } - + @Test public void testStreamingMethodWithAcceptHeader() throws Exception { testSendStreamingMessage(MediaType.SERVER_SENT_EVENTS); } - + @Test public void testSendMessageStreamNewMessageSuccess() throws Exception { testSendStreamingMessage(null); } - + private void testSendStreamingMessage(String mediaType) throws Exception { Message message = new Message.Builder(MESSAGE) .taskId(MINIMAL_TASK.getId()) .contextId(MINIMAL_TASK.getContextId()) .build(); - SendStreamingMessageRequest request = new SendStreamingMessageRequest( - "1", new MessageSendParams(message, null, null)); - - CompletableFuture>> responseFuture = initialiseStreamingRequest(request, mediaType); - + MessageSendParams messageSendParams = new MessageSendParams(message, null, null); + CountDownLatch latch = new CountDownLatch(1); AtomicReference errorRef = new AtomicReference<>(); - - responseFuture.thenAccept(response -> { - if (response.statusCode() != 200) { - //errorRef.set(new IllegalStateException("Status code was " + response.statusCode())); - throw new IllegalStateException("Status code was " + response.statusCode()); - } - response.body().forEach(line -> { - try { - SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line); - if (jsonResponse != null) { - assertNull(jsonResponse.getError()); - Message messageResponse = (Message) jsonResponse.getResult(); - assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); - assertEquals(MESSAGE.getRole(), messageResponse.getRole()); - Part part = messageResponse.getParts().get(0); - assertEquals(Part.Kind.TEXT, part.getKind()); - assertEquals("test message", ((TextPart) part).getText()); + AtomicReference messageResponseRef = new AtomicReference<>(); + + // Using A2AClient's sendStreamingMessage method + client.sendStreamingMessage( + "1", // requestId + messageSendParams, + // eventHandler + (streamingEvent) -> { + try { + if (streamingEvent instanceof Message) { + messageResponseRef.set((Message) streamingEvent); + latch.countDown(); + } + } catch (Exception e) { + errorRef.set(e); latch.countDown(); } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + }, + // errorHandler + (jsonRpcError) -> { + errorRef.set(new RuntimeException("JSON-RPC Error: " + jsonRpcError.getMessage())); + latch.countDown(); + }, + // failureHandler + () -> { + if (errorRef.get() == null) { + errorRef.set(new RuntimeException("Stream processing failed")); + } + latch.countDown(); } - }); - }).exceptionally(t -> { - if (!isStreamClosedError(t)) { - errorRef.set(t); - } - latch.countDown(); - return null; - }); - - + ); + boolean dataRead = latch.await(20, TimeUnit.SECONDS); Assertions.assertTrue(dataRead); Assertions.assertNull(errorRef.get()); - + + Message messageResponse = messageResponseRef.get(); + Assertions.assertNotNull(messageResponse); + assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId()); + assertEquals(MESSAGE.getRole(), messageResponse.getRole()); + Part part = messageResponse.getParts().get(0); + assertEquals(Part.Kind.TEXT, part.getKind()); + assertEquals("test message", ((TextPart) part).getText()); + } - + @Test public void testListPushNotificationConfigWithConfigId() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -869,7 +772,7 @@ public void testListPushNotificationConfigWithConfigId() throws Exception { .build(); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig1); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig2); - + try { ListTaskPushNotificationConfigResponse listResponse = client.listTaskPushNotificationConfig("111", MINIMAL_TASK.getId()); assertEquals("111", listResponse.getId()); @@ -884,7 +787,7 @@ public void testListPushNotificationConfigWithConfigId() throws Exception { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testListPushNotificationConfigWithoutConfigId() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -897,14 +800,14 @@ public void testListPushNotificationConfigWithoutConfigId() throws Exception { .url("http://2.example.com") .build(); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig1); - + // will overwrite the previous one savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig2); try { ListTaskPushNotificationConfigResponse listResponse = client.listTaskPushNotificationConfig("111", MINIMAL_TASK.getId()); assertEquals("111", listResponse.getId()); assertEquals(1, listResponse.getResult().size()); - + PushNotificationConfig expectedNotificationConfig = new PushNotificationConfig.Builder() .url("http://2.example.com") .id(MINIMAL_TASK.getId()) @@ -918,7 +821,7 @@ public void testListPushNotificationConfigWithoutConfigId() throws Exception { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testListPushNotificationConfigTaskNotFound() { try { @@ -928,7 +831,7 @@ public void testListPushNotificationConfigTaskNotFound() { assertInstanceOf(TaskNotFoundError.class, e.getCause()); } } - + @Test public void testListPushNotificationConfigEmptyList() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -942,7 +845,7 @@ public void testListPushNotificationConfigEmptyList() throws Exception { deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testDeletePushNotificationConfigWithValidConfigId() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -951,7 +854,7 @@ public void testDeletePushNotificationConfigWithValidConfigId() throws Exception .contextId("session-xyz") .status(new TaskStatus(TaskState.SUBMITTED)) .build()); - + PushNotificationConfig notificationConfig1 = new PushNotificationConfig.Builder() .url("http://example.com") @@ -965,18 +868,18 @@ public void testDeletePushNotificationConfigWithValidConfigId() throws Exception savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig1); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig2); savePushNotificationConfigInStore("task-456", notificationConfig1); - + try { // specify the config ID to delete DeleteTaskPushNotificationConfigResponse deleteResponse = client.deleteTaskPushNotificationConfig(MINIMAL_TASK.getId(), "config1"); assertNull(deleteResponse.getError()); assertNull(deleteResponse.getResult()); - + // should now be 1 left ListTaskPushNotificationConfigResponse listResponse = client.listTaskPushNotificationConfig(MINIMAL_TASK.getId()); assertEquals(1, listResponse.getResult().size()); - + // should remain unchanged, this is a different task listResponse = client.listTaskPushNotificationConfig("task-456"); assertEquals(1, listResponse.getResult().size()); @@ -990,7 +893,7 @@ public void testDeletePushNotificationConfigWithValidConfigId() throws Exception deleteTaskInTaskStore("task-456"); } } - + @Test public void testDeletePushNotificationConfigWithNonExistingConfigId() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -1006,13 +909,13 @@ public void testDeletePushNotificationConfigWithNonExistingConfigId() throws Exc .build(); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig1); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig2); - + try { DeleteTaskPushNotificationConfigResponse deleteResponse = client.deleteTaskPushNotificationConfig(MINIMAL_TASK.getId(), "non-existent-config-id"); assertNull(deleteResponse.getError()); assertNull(deleteResponse.getResult()); - + // should remain unchanged ListTaskPushNotificationConfigResponse listResponse = client.listTaskPushNotificationConfig(MINIMAL_TASK.getId()); assertEquals(2, listResponse.getResult().size()); @@ -1024,7 +927,7 @@ public void testDeletePushNotificationConfigWithNonExistingConfigId() throws Exc deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - + @Test public void testDeletePushNotificationConfigTaskNotFound() { try { @@ -1034,7 +937,7 @@ public void testDeletePushNotificationConfigTaskNotFound() { assertInstanceOf(TaskNotFoundError.class, e.getCause()); } } - + @Test public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); @@ -1047,16 +950,16 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio .url("http://2.example.com") .build(); savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig1); - + // this one will overwrite the previous one savePushNotificationConfigInStore(MINIMAL_TASK.getId(), notificationConfig2); - + try { DeleteTaskPushNotificationConfigResponse deleteResponse = client.deleteTaskPushNotificationConfig(MINIMAL_TASK.getId(), MINIMAL_TASK.getId()); assertNull(deleteResponse.getError()); assertNull(deleteResponse.getResult()); - + // should now be 0 ListTaskPushNotificationConfigResponse listResponse = client.listTaskPushNotificationConfig(MINIMAL_TASK.getId()); assertEquals(0, listResponse.getResult().size()); @@ -1067,59 +970,7 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio deleteTaskInTaskStore(MINIMAL_TASK.getId()); } } - - private SendStreamingMessageResponse extractJsonResponseFromSseLine(String line) throws JsonProcessingException { - line = extractSseData(line); - if (line != null) { - return Utils.OBJECT_MAPPER.readValue(line, SendStreamingMessageResponse.class); - } - return null; - } - - private static String extractSseData(String line) { - if (line.startsWith("data:")) { - line = line.substring(5).trim(); - return line; - } - return null; - } - - private boolean isStreamClosedError(Throwable throwable) { - // Unwrap the CompletionException - Throwable cause = throwable; - - while (cause != null) { - if (cause instanceof EOFException) { - return true; - } - cause = cause.getCause(); - } - return false; - } - - private CompletableFuture>> initialiseStreamingRequest( - StreamingJSONRPCRequest request, String mediaType) throws Exception { - - // Create the client - HttpClient client = HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_2) - .build(); - - // Create the request - HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:" + serverPort + "/")) - .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(request))) - .header("Content-Type", APPLICATION_JSON); - if (mediaType != null) { - builder.header("Accept", mediaType); - } - HttpRequest httpRequest = builder.build(); - - - // Send request async and return the CompletableFuture - return client.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofLines()); - } - + protected void saveTaskInTaskStore(Task task) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1129,13 +980,13 @@ protected void saveTaskInTaskStore(Task task) throws Exception { .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(task))) .header("Content-Type", APPLICATION_JSON) .build(); - + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); if (response.statusCode() != 200) { -throw new RuntimeException(String.format("Saving task failed! Status: %d, Body: %s", response.statusCode(), response.body())); + throw new RuntimeException(String.format("Saving task failed! Status: %d, Body: %s", response.statusCode(), response.body())); } } - + protected Task getTaskFromTaskStore(String taskId) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1144,17 +995,17 @@ protected Task getTaskFromTaskStore(String taskId) throws Exception { .uri(URI.create("http://localhost:" + serverPort + "/test/task/" + taskId)) .GET() .build(); - + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); if (response.statusCode() == 404) { return null; } if (response.statusCode() != 200) { -throw new RuntimeException(String.format("Getting task failed! Status: %d, Body: %s", response.statusCode(), response.body())); + throw new RuntimeException(String.format("Getting task failed! Status: %d, Body: %s", response.statusCode(), response.body())); } return Utils.OBJECT_MAPPER.readValue(response.body(), Task.TYPE_REFERENCE); } - + protected void deleteTaskInTaskStore(String taskId) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1168,7 +1019,7 @@ protected void deleteTaskInTaskStore(String taskId) throws Exception { throw new RuntimeException(response.statusCode() + ": Deleting task failed!" + response.body()); } } - + protected void ensureQueueForTask(String taskId) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1179,10 +1030,10 @@ protected void ensureQueueForTask(String taskId) throws Exception { .build(); HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); if (response.statusCode() != 200) { -throw new RuntimeException(String.format("Ensuring queue failed! Status: %d, Body: %s", response.statusCode(), response.body())); + throw new RuntimeException(String.format("Ensuring queue failed! Status: %d, Body: %s", response.statusCode(), response.body())); } } - + protected void enqueueEventOnServer(Event event) throws Exception { String path; if (event instanceof TaskArtifactUpdateEvent e) { @@ -1201,17 +1052,17 @@ protected void enqueueEventOnServer(Event event) throws Exception { .header("Content-Type", APPLICATION_JSON) .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(event))) .build(); - + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); if (response.statusCode() != 200) { throw new RuntimeException(response.statusCode() + ": Queueing event failed!" + response.body()); } } - + private CompletableFuture awaitStreamingSubscription() { int cnt = getStreamingSubscribedCount(); AtomicInteger initialCount = new AtomicInteger(cnt); - + return CompletableFuture.runAsync(() -> { try { boolean done = false; @@ -1233,7 +1084,7 @@ private CompletableFuture awaitStreamingSubscription() { } }); } - + private int getStreamingSubscribedCount() { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1250,7 +1101,7 @@ private int getStreamingSubscribedCount() { throw new RuntimeException(e); } } - + protected void deletePushNotificationConfigInStore(String taskId, String configId) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1264,7 +1115,7 @@ protected void deletePushNotificationConfigInStore(String taskId, String configI throw new RuntimeException(response.statusCode() + ": Deleting task failed!" + response.body()); } } - + protected void savePushNotificationConfigInStore(String taskId, PushNotificationConfig notificationConfig) throws Exception { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) @@ -1274,14 +1125,10 @@ protected void savePushNotificationConfigInStore(String taskId, PushNotification .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(notificationConfig))) .header("Content-Type", APPLICATION_JSON) .build(); - + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); if (response.statusCode() != 200) { throw new RuntimeException(response.statusCode() + ": Creating task push notification config failed! " + response.body()); } } - - private static class BreakException extends RuntimeException { - - } }