Skip to content

Commit 0394dcc

Browse files
authored
Delete operations if they are removed (#144)
* wip * cleanup * remove deleted operations * remove default specific code * undo schema change
1 parent 93eceae commit 0394dcc

File tree

2 files changed

+156
-159
lines changed

2 files changed

+156
-159
lines changed

crates/apollo-mcp-registry/src/platform_api/operation_collections/collection_poller.rs

Lines changed: 152 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,7 @@ use tokio::sync::mpsc::channel;
88
use tokio_stream::wrappers::ReceiverStream;
99

1010
use super::{error::CollectionError, event::CollectionEvent};
11-
use crate::platform_api::{
12-
PlatformApiConfig,
13-
operation_collections::collection_poller::operation_collection_query::{
14-
OperationCollectionQueryOperationCollectionOnNotFoundError as NotFoundError,
15-
OperationCollectionQueryOperationCollectionOnPermissionError as PermissionError,
16-
OperationCollectionQueryOperationCollectionOnValidationError as ValidationError,
17-
},
18-
};
11+
use crate::platform_api::PlatformApiConfig;
1912
use operation_collection_entries_query::OperationCollectionEntriesQueryOperationCollectionEntries;
2013
use operation_collection_polling_query::{
2114
OperationCollectionPollingQueryOperationCollection as PollingOperationCollectionResult,
@@ -25,7 +18,10 @@ use operation_collection_polling_query::{
2518
};
2619
use operation_collection_query::{
2720
OperationCollectionQueryOperationCollection as OperationCollectionResult,
21+
OperationCollectionQueryOperationCollectionOnNotFoundError as NotFoundError,
2822
OperationCollectionQueryOperationCollectionOnOperationCollectionOperations as OperationCollectionEntry,
23+
OperationCollectionQueryOperationCollectionOnPermissionError as PermissionError,
24+
OperationCollectionQueryOperationCollectionOnValidationError as ValidationError,
2925
};
3026

3127
const MAX_COLLECTION_SIZE_FOR_POLLING: usize = 100;
@@ -59,57 +55,67 @@ struct OperationCollectionPollingQuery;
5955
)]
6056
struct OperationCollectionQuery;
6157

62-
fn changed_ids(
63-
previous_updated_at: &mut HashMap<String, CollectionCache>,
64-
poll: operation_collection_polling_query::OperationCollectionPollingQueryOperationCollectionOnOperationCollection,
65-
) -> Vec<String> {
66-
poll.operations
67-
.iter()
68-
.filter_map(|operation| {
69-
let updated_at = operation.last_updated_at.clone();
70-
if let Some(previous_operation) = previous_updated_at.get(&operation.id) {
71-
if updated_at == *previous_operation.last_updated_at {
72-
None
73-
} else {
74-
previous_updated_at.insert(
75-
operation.id.clone(),
76-
CollectionCache {
77-
last_updated_at: updated_at,
78-
operation_data: previous_operation.operation_data.clone(),
79-
},
80-
);
81-
Some(operation.id.clone())
82-
}
83-
} else {
84-
previous_updated_at.insert(
85-
operation.id.clone(),
86-
CollectionCache {
87-
last_updated_at: updated_at,
88-
operation_data: None,
89-
},
90-
);
91-
Some(operation.id.clone())
58+
async fn handle_poll_result(
59+
previous_updated_at: &mut HashMap<String, OperationData>,
60+
poll: Vec<(String, String)>,
61+
platform_api_config: &PlatformApiConfig,
62+
) -> Result<Option<Vec<OperationData>>, CollectionError> {
63+
let mut keep_ids = poll.iter().map(|(id, _)| id);
64+
for id in previous_updated_at.clone().keys() {
65+
if keep_ids.all(|keep_id| keep_id != id) {
66+
previous_updated_at.remove(id);
67+
}
68+
}
69+
70+
let changed_ids: Vec<String> = poll
71+
.into_iter()
72+
.filter_map(|(id, last_updated_at)| match previous_updated_at.get(&id) {
73+
Some(previous_operation) if last_updated_at == previous_operation.last_updated_at => {
74+
None
9275
}
76+
_ => Some(id.clone()),
9377
})
94-
.collect()
78+
.collect();
79+
80+
if changed_ids.is_empty() {
81+
tracing::debug!("no operation changed");
82+
Ok(None)
83+
} else {
84+
tracing::debug!("changed operation ids: {:?}", changed_ids);
85+
let full_response = graphql_request::<OperationCollectionEntriesQuery>(
86+
&OperationCollectionEntriesQuery::build_query(
87+
operation_collection_entries_query::Variables {
88+
collection_entry_ids: changed_ids,
89+
},
90+
),
91+
platform_api_config,
92+
)
93+
.await?;
94+
95+
for operation in full_response.operation_collection_entries {
96+
previous_updated_at.insert(
97+
operation.id.clone(),
98+
OperationData::from(&operation).clone(),
99+
);
100+
}
101+
102+
Ok(Some(previous_updated_at.clone().into_values().collect()))
103+
}
95104
}
96105

97106
#[derive(Clone)]
98107
pub struct OperationData {
108+
id: String,
109+
last_updated_at: String,
99110
pub source_text: String,
100111
pub headers: Option<Vec<(String, String)>>,
101112
pub variables: Option<String>,
102113
}
103-
104-
#[derive(Clone)]
105-
pub struct CollectionCache {
106-
last_updated_at: String,
107-
operation_data: Option<OperationData>,
108-
}
109-
110114
impl From<&OperationCollectionEntry> for OperationData {
111115
fn from(operation: &OperationCollectionEntry) -> Self {
112116
Self {
117+
id: operation.id.clone(),
118+
last_updated_at: operation.last_updated_at.clone(),
113119
source_text: operation
114120
.operation_data
115121
.current_operation_revision
@@ -137,6 +143,8 @@ impl From<&OperationCollectionEntry> for OperationData {
137143
impl From<&OperationCollectionEntriesQueryOperationCollectionEntries> for OperationData {
138144
fn from(operation: &OperationCollectionEntriesQueryOperationCollectionEntries) -> Self {
139145
Self {
146+
id: operation.id.clone(),
147+
last_updated_at: operation.last_updated_at.clone(),
140148
source_text: operation
141149
.operation_data
142150
.current_operation_revision
@@ -163,22 +171,62 @@ impl From<&OperationCollectionEntriesQueryOperationCollectionEntries> for Operat
163171
}
164172

165173
#[derive(Clone)]
166-
pub struct CollectionSource {
167-
pub collection_id: String,
168-
pub platform_api_config: PlatformApiConfig,
174+
pub enum CollectionSource {
175+
Id(String, PlatformApiConfig),
169176
}
170177

178+
async fn write_init_response(
179+
sender: &tokio::sync::mpsc::Sender<CollectionEvent>,
180+
previous_updated_at: &mut HashMap<String, OperationData>,
181+
operations: impl Iterator<Item = OperationData>,
182+
) -> bool {
183+
let operations = operations
184+
.inspect(|operation_data| {
185+
previous_updated_at.insert(operation_data.id.clone(), operation_data.clone());
186+
})
187+
.collect::<Vec<_>>();
188+
let operation_count = operations.len();
189+
if let Err(e) = sender
190+
.send(CollectionEvent::UpdateOperationCollection(operations))
191+
.await
192+
{
193+
tracing::debug!(
194+
"failed to push to stream. This is likely to be because the server is shutting down: {e}"
195+
);
196+
false
197+
} else if operation_count > MAX_COLLECTION_SIZE_FOR_POLLING {
198+
tracing::warn!(
199+
"Operation Collection polling disabled. Collection has {} operations which exceeds the maximum of {}.",
200+
operation_count,
201+
MAX_COLLECTION_SIZE_FOR_POLLING
202+
);
203+
false
204+
} else {
205+
true
206+
}
207+
}
171208
impl CollectionSource {
172-
pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = CollectionEvent> + Send>> {
209+
pub fn into_stream(&self) -> Pin<Box<dyn Stream<Item = CollectionEvent> + Send>> {
210+
match self {
211+
CollectionSource::Id(id, platform_api_config) => {
212+
self.collection_id_stream(id.clone(), platform_api_config.clone())
213+
}
214+
}
215+
}
216+
217+
fn collection_id_stream(
218+
&self,
219+
collection_id: String,
220+
platform_api_config: PlatformApiConfig,
221+
) -> Pin<Box<dyn Stream<Item = CollectionEvent> + Send>> {
173222
let (sender, receiver) = channel(2);
174223
tokio::task::spawn(async move {
175224
let mut previous_updated_at = HashMap::new();
176-
177225
match graphql_request::<OperationCollectionQuery>(
178226
&OperationCollectionQuery::build_query(operation_collection_query::Variables {
179-
operation_collection_id: self.collection_id.clone(),
227+
operation_collection_id: collection_id.clone(),
180228
}),
181-
&self.platform_api_config,
229+
&platform_api_config,
182230
)
183231
.await
184232
{
@@ -199,38 +247,13 @@ impl CollectionSource {
199247
}
200248
}
201249
OperationCollectionResult::OperationCollection(collection) => {
202-
let operation_count = collection.operations.len();
203-
let operations = collection
204-
.operations
205-
.into_iter()
206-
.map(|operation| {
207-
let operation_id = operation.id.clone();
208-
let operation_data = OperationData::from(&operation);
209-
previous_updated_at.insert(
210-
operation_id.clone(),
211-
CollectionCache {
212-
last_updated_at: operation.last_updated_at,
213-
operation_data: Some(operation_data.clone()),
214-
},
215-
);
216-
operation_data
217-
})
218-
.collect::<Vec<_>>();
219-
220-
if let Err(e) = sender
221-
.send(CollectionEvent::UpdateOperationCollection(operations))
222-
.await
223-
{
224-
tracing::debug!(
225-
"failed to push to stream. This is likely to be because the server is shutting down: {e}"
226-
);
227-
return;
228-
} else if operation_count > MAX_COLLECTION_SIZE_FOR_POLLING {
229-
tracing::warn!(
230-
"Operation Collection polling disabled. Collection has {} operations which exceeds the maximum of {}.",
231-
operation_count,
232-
MAX_COLLECTION_SIZE_FOR_POLLING
233-
);
250+
let should_poll = write_init_response(
251+
&sender,
252+
&mut previous_updated_at,
253+
collection.operations.iter().map(OperationData::from),
254+
)
255+
.await;
256+
if !should_poll {
234257
return;
235258
}
236259
}
@@ -246,11 +269,11 @@ impl CollectionSource {
246269
};
247270

248271
loop {
249-
tokio::time::sleep(self.platform_api_config.poll_interval).await;
272+
tokio::time::sleep(platform_api_config.poll_interval).await;
250273

251-
match poll_operation_collection(
252-
self.collection_id.clone(),
253-
&self.platform_api_config,
274+
match poll_operation_collection_id(
275+
collection_id.clone(),
276+
&platform_api_config,
254277
&mut previous_updated_at,
255278
)
256279
.await
@@ -290,6 +313,42 @@ impl CollectionSource {
290313
}
291314
}
292315

316+
async fn poll_operation_collection_id(
317+
collection_id: String,
318+
platform_api_config: &PlatformApiConfig,
319+
previous_updated_at: &mut HashMap<String, OperationData>,
320+
) -> Result<Option<Vec<OperationData>>, CollectionError> {
321+
let response = graphql_request::<OperationCollectionPollingQuery>(
322+
&OperationCollectionPollingQuery::build_query(
323+
operation_collection_polling_query::Variables {
324+
operation_collection_id: collection_id.clone(),
325+
},
326+
),
327+
platform_api_config,
328+
)
329+
.await?;
330+
331+
match response.operation_collection {
332+
PollingOperationCollectionResult::OperationCollection(collection) => {
333+
handle_poll_result(
334+
previous_updated_at,
335+
collection
336+
.operations
337+
.into_iter()
338+
.map(|operation| (operation.id, operation.last_updated_at))
339+
.collect(),
340+
platform_api_config,
341+
)
342+
.await
343+
}
344+
PollingOperationCollectionResult::NotFoundError(PollingNotFoundError { message })
345+
| PollingOperationCollectionResult::PermissionError(PollingPermissionError { message })
346+
| PollingOperationCollectionResult::ValidationError(PollingValidationError { message }) => {
347+
Err(CollectionError::Response(message))
348+
}
349+
}
350+
}
351+
293352
async fn graphql_request<Query>(
294353
request_body: &graphql_client::QueryBody<Query::Variables>,
295354
platform_api_config: &PlatformApiConfig,
@@ -304,7 +363,10 @@ where
304363
HeaderName::from_static("apollographql-client-name"),
305364
HeaderValue::from_static("apollo-mcp-server"),
306365
),
307-
// TODO: add apollographql-client-version header
366+
(
367+
HeaderName::from_static("apollographql-client-version"),
368+
HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
369+
),
308370
(
309371
HeaderName::from_static("x-api-key"),
310372
HeaderValue::from_str(platform_api_config.apollo_key.expose_secret())
@@ -323,68 +385,3 @@ where
323385
.data
324386
.ok_or(CollectionError::Response("missing data".to_string()))
325387
}
326-
327-
async fn poll_operation_collection(
328-
collection_id: String,
329-
platform_api_config: &PlatformApiConfig,
330-
previous_updated_at: &mut HashMap<String, CollectionCache>,
331-
) -> Result<Option<Vec<OperationData>>, CollectionError> {
332-
let response = graphql_request::<OperationCollectionPollingQuery>(
333-
&OperationCollectionPollingQuery::build_query(
334-
operation_collection_polling_query::Variables {
335-
operation_collection_id: collection_id.clone(),
336-
},
337-
),
338-
platform_api_config,
339-
)
340-
.await?;
341-
342-
match response.operation_collection {
343-
PollingOperationCollectionResult::OperationCollection(collection) => {
344-
let changed_ids = changed_ids(previous_updated_at, collection);
345-
346-
if changed_ids.is_empty() {
347-
tracing::debug!("no operation changed");
348-
Ok(None)
349-
} else {
350-
tracing::debug!("changed operation ids: {:?}", changed_ids);
351-
let full_response = graphql_request::<OperationCollectionEntriesQuery>(
352-
&OperationCollectionEntriesQuery::build_query(
353-
operation_collection_entries_query::Variables {
354-
collection_entry_ids: changed_ids,
355-
},
356-
),
357-
platform_api_config,
358-
)
359-
.await?;
360-
361-
let mut updated_operations = HashMap::new();
362-
for (id, collection_data) in previous_updated_at.clone() {
363-
if let Some(operation_data) = collection_data.operation_data.as_ref() {
364-
updated_operations.insert(id, operation_data.clone());
365-
}
366-
}
367-
368-
for operation in full_response.operation_collection_entries {
369-
let operation_id = operation.id.clone();
370-
let operation_data = OperationData::from(&operation);
371-
previous_updated_at.insert(
372-
operation_id.clone(),
373-
CollectionCache {
374-
last_updated_at: operation.last_updated_at,
375-
operation_data: Some(operation_data.clone()),
376-
},
377-
);
378-
updated_operations.insert(operation_id.clone(), operation_data.clone());
379-
}
380-
381-
Ok(Some(updated_operations.into_values().collect()))
382-
}
383-
}
384-
PollingOperationCollectionResult::NotFoundError(PollingNotFoundError { message })
385-
| PollingOperationCollectionResult::PermissionError(PollingPermissionError { message })
386-
| PollingOperationCollectionResult::ValidationError(PollingValidationError { message }) => {
387-
Err(CollectionError::Response(message))
388-
}
389-
}
390-
}

0 commit comments

Comments
 (0)