1
1
using System . Collections . Immutable ;
2
2
using System . Runtime . CompilerServices ;
3
- using System . Runtime . InteropServices ;
4
3
using HotChocolate . Execution ;
5
4
using HotChocolate . Fusion . Execution . Nodes ;
6
5
using HotChocolate . Language ;
@@ -14,10 +13,18 @@ public async Task<IExecutionResult> ExecuteAsync(
14
13
CancellationToken cancellationToken = default )
15
14
{
16
15
context . Begin ( ) ;
17
- await ExecutorSession . ExecuteAsync ( context , cancellationToken ) ;
16
+ var strategy = DetermineExecutionStrategy ( context ) ;
17
+ await ExecutorSession . ExecuteAsync ( context , strategy , cancellationToken ) ;
18
18
return context . Complete ( ) ;
19
19
}
20
20
21
+ private static ExecutionStrategy DetermineExecutionStrategy ( OperationPlanContext context )
22
+ => context . OperationPlan . Operation . Definition . Operation switch
23
+ {
24
+ OperationType . Mutation => ExecutionStrategy . SequentialRoots ,
25
+ _ => ExecutionStrategy . Parallel
26
+ } ;
27
+
21
28
private sealed class ExecutorSession
22
29
{
23
30
private readonly List < ExecutionNode > _stack = [ ] ;
@@ -29,33 +36,96 @@ private sealed class ExecutorSession
29
36
private readonly OperationPlan _plan ;
30
37
private readonly ImmutableArray < ExecutionNodeTrace > . Builder ? _traces ;
31
38
private readonly CancellationToken _cancellationToken ;
39
+ private readonly ExecutionStrategy _strategy ;
40
+ private int _nextRootNode ;
32
41
33
- private ExecutorSession ( OperationPlanContext context , CancellationToken cancellationToken )
42
+ private ExecutorSession ( OperationPlanContext context , ExecutionStrategy strategy , CancellationToken cancellationToken )
34
43
{
35
44
_context = context ;
45
+ _strategy = strategy ;
36
46
_cancellationToken = cancellationToken ;
37
47
_plan = context . OperationPlan ;
38
48
_backlog = [ .. context . OperationPlan . AllNodes ] ;
49
+
50
+ // For sequential execution (mutations), remove root nodes from backlog initially
51
+ if ( _strategy == ExecutionStrategy . SequentialRoots )
52
+ {
53
+ foreach ( var root in context . OperationPlan . RootNodes )
54
+ {
55
+ _backlog . Remove ( root ) ;
56
+ }
57
+ }
58
+
39
59
var collectTracing = context . Schema . GetRequestOptions ( ) . CollectOperationPlanTelemetry ;
40
60
_traces = collectTracing ? ImmutableArray . CreateBuilder < ExecutionNodeTrace > ( ) : null ;
41
61
}
42
62
43
- public static Task ExecuteAsync ( OperationPlanContext context , CancellationToken cancellationToken )
44
- => new ExecutorSession ( context , cancellationToken ) . ExecuteInternalAsync ( ) ;
63
+ public static Task ExecuteAsync ( OperationPlanContext context , ExecutionStrategy strategy , CancellationToken cancellationToken )
64
+ => new ExecutorSession ( context , strategy , cancellationToken ) . ExecuteInternalAsync ( ) ;
45
65
46
66
private async Task ExecuteInternalAsync ( )
47
67
{
48
- Start ( ) ;
68
+ if ( _strategy == ExecutionStrategy . Parallel )
69
+ {
70
+ await ExecuteQueryAsync ( ) ;
71
+ }
72
+ else
73
+ {
74
+ await ExecuteMutationAsync ( ) ;
75
+ }
76
+
77
+ if ( _traces is { Count : > 0 } )
78
+ {
79
+ _context . Traces = [ .. _traces ] ;
80
+ }
81
+ }
82
+
83
+ private async Task ExecuteQueryAsync ( )
84
+ {
85
+ // Start all root nodes immediately for parallel execution
86
+ StartAllRootNodes ( ) ;
49
87
88
+ // Process until all nodes complete
50
89
while ( IsProcessing ( ) )
51
90
{
52
91
await WaitForNextCompletionAsync ( ) ;
53
92
EnqueueNextNodes ( ) ;
54
93
}
55
94
}
56
95
57
- private ReadOnlySpan < ExecutionNode > WaitingToRun
58
- => CollectionsMarshal . AsSpan ( _backlog ) ;
96
+ private async Task ExecuteMutationAsync ( )
97
+ {
98
+ // Sequential root processing - one root at a time
99
+ while ( StartNextRootNode ( ) )
100
+ {
101
+ // Complete the entire subtree of current root before starting next
102
+ var enqueued = true ;
103
+ while ( enqueued )
104
+ {
105
+ await WaitForNextCompletionAsync ( ) ;
106
+ enqueued = EnqueueNextNodes ( ) ;
107
+ }
108
+ }
109
+ }
110
+
111
+ private void StartAllRootNodes ( )
112
+ {
113
+ foreach ( var node in _context . OperationPlan . RootNodes )
114
+ {
115
+ StartNode ( node ) ;
116
+ }
117
+ }
118
+
119
+ private bool StartNextRootNode ( )
120
+ {
121
+ var roots = _context . OperationPlan . RootNodes ;
122
+ if ( _nextRootNode < roots . Length )
123
+ {
124
+ StartNode ( roots [ _nextRootNode ++ ] ) ;
125
+ return true ;
126
+ }
127
+ return false ;
128
+ }
59
129
60
130
[ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
61
131
private bool IsProcessing ( ) => _backlog . Count > 0 || _activeTasks . Count > 0 ;
@@ -69,7 +139,7 @@ private void SkipNode(ExecutionNode node)
69
139
{
70
140
_backlog . Remove ( current ) ;
71
141
72
- foreach ( var enqueuedNode in WaitingToRun )
142
+ foreach ( var enqueuedNode in _backlog )
73
143
{
74
144
if ( enqueuedNode . Dependencies . Contains ( current ) )
75
145
{
@@ -79,14 +149,6 @@ private void SkipNode(ExecutionNode node)
79
149
}
80
150
}
81
151
82
- private void Start ( )
83
- {
84
- foreach ( var node in _context . OperationPlan . RootNodes )
85
- {
86
- StartNode ( node ) ;
87
- }
88
- }
89
-
90
152
[ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
91
153
private void StartNode ( ExecutionNode node )
92
154
{
@@ -109,7 +171,7 @@ private async Task WaitForNextCompletionAsync()
109
171
_traces ? . Add ( new ExecutionNodeTrace
110
172
{
111
173
Id = task . Result . Id ,
112
- SpanId = task . Result . Activity ? . Id ,
174
+ SpanId = task . Result . Activity ? . SpanId . ToHexString ( ) ,
113
175
Status = task . Result . Status ,
114
176
Duration = task . Result . Duration
115
177
} ) ;
@@ -122,7 +184,7 @@ private async Task WaitForNextCompletionAsync()
122
184
else if ( task . IsFaulted || task . IsCanceled )
123
185
{
124
186
// execution nodes are not expected to throw as exception should be handled within.
125
- // if they do its a fatal error for the execution, so we await failed task here
187
+ // if they do it's a fatal error for the execution, so we await failed task here
126
188
// so that they can throw and terminate the execution.
127
189
await task ;
128
190
}
@@ -134,16 +196,14 @@ private async Task WaitForNextCompletionAsync()
134
196
}
135
197
136
198
_completedTasks . Clear ( ) ;
137
-
138
- if ( _backlog . Count == 0 && _traces is { Count : > 0 } )
139
- {
140
- _context . Traces = [ .. _traces ] ;
141
- }
142
199
}
143
200
144
- private void EnqueueNextNodes ( )
201
+ private bool EnqueueNextNodes ( )
145
202
{
146
- foreach ( var node in WaitingToRun )
203
+ var enqueued = false ;
204
+ _stack . Clear ( ) ;
205
+
206
+ foreach ( var node in _backlog )
147
207
{
148
208
var dependenciesFulfilled = true ;
149
209
@@ -159,9 +219,17 @@ private void EnqueueNextNodes()
159
219
160
220
if ( dependenciesFulfilled )
161
221
{
162
- StartNode ( node ) ;
222
+ _stack . Push ( node ) ;
223
+ enqueued = true ;
163
224
}
164
225
}
226
+
227
+ foreach ( var node in _stack )
228
+ {
229
+ StartNode ( node ) ;
230
+ }
231
+
232
+ return enqueued ;
165
233
}
166
234
}
167
235
}
0 commit comments