@@ -108,21 +108,14 @@ impl SubgraphExecutorMap {
108108 client_request : & ClientRequestDetails < ' a , ' req > ,
109109 ) -> HttpExecutionResponse {
110110 match self . get_or_create_executor ( subgraph_name, client_request) {
111- Ok ( Some ( executor) ) => executor. execute ( execution_request) . await ,
111+ Ok ( executor) => executor. execute ( execution_request) . await ,
112112 Err ( err) => {
113113 error ! (
114114 "Subgraph executor error for subgraph '{}': {}" ,
115115 subgraph_name, err,
116116 ) ;
117117 self . internal_server_error_response ( err. into ( ) , subgraph_name)
118118 }
119- Ok ( None ) => {
120- error ! (
121- "Subgraph executor not found for subgraph '{}'" ,
122- subgraph_name
123- ) ;
124- self . internal_server_error_response ( "Internal server error" . into ( ) , subgraph_name)
125- }
126119 }
127120 }
128121
@@ -151,15 +144,17 @@ impl SubgraphExecutorMap {
151144 & self ,
152145 subgraph_name : & str ,
153146 client_request : & ClientRequestDetails < ' _ , ' _ > ,
154- ) -> Result < Option < SubgraphExecutorBoxedArc > , SubgraphExecutorError > {
155- let from_expression =
156- self . get_or_create_executor_from_expression ( subgraph_name, client_request) ?;
157-
158- if from_expression. is_some ( ) {
159- return Ok ( from_expression) ;
160- }
161-
162- Ok ( self . get_executor_from_static_endpoint ( subgraph_name) )
147+ ) -> Result < SubgraphExecutorBoxedArc , SubgraphExecutorError > {
148+ self . expressions_by_subgraph
149+ . get ( subgraph_name)
150+ . map ( |expression| {
151+ self . get_or_create_executor_from_expression (
152+ subgraph_name,
153+ expression,
154+ client_request,
155+ )
156+ } )
157+ . unwrap_or_else ( || self . get_executor_from_static_endpoint ( subgraph_name) )
163158 }
164159
165160 /// Looks up a subgraph executor,
@@ -169,65 +164,50 @@ impl SubgraphExecutorMap {
169164 fn get_or_create_executor_from_expression (
170165 & self ,
171166 subgraph_name : & str ,
167+ expression : & VrlProgram ,
172168 client_request : & ClientRequestDetails < ' _ , ' _ > ,
173- ) -> Result < Option < SubgraphExecutorBoxedArc > , SubgraphExecutorError > {
174- if let Some ( expression) = self . expressions_by_subgraph . get ( subgraph_name) {
175- let original_url_value = VrlValue :: Bytes ( Bytes :: from (
176- self . static_endpoints_by_subgraph
177- . get ( subgraph_name)
178- . map ( |endpoint| endpoint. value ( ) . clone ( ) )
179- . ok_or_else ( || {
180- SubgraphExecutorError :: StaticEndpointNotFound ( subgraph_name. to_string ( ) )
181- } ) ?,
182- ) ) ;
183- let value = VrlValue :: Object ( BTreeMap :: from ( [
184- ( "request" . into ( ) , client_request. into ( ) ) ,
185- ( "original_url" . into ( ) , original_url_value) ,
186- ] ) ) ;
187-
188- // Resolve the expression to get an endpoint URL.
189- let endpoint_result =
190- execute_expression_with_value ( expression, value) . map_err ( |err| {
191- SubgraphExecutorError :: new_endpoint_expression_resolution_failure (
192- subgraph_name. to_string ( ) ,
193- err,
194- )
195- } ) ?;
196- let endpoint_str = match endpoint_result. as_str ( ) {
197- Some ( s) => Ok ( s) ,
198- None => Err ( SubgraphExecutorError :: EndpointExpressionWrongType (
199- subgraph_name. to_string ( ) ,
200- ) ) ,
201- } ?;
202-
203- // Check if an executor for this endpoint already exists.
204- let existing_executor = self
205- . executors_by_subgraph
169+ ) -> Result < SubgraphExecutorBoxedArc , SubgraphExecutorError > {
170+ let original_url_value = VrlValue :: Bytes ( Bytes :: from (
171+ self . static_endpoints_by_subgraph
206172 . get ( subgraph_name)
207- . and_then ( |endpoints| endpoints. get ( endpoint_str. as_ref ( ) ) . map ( |e| e. clone ( ) ) ) ;
208-
209- if let Some ( executor) = existing_executor {
210- return Ok ( Some ( executor) ) ;
211- }
212-
173+ . map ( |endpoint| endpoint. value ( ) . clone ( ) )
174+ . ok_or_else ( || {
175+ SubgraphExecutorError :: StaticEndpointNotFound ( subgraph_name. to_string ( ) )
176+ } ) ?,
177+ ) ) ;
178+ let value = VrlValue :: Object ( BTreeMap :: from ( [
179+ ( "request" . into ( ) , client_request. into ( ) ) ,
180+ ( "original_url" . into ( ) , original_url_value) ,
181+ ] ) ) ;
182+
183+ // Resolve the expression to get an endpoint URL.
184+ let endpoint_result = execute_expression_with_value ( expression, value) . map_err ( |err| {
185+ SubgraphExecutorError :: new_endpoint_expression_resolution_failure (
186+ subgraph_name. to_string ( ) ,
187+ err,
188+ )
189+ } ) ?;
190+ let endpoint_str = match endpoint_result. as_str ( ) {
191+ Some ( s) => Ok ( s) ,
192+ None => Err ( SubgraphExecutorError :: EndpointExpressionWrongType (
193+ subgraph_name. to_string ( ) ,
194+ ) ) ,
195+ } ?;
196+
197+ // Check if an executor for this endpoint already exists.
198+ self . executors_by_subgraph
199+ . get ( subgraph_name)
200+ . and_then ( |endpoints| endpoints. get ( endpoint_str. as_ref ( ) ) . map ( |e| e. clone ( ) ) )
201+ . map ( Ok )
213202 // If not, create and register a new one.
214- self . register_executor ( subgraph_name, & endpoint_str) ?;
215-
216- let endpoints = self
217- . executors_by_subgraph
218- . get ( subgraph_name)
219- . expect ( "Executor was just registered, should be present" ) ;
220- return Ok ( endpoints. get ( endpoint_str. as_ref ( ) ) . map ( |e| e. clone ( ) ) ) ;
221- }
222-
223- Ok ( None )
203+ . unwrap_or_else ( || self . register_executor ( subgraph_name, endpoint_str. as_ref ( ) ) )
224204 }
225205
226206 /// Looks up a subgraph executor based on a static endpoint URL.
227207 fn get_executor_from_static_endpoint (
228208 & self ,
229209 subgraph_name : & str ,
230- ) -> Option < SubgraphExecutorBoxedArc > {
210+ ) -> Result < SubgraphExecutorBoxedArc , SubgraphExecutorError > {
231211 self . static_endpoints_by_subgraph
232212 . get ( subgraph_name)
233213 . and_then ( |endpoint_ref| {
@@ -236,6 +216,7 @@ impl SubgraphExecutorMap {
236216 . get ( subgraph_name)
237217 . and_then ( |endpoints| endpoints. get ( endpoint_str) . map ( |e| e. clone ( ) ) )
238218 } )
219+ . ok_or_else ( || SubgraphExecutorError :: StaticEndpointNotFound ( subgraph_name. to_string ( ) ) )
239220 }
240221
241222 /// Registers a VRL expression for the given subgraph name.
@@ -269,7 +250,7 @@ impl SubgraphExecutorMap {
269250 & self ,
270251 subgraph_name : & str ,
271252 endpoint_str : & str ,
272- ) -> Result < ( ) , SubgraphExecutorError > {
253+ ) -> Result < SubgraphExecutorBoxedArc , SubgraphExecutorError > {
273254 let endpoint_uri = endpoint_str. parse :: < Uri > ( ) . map_err ( |e| {
274255 SubgraphExecutorError :: EndpointParseFailure ( endpoint_str. to_string ( ) , e. to_string ( ) )
275256 } ) ?;
@@ -302,11 +283,13 @@ impl SubgraphExecutorMap {
302283 self . in_flight_requests . clone ( ) ,
303284 ) ;
304285
286+ let executor_arc = executor. to_boxed_arc ( ) ;
287+
305288 self . executors_by_subgraph
306289 . entry ( subgraph_name. to_string ( ) )
307290 . or_default ( )
308- . insert ( endpoint_str. to_string ( ) , executor . to_boxed_arc ( ) ) ;
291+ . insert ( endpoint_str. to_string ( ) , executor_arc . clone ( ) ) ;
309292
310- Ok ( ( ) )
293+ Ok ( executor_arc )
311294 }
312295}
0 commit comments