Skip to content

Commit 9c4c7e7

Browse files
[Fusion] Support request batching for legacy servers (#9050)
1 parent 6982e62 commit 9c4c7e7

File tree

76 files changed

+3855
-262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+3855
-262
lines changed

src/HotChocolate/AspNetCore/src/Transport.Http/GraphQLHttpResponse.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ private async ValueTask<OperationResult> ReadAsResultInternalAsync(string? charS
278278
chunks,
279279
lastLength: currentChunkPosition,
280280
usedChunks: chunkIndex,
281-
options: default,
282281
pooledMemory: true);
283282
}
284283
finally
@@ -340,8 +339,8 @@ public IAsyncEnumerable<OperationResult> ReadAsResultStreamAsync()
340339
if (contentType?.MediaType?.Equals(ContentType.Json, StringComparison.Ordinal) ?? false)
341340
{
342341
_message.EnsureSuccessStatusCode();
343-
return new GraphQLHttpSingleResultEnumerable(
344-
ct => ReadAsResultInternalAsync(contentType.CharSet, ct));
342+
343+
return new JsonResultEnumerable(_message, contentType.CharSet);
345344
}
346345

347346
_message.EnsureSuccessStatusCode();
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
#if FUSION
2+
using System.Buffers;
3+
using System.IO.Pipelines;
4+
using System.Text.Json;
5+
using HotChocolate.Fusion.Text.Json;
6+
#else
7+
using System.Buffers;
8+
using System.IO.Pipelines;
9+
using System.Text.Json;
10+
using HotChocolate.Buffers;
11+
#endif
12+
13+
#if FUSION
14+
namespace HotChocolate.Fusion.Transport.Http;
15+
#else
16+
namespace HotChocolate.Transport.Http;
17+
#endif
18+
19+
/// <summary>
20+
/// Reads a JSON response that can be either a single object or an array of GraphQL responses.
21+
/// </summary>
22+
#if FUSION
23+
internal sealed class JsonResultEnumerable(HttpResponseMessage message, string? charSet) : IAsyncEnumerable<SourceResultDocument>
24+
#else
25+
internal sealed class JsonResultEnumerable(HttpResponseMessage message, string? charSet) : IAsyncEnumerable<OperationResult>
26+
#endif
27+
{
28+
private static readonly StreamPipeReaderOptions s_options = new(
29+
pool: MemoryPool<byte>.Shared,
30+
bufferSize: 4096,
31+
minimumReadSize: 1,
32+
leaveOpen: true,
33+
useZeroByteReads: true);
34+
35+
#if FUSION
36+
public async IAsyncEnumerator<SourceResultDocument> GetAsyncEnumerator(
37+
#else
38+
public async IAsyncEnumerator<OperationResult> GetAsyncEnumerator(
39+
#endif
40+
CancellationToken cancellationToken = default)
41+
{
42+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
43+
await using var contentStream = await message.Content.ReadAsStreamAsync(cts.Token).ConfigureAwait(false);
44+
45+
var stream = contentStream;
46+
var sourceEncoding = HttpTransportUtilities.GetEncoding(charSet);
47+
if (HttpTransportUtilities.NeedsTranscoding(sourceEncoding))
48+
{
49+
stream = HttpTransportUtilities.GetTranscodingStream(contentStream, sourceEncoding);
50+
}
51+
52+
var reader = PipeReader.Create(stream, s_options);
53+
54+
#if FUSION
55+
var chunks = ArrayPool<byte[]>.Shared.Rent(64);
56+
var currentChunk = JsonMemory.Rent();
57+
var currentChunkPosition = 0;
58+
var chunkIndex = 0;
59+
#else
60+
var buffer = new PooledArrayWriter();
61+
var bufferOwnershipTransferred = false;
62+
#endif
63+
64+
try
65+
{
66+
// Read the entire response into memory
67+
while (true)
68+
{
69+
var result = await reader.ReadAsync(cts.Token).ConfigureAwait(false);
70+
if (result.IsCanceled)
71+
{
72+
yield break;
73+
}
74+
75+
var pipeBuffer = result.Buffer;
76+
77+
#if FUSION
78+
foreach (var segment in pipeBuffer)
79+
{
80+
var segmentSpan = segment.Span;
81+
var segmentOffset = 0;
82+
83+
while (segmentOffset < segmentSpan.Length)
84+
{
85+
var spaceInCurrentChunk = JsonMemory.BufferSize - currentChunkPosition;
86+
var bytesToCopy = Math.Min(spaceInCurrentChunk, segmentSpan.Length - segmentOffset);
87+
88+
segmentSpan.Slice(segmentOffset, bytesToCopy).CopyTo(currentChunk.AsSpan(currentChunkPosition));
89+
currentChunkPosition += bytesToCopy;
90+
segmentOffset += bytesToCopy;
91+
92+
if (currentChunkPosition == JsonMemory.BufferSize)
93+
{
94+
if (chunkIndex >= chunks.Length)
95+
{
96+
var newChunks = ArrayPool<byte[]>.Shared.Rent(chunks.Length * 2);
97+
Array.Copy(chunks, 0, newChunks, 0, chunks.Length);
98+
chunks.AsSpan().Clear();
99+
ArrayPool<byte[]>.Shared.Return(chunks);
100+
chunks = newChunks;
101+
}
102+
103+
chunks[chunkIndex++] = currentChunk;
104+
currentChunk = JsonMemory.Rent();
105+
currentChunkPosition = 0;
106+
}
107+
}
108+
}
109+
#else
110+
foreach (var segment in pipeBuffer)
111+
{
112+
var span = buffer.GetSpan(segment.Length);
113+
segment.Span.CopyTo(span);
114+
buffer.Advance(segment.Length);
115+
}
116+
#endif
117+
118+
reader.AdvanceTo(pipeBuffer.End);
119+
120+
if (result.IsCompleted)
121+
{
122+
break;
123+
}
124+
}
125+
126+
#if FUSION
127+
// Add the final partial chunk
128+
if (chunkIndex >= chunks.Length)
129+
{
130+
var newChunks = ArrayPool<byte[]>.Shared.Rent(chunks.Length * 2);
131+
Array.Copy(chunks, 0, newChunks, 0, chunks.Length);
132+
chunks.AsSpan().Clear();
133+
ArrayPool<byte[]>.Shared.Return(chunks);
134+
chunks = newChunks;
135+
}
136+
chunks[chunkIndex++] = currentChunk;
137+
138+
if (IsJsonArray(chunks, chunkIndex, currentChunkPosition))
139+
{
140+
Utf8JsonReader jsonReader;
141+
if (chunkIndex > 1)
142+
{
143+
SequenceSegment? first = null;
144+
SequenceSegment? previous = null;
145+
var dataChunksSpan = chunks.AsSpan(0, chunkIndex);
146+
147+
for (var i = 0; i < dataChunksSpan.Length; i++)
148+
{
149+
var chunk = dataChunksSpan[i];
150+
var chunkDataLength =
151+
(i == dataChunksSpan.Length - 1) ? currentChunkPosition : JsonMemory.BufferSize;
152+
var current = new SequenceSegment(chunk, chunkDataLength);
153+
154+
first ??= current;
155+
previous?.SetNext(current);
156+
previous = current;
157+
}
158+
159+
if (first is null || previous is null)
160+
{
161+
throw new InvalidOperationException("Sequence segments cannot be empty.");
162+
}
163+
164+
var sequence = new ReadOnlySequence<byte>(first, 0, previous, currentChunkPosition);
165+
jsonReader = new Utf8JsonReader(sequence, default);
166+
}
167+
else
168+
{
169+
jsonReader = new Utf8JsonReader(chunks[0].AsSpan(0, currentChunkPosition), default);
170+
}
171+
172+
jsonReader.Read();
173+
174+
if (jsonReader.TokenType != JsonTokenType.StartArray)
175+
{
176+
throw new InvalidOperationException("Expected first JSON token to be a StartArray.");
177+
}
178+
179+
var documents = new List<SourceResultDocument>();
180+
181+
var isFirstDocument = true;
182+
while (jsonReader.Read())
183+
{
184+
if (jsonReader.TokenType == JsonTokenType.EndArray)
185+
{
186+
break;
187+
}
188+
189+
var document = SourceResultDocument.Parse(
190+
ref jsonReader,
191+
chunks,
192+
usedChunks: chunkIndex,
193+
skipInitialRead: true,
194+
pooledMemory: isFirstDocument);
195+
196+
documents.Add(document);
197+
198+
isFirstDocument = false;
199+
}
200+
201+
foreach (var document in documents)
202+
{
203+
yield return document;
204+
}
205+
}
206+
else
207+
{
208+
yield return SourceResultDocument.Parse(
209+
chunks,
210+
lastLength: currentChunkPosition,
211+
usedChunks: chunkIndex,
212+
pooledMemory: true);
213+
}
214+
#else
215+
var memory = buffer.WrittenMemory;
216+
217+
if (IsJsonArray(memory.Span))
218+
{
219+
var jsonReader = new Utf8JsonReader(memory.Span);
220+
var documents = new List<JsonDocument>();
221+
222+
if (!jsonReader.Read() || jsonReader.TokenType != JsonTokenType.StartArray)
223+
{
224+
throw new JsonException("Expected first JSON token to be a StartArray.");
225+
}
226+
227+
while (jsonReader.Read())
228+
{
229+
if (jsonReader.TokenType == JsonTokenType.EndArray)
230+
{
231+
break;
232+
}
233+
234+
if (jsonReader.TokenType == JsonTokenType.StartObject)
235+
{
236+
var doc = JsonDocument.ParseValue(ref jsonReader);
237+
documents.Add(doc);
238+
}
239+
}
240+
241+
foreach (var document in documents)
242+
{
243+
yield return OperationResult.Parse(document);
244+
}
245+
}
246+
else
247+
{
248+
var document = JsonDocument.Parse(memory);
249+
var documentOwner = new JsonDocumentOwner(document, buffer);
250+
yield return OperationResult.Parse(documentOwner);
251+
252+
bufferOwnershipTransferred = true;
253+
}
254+
#endif
255+
}
256+
finally
257+
{
258+
#if !FUSION
259+
// If we haven't transferred ownership of the buffer via a JsonDocumentOwner
260+
// or we've encountered an exception, we need to free the allocated memory.
261+
if (!bufferOwnershipTransferred)
262+
{
263+
buffer.Dispose();
264+
}
265+
#endif
266+
267+
await cts.CancelAsync().ConfigureAwait(false);
268+
await reader.CompleteAsync().ConfigureAwait(false);
269+
}
270+
}
271+
272+
#if FUSION
273+
private static bool IsJsonArray(byte[][] chunks, int usedChunks, int lastChunkLength)
274+
{
275+
for (var i = 0; i < usedChunks; i++)
276+
{
277+
var chunkLength = (i == usedChunks - 1) ? lastChunkLength : JsonMemory.BufferSize;
278+
var chunk = chunks[i].AsSpan(0, chunkLength);
279+
280+
foreach (var b in chunk)
281+
{
282+
// Skip whitespaces.
283+
if (b is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
284+
{
285+
continue;
286+
}
287+
288+
return b == (byte)'[';
289+
}
290+
}
291+
292+
return false;
293+
}
294+
#else
295+
private static bool IsJsonArray(ReadOnlySpan<byte> span)
296+
{
297+
foreach (var b in span)
298+
{
299+
if (b is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
300+
{
301+
continue;
302+
}
303+
304+
return b == (byte)'[';
305+
}
306+
307+
return false;
308+
}
309+
#endif
310+
}

src/HotChocolate/AspNetCore/src/Transport.Http/JsonLines/JsonLinesReader.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ private static SourceResultDocument ParseDocument(ReadOnlySequence<byte> lineBuf
132132
chunks,
133133
lastBufferSize,
134134
chunksNeeded,
135-
options: default,
136135
pooledMemory: true);
137136
}
138137

src/HotChocolate/AspNetCore/src/Transport.Http/Sse/SseEventParser.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,12 @@ bool FindLineEnd(Span<byte> chunk, out int end, out int nextPosition)
176176
if (lineFeedIndex > 0 && chunk[lineFeedIndex - 1] == (byte)'\r')
177177
{
178178
nextPosition = possibleEnd + 1;
179-
end = possibleEnd - 2;
179+
end = possibleEnd - 1;
180180
return true;
181181
}
182182

183183
nextPosition = possibleEnd + 1;
184-
end = possibleEnd - 1;
184+
end = possibleEnd;
185185
return true;
186186
}
187187

src/HotChocolate/AspNetCore/src/Transport.Http/Sse/SseReader.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ public async IAsyncEnumerator<OperationResult> GetAsyncEnumerator(
107107
case SseEventType.Complete:
108108
reader.AdvanceTo(buffer.GetPosition(1, position.Value));
109109
#if FUSION
110-
JsonMemory.Return(eventBuffers);
111110
eventBuffers.Clear();
111+
JsonMemory.Return(eventBuffers);
112112
#endif
113113
yield break;
114114

115115
case SseEventType.Next when eventMessage.Data is not null:
116116
#if FUSION
117-
var leftOver = eventBuffers.Count - eventMessage.Data.Length;
117+
var leftOver = eventBuffers.Count - eventMessage.UsedChunks;
118118
currentPosition = 0;
119119

120120
if (leftOver == 0)
@@ -130,7 +130,6 @@ public async IAsyncEnumerator<OperationResult> GetAsyncEnumerator(
130130
eventMessage.Data,
131131
eventMessage.LastChunkSize,
132132
eventMessage.UsedChunks,
133-
options: default,
134133
pooledMemory: true);
135134
#else
136135
eventBuffer.Reset();

0 commit comments

Comments
 (0)