Skip to content

Commit b92f584

Browse files
authored
Merge pull request #514 from serverlessworkflow/fix-sse-streaming
Fixed both the `ClusterResourceController` and `NamespacedResourceController` by ensuring that SSE-based actions do not set the response's status code after streaming
2 parents edb666c + e274803 commit b92f584

File tree

4 files changed

+54
-27
lines changed

4 files changed

+54
-27
lines changed

src/api/Synapse.Api.Http/ClusterResourceController.cs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@ namespace Synapse.Api.Http;
2020
/// <param name="mediator">The service used to mediate calls</param>
2121
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
2222
public abstract class ClusterResourceController<TResource>(IMediator mediator, IJsonSerializer jsonSerializer)
23-
: ResourceController<TResource>(mediator)
23+
: ResourceController<TResource>(mediator, jsonSerializer)
2424
where TResource : class, IResource, new()
2525
{
2626

27-
/// <summary>
28-
/// Gets the service used to serialize/deserialize data to/from JSON
29-
/// </summary>
30-
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;
31-
3227
/// <summary>
3328
/// Gets the resource with the specified name
3429
/// </summary>
@@ -97,13 +92,17 @@ public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> Watc
9792
/// </summary>
9893
/// <param name="labelSelector">A comma-separated list of label selectors, if any</param>
9994
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
100-
/// <returns>A new <see cref="IActionResult"/></returns>
95+
/// <returns>A new awaitable <see cref="Task"/></returns>
10196
[HttpGet("watch/sse")]
10297
[ProducesResponseType(typeof(IAsyncEnumerable<ResourceWatchEvent>), (int)HttpStatusCode.OK)]
10398
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
104-
public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSelector = null, CancellationToken cancellationToken = default)
99+
public virtual async Task WatchResourcesUsingSSE(string? labelSelector = null, CancellationToken cancellationToken = default)
105100
{
106-
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!);
101+
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors))
102+
{
103+
await WriteInvalidLabelSelectorResponseAsync(labelSelector!, cancellationToken).ConfigureAwait(false);
104+
return;
105+
}
107106
var response = await this.Mediator.ExecuteAsync(new WatchResourcesQuery<TResource>(null, labelSelectors), cancellationToken).ConfigureAwait(false);
108107
this.Response.Headers.ContentType = "text/event-stream";
109108
this.Response.Headers.CacheControl = "no-cache";
@@ -119,7 +118,6 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
119118
}
120119
}
121120
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
122-
return this.Ok();
123121
}
124122

125123
/// <summary>
@@ -142,11 +140,11 @@ public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> Moni
142140
/// </summary>
143141
/// <param name="name">The name of the cluster resource to monitor</param>
144142
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
145-
/// <returns>A new <see cref="IActionResult"/></returns>
143+
/// <returns>A new awaitable <see cref="Task"/></returns>
146144
[HttpGet("{name}/monitor/sse")]
147145
[ProducesResponseType(typeof(IAsyncEnumerable<ResourceWatchEvent>), (int)HttpStatusCode.OK)]
148146
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
149-
public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, CancellationToken cancellationToken = default)
147+
public virtual async Task MonitorResourceUsingSSE(string name, CancellationToken cancellationToken = default)
150148
{
151149
var response = await this.Mediator.ExecuteAsync(new MonitorResourceQuery<TResource>(name, null), cancellationToken).ConfigureAwait(false);
152150
this.Response.Headers.ContentType = "text/event-stream";
@@ -163,7 +161,6 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
163161
}
164162
}
165163
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
166-
return this.Ok();
167164
}
168165

169166
/// <summary>

src/api/Synapse.Api.Http/NamespacedResourceController.cs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@ namespace Synapse.Api.Http;
2020
/// <param name="mediator">The service used to mediate calls</param>
2121
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
2222
public abstract class NamespacedResourceController<TResource>(IMediator mediator, IJsonSerializer jsonSerializer)
23-
: ResourceController<TResource>(mediator)
23+
: ResourceController<TResource>(mediator, jsonSerializer)
2424
where TResource : class, IResource, new()
2525
{
2626

27-
/// <summary>
28-
/// Gets the service used to serialize/deserialize data to/from JSON
29-
/// </summary>
30-
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;
31-
3227
/// <summary>
3328
/// Gets the resource with the specified name and namespace
3429
/// </summary>
@@ -150,13 +145,17 @@ public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> Watc
150145
/// <param name="namespace">The namespace the resources to watch belong to</param>
151146
/// <param name="labelSelector">A comma-separated list of label selectors, if any</param>
152147
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
153-
/// <returns>A new <see cref="IActionResult"/></returns>
148+
/// <returns>A new awaitable <see cref="Task"/></returns>
154149
[HttpGet("{namespace}/watch/sse")]
155150
[ProducesResponseType(typeof(IAsyncEnumerable<ResourceWatchEvent>), (int)HttpStatusCode.OK)]
156151
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
157-
public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespace, string? labelSelector = null, CancellationToken cancellationToken = default)
152+
public virtual async Task WatchResourcesUsingSSE(string @namespace, string? labelSelector = null, CancellationToken cancellationToken = default)
158153
{
159-
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!);
154+
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors))
155+
{
156+
await WriteInvalidLabelSelectorResponseAsync(labelSelector!, cancellationToken).ConfigureAwait(false);
157+
return;
158+
}
160159
var response = await this.Mediator.ExecuteAsync(new WatchResourcesQuery<TResource>(@namespace, labelSelectors), cancellationToken).ConfigureAwait(false);
161160
this.Response.Headers.ContentType = "text/event-stream";
162161
this.Response.Headers.CacheControl = "no-cache";
@@ -172,7 +171,6 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
172171
}
173172
}
174173
catch (Exception ex) when(ex is TaskCanceledException || ex is OperationCanceledException) { }
175-
return this.Ok();
176174
}
177175

178176
/// <summary>
@@ -197,11 +195,11 @@ public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> Moni
197195
/// <param name="namespace">The namespace the resource to monitor belongs to</param>
198196
/// <param name="name">The name of the resource to monitor</param>
199197
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
200-
/// <returns>A new <see cref="IActionResult"/></returns>
198+
/// <returns>A new awaitable <see cref="Task"/></returns>
201199
[HttpGet("{namespace}/{name}/monitor/sse")]
202200
[ProducesResponseType(typeof(IAsyncEnumerable<ResourceWatchEvent>), (int)HttpStatusCode.OK)]
203201
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
204-
public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, string @namespace, CancellationToken cancellationToken = default)
202+
public virtual async Task MonitorResourceUsingSSE(string name, string @namespace, CancellationToken cancellationToken = default)
205203
{
206204
var response = await this.Mediator.ExecuteAsync(new MonitorResourceQuery<TResource>(name, @namespace), cancellationToken).ConfigureAwait(false);
207205
this.Response.Headers.ContentType = "text/event-stream";
@@ -218,7 +216,6 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
218216
}
219217
}
220218
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
221-
return this.Ok();
222219
}
223220

224221
/// <summary>

src/api/Synapse.Api.Http/ResourceController.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ namespace Synapse.Api.Http;
1818
/// </summary>
1919
/// <typeparam name="TResource">The type of <see cref="IResource"/> to manage</typeparam>
2020
/// <param name="mediator">The service used to mediate calls</param>
21-
public abstract class ResourceController<TResource>(IMediator mediator)
21+
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON.</param>
22+
public abstract class ResourceController<TResource>(IMediator mediator, IJsonSerializer jsonSerializer)
2223
: Controller
2324
where TResource : class, IResource, new()
2425
{
@@ -28,6 +29,11 @@ public abstract class ResourceController<TResource>(IMediator mediator)
2829
/// </summary>
2930
protected IMediator Mediator { get; } = mediator;
3031

32+
/// <summary>
33+
/// Gets the service used to serialize/deserialize data to/from JSON.
34+
/// </summary>
35+
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;
36+
3137
/// <summary>
3238
/// Creates a new resource of the specified type
3339
/// </summary>
@@ -117,4 +123,30 @@ protected IActionResult InvalidLabelSelector(string labelSelector)
117123
return this.ValidationProblem("Bad Request", statusCode: (int)HttpStatusCode.BadRequest, title: "Bad Request", modelStateDictionary: this.ModelState);
118124
}
119125

126+
/// <summary>
127+
/// Writes to the response the description of a validation problem that occurred while processing the request.
128+
/// </summary>
129+
/// <param name="cancellationToken">A <see cref="CancellationToken"/>.</param>
130+
/// <returns>A new awaitable <see cref="Task"/>.</returns>
131+
protected virtual async Task WriteValidationProblemResponseAsync(CancellationToken cancellationToken)
132+
{
133+
var problem = new ValidationProblemDetails(ModelState);
134+
var json = JsonSerializer.SerializeToText(problem);
135+
Response.StatusCode = (int)HttpStatusCode.BadRequest;
136+
Response.ContentType = MediaTypeNames.Application.Json;
137+
await Response.WriteAsync(json, cancellationToken).ConfigureAwait(false);
138+
}
139+
140+
/// <summary>
141+
/// Writes to the response the description of an error that occurred while parsing the request's label selector.
142+
/// </summary>
143+
/// <param name="labelSelector">The invalid label selector.</param>
144+
/// <param name="cancellationToken">A <see cref="CancellationToken"/>.</param>
145+
/// <returns>A new awaitable <see cref="Task"/>.</returns>
146+
protected virtual async Task WriteInvalidLabelSelectorResponseAsync(string labelSelector, CancellationToken cancellationToken)
147+
{
148+
ModelState.AddModelError(nameof(labelSelector), $"The specified value '{labelSelector}' is not a valid comma-separated label selector list");
149+
await WriteValidationProblemResponseAsync(cancellationToken).ConfigureAwait(false);
150+
}
151+
120152
}

src/api/Synapse.Api.Http/Usings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@
2727
global using Synapse.Resources;
2828
global using System.Collections.Concurrent;
2929
global using System.Net;
30+
global using System.Net.Mime;
3031
global using System.Text;

0 commit comments

Comments
 (0)