Skip to content

Commit 5522cca

Browse files
committed
Simplify
1 parent 68945ff commit 5522cca

File tree

1 file changed

+53
-70
lines changed
  • lib/executor/src/executors

1 file changed

+53
-70
lines changed

lib/executor/src/executors/map.rs

Lines changed: 53 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -108,21 +108,14 @@ impl SubgraphExecutorMap {
108108
client_request: &ClientRequestDetails<'a, 'req>,
109109
) -> HttpExecutionResponse {
110110
match self.get_or_create_executor(subgraph_name, client_request) {
111-
Ok(Some(executor)) => executor.execute(execution_request).await,
111+
Ok(executor) => executor.execute(execution_request).await,
112112
Err(err) => {
113113
error!(
114114
"Subgraph executor error for subgraph '{}': {}",
115115
subgraph_name, err,
116116
);
117117
self.internal_server_error_response(err.into(), subgraph_name)
118118
}
119-
Ok(None) => {
120-
error!(
121-
"Subgraph executor not found for subgraph '{}'",
122-
subgraph_name
123-
);
124-
self.internal_server_error_response("Internal server error".into(), subgraph_name)
125-
}
126119
}
127120
}
128121

@@ -151,15 +144,17 @@ impl SubgraphExecutorMap {
151144
&self,
152145
subgraph_name: &str,
153146
client_request: &ClientRequestDetails<'_, '_>,
154-
) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
155-
let from_expression =
156-
self.get_or_create_executor_from_expression(subgraph_name, client_request)?;
157-
158-
if from_expression.is_some() {
159-
return Ok(from_expression);
160-
}
161-
162-
Ok(self.get_executor_from_static_endpoint(subgraph_name))
147+
) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
148+
self.expressions_by_subgraph
149+
.get(subgraph_name)
150+
.map(|expression| {
151+
self.get_or_create_executor_from_expression(
152+
subgraph_name,
153+
expression,
154+
client_request,
155+
)
156+
})
157+
.unwrap_or_else(|| self.get_executor_from_static_endpoint(subgraph_name))
163158
}
164159

165160
/// Looks up a subgraph executor,
@@ -169,65 +164,50 @@ impl SubgraphExecutorMap {
169164
fn get_or_create_executor_from_expression(
170165
&self,
171166
subgraph_name: &str,
167+
expression: &VrlProgram,
172168
client_request: &ClientRequestDetails<'_, '_>,
173-
) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
174-
if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) {
175-
let original_url_value = VrlValue::Bytes(Bytes::from(
176-
self.static_endpoints_by_subgraph
177-
.get(subgraph_name)
178-
.map(|endpoint| endpoint.value().clone())
179-
.ok_or_else(|| {
180-
SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
181-
})?,
182-
));
183-
let value = VrlValue::Object(BTreeMap::from([
184-
("request".into(), client_request.into()),
185-
("original_url".into(), original_url_value),
186-
]));
187-
188-
// Resolve the expression to get an endpoint URL.
189-
let endpoint_result =
190-
execute_expression_with_value(expression, value).map_err(|err| {
191-
SubgraphExecutorError::new_endpoint_expression_resolution_failure(
192-
subgraph_name.to_string(),
193-
err,
194-
)
195-
})?;
196-
let endpoint_str = match endpoint_result.as_str() {
197-
Some(s) => Ok(s),
198-
None => Err(SubgraphExecutorError::EndpointExpressionWrongType(
199-
subgraph_name.to_string(),
200-
)),
201-
}?;
202-
203-
// Check if an executor for this endpoint already exists.
204-
let existing_executor = self
205-
.executors_by_subgraph
169+
) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
170+
let original_url_value = VrlValue::Bytes(Bytes::from(
171+
self.static_endpoints_by_subgraph
206172
.get(subgraph_name)
207-
.and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone()));
208-
209-
if let Some(executor) = existing_executor {
210-
return Ok(Some(executor));
211-
}
212-
173+
.map(|endpoint| endpoint.value().clone())
174+
.ok_or_else(|| {
175+
SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
176+
})?,
177+
));
178+
let value = VrlValue::Object(BTreeMap::from([
179+
("request".into(), client_request.into()),
180+
("original_url".into(), original_url_value),
181+
]));
182+
183+
// Resolve the expression to get an endpoint URL.
184+
let endpoint_result = execute_expression_with_value(expression, value).map_err(|err| {
185+
SubgraphExecutorError::new_endpoint_expression_resolution_failure(
186+
subgraph_name.to_string(),
187+
err,
188+
)
189+
})?;
190+
let endpoint_str = match endpoint_result.as_str() {
191+
Some(s) => Ok(s),
192+
None => Err(SubgraphExecutorError::EndpointExpressionWrongType(
193+
subgraph_name.to_string(),
194+
)),
195+
}?;
196+
197+
// Check if an executor for this endpoint already exists.
198+
self.executors_by_subgraph
199+
.get(subgraph_name)
200+
.and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone()))
201+
.map(Ok)
213202
// If not, create and register a new one.
214-
self.register_executor(subgraph_name, &endpoint_str)?;
215-
216-
let endpoints = self
217-
.executors_by_subgraph
218-
.get(subgraph_name)
219-
.expect("Executor was just registered, should be present");
220-
return Ok(endpoints.get(endpoint_str.as_ref()).map(|e| e.clone()));
221-
}
222-
223-
Ok(None)
203+
.unwrap_or_else(|| self.register_executor(subgraph_name, endpoint_str.as_ref()))
224204
}
225205

226206
/// Looks up a subgraph executor based on a static endpoint URL.
227207
fn get_executor_from_static_endpoint(
228208
&self,
229209
subgraph_name: &str,
230-
) -> Option<SubgraphExecutorBoxedArc> {
210+
) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
231211
self.static_endpoints_by_subgraph
232212
.get(subgraph_name)
233213
.and_then(|endpoint_ref| {
@@ -236,6 +216,7 @@ impl SubgraphExecutorMap {
236216
.get(subgraph_name)
237217
.and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone()))
238218
})
219+
.ok_or_else(|| SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()))
239220
}
240221

241222
/// Registers a VRL expression for the given subgraph name.
@@ -269,7 +250,7 @@ impl SubgraphExecutorMap {
269250
&self,
270251
subgraph_name: &str,
271252
endpoint_str: &str,
272-
) -> Result<(), SubgraphExecutorError> {
253+
) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
273254
let endpoint_uri = endpoint_str.parse::<Uri>().map_err(|e| {
274255
SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string())
275256
})?;
@@ -302,11 +283,13 @@ impl SubgraphExecutorMap {
302283
self.in_flight_requests.clone(),
303284
);
304285

286+
let executor_arc = executor.to_boxed_arc();
287+
305288
self.executors_by_subgraph
306289
.entry(subgraph_name.to_string())
307290
.or_default()
308-
.insert(endpoint_str.to_string(), executor.to_boxed_arc());
291+
.insert(endpoint_str.to_string(), executor_arc.clone());
309292

310-
Ok(())
293+
Ok(executor_arc)
311294
}
312295
}

0 commit comments

Comments
 (0)