Skip to content

Commit 5a86835

Browse files
[Fusion] Properly update RequestExecutorProxy (#8813)
1 parent 7d1bc0c commit 5a86835

File tree

6 files changed

+190
-104
lines changed

6 files changed

+190
-104
lines changed

src/HotChocolate/Core/src/Execution/RequestExecutorManager.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,7 @@ await OnRequestExecutorCreatedAsync(context, executor, setup, cancellationToken)
197197
registeredExecutor.Executor);
198198

199199
_events.RaiseEvent(
200-
new RequestExecutorEvent(
201-
RequestExecutorEventType.Created,
202-
schemaName,
203-
registeredExecutor.Executor));
200+
RequestExecutorEvent.Created(registeredExecutor.Executor));
204201

205202
return registeredExecutor;
206203
}
@@ -219,11 +216,7 @@ await CreateRequestExecutorAsync(schemaName, isInitialCreation: false, Cancellat
219216

220217
try
221218
{
222-
_events.RaiseEvent(
223-
new RequestExecutorEvent(
224-
RequestExecutorEventType.Evicted,
225-
schemaName,
226-
previousExecutor.Executor));
219+
_events.RaiseEvent(RequestExecutorEvent.Evicted(previousExecutor.Executor));
227220
}
228221
finally
229222
{

src/HotChocolate/Core/test/Execution.Tests/RequestExecutorProxyTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class RequestExecutorProxyTests
99
public async Task Ensure_Executor_Is_Cached()
1010
{
1111
// arrange
12-
var resolver =
12+
var manager =
1313
new ServiceCollection()
1414
.AddGraphQL()
1515
.AddStarWarsRepositories()
@@ -19,7 +19,7 @@ public async Task Ensure_Executor_Is_Cached()
1919
.GetRequiredService<RequestExecutorManager>();
2020

2121
// act
22-
var proxy = new RequestExecutorProxy(resolver, resolver, ISchemaDefinition.DefaultName);
22+
var proxy = new RequestExecutorProxy(manager, manager, ISchemaDefinition.DefaultName);
2323
var a = await proxy.GetExecutorAsync(CancellationToken.None);
2424
var b = await proxy.GetExecutorAsync(CancellationToken.None);
2525

@@ -33,7 +33,7 @@ public async Task Ensure_Executor_Is_Correctly_Swapped_When_Evicted()
3333
// arrange
3434
var executorUpdatedResetEvent = new ManualResetEventSlim(false);
3535
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
36-
var resolver =
36+
var manager =
3737
new ServiceCollection()
3838
.AddGraphQL()
3939
.AddStarWarsRepositories()
@@ -43,7 +43,7 @@ public async Task Ensure_Executor_Is_Correctly_Swapped_When_Evicted()
4343
.GetRequiredService<RequestExecutorManager>();
4444
var updated = false;
4545

46-
var proxy = new TestProxy(resolver, resolver, ISchemaDefinition.DefaultName);
46+
var proxy = new TestProxy(manager, manager, ISchemaDefinition.DefaultName);
4747
proxy.ExecutorUpdated += () =>
4848
{
4949
updated = true;
@@ -52,7 +52,7 @@ public async Task Ensure_Executor_Is_Correctly_Swapped_When_Evicted()
5252

5353
// act
5454
var a = await proxy.GetExecutorAsync(CancellationToken.None);
55-
resolver.EvictExecutor();
55+
manager.EvictExecutor();
5656
executorUpdatedResetEvent.Wait(cts.Token);
5757
var b = await proxy.GetExecutorAsync(CancellationToken.None);
5858

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutorManager.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ internal sealed class FusionRequestExecutorManager
5050
private ImmutableArray<ObserverSession> _observers = [];
5151

5252
private bool _disposed;
53+
private ulong _version;
5354

5455
public FusionRequestExecutorManager(
5556
IOptionsMonitor<FusionGatewaySetup> optionsMonitor,
@@ -107,7 +108,6 @@ private async ValueTask<IRequestExecutor> GetOrCreateRequestExecutorAsync(
107108

108109
registration = await CreateInitialRegistrationAsync(schemaName, cancellationToken).ConfigureAwait(false);
109110
_registry.TryAdd(schemaName, registration);
110-
await _executorEvents.WriteCreatedAsync(registration.Executor, cancellationToken).ConfigureAwait(false);
111111
return registration.Executor;
112112
}
113113
finally
@@ -121,9 +121,14 @@ private SemaphoreSlim GetSemaphoreForSchema(string schemaName)
121121

122122
private async ValueTask EvictExecutorAsync(FusionRequestExecutor executor, CancellationToken cancellationToken)
123123
{
124-
await _executorEvents.WriteEvictedAsync(executor, cancellationToken);
125-
126-
EvictRequestExecutorAsync(executor).FireAndForget();
124+
try
125+
{
126+
await _executorEvents.WriteEvictedAsync(executor, cancellationToken);
127+
}
128+
finally
129+
{
130+
EvictRequestExecutorAsync(executor).FireAndForget();
131+
}
127132
}
128133

129134
private static async Task EvictRequestExecutorAsync(FusionRequestExecutor previousExecutor)
@@ -150,6 +155,8 @@ private async ValueTask<RequestExecutorRegistration> CreateInitialRegistrationAs
150155

151156
await WarmupExecutorAsync(executor, true, cancellationToken).ConfigureAwait(false);
152157

158+
await _executorEvents.WriteCreatedAsync(executor, cancellationToken).ConfigureAwait(false);
159+
153160
return new RequestExecutorRegistration(
154161
this,
155162
documentProvider,
@@ -161,6 +168,13 @@ private FusionRequestExecutor CreateRequestExecutor(
161168
string schemaName,
162169
FusionConfiguration configuration)
163170
{
171+
ulong version;
172+
173+
unchecked
174+
{
175+
version = ++_version;
176+
}
177+
164178
var setup = _optionsMonitor.Get(schemaName);
165179

166180
var options = CreateOptions(setup);
@@ -179,7 +193,7 @@ private FusionRequestExecutor CreateRequestExecutor(
179193
var pipeline = CreatePipeline(setup, schema, schemaServices, requestOptions);
180194

181195
var contextPool = schemaServices.GetRequiredService<ObjectPool<PooledRequestContext>>();
182-
var executor = new FusionRequestExecutor(schema, _applicationServices, pipeline, contextPool, 0);
196+
var executor = new FusionRequestExecutor(schema, _applicationServices, pipeline, contextPool, version);
183197
var requestExecutorAccessor = schemaServices.GetRequiredService<RequestExecutorAccessor>();
184198
requestExecutorAccessor.RequestExecutor = executor;
185199

@@ -646,6 +660,8 @@ private async Task WaitForUpdatesAsync()
646660

647661
await _manager.WarmupExecutorAsync(nextExecutor, false, _cancellationToken).ConfigureAwait(false);
648662

663+
await _manager._executorEvents.WriteCreatedAsync(nextExecutor, _cancellationToken).ConfigureAwait(false);
664+
649665
Executor = nextExecutor;
650666

651667
await _manager.EvictExecutorAsync(previousExecutor, _cancellationToken);

src/HotChocolate/Fusion-vnext/test/Fusion.Execution.Tests/Execution/FusionRequestExecutorManagerTests.cs

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
1-
using System.Buffers;
2-
using System.Net.Security;
3-
using System.Text.Json;
4-
using HotChocolate.Buffers;
51
using HotChocolate.Caching.Memory;
62
using HotChocolate.Execution;
73
using HotChocolate.Fusion.Configuration;
84
using HotChocolate.Fusion.Execution.Nodes;
95
using HotChocolate.Fusion.Execution.Pipeline;
106
using HotChocolate.Language;
117
using Microsoft.Extensions.DependencyInjection;
12-
using Microsoft.Extensions.Hosting;
138

149
namespace HotChocolate.Fusion.Execution;
1510

@@ -22,10 +17,6 @@ public async Task GetExecutorAsync_Throws_If_Schema_Does_Not_Exist()
2217
var schemaDocument =
2318
ComposeSchemaDocument(
2419
"""
25-
schema {
26-
query: Query
27-
}
28-
2920
type Query {
3021
foo: String
3122
}
@@ -54,10 +45,6 @@ public async Task CreateExecutor()
5445
var schemaDocument =
5546
ComposeSchemaDocument(
5647
"""
57-
schema {
58-
query: Query
59-
}
60-
6148
type Query {
6249
foo: String
6350
}
@@ -86,10 +73,6 @@ public async Task GetOperationPlanFromExecution()
8673
var schemaDocument =
8774
ComposeSchemaDocument(
8875
"""
89-
schema {
90-
query: Query
91-
}
92-
9376
type Query {
9477
foo: String
9578
}
@@ -181,10 +164,6 @@ public async Task Plan_Cache_Should_Be_Scoped_To_Executor()
181164
configProvider.UpdateConfiguration(
182165
CreateConfiguration(
183166
"""
184-
schema {
185-
query: Query
186-
}
187-
188167
type Query {
189168
field2: String!
190169
}
@@ -241,10 +220,6 @@ public async Task Executor_Should_Only_Be_Switched_Once_It_Is_Warmed_Up()
241220
configProvider.UpdateConfiguration(
242221
CreateConfiguration(
243222
"""
244-
schema {
245-
query: Query
246-
}
247-
248223
type Query {
249224
field2: String!
250225
}
@@ -304,10 +279,6 @@ public async Task WarmupTasks_Are_Applied_Correct_Number_Of_Times()
304279
configProvider.UpdateConfiguration(
305280
CreateConfiguration(
306281
"""
307-
schema {
308-
query: Query
309-
}
310-
311282
type Query {
312283
field2: String!
313284
}
@@ -426,66 +397,11 @@ private static FusionConfiguration CreateConfiguration(string? sourceSchemaText
426397
{
427398
sourceSchemaText ??=
428399
"""
429-
schema {
430-
query: Query
431-
}
432-
433400
type Query {
434401
field: String!
435402
}
436403
""";
437404

438-
var schema = ComposeSchemaDocument(sourceSchemaText);
439-
440-
return new FusionConfiguration(
441-
schema,
442-
new JsonDocumentOwner(
443-
JsonDocument.Parse("{ }"),
444-
new EmptyMemoryOwner()));
445-
}
446-
447-
private sealed class TestFusionConfigurationProvider(FusionConfiguration initialConfig) : IFusionConfigurationProvider
448-
{
449-
private List<IObserver<FusionConfiguration>> _observers = [];
450-
451-
public IDisposable Subscribe(IObserver<FusionConfiguration> observer)
452-
{
453-
if (Configuration is not null)
454-
{
455-
observer.OnNext(Configuration);
456-
}
457-
458-
_observers.Add(observer);
459-
460-
return new Observer();
461-
}
462-
463-
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
464-
465-
public FusionConfiguration? Configuration { get; private set; } = initialConfig;
466-
467-
public void UpdateConfiguration(FusionConfiguration configuration)
468-
{
469-
Configuration = configuration;
470-
471-
foreach (var observer in _observers)
472-
{
473-
observer.OnNext(Configuration);
474-
}
475-
}
476-
477-
private sealed class Observer : IDisposable
478-
{
479-
public void Dispose()
480-
{
481-
}
482-
}
483-
}
484-
485-
private class EmptyMemoryOwner : IMemoryOwner<byte>
486-
{
487-
public Memory<byte> Memory => default;
488-
489-
public void Dispose() { }
405+
return CreateFusionConfiguration(sourceSchemaText);
490406
}
491407
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
using HotChocolate.Execution;
2+
using Microsoft.Extensions.DependencyInjection;
3+
4+
namespace HotChocolate.Fusion.Execution;
5+
6+
public class FusionRequestExecutorProxyTests : FusionTestBase
7+
{
8+
[Fact]
9+
public async Task Ensure_Executor_Is_Cached()
10+
{
11+
// arrange
12+
var manager =
13+
new ServiceCollection()
14+
.AddGraphQLGateway()
15+
.AddInMemoryConfiguration(
16+
ComposeSchemaDocument(
17+
"""
18+
type Query {
19+
field: String!
20+
}
21+
"""))
22+
.Services
23+
.BuildServiceProvider()
24+
.GetRequiredService<FusionRequestExecutorManager>();
25+
26+
// act
27+
var proxy = new RequestExecutorProxy(manager, manager, ISchemaDefinition.DefaultName);
28+
var a = await proxy.GetExecutorAsync(CancellationToken.None);
29+
var b = await proxy.GetExecutorAsync(CancellationToken.None);
30+
31+
// assert
32+
Assert.Same(a, b);
33+
}
34+
35+
[Fact]
36+
public async Task Ensure_Executor_Is_Correctly_Swapped_When_Evicted()
37+
{
38+
// arrange
39+
var executorUpdatedResetEvent = new ManualResetEventSlim(false);
40+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
41+
42+
var configProvider = new TestFusionConfigurationProvider(
43+
CreateFusionConfiguration(
44+
"""
45+
type Query {
46+
field: String!
47+
}
48+
"""));
49+
50+
var manager =
51+
new ServiceCollection()
52+
.AddGraphQLGateway()
53+
.AddConfigurationProvider(_ => configProvider)
54+
.Services
55+
.BuildServiceProvider()
56+
.GetRequiredService<FusionRequestExecutorManager>();
57+
var updated = false;
58+
59+
var proxy = new TestProxy(manager, manager, ISchemaDefinition.DefaultName);
60+
proxy.ExecutorUpdated += () =>
61+
{
62+
updated = true;
63+
executorUpdatedResetEvent.Set();
64+
};
65+
66+
// act
67+
var a = await proxy.GetExecutorAsync(CancellationToken.None);
68+
69+
configProvider.UpdateConfiguration(
70+
CreateFusionConfiguration(
71+
"""
72+
type Query {
73+
field2: String!
74+
}
75+
"""));
76+
77+
executorUpdatedResetEvent.Wait(cts.Token);
78+
var b = await proxy.GetExecutorAsync(CancellationToken.None);
79+
80+
// assert
81+
Assert.NotSame(a, b);
82+
Assert.True(updated);
83+
}
84+
85+
private class TestProxy(
86+
IRequestExecutorProvider executorProvider,
87+
IRequestExecutorEvents executorEvents,
88+
string schemaName)
89+
: RequestExecutorProxy(executorProvider, executorEvents, schemaName)
90+
{
91+
public event Action? ExecutorUpdated;
92+
93+
protected override void OnAfterRequestExecutorSwapped(
94+
IRequestExecutor newExecutor,
95+
IRequestExecutor oldExecutor)
96+
{
97+
ExecutorUpdated?.Invoke();
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)