@@ -35,7 +35,7 @@ internal sealed class FusionRequestExecutorManager
3535 , IAsyncDisposable
3636{
3737 private readonly object _lock = new ( ) ;
38- private readonly SemaphoreSlim _semaphore = new ( 1 , 1 ) ;
38+ private readonly ConcurrentDictionary < string , SemaphoreSlim > _semaphoreBySchema = new ( ) ;
3939 private readonly ConcurrentDictionary < string , RequestExecutorRegistration > _registry = [ ] ;
4040 private readonly IOptionsMonitor < FusionGatewaySetup > _optionsMonitor ;
4141 private readonly IServiceProvider _applicationServices ;
@@ -90,7 +90,8 @@ private async ValueTask<IRequestExecutor> GetOrCreateRequestExecutorAsync(
9090 string schemaName ,
9191 CancellationToken cancellationToken )
9292 {
93- await _semaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
93+ var semaphore = GetSemaphoreForSchema ( schemaName ) ;
94+ await semaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
9495
9596 try
9697 {
@@ -106,10 +107,32 @@ private async ValueTask<IRequestExecutor> GetOrCreateRequestExecutorAsync(
106107 }
107108 finally
108109 {
109- _semaphore . Release ( ) ;
110+ semaphore . Release ( ) ;
110111 }
111112 }
112113
114+ private SemaphoreSlim GetSemaphoreForSchema ( string schemaName )
115+ => _semaphoreBySchema . GetOrAdd ( schemaName , _ => new SemaphoreSlim ( 1 , 1 ) ) ;
116+
117+ private async ValueTask EvictExecutorAsync ( FusionRequestExecutor executor , CancellationToken cancellationToken )
118+ {
119+ await _executorEvents . WriteEvictedAsync ( executor , cancellationToken ) ;
120+
121+ EvictRequestExecutorAsync ( executor ) . FireAndForget ( ) ;
122+ }
123+
124+ private static async Task EvictRequestExecutorAsync ( FusionRequestExecutor previousExecutor )
125+ {
126+ var evictionTimeout = previousExecutor . Schema . Features
127+ . GetRequired < FusionRequestOptions > ( ) . EvictionTimeout ;
128+
129+ // we will give the request executor some grace period to finish all requests
130+ // in the pipeline.
131+ await Task . Delay ( evictionTimeout ) . ConfigureAwait ( false ) ;
132+
133+ await previousExecutor . DisposeAsync ( ) . ConfigureAwait ( false ) ;
134+ }
135+
113136 private async ValueTask < RequestExecutorRegistration > CreateInitialRegistrationAsync (
114137 string schemaName ,
115138 CancellationToken cancellationToken )
@@ -119,10 +142,14 @@ private async ValueTask<RequestExecutorRegistration> CreateInitialRegistrationAs
119142 var ( configuration , documentProvider ) =
120143 await GetSchemaDocumentAsync ( setup . DocumentProvider , cancellationToken ) . ConfigureAwait ( false ) ;
121144
145+ var executor = CreateRequestExecutor ( schemaName , configuration ) ;
146+
147+ await WarmupExecutorAsync ( executor , cancellationToken ) . ConfigureAwait ( false ) ;
148+
122149 return new RequestExecutorRegistration (
123150 this ,
124151 documentProvider ,
125- CreateRequestExecutor ( schemaName , configuration ) ,
152+ executor ,
126153 configuration ) ;
127154 }
128155
@@ -149,6 +176,15 @@ private FusionRequestExecutor CreateRequestExecutor(
149176 return executor ;
150177 }
151178
179+ private async Task WarmupExecutorAsync ( IRequestExecutor executor , CancellationToken cancellationToken )
180+ {
181+ var warmupTasks = executor . Schema . Services . GetServices < IRequestExecutorWarmupTask > ( ) ;
182+ foreach ( var warmupTask in warmupTasks )
183+ {
184+ await warmupTask . WarmupAsync ( executor , cancellationToken ) . ConfigureAwait ( false ) ;
185+ }
186+ }
187+
152188 private async ValueTask < ( FusionConfiguration , IFusionConfigurationProvider ) > GetSchemaDocumentAsync (
153189 Func < IServiceProvider , IFusionConfigurationProvider > ? documentProviderFactory ,
154190 CancellationToken cancellationToken )
@@ -165,7 +201,7 @@ private FusionRequestExecutor CreateRequestExecutor(
165201 return ( await documentPromise . Task . ConfigureAwait ( false ) , documentProvider ) ;
166202 }
167203
168- private static FusionRequestOptions CreateRequestOptions ( FusionGatewaySetup setup )
204+ internal static FusionRequestOptions CreateRequestOptions ( FusionGatewaySetup setup )
169205 {
170206 var options = new FusionRequestOptions ( ) ;
171207
@@ -174,15 +210,7 @@ private static FusionRequestOptions CreateRequestOptions(FusionGatewaySetup setu
174210 configure . Invoke ( options ) ;
175211 }
176212
177- if ( options . OperationExecutionPlanCacheSize < 16 )
178- {
179- options . OperationExecutionPlanCacheSize = 16 ;
180- }
181-
182- if ( options . OperationDocumentCacheSize < 16 )
183- {
184- options . OperationDocumentCacheSize = 16 ;
185- }
213+ options . MakeReadOnly ( ) ;
186214
187215 return options ;
188216 }
@@ -490,6 +518,13 @@ public async ValueTask DisposeAsync()
490518 session . OnCompleted ( ) ;
491519 }
492520
521+ foreach ( var semaphore in _semaphoreBySchema . Values )
522+ {
523+ semaphore . Dispose ( ) ;
524+ }
525+
526+ _semaphoreBySchema . Clear ( ) ;
527+
493528 _observers = [ ] ;
494529 }
495530
@@ -555,7 +590,7 @@ private async Task WaitForUpdatesAsync()
555590 break ;
556591 }
557592
558- var documentHash = XxHash64 . HashToUInt64 ( Encoding . UTF8 . GetBytes ( configuration . ToString ( ) ) ) ;
593+ var documentHash = XxHash64 . HashToUInt64 ( Encoding . UTF8 . GetBytes ( configuration . Schema . ToString ( ) ) ) ;
559594 var settingsHash = XxHash64 . HashToUInt64 ( GetRawUtf8Value ( configuration . Settings . Document . RootElement ) ) ;
560595
561596 if ( documentHash == _documentHash && settingsHash == _settingsHash )
@@ -569,12 +604,12 @@ private async Task WaitForUpdatesAsync()
569604 var previousExecutor = Executor ;
570605 var nextExecutor = _manager . CreateRequestExecutor ( Executor . Schema . Name , configuration ) ;
571606
572- // TODO : should we have the warmup tasks here?
607+ await _manager . WarmupExecutorAsync ( nextExecutor , _cancellationToken ) . ConfigureAwait ( false ) ;
573608
574609 Executor = nextExecutor ;
575610
576- // we need to free the resources of the current schema as well as for the configuration object.
577- await previousExecutor . DisposeAsync ( ) . ConfigureAwait ( false ) ;
611+ await _manager . EvictExecutorAsync ( previousExecutor , _cancellationToken ) ;
612+
578613 configuration . Dispose ( ) ;
579614 }
580615 }
@@ -718,4 +753,13 @@ public static async ValueTask WriteCreatedAsync(
718753 var eventArgs = RequestExecutorEvent . Created ( executor ) ;
719754 await executorEvents . Writer . WriteAsync ( eventArgs , cancellationToken ) . ConfigureAwait ( false ) ;
720755 }
756+
757+ public static async ValueTask WriteEvictedAsync (
758+ this Channel < RequestExecutorEvent > executorEvents ,
759+ FusionRequestExecutor executor ,
760+ CancellationToken cancellationToken )
761+ {
762+ var eventArgs = RequestExecutorEvent . Evicted ( executor ) ;
763+ await executorEvents . Writer . WriteAsync ( eventArgs , cancellationToken ) . ConfigureAwait ( false ) ;
764+ }
721765}
0 commit comments