@@ -461,9 +461,13 @@ mod tests_in_gcp {
461461 use crate :: client:: { Client , ClientConfig } ;
462462 use crate :: publisher:: PublisherConfig ;
463463 use google_cloud_gax:: conn:: Environment ;
464+ use google_cloud_gax:: grpc:: codegen:: tokio_stream:: StreamExt ;
464465 use google_cloud_googleapis:: pubsub:: v1:: PubsubMessage ;
465466 use serial_test:: serial;
467+ use std:: collections:: HashMap ;
468+
466469 use std:: time:: Duration ;
470+ use tokio_util:: sync:: CancellationToken ;
467471
468472 fn make_msg ( key : & str ) -> PubsubMessage {
469473 PubsubMessage {
@@ -581,4 +585,70 @@ mod tests_in_gcp {
581585 tracing:: info!( "msg id {}" , awaiter. get( ) . await . unwrap( ) ) ;
582586 }
583587 }
588+ #[ tokio:: test]
589+ #[ serial]
590+ #[ ignore]
591+ async fn test_subscribe_exactly_once_delivery ( ) {
592+ let client = Client :: new ( ClientConfig :: default ( ) . with_auth ( ) . await . unwrap ( ) )
593+ . await
594+ . unwrap ( ) ;
595+
596+ // Check if the subscription is exactly_once_delivery
597+ let subscription = client. subscription ( "eod-test" ) ;
598+ let config = subscription. config ( None ) . await . unwrap ( ) . 1 ;
599+ assert ! ( config. enable_exactly_once_delivery) ;
600+
601+ // publish message
602+ let ctx = CancellationToken :: new ( ) ;
603+ let ctx_pub = ctx. clone ( ) ;
604+ let publisher = client. topic ( "eod-test" ) . new_publisher ( None ) ;
605+ let pub_task = tokio:: spawn ( async move {
606+ tracing:: info!( "start publisher" ) ;
607+ loop {
608+ if ctx_pub. is_cancelled ( ) {
609+ tracing:: info!( "finish publisher" ) ;
610+ return ;
611+ }
612+ publisher
613+ . publish_blocking ( PubsubMessage {
614+ data : "msg" . into ( ) ,
615+ ..Default :: default ( )
616+ } )
617+ . get ( )
618+ . await
619+ . unwrap ( ) ;
620+ }
621+ } ) ;
622+
623+ // subscribe message
624+ let ctx_sub = ctx. clone ( ) ;
625+ let sub_task = tokio:: spawn ( async move {
626+ tracing:: info!( "start subscriber" ) ;
627+ let mut stream = subscription. subscribe ( None ) . await . unwrap ( ) ;
628+ let mut msgs = HashMap :: new ( ) ;
629+ while let Some ( message) = stream. next ( ) . await {
630+ if ctx_sub. is_cancelled ( ) {
631+ break ;
632+ }
633+ let msg_id = & message. message . message_id ;
634+ * msgs. entry ( msg_id. clone ( ) ) . or_insert ( 0 ) += 1 ;
635+ message. ack ( ) . await . unwrap ( ) ;
636+ }
637+ tracing:: info!( "finish subscriber" ) ;
638+ msgs
639+ } ) ;
640+
641+ tokio:: time:: sleep ( Duration :: from_secs ( 30 ) ) . await ;
642+
643+ // check redelivered messages
644+ ctx. cancel ( ) ;
645+ pub_task. await . unwrap ( ) ;
646+ let received_msgs = sub_task. await . unwrap ( ) ;
647+ assert ! ( !received_msgs. is_empty( ) ) ;
648+
649+ tracing:: info!( "Number of received messages = {}" , received_msgs. len( ) ) ;
650+ for ( msg_id, count) in received_msgs {
651+ assert_eq ! ( count, 1 , "msg_id = {msg_id}, count = {count}" ) ;
652+ }
653+ }
584654}
0 commit comments