Skip to content

Commit 12187a0

Browse files
fix: fix sse deserialization (#35)
* fix sse deserialization * remove unused code * address pr review feedback * address pr review feedback
1 parent 35e0142 commit 12187a0

File tree

3 files changed

+106
-14
lines changed

3 files changed

+106
-14
lines changed

samples/Client/Program.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ private static async Task<AgentTask> SendMessageToAgentStreaming(string messageT
187187
{
188188
Message = new Message
189189
{
190+
MessageId = Guid.NewGuid().ToString("N"),
191+
ContextId = currentSessionId,
190192
Role = MessageRole.User,
191193
Parts =
192194
[
@@ -219,6 +221,18 @@ private static async Task<AgentTask> SendMessageToAgentStreaming(string messageT
219221
Console.WriteLine($"Artifacts count: {agentTask.Artifacts.Count}");
220222
}
221223
break;
224+
case Message message:
225+
taskId = message.TaskId;
226+
Console.WriteLine($"Received message {message.MessageId} with role {message.Role}");
227+
if (message.Parts.Count > 0)
228+
{
229+
Console.WriteLine($"Message parts count: {message.Parts.Count}");
230+
foreach (var part in message.Parts.OfType<TextPart>())
231+
{
232+
Console.WriteLine(part.Text);
233+
}
234+
}
235+
break;
222236
default:
223237
Console.WriteLine($"Unknown event type: {item.EventType}");
224238
break;

src/A2A/Client/A2AClient.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,14 @@ private async Task<TOutput> SendRpcRequestAsync<TInput, TOutput>(
100100
"application/json",
101101
cancellationToken).ConfigureAwait(false);
102102

103-
var responseObject = await JsonSerializer.DeserializeAsync(responseStream, A2AJsonUtilities.JsonContext.Default.JsonRpcResponse, cancellationToken) ??
104-
throw new InvalidOperationException("Failed to deserialize the response.");
103+
var responseObject = await JsonSerializer.DeserializeAsync(responseStream, A2AJsonUtilities.JsonContext.Default.JsonRpcResponse, cancellationToken);
105104

106-
return responseObject.Result?.Deserialize(outputTypeInfo) ??
105+
if (responseObject?.Error is { } error)
106+
{
107+
throw new InvalidOperationException($"JSON-RPC error ({error.Code}): {error.Message}");
108+
}
109+
110+
return responseObject?.Result?.Deserialize(outputTypeInfo) ??
107111
throw new InvalidOperationException("Response does not contain a result.");
108112
}
109113

@@ -124,7 +128,16 @@ private async IAsyncEnumerable<SseItem<TOutput>> SendRpcSseRequestAsync<TInput,
124128
var sseParser = SseParser.Create(responseStream, (eventType, data) =>
125129
{
126130
var reader = new Utf8JsonReader(data);
127-
return JsonSerializer.Deserialize(ref reader, outputTypeInfo) ?? throw new InvalidOperationException("Failed to deserialize the event.");
131+
132+
var responseObject = JsonSerializer.Deserialize(ref reader, A2AJsonUtilities.JsonContext.Default.JsonRpcResponse);
133+
134+
if (responseObject?.Error is { } error)
135+
{
136+
throw new InvalidOperationException($"JSON-RPC error ({error.Code}): {error.Message}");
137+
}
138+
139+
return JsonSerializer.Deserialize(responseObject?.Result, outputTypeInfo) ??
140+
throw new InvalidOperationException("Failed to deserialize the event.");
128141
});
129142

130143
await foreach (var item in sseParser.EnumerateAsync(cancellationToken))

tests/A2A.UnitTests/Client/A2AClientTests.cs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,13 @@ public async Task SendMessageStreamAsync_MapsRequestParamsCorrectly()
379379
};
380380

381381
HttpRequestMessage? capturedRequest = null;
382-
// Simulate a minimal valid SSE response (empty event stream)
383-
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes("event: message\ndata: {}\n\n"));
382+
// Simulate a minimal valid SSE response
383+
var jsonRpcResponse = JsonSerializer.Serialize(new JsonRpcResponse
384+
{
385+
Id = "test-id",
386+
Result = JsonSerializer.SerializeToNode(new { })
387+
});
388+
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes($"event: message\ndata: {jsonRpcResponse}\n\n"));
384389
var response = new HttpResponseMessage(HttpStatusCode.OK)
385390
{
386391
Content = new StreamContent(sseStream)
@@ -436,9 +441,12 @@ public async Task SendMessageStreamAsync_MapsResponseCorrectly()
436441
TaskId = "task-456",
437442
ContextId = "ctx-789"
438443
};
439-
var sseEventJson = JsonSerializer.Serialize<A2AEvent>(expectedMessage, A2AJsonUtilities.DefaultOptions);
440-
var sseData = $"event: message\ndata: {sseEventJson}\n\n";
441-
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes(sseData));
444+
var jsonRpcResponse = JsonSerializer.Serialize(new JsonRpcResponse
445+
{
446+
Id = "test-id",
447+
Result = JsonSerializer.SerializeToNode<A2AEvent>(expectedMessage, A2AJsonUtilities.DefaultOptions)
448+
});
449+
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes($"event: message\ndata: {jsonRpcResponse}\n\n"));
442450
var response = new HttpResponseMessage(HttpStatusCode.OK)
443451
{
444452
Content = new StreamContent(sseStream)
@@ -477,8 +485,13 @@ public async Task ResubscribeToTaskAsync_MapsRequestParamsCorrectly()
477485
// Arrange
478486
var taskId = "task-123";
479487
HttpRequestMessage? capturedRequest = null;
480-
// Simulate a minimal valid SSE response (empty event stream)
481-
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes("event: message\ndata: {}\n\n"));
488+
// Simulate a minimal valid SSE response
489+
var jsonRpcResponse = JsonSerializer.Serialize(new JsonRpcResponse
490+
{
491+
Id = "test-id",
492+
Result = JsonSerializer.SerializeToNode(new { })
493+
});
494+
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes($"event: message\ndata: {jsonRpcResponse}\n\n"));
482495
var response = new HttpResponseMessage(HttpStatusCode.OK)
483496
{
484497
Content = new StreamContent(sseStream)
@@ -520,9 +533,12 @@ public async Task ResubscribeToTaskAsync_MapsResponseCorrectly()
520533
TaskId = "task-456",
521534
ContextId = "ctx-789"
522535
};
523-
var sseEventJson = JsonSerializer.Serialize<A2AEvent>(expectedMessage, A2AJsonUtilities.DefaultOptions);
524-
var sseData = $"event: message\ndata: {sseEventJson}\n\n";
525-
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes(sseData));
536+
var jsonRpcResponse = JsonSerializer.Serialize(new JsonRpcResponse
537+
{
538+
Id = "test-id",
539+
Result = JsonSerializer.SerializeToNode<A2AEvent>(expectedMessage, A2AJsonUtilities.DefaultOptions)
540+
});
541+
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes($"event: message\ndata: {jsonRpcResponse}\n\n"));
526542
var response = new HttpResponseMessage(HttpStatusCode.OK)
527543
{
528544
Content = new StreamContent(sseStream)
@@ -554,6 +570,55 @@ public async Task ResubscribeToTaskAsync_MapsResponseCorrectly()
554570
Assert.Equal(expectedMessage.ContextId, message.ContextId);
555571
}
556572

573+
[Fact]
574+
public async Task SendMessageStreamAsync_ThrowsOnJsonRpcError()
575+
{
576+
// Arrange
577+
var jsonRpcErrorResponse = JsonSerializer.Serialize(JsonRpcResponse.InvalidParamsResponse("test-id"));
578+
var sseStream = new MemoryStream(Encoding.UTF8.GetBytes($"event: message\ndata: {jsonRpcErrorResponse}\n\n"));
579+
var response = new HttpResponseMessage(HttpStatusCode.OK)
580+
{
581+
Content = new StreamContent(sseStream)
582+
};
583+
response.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("text/event-stream");
584+
var sut = CreateA2AClient(response);
585+
var sendParams = new MessageSendParams();
586+
587+
// Act & Assert
588+
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
589+
{
590+
await foreach (var _ in sut.SendMessageStreamAsync(sendParams))
591+
{
592+
// Should throw before yielding any items
593+
}
594+
});
595+
596+
Assert.Contains("-32602", exception.Message);
597+
Assert.Contains("Invalid parameters", exception.Message);
598+
}
599+
600+
[Fact]
601+
public async Task SendMessageAsync_ThrowsOnJsonRpcError()
602+
{
603+
// Arrange
604+
var jsonRpcErrorResponse = JsonSerializer.Serialize(JsonRpcResponse.MethodNotFoundResponse("test-id"));
605+
var response = new HttpResponseMessage(HttpStatusCode.OK)
606+
{
607+
Content = new StringContent(jsonRpcErrorResponse, Encoding.UTF8, "application/json")
608+
};
609+
var sut = CreateA2AClient(response);
610+
var sendParams = new MessageSendParams();
611+
612+
// Act & Assert
613+
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
614+
{
615+
await sut.SendMessageAsync(sendParams);
616+
});
617+
618+
Assert.Contains("-32601", exception.Message);
619+
Assert.Contains("Method not found", exception.Message);
620+
}
621+
557622
private static A2AClient CreateA2AClient<T>(T result, Action<HttpRequestMessage>? onRequest = null)
558623
{
559624
var jsonResponse = JsonSerializer.Serialize(new JsonRpcResponse

0 commit comments

Comments
 (0)