Skip to content

Commit 5d7a5c2

Browse files
authored
Integrated Archive in Gateway (#8532)
1 parent 61ca587 commit 5d7a5c2

File tree

8 files changed

+259
-108
lines changed

8 files changed

+259
-108
lines changed
Lines changed: 146 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
using System.Buffers;
12
using System.Collections.Immutable;
23
using System.IO.Hashing;
34
using System.IO.Pipelines;
5+
using System.Runtime.InteropServices;
6+
using System.Text.Json;
7+
using System.Threading.Channels;
48
using HotChocolate.Buffers;
9+
using HotChocolate.Fusion.Packaging;
510
using HotChocolate.Language;
611
using HotChocolate.Utilities;
712
using IOPath = System.IO.Path;
813

914
namespace HotChocolate.Fusion.Configuration;
1015

11-
public class FileSystemFusionConfigurationProvider : IFusionSchemaDocumentProvider
16+
public class FileSystemFusionConfigurationProvider : IFusionConfigurationProvider
1217
{
1318
#if NET9_0_OR_GREATER
1419
private readonly Lock _syncRoot = new();
@@ -17,11 +22,20 @@ public class FileSystemFusionConfigurationProvider : IFusionSchemaDocumentProvid
1722
#endif
1823
private readonly string _fileName;
1924
private readonly FileSystemWatcher _watcher;
20-
private readonly SemaphoreSlim _semaphore;
25+
26+
private readonly Channel<bool> _schemaUpdateEvents =
27+
Channel.CreateBounded<bool>(
28+
new BoundedChannelOptions(1)
29+
{
30+
FullMode = BoundedChannelFullMode.DropNewest, SingleReader = true, SingleWriter = false
31+
});
32+
2133
private readonly CancellationTokenSource _cts;
22-
private readonly CancellationToken _ct;
2334
private ImmutableArray<ObserverSession> _sessions = [];
35+
private readonly bool _isPackage;
2436
private ulong _schemaDocumentHash;
37+
private ulong _settingsHash;
38+
private ulong _packageHash;
2539
private bool _disposed;
2640

2741
public FileSystemFusionConfigurationProvider(string fileName)
@@ -38,15 +52,13 @@ public FileSystemFusionConfigurationProvider(string fileName)
3852
throw new FileNotFoundException("The file must contain a path.", fileName);
3953
}
4054

41-
_semaphore = new SemaphoreSlim(1, 1);
55+
_isPackage = IOPath.GetExtension(fileName)?.ToLowerInvariant() is ".far";
4256
_cts = new CancellationTokenSource();
43-
_ct = _cts.Token;
4457

4558
_watcher = new FileSystemWatcher
4659
{
4760
Path = directory,
4861
Filter = "*.*",
49-
5062
NotifyFilter =
5163
NotifyFilters.FileName
5264
| NotifyFilters.DirectoryName
@@ -60,27 +72,30 @@ public FileSystemFusionConfigurationProvider(string fileName)
6072
{
6173
if (fullPath.Equals(e.FullPath, StringComparison.Ordinal))
6274
{
63-
BeginLoadSchemaDocument();
75+
_schemaUpdateEvents.Writer.TryWrite(true);
6476
}
6577
};
6678

6779
_watcher.Changed += (_, e) =>
6880
{
6981
if (fullPath.Equals(e.FullPath, StringComparison.Ordinal))
7082
{
71-
BeginLoadSchemaDocument();
83+
_schemaUpdateEvents.Writer.TryWrite(true);
7284
}
7385
};
7486

7587
_watcher.EnableRaisingEvents = true;
76-
BeginLoadSchemaDocument();
88+
_schemaUpdateEvents.Writer.TryWrite(true);
89+
90+
SchemaUpdateProcessorAsync(_cts.Token).FireAndForget();
7791
}
7892

79-
public DocumentNode? SchemaDocument { get; private set; }
93+
public FusionConfiguration? Configuration { get; private set; }
8094

81-
public IDisposable Subscribe(IObserver<DocumentNode> observer)
95+
public IDisposable Subscribe(IObserver<FusionConfiguration> observer)
8296
{
8397
ArgumentNullException.ThrowIfNull(observer);
98+
ObjectDisposedException.ThrowIf(_disposed, this);
8499

85100
var session = new ObserverSession(this, observer);
86101

@@ -89,11 +104,11 @@ public IDisposable Subscribe(IObserver<DocumentNode> observer)
89104
_sessions = _sessions.Add(session);
90105
}
91106

92-
var schemaDocument = SchemaDocument;
107+
var configuration = Configuration;
93108

94-
if (schemaDocument is not null)
109+
if (configuration is not null)
95110
{
96-
observer.OnNext(SchemaDocument!);
111+
observer.OnNext(configuration);
97112
}
98113

99114
return session;
@@ -107,61 +122,118 @@ private void Unsubscribe(ObserverSession session)
107122
}
108123
}
109124

110-
private void BeginLoadSchemaDocument()
111-
=> LoadSchemaDocumentAsync(_ct).FireAndForget();
112-
113-
private async Task LoadSchemaDocumentAsync(CancellationToken cancellationToken)
125+
private async Task SchemaUpdateProcessorAsync(CancellationToken ct)
114126
{
115-
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
127+
var defaultSettings = JsonDocument.Parse("{ }");
128+
var defaultSettingsHash = XxHash64.HashToUInt64(JsonMarshal.GetRawUtf8Value(defaultSettings.RootElement));
116129

117-
try
130+
await foreach (var _ in _schemaUpdateEvents.Reader.ReadAllAsync(ct))
118131
{
119-
using var buffer = new PooledArrayWriter();
120-
await using var fileStream = File.OpenRead(_fileName);
121-
var pipeReader = PipeReader.Create(fileStream);
122-
123-
while (true)
132+
try
124133
{
125-
var result = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
126-
var readBuffer = result.Buffer;
134+
var settings = new JsonDocumentOwner(defaultSettings, EmptyMemoryOwner.Instance);
135+
DocumentNode schema;
136+
ulong settingsHash;
137+
ulong schemaHash;
127138

128-
foreach (var segment in readBuffer)
139+
if (_isPackage)
129140
{
130-
var span = segment.Span;
131-
span.CopyTo(buffer.GetSpan(span.Length));
132-
buffer.Advance(span.Length);
141+
await using (var fileStream = File.OpenRead(_fileName))
142+
{
143+
var hash = new XxHash64();
144+
await hash.AppendAsync(fileStream, ct);
145+
var packageHash = hash.GetCurrentHashAsUInt64();
146+
147+
if (packageHash == _packageHash)
148+
{
149+
continue;
150+
}
151+
152+
_packageHash = packageHash;
153+
}
154+
155+
using var archive = FusionArchive.Open(_fileName);
156+
using var config = await archive.TryGetGatewayConfigurationAsync(new Version(2, 0, 0), ct);
157+
158+
if (config is null)
159+
{
160+
// ignore and wait for next update
161+
continue;
162+
}
163+
164+
await using var stream = await config.OpenReadSchemaAsync(ct);
165+
(schema, schemaHash) = await ReadSchemaDocumentAsync(stream, ct);
166+
var settingsSpan = JsonMarshal.GetRawUtf8Value(config.Settings.RootElement);
167+
var buffer = new PooledArrayWriter(settingsSpan.Length);
168+
buffer.Write(settingsSpan);
169+
settingsHash = XxHash64.HashToUInt64(settingsSpan);
170+
settings = new JsonDocumentOwner(JsonDocument.Parse(buffer.WrittenMemory), buffer);
171+
}
172+
else
173+
{
174+
await using var stream = File.OpenRead(_fileName);
175+
(schema, schemaHash) = await ReadSchemaDocumentAsync(stream, ct);
176+
settingsHash = defaultSettingsHash;
133177
}
134178

135-
pipeReader.AdvanceTo(readBuffer.End);
136-
137-
if (result.IsCompleted)
179+
if (_schemaDocumentHash == schemaHash && _settingsHash == settingsHash)
138180
{
139-
break;
181+
settings.Dispose();
182+
continue;
140183
}
184+
185+
_settingsHash = settingsHash;
186+
_schemaDocumentHash = schemaHash;
187+
NotifyObservers(new FusionConfiguration(schema, settings));
141188
}
189+
catch
190+
{
191+
// ignore and wait for next update
192+
}
193+
}
194+
}
142195

143-
await pipeReader.CompleteAsync().ConfigureAwait(false);
196+
private async ValueTask<(DocumentNode, ulong)> ReadSchemaDocumentAsync(Stream stream, CancellationToken ct)
197+
{
198+
using var buffer = new PooledArrayWriter();
199+
var pipeReader = PipeReader.Create(stream);
144200

145-
var hash = XxHash64.HashToUInt64(buffer.WrittenSpan);
201+
while (true)
202+
{
203+
var result = await pipeReader.ReadAsync(ct).ConfigureAwait(false);
204+
var readBuffer = result.Buffer;
146205

147-
if (_schemaDocumentHash != hash)
206+
foreach (var segment in readBuffer)
148207
{
149-
_schemaDocumentHash = hash;
208+
var span = segment.Span;
209+
span.CopyTo(buffer.GetSpan(span.Length));
210+
buffer.Advance(span.Length);
211+
}
212+
213+
pipeReader.AdvanceTo(readBuffer.End);
150214

151-
var schemaDocument = Utf8GraphQLParser.Parse(buffer.WrittenSpan);
152-
SchemaDocument = schemaDocument;
153-
NotifyObservers(schemaDocument);
215+
if (result.IsCompleted)
216+
{
217+
break;
154218
}
155219
}
156-
finally
157-
{
158-
_semaphore.Release();
159-
}
220+
221+
await pipeReader.CompleteAsync().ConfigureAwait(false);
222+
223+
var hash = XxHash64.HashToUInt64(buffer.WrittenSpan);
224+
var document = Utf8GraphQLParser.Parse(buffer.WrittenSpan);
225+
return (document, hash);
160226
}
161227

162-
private void NotifyObservers(DocumentNode schemaDocument)
228+
private void NotifyObservers(FusionConfiguration configuration)
163229
{
164-
var sessions = _sessions;
230+
ImmutableArray<ObserverSession> sessions;
231+
232+
lock (_syncRoot)
233+
{
234+
sessions = _sessions;
235+
Configuration = configuration;
236+
}
165237

166238
if (sessions.IsEmpty)
167239
{
@@ -170,7 +242,7 @@ private void NotifyObservers(DocumentNode schemaDocument)
170242

171243
foreach (var session in sessions)
172244
{
173-
session.Notify(schemaDocument);
245+
session.Notify(configuration);
174246
}
175247
}
176248

@@ -187,48 +259,43 @@ public ValueTask DisposeAsync()
187259
_watcher.Dispose();
188260

189261
_cts.Cancel();
190-
191-
_semaphore.Dispose();
192262
_cts.Dispose();
193263

194264
foreach (var session in _sessions)
195265
{
196266
session.Complete();
197267
}
198268

269+
// drain events
270+
while (_schemaUpdateEvents.Reader.TryRead(out _))
271+
{
272+
}
273+
199274
return ValueTask.CompletedTask;
200275
}
201276

202-
private sealed class ObserverSession : IDisposable
277+
private sealed class ObserverSession(
278+
FileSystemFusionConfigurationProvider provider,
279+
IObserver<FusionConfiguration> observer)
280+
: IDisposable
203281
{
204-
private readonly FileSystemFusionConfigurationProvider _provider;
205-
private readonly IObserver<DocumentNode> _observer;
206-
207-
public ObserverSession(
208-
FileSystemFusionConfigurationProvider provider,
209-
IObserver<DocumentNode> observer)
210-
{
211-
_observer = observer;
212-
_provider = provider;
213-
}
214-
215-
public void Notify(DocumentNode schemaDocument)
282+
public void Notify(FusionConfiguration schemaDocument)
216283
{
217284
try
218285
{
219-
_observer.OnNext(schemaDocument);
286+
observer.OnNext(schemaDocument);
220287
}
221288
catch (Exception ex)
222289
{
223-
_observer.OnError(ex);
290+
observer.OnError(ex);
224291
}
225292
}
226293

227294
public void Complete()
228295
{
229296
try
230297
{
231-
_observer.OnCompleted();
298+
observer.OnCompleted();
232299
}
233300
catch
234301
{
@@ -238,6 +305,17 @@ public void Complete()
238305
}
239306

240307
public void Dispose()
241-
=> _provider.Unsubscribe(this);
308+
=> provider.Unsubscribe(this);
309+
}
310+
311+
private sealed class EmptyMemoryOwner : IMemoryOwner<byte>
312+
{
313+
public static readonly EmptyMemoryOwner Instance = new();
314+
315+
public Memory<byte> Memory => default;
316+
317+
public void Dispose()
318+
{
319+
}
242320
}
243321
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using HotChocolate.Buffers;
2+
using HotChocolate.Language;
3+
4+
namespace HotChocolate.Fusion.Configuration;
5+
6+
/// <summary>
7+
/// The fusion configuration consists of the fusion execution schema document and
8+
/// the fusion execution schema settings.
9+
/// </summary>
10+
/// <param name="Schema">
11+
/// The fusion execution schema document.
12+
/// </param>
13+
/// <param name="Settings">
14+
/// The fusion execution schema settings.
15+
/// </param>
16+
public sealed record FusionConfiguration(
17+
DocumentNode Schema,
18+
JsonDocumentOwner Settings)
19+
: IDisposable
20+
{
21+
public void Dispose() => Settings.Dispose();
22+
}

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Configuration/FusionGatewaySetup.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace HotChocolate.Fusion.Configuration;
99

1010
internal sealed class FusionGatewaySetup
1111
{
12-
public Func<IServiceProvider, IFusionSchemaDocumentProvider>? DocumentProvider { get; set; }
12+
public Func<IServiceProvider, IFusionConfigurationProvider>? DocumentProvider { get; set; }
1313

1414
public List<Action<FusionRequestOptions>> RequestOptionsModifiers { get; } = [];
1515

0 commit comments

Comments
 (0)