@@ -9,6 +9,15 @@ use tokio_stream::wrappers::ReceiverStream;
99
1010use super :: { error:: CollectionError , event:: CollectionEvent } ;
1111use crate :: platform_api:: PlatformApiConfig ;
12+ use operation_collection_default_polling_query:: {
13+ OperationCollectionDefaultPollingQueryVariant as PollingDefaultGraphVariant ,
14+ OperationCollectionDefaultPollingQueryVariantOnGraphVariantMcpDefaultCollection as PollingDefaultCollection ,
15+ } ;
16+ use operation_collection_default_query:: {
17+ OperationCollectionDefaultQueryVariant ,
18+ OperationCollectionDefaultQueryVariantOnGraphVariantMcpDefaultCollection as DefaultCollectionResult ,
19+ OperationCollectionDefaultQueryVariantOnGraphVariantMcpDefaultCollectionOnOperationCollectionOperations as OperationCollectionDefaultEntry ,
20+ } ;
1221use operation_collection_entries_query:: OperationCollectionEntriesQueryOperationCollectionEntries ;
1322use operation_collection_polling_query:: {
1423 OperationCollectionPollingQueryOperationCollection as PollingOperationCollectionResult ,
@@ -55,6 +64,24 @@ struct OperationCollectionPollingQuery;
5564) ]
5665struct OperationCollectionQuery ;
5766
67+ #[ derive( GraphQLQuery ) ]
68+ #[ graphql(
69+ query_path = "src/platform_api/operation_collections/operation_collections.graphql" ,
70+ schema_path = "src/platform_api/platform-api.graphql" ,
71+ request_derives = "Debug" ,
72+ response_derives = "PartialEq, Debug, Deserialize"
73+ ) ]
74+ struct OperationCollectionDefaultQuery ;
75+
76+ #[ derive( GraphQLQuery ) ]
77+ #[ graphql(
78+ query_path = "src/platform_api/operation_collections/operation_collections.graphql" ,
79+ schema_path = "src/platform_api/platform-api.graphql" ,
80+ request_derives = "Debug" ,
81+ response_derives = "PartialEq, Debug, Deserialize"
82+ ) ]
83+ struct OperationCollectionDefaultPollingQuery ;
84+
5885async fn handle_poll_result (
5986 previous_updated_at : & mut HashMap < String , OperationData > ,
6087 poll : Vec < ( String , String ) > ,
@@ -89,7 +116,7 @@ async fn handle_poll_result(
89116 }
90117
91118 if !changed_ids. is_empty ( ) {
92- tracing:: info !( "changed operation ids: {:?}" , changed_ids) ;
119+ tracing:: debug !( "changed operation ids: {:?}" , changed_ids) ;
93120 let full_response = graphql_request :: < OperationCollectionEntriesQuery > (
94121 & OperationCollectionEntriesQuery :: build_query (
95122 operation_collection_entries_query:: Variables {
@@ -176,10 +203,40 @@ impl From<&OperationCollectionEntriesQueryOperationCollectionEntries> for Operat
176203 }
177204 }
178205}
206+ impl From < & OperationCollectionDefaultEntry > for OperationData {
207+ fn from ( operation : & OperationCollectionDefaultEntry ) -> Self {
208+ Self {
209+ id : operation. id . clone ( ) ,
210+ last_updated_at : operation. last_updated_at . clone ( ) ,
211+ source_text : operation
212+ . operation_data
213+ . current_operation_revision
214+ . body
215+ . clone ( ) ,
216+ headers : operation
217+ . operation_data
218+ . current_operation_revision
219+ . headers
220+ . as_ref ( )
221+ . map ( |headers| {
222+ headers
223+ . iter ( )
224+ . map ( |h| ( h. name . clone ( ) , h. value . clone ( ) ) )
225+ . collect ( )
226+ } ) ,
227+ variables : operation
228+ . operation_data
229+ . current_operation_revision
230+ . variables
231+ . clone ( ) ,
232+ }
233+ }
234+ }
179235
180236#[ derive( Clone ) ]
181237pub enum CollectionSource {
182238 Id ( String , PlatformApiConfig ) ,
239+ Default ( String , PlatformApiConfig ) ,
183240}
184241
185242async fn write_init_response (
@@ -213,11 +270,14 @@ async fn write_init_response(
213270 }
214271}
215272impl CollectionSource {
216- pub fn into_stream ( & self ) -> Pin < Box < dyn Stream < Item = CollectionEvent > + Send > > {
273+ pub fn into_stream ( self ) -> Pin < Box < dyn Stream < Item = CollectionEvent > + Send > > {
217274 match self {
218- CollectionSource :: Id ( id, platform_api_config) => {
275+ CollectionSource :: Id ( ref id, ref platform_api_config) => {
219276 self . collection_id_stream ( id. clone ( ) , platform_api_config. clone ( ) )
220277 }
278+ CollectionSource :: Default ( ref graph_ref, ref platform_api_config) => {
279+ self . default_collection_stream ( graph_ref. clone ( ) , platform_api_config. clone ( ) )
280+ }
221281 }
222282 }
223283
@@ -318,6 +378,134 @@ impl CollectionSource {
318378 } ) ;
319379 Box :: pin ( ReceiverStream :: new ( receiver) )
320380 }
381+
382+ pub fn default_collection_stream (
383+ & self ,
384+ graph_ref : String ,
385+ platform_api_config : PlatformApiConfig ,
386+ ) -> Pin < Box < dyn Stream < Item = CollectionEvent > + Send > > {
387+ let ( sender, receiver) = channel ( 2 ) ;
388+ tokio:: task:: spawn ( async move {
389+ let mut previous_updated_at = HashMap :: new ( ) ;
390+ match graphql_request :: < OperationCollectionDefaultQuery > (
391+ & OperationCollectionDefaultQuery :: build_query (
392+ operation_collection_default_query:: Variables {
393+ graph_ref : graph_ref. clone ( ) ,
394+ } ,
395+ ) ,
396+ & platform_api_config,
397+ )
398+ . await
399+ {
400+ Ok ( response) => match response. variant {
401+ Some ( OperationCollectionDefaultQueryVariant :: GraphVariant ( variant) ) => {
402+ match variant. mcp_default_collection {
403+ DefaultCollectionResult :: OperationCollection ( collection) => {
404+ let should_poll = write_init_response (
405+ & sender,
406+ & mut previous_updated_at,
407+ collection. operations . iter ( ) . map ( OperationData :: from) ,
408+ )
409+ . await ;
410+ if !should_poll {
411+ return ;
412+ }
413+ }
414+ DefaultCollectionResult :: PermissionError ( error) => {
415+ if let Err ( e) = sender
416+ . send ( CollectionEvent :: CollectionError (
417+ CollectionError :: Response ( error. message ) ,
418+ ) )
419+ . await
420+ {
421+ tracing:: debug!(
422+ "failed to send error to collection stream. This is likely to be because the server is shutting down: {e}"
423+ ) ;
424+ return ;
425+ }
426+ }
427+ }
428+ }
429+ Some ( OperationCollectionDefaultQueryVariant :: InvalidRefFormat ( err) ) => {
430+ if let Err ( e) = sender
431+ . send ( CollectionEvent :: CollectionError ( CollectionError :: Response (
432+ err. message ,
433+ ) ) )
434+ . await
435+ {
436+ tracing:: debug!(
437+ "failed to send error to collection stream. This is likely to be because the server is shutting down: {e}"
438+ ) ;
439+ return ;
440+ }
441+ }
442+ None => {
443+ if let Err ( e) = sender
444+ . send ( CollectionEvent :: CollectionError ( CollectionError :: Response (
445+ format ! ( "{graph_ref} not found" ) ,
446+ ) ) )
447+ . await
448+ {
449+ tracing:: debug!(
450+ "failed to send error to collection stream. This is likely to be because the server is shutting down: {e}"
451+ ) ;
452+ }
453+ return ;
454+ }
455+ } ,
456+ Err ( err) => {
457+ if let Err ( e) = sender. send ( CollectionEvent :: CollectionError ( err) ) . await {
458+ tracing:: debug!(
459+ "failed to send error to collection stream. This is likely to be because the server is shutting down: {e}"
460+ ) ;
461+ }
462+ return ;
463+ }
464+ } ;
465+
466+ loop {
467+ tokio:: time:: sleep ( platform_api_config. poll_interval ) . await ;
468+
469+ match poll_operation_collection_default (
470+ graph_ref. clone ( ) ,
471+ & platform_api_config,
472+ & mut previous_updated_at,
473+ )
474+ . await
475+ {
476+ Ok ( Some ( operations) ) => {
477+ let operations_count = operations. len ( ) ;
478+ if let Err ( e) = sender
479+ . send ( CollectionEvent :: UpdateOperationCollection ( operations) )
480+ . await
481+ {
482+ tracing:: debug!(
483+ "failed to push to stream. This is likely to be because the server is shutting down: {e}"
484+ ) ;
485+ break ;
486+ } else if operations_count > MAX_COLLECTION_SIZE_FOR_POLLING {
487+ tracing:: warn!(
488+ "Operation Collection polling disabled. Collection has {operations_count} operations which exceeds the maximum of {MAX_COLLECTION_SIZE_FOR_POLLING}."
489+ ) ;
490+ break ;
491+ }
492+ }
493+ Ok ( None ) => {
494+ tracing:: debug!( "Operation collection unchanged" ) ;
495+ }
496+ Err ( err) => {
497+ if let Err ( e) = sender. send ( CollectionEvent :: CollectionError ( err) ) . await {
498+ tracing:: debug!(
499+ "failed to send error to collection stream. This is likely to be because the server is shutting down: {e}"
500+ ) ;
501+ break ;
502+ }
503+ }
504+ }
505+ }
506+ } ) ;
507+ Box :: pin ( ReceiverStream :: new ( receiver) )
508+ }
321509}
322510
323511async fn poll_operation_collection_id (
@@ -356,12 +544,56 @@ async fn poll_operation_collection_id(
356544 }
357545}
358546
547+ async fn poll_operation_collection_default (
548+ graph_ref : String ,
549+ platform_api_config : & PlatformApiConfig ,
550+ previous_updated_at : & mut HashMap < String , OperationData > ,
551+ ) -> Result < Option < Vec < OperationData > > , CollectionError > {
552+ let response = graphql_request :: < OperationCollectionDefaultPollingQuery > (
553+ & OperationCollectionDefaultPollingQuery :: build_query (
554+ operation_collection_default_polling_query:: Variables { graph_ref } ,
555+ ) ,
556+ platform_api_config,
557+ )
558+ . await ?;
559+
560+ match response. variant {
561+ Some ( PollingDefaultGraphVariant :: GraphVariant ( variant) ) => {
562+ match variant. mcp_default_collection {
563+ PollingDefaultCollection :: OperationCollection ( collection) => {
564+ handle_poll_result (
565+ previous_updated_at,
566+ collection
567+ . operations
568+ . into_iter ( )
569+ . map ( |operation| ( operation. id , operation. last_updated_at ) )
570+ . collect ( ) ,
571+ platform_api_config,
572+ )
573+ . await
574+ }
575+
576+ PollingDefaultCollection :: PermissionError ( error) => {
577+ Err ( CollectionError :: Response ( error. message ) )
578+ }
579+ }
580+ }
581+ Some ( PollingDefaultGraphVariant :: InvalidRefFormat ( err) ) => {
582+ Err ( CollectionError :: Response ( err. message ) )
583+ }
584+ None => Err ( CollectionError :: Response (
585+ "Default collection not found" . to_string ( ) ,
586+ ) ) ,
587+ }
588+ }
589+
359590async fn graphql_request < Query > (
360591 request_body : & graphql_client:: QueryBody < Query :: Variables > ,
361592 platform_api_config : & PlatformApiConfig ,
362593) -> Result < Query :: ResponseData , CollectionError >
363594where
364595 Query : graphql_client:: GraphQLQuery ,
596+ <Query as graphql_client:: GraphQLQuery >:: ResponseData : std:: fmt:: Debug ,
365597{
366598 let res = reqwest:: Client :: new ( )
367599 . post ( platform_api_config. registry_url . clone ( ) )
0 commit comments