From 20d6fac559b78289ee20a7a8eed3231f159d020b Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Thu, 24 Oct 2024 17:19:02 +0200 Subject: [PATCH] fix(ApiClient): Fixed the API client to use SSEs for watching and monitoring resources fix(Api): Fixed the actions to monitor and watch resources using SSEs in both cluster and namespaced resource controllers to flush headers directly Signed-off-by: Charles d'Avernas --- .../Services/IClusterResourceApiClient.cs | 4 +- .../Services/INamespacedResourceApiClient.cs | 4 +- .../Services/ResourceHttpApiClient.cs | 61 ++++++++++++++----- .../ClusterResourceController.cs | 2 + .../NamespacedResourceController.cs | 2 + .../MonitorWorkflowInstancesCommand.cs | 3 +- .../ResourceEditor/ResourceEditor.razor | 2 +- .../Executors/WorkflowProcessExecutor.cs | 9 ++- .../Services/WorkflowExecutionContext.cs | 2 +- .../Services/MockClusterResourceApiClient.cs | 15 +++-- .../MockNamespacedResourceApiClient.cs | 11 +++- 11 files changed, 81 insertions(+), 34 deletions(-) diff --git a/src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs index 19806bcb8..f7e6b6478 100644 --- a/src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs @@ -38,7 +38,7 @@ public interface IClusterResourceApiClient /// Defines the expected labels, if any, of the resources to watch /// A /// A new used to asynchronously enumerate resulting s - Task>> WatchAsync(IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default); + IAsyncEnumerable> WatchAsync(IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default); /// /// Monitors the resource with the specified name @@ -46,7 +46,7 @@ public interface IClusterResourceApiClient /// The name of the resource to monitor /// A /// A new used to asynchronously enumerate resulting s - Task>> MonitorAsync(string name, CancellationToken cancellationToken = default); + IAsyncEnumerable> MonitorAsync(string name, CancellationToken cancellationToken = default); /// /// Gets the resource with the specified name diff --git a/src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs index 2283fac9e..201a6b49c 100644 --- a/src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs @@ -40,7 +40,7 @@ public interface INamespacedResourceApiClient /// Defines the expected labels, if any, of the resources to watch /// A /// A new used to asynchronously enumerate resulting s - Task>> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default); + IAsyncEnumerable> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default); /// /// Monitors the resource with the specified name @@ -49,7 +49,7 @@ public interface INamespacedResourceApiClient /// The namespace the resource to monitor belongs to /// A /// A new used to asynchronously enumerate resulting s - Task>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default); + IAsyncEnumerable> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default); /// /// Gets the resource with the specified name diff --git a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs index 8ed809e38..b8deddf8e 100644 --- a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Runtime.CompilerServices; + namespace Synapse.Api.Client.Services; /// @@ -105,25 +107,32 @@ public virtual async Task> ListAsync(IEnumerable - public virtual async Task>> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) + public virtual async IAsyncEnumerable> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { var resource = new TResource(); - var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch"; + var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch"; var queryStringArguments = new Dictionary(); if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(',')); if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}"; using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); - request.EnableWebAssemblyStreamingResponse(); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - return this.JsonSerializer.DeserializeAsyncEnumerable>(responseStream, cancellationToken)!; + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + while (!streamReader.EndOfStream) + { + var sseMessage = await streamReader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(sseMessage)) continue; + var json = sseMessage["data: ".Length..].Trim(); + var e = JsonSerializer.Deserialize>(json)!; + yield return e; + } } /// - public virtual async Task>> WatchAsync(IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) + public virtual async IAsyncEnumerable> WatchAsync(IEnumerable? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { var resource = new TResource(); - var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch"; + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse"; var queryStringArguments = new Dictionary(); if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(',')); if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}"; @@ -131,34 +140,56 @@ public virtual async Task>> Watc request.EnableWebAssemblyStreamingResponse(); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - return this.JsonSerializer.DeserializeAsyncEnumerable>(responseStream, cancellationToken)!; + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + while (!streamReader.EndOfStream) + { + var sseMessage = await streamReader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(sseMessage)) continue; + var json = sseMessage["data: ".Length..].Trim(); + var e = JsonSerializer.Deserialize>(json)!; + yield return e; + } } /// - public virtual async Task>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default) + public virtual async IAsyncEnumerable> MonitorAsync(string name, string @namespace, [EnumeratorCancellation]CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(name); ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); var resource = new TResource(); - var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor"; + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor/sse"; using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); - request.EnableWebAssemblyStreamingResponse(); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - return this.JsonSerializer.DeserializeAsyncEnumerable>(responseStream, cancellationToken)!; + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + while (!streamReader.EndOfStream) + { + var sseMessage = await streamReader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(sseMessage)) continue; + var json = sseMessage["data: ".Length..].Trim(); + var e = JsonSerializer.Deserialize>(json)!; + yield return e; + } } /// - public virtual async Task>> MonitorAsync(string name, CancellationToken cancellationToken = default) + public virtual async IAsyncEnumerable> MonitorAsync(string name, [EnumeratorCancellation]CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(name); var resource = new TResource(); - var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor"; + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor/sse"; using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false); - request.EnableWebAssemblyStreamingResponse(); var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - return this.JsonSerializer.DeserializeAsyncEnumerable>(responseStream, cancellationToken)!; + using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync()); + while (!streamReader.EndOfStream) + { + var sseMessage = await streamReader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(sseMessage)) continue; + var json = sseMessage["data: ".Length..].Trim(); + var e = JsonSerializer.Deserialize>(json)!; + yield return e; + } } /// diff --git a/src/api/Synapse.Api.Http/ClusterResourceController.cs b/src/api/Synapse.Api.Http/ClusterResourceController.cs index 9b7b356e0..3428bfc7d 100644 --- a/src/api/Synapse.Api.Http/ClusterResourceController.cs +++ b/src/api/Synapse.Api.Http/ClusterResourceController.cs @@ -108,6 +108,7 @@ public virtual async Task WatchResourcesUsingSSE(string? labelSel this.Response.Headers.ContentType = "text/event-stream"; this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); await foreach (var e in response.Data!) { var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; @@ -147,6 +148,7 @@ public virtual async Task MonitorResourceUsingSSE(string name, Ca this.Response.Headers.ContentType = "text/event-stream"; this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); await foreach (var e in response.Data!) { var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; diff --git a/src/api/Synapse.Api.Http/NamespacedResourceController.cs b/src/api/Synapse.Api.Http/NamespacedResourceController.cs index e6bdae09b..65ee55980 100644 --- a/src/api/Synapse.Api.Http/NamespacedResourceController.cs +++ b/src/api/Synapse.Api.Http/NamespacedResourceController.cs @@ -170,6 +170,7 @@ public virtual async Task WatchResourcesUsingSSE(string @namespac this.Response.Headers.ContentType = "text/event-stream"; this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); await foreach (var e in response.Data!) { var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; @@ -211,6 +212,7 @@ public virtual async Task MonitorResourceUsingSSE(string name, st this.Response.Headers.ContentType = "text/event-stream"; this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); await foreach(var e in response.Data!) { var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n"; diff --git a/src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs b/src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs index f034007a9..eeb72c96f 100644 --- a/src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs +++ b/src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs @@ -64,8 +64,7 @@ public MonitorWorkflowInstancesCommand(IServiceProvider serviceProvider, ILogger public async Task HandleAsync(string name, string @namespace, string output) { this.EnsureConfigured(); - var enumerable = await this.Api.WorkflowInstances.MonitorAsync(name, @namespace); - await foreach (var e in enumerable) + await foreach (var e in this.Api.WorkflowInstances.MonitorAsync(name, @namespace)) { string outputText = output.ToLowerInvariant() switch { diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor b/src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor index ddaf0ec71..f3106341f 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor @@ -258,7 +258,7 @@ await this.JSRuntime.InvokeVoidAsync("navigator.clipboard.writeText", text); this.ToastService.Notify(new(ToastType.Success, "Copied to the clipboard!")); } - catch (Exception ex) + catch { this.ToastService.Notify(new(ToastType.Danger, "Failed to copy the definition to the clipboard.")); } diff --git a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs index 2997d654e..c08873e39 100644 --- a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs @@ -93,17 +93,16 @@ await this.SetErrorAsync(new() { Definition = new() { - Namespace = this.ProcessDefinition.Namespace, - Name = this.ProcessDefinition.Name, - Version = this.ProcessDefinition.Version + Namespace = workflowDefinition.Document.Namespace, + Name = workflowDefinition.Document.Name, + Version = workflowDefinition.Document.Version }, Input = input } }; workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false); } - var watchEvents = await this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken).ConfigureAwait(false); - await foreach(var watchEvent in watchEvents) + await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken)) { switch (watchEvent.Resource.Status?.Phase) { diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs index ac3a16c12..3dc42d29e 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs @@ -335,7 +335,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte } var taskCompletionSource = new TaskCompletionSource(); using var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled()); - using var subscription = (await this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken)) + using var subscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken) .ToObservable() .Where(e => e.Type == ResourceWatchEventType.Updated) .Select(e => e.Resource.Status?.Correlation?.Contexts) diff --git a/tests/Synapse.UnitTests/Services/MockClusterResourceApiClient.cs b/tests/Synapse.UnitTests/Services/MockClusterResourceApiClient.cs index f7bfef3f9..7132bda12 100644 --- a/tests/Synapse.UnitTests/Services/MockClusterResourceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockClusterResourceApiClient.cs @@ -15,6 +15,7 @@ using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.ResourceOriented.Services; using Synapse.Api.Client.Services; +using System.Runtime.CompilerServices; namespace Synapse.UnitTests.Services; @@ -26,10 +27,16 @@ internal class MockClusterResourceApiClient(IResourceRepository resou public Task CreateAsync(TResource resource, CancellationToken cancellationToken = default) => resources.AddAsync(resource, false, cancellationToken); public Task> ListAsync(IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(resources.GetAllAsync(null, labelSelectors, cancellationToken)!); - - public async Task>> WatchAsync(IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => (await resources.WatchAsync(null!, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); - - public async Task>> MonitorAsync(string name, CancellationToken cancellationToken = default) => (await resources.MonitorAsync(name, null!, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); + + public async IAsyncEnumerable> WatchAsync(IEnumerable? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var e in (await resources.WatchAsync(null!, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + } + + public async IAsyncEnumerable> MonitorAsync(string name, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach(var e in (await resources.MonitorAsync(name, null!, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + } public Task GetAsync(string name, CancellationToken cancellationToken = default) => resources.GetAsync(name, null, cancellationToken)!; diff --git a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs index a83462ad1..6cd2c82ae 100644 --- a/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockNamespacedResourceApiClient.cs @@ -15,6 +15,7 @@ using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.ResourceOriented.Services; using Synapse.Api.Client.Services; +using System.Runtime.CompilerServices; namespace Synapse.UnitTests.Services; @@ -29,9 +30,15 @@ internal class MockNamespacedResourceApiClient(IResourceRepository re public Task> ListAsync(string? @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(resources.GetAllAsync(@namespace, labelSelectors, cancellationToken: cancellationToken)!); - public async Task>> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) => (await resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); + public async IAsyncEnumerable> WatchAsync(string? @namespace = null, IEnumerable? labelSelectors = null, [EnumeratorCancellation]CancellationToken cancellationToken = default) + { + await foreach (var e in (await resources.WatchAsync(@namespace, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + } - public async Task>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable(); + public async IAsyncEnumerable> MonitorAsync(string name, string @namespace, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var e in (await resources.MonitorAsync(name, @namespace, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e; + } public Task GetAsync(string name, string @namespace, CancellationToken cancellationToken = default) => resources.GetAsync(name, @namespace, cancellationToken)!;