@@ -23,7 +23,9 @@ use rabbitmq_http_client::commons::{PolicyTarget, VirtualHostLimitTarget};
2323use rabbitmq_http_client:: requests:: {
2424 Amqp10ShovelDestinationParams , Amqp10ShovelParams , Amqp10ShovelSourceParams ,
2525 Amqp091ShovelDestinationParams , Amqp091ShovelParams , Amqp091ShovelSourceParams ,
26- EnforcedLimitParams ,
26+ EnforcedLimitParams , ExchangeFederationParams , FEDERATION_UPSTREAM_COMPONENT ,
27+ FederationResourceCleanupMode , FederationUpstreamParams , QueueFederationParams ,
28+ RuntimeParameterDefinition ,
2729} ;
2830use std:: fs;
2931use std:: process;
@@ -362,6 +364,248 @@ pub fn delete_shovel(
362364 client. delete_shovel ( vhost, & name, true )
363365}
364366
367+ //
368+ // Federation
369+ //
370+
371+ pub fn list_federation_upstreams (
372+ client : APIClient ,
373+ ) -> ClientResult < Vec < responses:: FederationUpstream > > {
374+ client. list_federation_upstreams ( )
375+ }
376+
377+ pub fn declare_federation_upstream (
378+ client : APIClient ,
379+ vhost : & str ,
380+ command_args : & ArgMatches ,
381+ ) -> ClientResult < ( ) > {
382+ // common settings
383+ let name = command_args. get_one :: < String > ( "name" ) . cloned ( ) . unwrap ( ) ;
384+ let uri = command_args. get_one :: < String > ( "uri" ) . cloned ( ) . unwrap ( ) ;
385+ let reconnect_delay = command_args
386+ . get_one :: < u16 > ( "reconnect_delay" )
387+ . cloned ( )
388+ . unwrap ( ) ;
389+ let trust_user_id = command_args
390+ . get_one :: < bool > ( "trust_user_id" )
391+ . cloned ( )
392+ . unwrap ( ) ;
393+ let prefetch_count = command_args
394+ . get_one :: < u16 > ( "prefetch_count" )
395+ . cloned ( )
396+ . unwrap ( ) ;
397+ let ack_mode = command_args
398+ . get_one :: < MessageTransferAcknowledgementMode > ( "ack_mode" )
399+ . cloned ( )
400+ . unwrap ( ) ;
401+
402+ // optional queue federation settings
403+ let queue_name = command_args. get_one :: < String > ( "queue_name" ) . cloned ( ) ;
404+ let consumer_tag = command_args. get_one :: < String > ( "consumer_tag" ) . cloned ( ) ;
405+ let qn: String ;
406+ let ct: String ;
407+ let qfp = match ( queue_name, consumer_tag) {
408+ ( Some ( queue_name) , Some ( consumer_tag) ) => {
409+ qn = queue_name. clone ( ) ;
410+ ct = consumer_tag. clone ( ) ;
411+ let qfp = QueueFederationParams :: new_with_consumer_tag ( & qn, & ct) ;
412+ Some ( qfp)
413+ }
414+ ( Some ( queue_name) , None ) => {
415+ qn = queue_name. clone ( ) ;
416+ let qfp = QueueFederationParams :: new ( & qn) ;
417+ Some ( qfp)
418+ }
419+ ( None , Some ( _) ) => None ,
420+ ( None , None ) => None ,
421+ } ;
422+
423+ // optional exchange federation settings
424+ let exchange_name = command_args
425+ . get_one :: < String > ( "exchange_name" )
426+ . map ( |s| s. as_str ( ) ) ;
427+ let queue_type = command_args
428+ . get_one :: < String > ( "queue_type" )
429+ . map ( |s| Into :: < QueueType > :: into ( s. as_str ( ) ) )
430+ . unwrap_or_default ( ) ;
431+ let max_hops = command_args. get_one :: < u8 > ( "max_hops" ) . copied ( ) ;
432+ let resource_cleanup_mode = command_args
433+ . get_one :: < FederationResourceCleanupMode > ( "resource_cleanup_mode" )
434+ . cloned ( )
435+ . unwrap_or_default ( ) ;
436+ let bind_using_nowait = command_args
437+ . get_one :: < bool > ( "bind_nowait" )
438+ . cloned ( )
439+ . unwrap_or_default ( ) ;
440+ let ttl = command_args. get_one :: < u32 > ( "ttl" ) . cloned ( ) ;
441+ let message_ttl = command_args. get_one :: < u32 > ( "message_ttl" ) . cloned ( ) ;
442+ let efp = Some ( ExchangeFederationParams {
443+ exchange : exchange_name,
444+ max_hops,
445+ queue_type,
446+ ttl,
447+ message_ttl,
448+ resource_cleanup_mode,
449+ } ) ;
450+
451+ // putting it all together
452+ let upstream = FederationUpstreamParams {
453+ name : & name,
454+ vhost,
455+ uri : & uri,
456+ reconnect_delay,
457+ trust_user_id,
458+ prefetch_count,
459+ ack_mode,
460+ bind_using_nowait,
461+ queue_federation : qfp,
462+ exchange_federation : efp,
463+ } ;
464+ let param = RuntimeParameterDefinition :: from ( upstream) ;
465+ client. upsert_runtime_parameter ( & param)
466+ }
467+
468+ pub fn declare_federation_upstream_for_exchange_federation (
469+ client : APIClient ,
470+ vhost : & str ,
471+ command_args : & ArgMatches ,
472+ ) -> ClientResult < ( ) > {
473+ let name = command_args. get_one :: < String > ( "name" ) . cloned ( ) . unwrap ( ) ;
474+ let uri = command_args. get_one :: < String > ( "uri" ) . cloned ( ) . unwrap ( ) ;
475+ let reconnect_delay = command_args
476+ . get_one :: < u16 > ( "reconnect_delay" )
477+ . cloned ( )
478+ . unwrap ( ) ;
479+ let trust_user_id = command_args
480+ . get_one :: < bool > ( "trust_user_id" )
481+ . cloned ( )
482+ . unwrap ( ) ;
483+ let prefetch_count = command_args
484+ . get_one :: < u16 > ( "prefetch_count" )
485+ . cloned ( )
486+ . unwrap ( ) ;
487+ let ack_mode = command_args
488+ . get_one :: < MessageTransferAcknowledgementMode > ( "ack_mode" )
489+ . cloned ( )
490+ . unwrap ( ) ;
491+
492+ let exchange_name = command_args
493+ . get_one :: < String > ( "exchange_name" )
494+ . map ( |s| s. as_str ( ) ) ;
495+ let queue_type = command_args
496+ . get_one :: < String > ( "queue_type" )
497+ . map ( |s| Into :: < QueueType > :: into ( s. as_str ( ) ) )
498+ . unwrap_or_default ( ) ;
499+ let max_hops = command_args. get_one :: < u8 > ( "max_hops" ) . copied ( ) ;
500+ let resource_cleanup_mode = command_args
501+ . get_one :: < FederationResourceCleanupMode > ( "resource_cleanup_mode" )
502+ . cloned ( )
503+ . unwrap_or_default ( ) ;
504+ let bind_using_nowait = command_args
505+ . get_one :: < bool > ( "bind_nowait" )
506+ . cloned ( )
507+ . unwrap_or_default ( ) ;
508+ let ttl = command_args. get_one :: < u32 > ( "ttl" ) . cloned ( ) ;
509+ let message_ttl = command_args. get_one :: < u32 > ( "message_ttl" ) . cloned ( ) ;
510+ let efp = Some ( ExchangeFederationParams {
511+ exchange : exchange_name,
512+ max_hops,
513+ queue_type,
514+ ttl,
515+ message_ttl,
516+ resource_cleanup_mode,
517+ } ) ;
518+
519+ // putting it all together
520+ let upstream = FederationUpstreamParams {
521+ name : & name,
522+ vhost,
523+ uri : & uri,
524+ reconnect_delay,
525+ trust_user_id,
526+ prefetch_count,
527+ ack_mode,
528+ bind_using_nowait,
529+ queue_federation : None ,
530+ exchange_federation : efp,
531+ } ;
532+ let param = RuntimeParameterDefinition :: from ( upstream) ;
533+ client. upsert_runtime_parameter ( & param)
534+ }
535+
536+ pub fn declare_federation_upstream_for_queue_federation (
537+ client : APIClient ,
538+ vhost : & str ,
539+ command_args : & ArgMatches ,
540+ ) -> ClientResult < ( ) > {
541+ let name = command_args. get_one :: < String > ( "name" ) . cloned ( ) . unwrap ( ) ;
542+ let uri = command_args. get_one :: < String > ( "uri" ) . cloned ( ) . unwrap ( ) ;
543+ let reconnect_delay = command_args
544+ . get_one :: < u16 > ( "reconnect_delay" )
545+ . cloned ( )
546+ . unwrap ( ) ;
547+ let trust_user_id = command_args
548+ . get_one :: < bool > ( "trust_user_id" )
549+ . cloned ( )
550+ . unwrap ( ) ;
551+ let prefetch_count = command_args
552+ . get_one :: < u16 > ( "prefetch_count" )
553+ . cloned ( )
554+ . unwrap ( ) ;
555+ let ack_mode = command_args
556+ . get_one :: < MessageTransferAcknowledgementMode > ( "ack_mode" )
557+ . cloned ( )
558+ . unwrap ( ) ;
559+
560+ let queue_name = command_args. get_one :: < String > ( "queue_name" ) . cloned ( ) ;
561+ let consumer_tag = command_args. get_one :: < String > ( "consumer_tag" ) . cloned ( ) ;
562+ let qn: String ;
563+ let ct: String ;
564+ let qfp = match ( queue_name, consumer_tag) {
565+ ( Some ( queue_name) , Some ( consumer_tag) ) => {
566+ qn = queue_name. clone ( ) ;
567+ ct = consumer_tag. clone ( ) ;
568+ let qfp = QueueFederationParams :: new_with_consumer_tag ( & qn, & ct) ;
569+ Some ( qfp)
570+ }
571+ ( Some ( queue_name) , None ) => {
572+ qn = queue_name. clone ( ) ;
573+ let qfp = QueueFederationParams :: new ( & qn) ;
574+ Some ( qfp)
575+ }
576+ ( None , Some ( _) ) => None ,
577+ ( None , None ) => None ,
578+ } ;
579+
580+ let upstream = FederationUpstreamParams {
581+ name : & name,
582+ vhost,
583+ uri : & uri,
584+ reconnect_delay,
585+ trust_user_id,
586+ prefetch_count,
587+ ack_mode,
588+ bind_using_nowait : false ,
589+ queue_federation : qfp,
590+ exchange_federation : None ,
591+ } ;
592+ let param = RuntimeParameterDefinition :: from ( upstream) ;
593+ client. upsert_runtime_parameter ( & param)
594+ }
595+
596+ pub fn delete_federation_upstream (
597+ client : APIClient ,
598+ vhost : & str ,
599+ command_args : & ArgMatches ,
600+ ) -> ClientResult < ( ) > {
601+ let name = command_args. get_one :: < String > ( "name" ) . cloned ( ) . unwrap ( ) ;
602+ client. clear_runtime_parameter ( FEDERATION_UPSTREAM_COMPONENT , vhost, & name)
603+ }
604+
605+ //
606+ // Feature flags
607+ //
608+
365609pub fn enable_feature_flag ( client : APIClient , command_args : & ArgMatches ) -> ClientResult < ( ) > {
366610 let name = command_args. get_one :: < String > ( "name" ) . cloned ( ) . unwrap ( ) ;
367611 client. enable_feature_flag ( & name)
@@ -371,6 +615,10 @@ pub fn enable_all_stable_feature_flags(client: APIClient) -> ClientResult<()> {
371615 client. enable_all_stable_feature_flags ( )
372616}
373617
618+ //
619+ // Deprecated features
620+ //
621+
374622pub fn list_deprecated_features (
375623 client : APIClient ,
376624) -> ClientResult < responses:: DeprecatedFeatureList > {
@@ -383,6 +631,10 @@ pub fn list_deprecated_features_in_use(
383631 client. list_deprecated_features_in_use ( )
384632}
385633
634+ //
635+ // Declaration of core resources
636+ //
637+
386638pub fn declare_vhost ( client : APIClient , command_args : & ArgMatches ) -> ClientResult < ( ) > {
387639 // the flag is required
388640 let name = command_args. get_one :: < String > ( "name" ) . unwrap ( ) ;
0 commit comments