-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathJsonRpcStreamedResult.cs
More file actions
61 lines (54 loc) · 2.6 KB
/
JsonRpcStreamedResult.cs
File metadata and controls
61 lines (54 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
using Microsoft.AspNetCore.Http;
using System.Net.ServerSentEvents;
using System.Text.Encodings.Web;
using System.Text.Json;
namespace A2A.AspNetCore;
/// <summary>
/// Result type for streaming JSON-RPC responses as Server-Sent Events (SSE) in HTTP responses.
/// </summary>
/// <remarks>
/// Implements IResult to provide real-time streaming of JSON-RPC responses for continuous
/// event streams like task updates, status changes, and artifact notifications.
/// </remarks>
public class JsonRpcStreamedResult : IResult
{
private readonly IAsyncEnumerable<A2AEvent> _events;
private readonly JsonRpcId requestId;
/// <summary>
/// Initializes a new instance of the JsonRpcStreamedResult class.
/// </summary>
/// <param name="events">The async enumerable stream of A2A events to send as Server-Sent Events.</param>
/// <param name="requestId">The JSON-RPC request ID used for correlating responses with the original request.</param>
public JsonRpcStreamedResult(IAsyncEnumerable<A2AEvent> events, JsonRpcId requestId)
{
ArgumentNullException.ThrowIfNull(events);
_events = events;
this.requestId = requestId;
}
/// <summary>
/// Executes the result by streaming JSON-RPC responses as Server-Sent Events to the HTTP response.
/// </summary>
/// <remarks>
/// Sets appropriate SSE headers, wraps each A2A event in a JSON-RPC response format,
/// and streams them using the SSE protocol with proper formatting and encoding.
/// </remarks>
/// <param name="httpContext">The HTTP context to stream the responses to.</param>
/// <returns>A task representing the asynchronous streaming operation.</returns>
public async Task ExecuteAsync(HttpContext httpContext)
{
ArgumentNullException.ThrowIfNull(httpContext);
httpContext.Response.StatusCode = StatusCodes.Status200OK;
httpContext.Response.ContentType = "text/event-stream";
httpContext.Response.Headers.Append("Cache-Control", "no-cache");
var responseTypeInfo = A2AJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcResponse));
await SseFormatter.WriteAsync(
_events.Select(e => new SseItem<JsonRpcResponse>(JsonRpcResponse.CreateJsonRpcResponse(requestId, e))),
httpContext.Response.Body,
(item, writer) =>
{
using Utf8JsonWriter json = new(writer, new() { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping });
JsonSerializer.Serialize(json, item.Data, responseTypeInfo);
},
httpContext.RequestAborted).ConfigureAwait(false);
}
}