Skip to content

Commit a65567d

Browse files
authored
Merge pull request #637 from apollographql/AMS-403
Keep server running when collection sync encounters invalid operations
2 parents ad5d61a + a712d9f commit a65567d

File tree

8 files changed

+434
-12
lines changed

8 files changed

+434
-12
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
default: patch
3+
---
4+
5+
# Fix server crash on collection sync with invalid operations
6+
7+
A single operation with malformed variables JSON in a collection would crash the entire server. Invalid operations are now skipped with a warning, and the server continues serving with the remaining valid operations.

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo-mcp-registry/src/platform_api/operation_collections/collection_poller.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,13 @@ async fn handle_poll_result(
140140

141141
#[derive(Clone, Debug)]
142142
pub struct OperationData {
143-
id: String,
144-
last_updated_at: String,
143+
pub id: String,
144+
pub last_updated_at: String,
145145
pub source_text: String,
146146
pub headers: Option<Vec<(String, String)>>,
147147
pub variables: Option<String>,
148148
}
149+
149150
impl From<&OperationCollectionEntry> for OperationData {
150151
fn from(operation: &OperationCollectionEntry) -> Self {
151152
Self {

crates/apollo-mcp-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ insta.workspace = true
7878
mockito = "1.7.0"
7979
opentelemetry_sdk = { version = "0.31.0", features = ["testing"] }
8080
rstest.workspace = true
81+
secrecy.workspace = true
8182
serde_yaml = "0.9.34"
8283
tokio.workspace = true
8384
tower = "0.5.2"

crates/apollo-mcp-server/src/operations/operation_source.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,17 @@ impl OperationSource {
5656
.into_stream()
5757
.map(|event| match event {
5858
CollectionEvent::UpdateOperationCollection(operations) => {
59-
match operations
59+
let raw_operations = operations
6060
.iter()
61-
.map(RawOperation::try_from)
62-
.collect::<Result<Vec<_>, _>>()
63-
{
64-
Ok(operations) => Event::OperationsUpdated(operations),
65-
Err(e) => Event::CollectionError(e),
66-
}
61+
.filter_map(|op| {
62+
RawOperation::try_from(op)
63+
.inspect_err(|e| {
64+
warn!("Skipping invalid operation in collection: {e}");
65+
})
66+
.ok()
67+
})
68+
.collect();
69+
Event::OperationsUpdated(raw_operations)
6770
}
6871
CollectionEvent::CollectionError(error) => Event::CollectionError(error),
6972
})

crates/apollo-mcp-server/src/operations/raw_operation.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,53 @@ impl serde::Serialize for RawOperation {
144144
state.end()
145145
}
146146
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use super::*;
151+
152+
fn operation_data(source_text: &str, variables: Option<&str>) -> OperationData {
153+
OperationData {
154+
id: String::new(),
155+
last_updated_at: String::new(),
156+
source_text: source_text.to_string(),
157+
variables: variables.map(|v| v.to_string()),
158+
headers: None,
159+
}
160+
}
161+
162+
#[test]
163+
fn try_from_fails_with_malformed_variables_json() {
164+
let data = operation_data(
165+
"query GetUser($id: ID!) { user(id: $id) { name } }",
166+
Some(r#"{id: "123"}"#), // unquoted key
167+
);
168+
let result = RawOperation::try_from(&data);
169+
assert!(
170+
matches!(result, Err(CollectionError::InvalidVariables(_))),
171+
"expected InvalidVariables error for malformed JSON, got {result:?}"
172+
);
173+
}
174+
175+
#[test]
176+
fn one_bad_operation_does_not_poison_batch() {
177+
let operations = [
178+
operation_data(
179+
"query GetUser($id: ID!) { user(id: $id) { name } }",
180+
Some(r#"{"id": "123"}"#),
181+
),
182+
operation_data("query ListUsers { users { name } }", None),
183+
operation_data(
184+
"query SearchUsers($term: String!) { search(term: $term) { name } }",
185+
Some(r#"not valid json"#),
186+
),
187+
];
188+
189+
let recovered: Vec<RawOperation> = operations
190+
.iter()
191+
.filter_map(|op| RawOperation::try_from(op).ok())
192+
.collect();
193+
194+
assert_eq!(recovered.len(), 2);
195+
}
196+
}

crates/apollo-mcp-server/src/server/states.rs

Lines changed: 197 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,15 @@ impl StateMachine {
153153
ServerEvent::OperationError(e, _) => {
154154
State::Error(ServerError::Operation(OperationError::File(e)))
155155
}
156-
ServerEvent::CollectionError(e) => {
157-
State::Error(ServerError::Operation(OperationError::Collection(e)))
158-
}
156+
ServerEvent::CollectionError(e) => match state {
157+
State::Running(running) => {
158+
tracing::error!(
159+
"Collection error while running, keeping existing operations: {e}"
160+
);
161+
running.into()
162+
}
163+
_ => State::Error(ServerError::Operation(OperationError::Collection(e))),
164+
},
159165
ServerEvent::Shutdown => match state {
160166
State::Running(running) => {
161167
running.cancellation_token.cancel();
@@ -305,3 +311,191 @@ impl From<ServerError> for State {
305311
State::Error(error)
306312
}
307313
}
314+
315+
#[cfg(test)]
316+
mod tests {
317+
use std::sync::Arc;
318+
319+
use apollo_compiler::Schema;
320+
use apollo_mcp_registry::platform_api::operation_collections::error::CollectionError;
321+
use reqwest::header::HeaderMap;
322+
use tokio::sync::RwLock;
323+
use tokio_util::sync::CancellationToken;
324+
325+
use crate::cors::CorsConfig;
326+
use crate::errors::OperationError;
327+
use crate::event::Event as ServerEvent;
328+
use crate::health::HealthCheckConfig;
329+
use crate::host_validation::HostValidationConfig;
330+
use crate::operations::{MutationMode, RawOperation};
331+
use crate::server::Transport;
332+
use crate::server_info::ServerInfoConfig;
333+
334+
use super::{Config, Configuring, Running, State};
335+
336+
fn create_running_server() -> Running {
337+
let schema = Schema::parse("type Query { id: String }", "schema.graphql")
338+
.unwrap()
339+
.validate()
340+
.unwrap();
341+
342+
Running {
343+
schema: Arc::new(RwLock::new(schema)),
344+
operations: Arc::new(RwLock::new(vec![])),
345+
apps: vec![],
346+
headers: HeaderMap::new(),
347+
forward_headers: vec![],
348+
endpoint: "http://localhost:4000".parse().unwrap(),
349+
execute_tool: None,
350+
introspect_tool: None,
351+
search_tool: None,
352+
explorer_tool: None,
353+
validate_tool: None,
354+
custom_scalar_map: None,
355+
peers: Arc::new(RwLock::new(vec![])),
356+
cancellation_token: CancellationToken::new(),
357+
mutation_mode: MutationMode::None,
358+
disable_type_description: false,
359+
disable_schema_description: false,
360+
enable_output_schema: false,
361+
disable_auth_token_passthrough: false,
362+
health_check: None,
363+
server_info: ServerInfoConfig::default(),
364+
}
365+
}
366+
367+
fn test_config() -> Config {
368+
Config {
369+
transport: Transport::StreamableHttp {
370+
auth: None,
371+
address: "127.0.0.1".parse().unwrap(),
372+
port: 0,
373+
stateful_mode: false,
374+
host_validation: HostValidationConfig::default(),
375+
},
376+
endpoint: "http://localhost:4000".parse().unwrap(),
377+
headers: HeaderMap::new(),
378+
forward_headers: vec![],
379+
execute_introspection: false,
380+
validate_introspection: false,
381+
introspect_introspection: false,
382+
search_introspection: false,
383+
introspect_minify: false,
384+
search_minify: false,
385+
explorer_graph_ref: None,
386+
execute_tool_hint: None,
387+
introspect_tool_hint: None,
388+
search_tool_hint: None,
389+
validate_tool_hint: None,
390+
custom_scalar_map: None,
391+
mutation_mode: MutationMode::None,
392+
disable_type_description: false,
393+
disable_schema_description: false,
394+
enable_output_schema: false,
395+
disable_auth_token_passthrough: false,
396+
search_leaf_depth: 5,
397+
index_memory_bytes: 1024 * 1024,
398+
health_check: HealthCheckConfig::default(),
399+
cors: CorsConfig::default(),
400+
server_info: ServerInfoConfig::default(),
401+
}
402+
}
403+
404+
// Replicate the event-processing match from StateMachine::start() to test
405+
// how each event variant is handled when the server is in the Running state.
406+
async fn process_event(state: State, event: ServerEvent) -> State {
407+
match event {
408+
ServerEvent::OperationsUpdated(operations) => match state {
409+
State::Running(running) => {
410+
running.update_operations(operations).await;
411+
running.into()
412+
}
413+
other => other,
414+
},
415+
ServerEvent::OperationError(e, _) => State::Error(
416+
crate::errors::ServerError::Operation(OperationError::File(e)),
417+
),
418+
ServerEvent::CollectionError(e) => match state {
419+
State::Running(running) => running.into(),
420+
_ => State::Error(crate::errors::ServerError::Operation(
421+
OperationError::Collection(e),
422+
)),
423+
},
424+
_ => state,
425+
}
426+
}
427+
428+
#[tokio::test]
429+
async fn operations_updated_keeps_server_running() {
430+
let running = create_running_server();
431+
let state = State::Running(running);
432+
433+
let event = ServerEvent::OperationsUpdated(vec![RawOperation::from((
434+
"query Valid { id }".to_string(),
435+
Some("valid.graphql".to_string()),
436+
))]);
437+
438+
let new_state = process_event(state, event).await;
439+
440+
assert!(
441+
matches!(new_state, State::Running(_)),
442+
"expected server to remain Running after operations update"
443+
);
444+
}
445+
446+
// A CollectionError while Running should NOT kill the server.
447+
// The server keeps its existing operations and stays alive.
448+
#[tokio::test]
449+
async fn collection_error_keeps_running_server_alive() {
450+
let running = create_running_server();
451+
let state = State::Running(running);
452+
453+
let event = ServerEvent::CollectionError(CollectionError::InvalidVariables(
454+
r#"not valid json"#.to_string(),
455+
));
456+
457+
let new_state = process_event(state, event).await;
458+
459+
assert!(
460+
matches!(new_state, State::Running(_)),
461+
"expected server to remain Running after CollectionError"
462+
);
463+
}
464+
465+
// A CollectionError from a Platform API error while Running should also
466+
// keep the server alive.
467+
#[tokio::test]
468+
async fn collection_api_error_keeps_running_server_alive() {
469+
let running = create_running_server();
470+
let state = State::Running(running);
471+
472+
let event =
473+
ServerEvent::CollectionError(CollectionError::Response("missing data".to_string()));
474+
475+
let new_state = process_event(state, event).await;
476+
477+
assert!(
478+
matches!(new_state, State::Running(_)),
479+
"expected server to remain Running after API collection error"
480+
);
481+
}
482+
483+
// A CollectionError during startup (before Running) should still be fatal.
484+
#[tokio::test]
485+
async fn collection_error_during_startup_is_fatal() {
486+
let event = ServerEvent::CollectionError(CollectionError::InvalidVariables(
487+
r#"bad json"#.to_string(),
488+
));
489+
490+
let state = State::Configuring(Configuring {
491+
config: test_config(),
492+
});
493+
494+
let new_state = process_event(state, event).await;
495+
496+
assert!(
497+
matches!(new_state, State::Error(_)),
498+
"expected CollectionError during startup to be fatal"
499+
);
500+
}
501+
}

0 commit comments

Comments
 (0)