Skip to content

Commit fd08b50

Browse files
authored
[Fusion] Added Subscription Support (#8479)
1 parent 0e318cd commit fd08b50

26 files changed

+952
-128
lines changed

src/HotChocolate/AspNetCore/src/Transport.Http/GraphQLHttpEventStreamEnumerable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public IAsyncEnumerator<OperationResult> GetAsyncEnumerator(CancellationToken ca
1919
=> new GraphQLHttpEventStreamEnumerator(_message, cancellationToken);
2020
}
2121

22-
internal class GraphQLHttpEventStreamEnumerator : IAsyncEnumerator<OperationResult>
22+
internal sealed class GraphQLHttpEventStreamEnumerator : IAsyncEnumerator<OperationResult>
2323
{
2424
private static ReadOnlySpan<byte> Event => "event:"u8;
2525
private static ReadOnlySpan<byte> Next => "next\n"u8;

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using HotChocolate.Fusion.Execution.Clients;
66
using Microsoft.Extensions.DependencyInjection.Extensions;
77
using Microsoft.Extensions.ObjectPool;
8+
using Microsoft.Extensions.Options;
89

910
namespace Microsoft.Extensions.DependencyInjection;
1011

@@ -33,24 +34,30 @@ public static IFusionGatewayBuilder AddGraphQLGateway(
3334
private static void AddRequestExecutorManager(
3435
IServiceCollection services)
3536
{
36-
services.TryAddSingleton<FusionRequestExecutorManager>();
37-
services.TryAddSingleton<IRequestExecutorProvider>(sp => sp.GetRequiredService<FusionRequestExecutorManager>());
38-
services.TryAddSingleton<IRequestExecutorEvents>(sp => sp.GetRequiredService<FusionRequestExecutorManager>());
37+
services.TryAddSingleton(
38+
static sp => new FusionRequestExecutorManager(
39+
sp.GetRequiredService<IOptionsMonitor<FusionGatewaySetup>>(),
40+
sp));
41+
services.TryAddSingleton<IRequestExecutorProvider>(
42+
static sp => sp.GetRequiredService<FusionRequestExecutorManager>());
43+
services.TryAddSingleton<IRequestExecutorEvents>(
44+
static sp => sp.GetRequiredService<FusionRequestExecutorManager>());
3945
}
4046

4147
private static void AddSourceSchemaScope(
4248
IServiceCollection services)
4349
{
44-
services.TryAddSingleton<ISourceSchemaClientScopeFactory>(static sp =>
45-
new DefaultSourceSchemaClientScopeFactory(
50+
services.TryAddSingleton<ISourceSchemaClientScopeFactory>(
51+
static sp => new DefaultSourceSchemaClientScopeFactory(
4652
sp.GetRequiredService<IHttpClientFactory>()));
4753
}
4854

4955
internal static void AddResultObjectPools(
5056
IServiceCollection services,
5157
FusionMemoryPoolOptions options)
5258
{
53-
services.TryAddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
59+
services.TryAddSingleton<ObjectPoolProvider>(
60+
static _ => new DefaultObjectPoolProvider());
5461

5562
services.TryAddSingleton(options);
5663

@@ -129,9 +136,11 @@ internal static void AddResultObjectPools(
129136
});
130137

131138
services.TryAddScoped(static provider =>
132-
new ResultPoolSessionHolder(provider.GetRequiredService<ObjectPool<ResultPoolSession>>()));
139+
new ResultPoolSessionHolder(
140+
provider.GetRequiredService<ObjectPool<ResultPoolSession>>()));
133141

134-
services.TryAddScoped(static provider => provider.GetRequiredService<ResultPoolSessionHolder>().Session);
142+
services.TryAddScoped(static provider =>
143+
provider.GetRequiredService<ResultPoolSessionHolder>().Session);
135144
}
136145

137146
private static DefaultFusionGatewayBuilder CreateBuilder(

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Diagnostics/AggregateFusionExecutionDiagnosticEvents.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using HotChocolate.Execution;
22
using HotChocolate.Execution.Instrumentation;
3+
using HotChocolate.Fusion.Execution;
4+
using HotChocolate.Fusion.Execution.Nodes;
35
using HotChocolate.Language;
46

57
namespace HotChocolate.Fusion.Diagnostics;
@@ -137,6 +139,44 @@ public void DocumentNotFoundInStorage(RequestContext context, OperationDocumentI
137139
}
138140
}
139141

142+
public IDisposable ExecuteOperation(OperationPlanContext context, OperationExecutionNode node)
143+
{
144+
var scopes = new IDisposable[_listeners.Length];
145+
146+
for (var i = 0; i < _listeners.Length; i++)
147+
{
148+
scopes[i] = _listeners[i].ExecuteOperation(context, node);
149+
}
150+
151+
return new AggregateActivityScope(scopes);
152+
}
153+
154+
public IDisposable ExecuteSubscriptionEvent(
155+
OperationPlanContext context,
156+
OperationExecutionNode node)
157+
{
158+
var scopes = new IDisposable[_listeners.Length];
159+
160+
for (var i = 0; i < _listeners.Length; i++)
161+
{
162+
scopes[i] = _listeners[i].ExecuteSubscriptionEvent(context, node);
163+
}
164+
165+
return new AggregateActivityScope(scopes);
166+
}
167+
168+
public IDisposable ExecuteIntrospection(OperationPlanContext context, IntrospectionExecutionNode node)
169+
{
170+
var scopes = new IDisposable[_listeners.Length];
171+
172+
for (var i = 0; i < _listeners.Length; i++)
173+
{
174+
scopes[i] = _listeners[i].ExecuteIntrospection(context, node);
175+
}
176+
177+
return new AggregateActivityScope(scopes);
178+
}
179+
140180
public void ExecutorCreated(string name, IRequestExecutor executor)
141181
{
142182
for (var i = 0; i < _listeners.Length; i++)

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Diagnostics/FusionExecutionDiagnosticEventListener.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using HotChocolate.Execution;
22
using HotChocolate.Execution.Instrumentation;
3+
using HotChocolate.Fusion.Execution;
4+
using HotChocolate.Fusion.Execution.Nodes;
35
using HotChocolate.Language;
46

57
namespace HotChocolate.Fusion.Diagnostics;
@@ -58,6 +60,15 @@ public virtual void RetrievedDocumentFromStorage(RequestContext context) { }
5860
/// <inheritdoc />
5961
public virtual void DocumentNotFoundInStorage(RequestContext context, OperationDocumentId documentId) { }
6062

63+
public virtual IDisposable ExecuteOperation(OperationPlanContext context, OperationExecutionNode node)
64+
=> EmptyScope;
65+
66+
public virtual IDisposable ExecuteSubscriptionEvent(OperationPlanContext context, OperationExecutionNode node)
67+
=> EmptyScope;
68+
69+
public virtual IDisposable ExecuteIntrospection(OperationPlanContext context, IntrospectionExecutionNode node)
70+
=> EmptyScope;
71+
6172
/// <inheritdoc />
6273
public virtual void ExecutorCreated(string name, IRequestExecutor executor) { }
6374

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using HotChocolate.Execution.Instrumentation;
2+
using HotChocolate.Fusion.Execution;
3+
using HotChocolate.Fusion.Execution.Nodes;
4+
5+
namespace HotChocolate.Fusion.Diagnostics;
6+
7+
public interface IFusionExecutionDiagnosticEvents : ICoreExecutionDiagnosticEvents
8+
{
9+
IDisposable ExecuteOperation(
10+
OperationPlanContext context,
11+
OperationExecutionNode node);
12+
13+
IDisposable ExecuteSubscriptionEvent(
14+
OperationPlanContext context,
15+
OperationExecutionNode node);
16+
17+
IDisposable ExecuteIntrospection(
18+
OperationPlanContext context,
19+
IntrospectionExecutionNode node);
20+
}

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Diagnostics/IFusions.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Diagnostics/NoopFusionExecutionDiagnosticEvents.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using HotChocolate.Execution;
22
using HotChocolate.Execution.Instrumentation;
3+
using HotChocolate.Fusion.Execution;
4+
using HotChocolate.Fusion.Execution.Nodes;
35
using HotChocolate.Language;
46

57
namespace HotChocolate.Fusion.Diagnostics;
@@ -30,6 +32,15 @@ public void RetrievedDocumentFromStorage(RequestContext context) { }
3032

3133
public void DocumentNotFoundInStorage(RequestContext context, OperationDocumentId documentId) { }
3234

35+
public IDisposable ExecuteOperation(OperationPlanContext context, OperationExecutionNode node)
36+
=> this;
37+
38+
public IDisposable ExecuteSubscriptionEvent(OperationPlanContext context, OperationExecutionNode node)
39+
=> this;
40+
41+
public IDisposable ExecuteIntrospection(OperationPlanContext context, IntrospectionExecutionNode node)
42+
=> this;
43+
3344
public void ExecutorCreated(string name, IRequestExecutor executor) { }
3445

3546
public void ExecutorEvicted(string name, IRequestExecutor executor) { }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace HotChocolate.Fusion.Execution.Clients;
2+
3+
public enum FinalMessage : byte
4+
{
5+
Yes,
6+
No,
7+
Undefined
8+
}

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/Clients/SourceSchemaHttpClient.cs

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public async ValueTask<SourceSchemaClientResponse> ExecuteAsync(
5252
};
5353

5454
var httpResponse = await _client.SendAsync(httpRequest, cancellationToken);
55-
return new Response(httpResponse, request.Variables);
55+
return new Response(request.Operation.Operation, httpResponse, request.Variables);
5656
}
5757

5858
private GraphQLHttpRequest CreateHttpRequest(
@@ -129,54 +129,71 @@ public ValueTask DisposeAsync()
129129
}
130130

131131
private sealed class Response(
132+
OperationType operation,
132133
GraphQLHttpResponse response,
133134
ImmutableArray<VariableValues> variables)
134135
: SourceSchemaClientResponse
135136
{
136137
public override async IAsyncEnumerable<SourceSchemaResult> ReadAsResultStreamAsync(
137138
[EnumeratorCancellation] CancellationToken cancellationToken = default)
138139
{
139-
switch (variables.Length)
140+
if (operation == OperationType.Subscription)
140141
{
141-
case 0:
142+
await foreach (var result in response.ReadAsResultStreamAsync().WithCancellation(cancellationToken))
142143
{
143-
var result = await response.ReadAsResultAsync(cancellationToken);
144144
yield return new SourceSchemaResult(
145145
Path.Root,
146146
result,
147147
result.Data,
148148
result.Errors,
149149
result.Extensions);
150-
break;
151150
}
152-
153-
case 1:
151+
}
152+
else
153+
{
154+
switch (variables.Length)
154155
{
155-
var result = await response.ReadAsResultAsync(cancellationToken);
156-
yield return new SourceSchemaResult(
157-
variables[0].Path,
158-
result,
159-
result.Data,
160-
result.Errors,
161-
result.Extensions);
162-
break;
163-
}
156+
case 0:
157+
{
158+
var result = await response.ReadAsResultAsync(cancellationToken);
159+
yield return new SourceSchemaResult(
160+
Path.Root,
161+
result,
162+
result.Data,
163+
result.Errors,
164+
result.Extensions);
165+
break;
166+
}
164167

165-
default:
166-
{
167-
await foreach (var result in response.ReadAsResultStreamAsync().WithCancellation(cancellationToken))
168+
case 1:
168169
{
169-
var index = result.VariableIndex!.Value;
170-
var (path, _) = variables[index];
170+
var result = await response.ReadAsResultAsync(cancellationToken);
171171
yield return new SourceSchemaResult(
172-
path,
172+
variables[0].Path,
173173
result,
174174
result.Data,
175175
result.Errors,
176176
result.Extensions);
177+
break;
177178
}
178179

179-
break;
180+
default:
181+
{
182+
await foreach (var result in response.ReadAsResultStreamAsync()
183+
.WithCancellation(cancellationToken))
184+
{
185+
var index = result.VariableIndex!.Value;
186+
var (path, _) = variables[index];
187+
yield return new SourceSchemaResult(
188+
path,
189+
result,
190+
result.Data,
191+
result.Errors,
192+
result.Extensions);
193+
}
194+
195+
break;
196+
}
180197
}
181198
}
182199
}

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/Clients/SourceSchemaResult.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ public sealed class SourceSchemaResult : IDisposable
66
{
77
private readonly IDisposable? _resource;
88

9-
public SourceSchemaResult(Path path, JsonDocument document)
9+
public SourceSchemaResult(Path path, JsonDocument document, FinalMessage final = FinalMessage.Undefined)
1010
{
11+
ArgumentNullException.ThrowIfNull(path);
12+
ArgumentNullException.ThrowIfNull(document);
13+
1114
_resource = document;
1215
Path = path;
1316

@@ -31,15 +34,27 @@ public SourceSchemaResult(Path path, JsonDocument document)
3134
{
3235
Extensions = extensions;
3336
}
37+
38+
Final = final;
3439
}
3540

36-
public SourceSchemaResult(Path path, IDisposable resource, JsonElement data, JsonElement errors, JsonElement extensions)
41+
public SourceSchemaResult(
42+
Path path,
43+
IDisposable resource,
44+
JsonElement data,
45+
JsonElement errors,
46+
JsonElement extensions,
47+
FinalMessage final = FinalMessage.Undefined)
3748
{
49+
ArgumentNullException.ThrowIfNull(path);
50+
ArgumentNullException.ThrowIfNull(resource);
51+
3852
_resource = resource;
3953
Path = path;
4054
Data = data;
4155
Errors = errors;
4256
Extensions = extensions;
57+
Final = final;
4358
}
4459

4560
public Path Path { get; }
@@ -50,5 +65,7 @@ public SourceSchemaResult(Path path, IDisposable resource, JsonElement data, Jso
5065

5166
public JsonElement Extensions { get; }
5267

68+
public FinalMessage Final { get; }
69+
5370
public void Dispose() => _resource?.Dispose();
5471
}

0 commit comments

Comments
 (0)