@@ -122,6 +122,18 @@ void ReadStreamLayerClientTest::SetUpCommonNetworkMockCalls() {
122122 olp::http::HttpStatusCode::CREATED),
123123 HTTP_RESPONSE_STREAM_LAYER_SUBSCRIPTION));
124124
125+ ON_CALL (*network_mock_,
126+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_SERIAL), _, _, _, _))
127+ .WillByDefault (ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (
128+ olp::http::HttpStatusCode::OK),
129+ HTTP_RESPONSE_EMPTY));
130+
131+ ON_CALL (*network_mock_,
132+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_PARALLEL), _, _, _, _))
133+ .WillByDefault (ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (
134+ olp::http::HttpStatusCode::OK),
135+ HTTP_RESPONSE_EMPTY));
136+
125137 // Catch any non-interesting network calls that don't need to be verified
126138 EXPECT_CALL (*network_mock_, Send (_, _, _, _, _)).Times (AtLeast (0 ));
127139}
@@ -471,24 +483,248 @@ TEST_F(ReadStreamLayerClientTest, SubscribeCancelFuture) {
471483 EXPECT_EQ (ErrorCode::Cancelled, response.GetError ().GetErrorCode ());
472484}
473485
474- TEST_F (ReadStreamLayerClientTest, CancelPendingRequestsSubscribe) {
486+ TEST_F (ReadStreamLayerClientTest, Unsubscribe) {
487+ HRN hrn (GetTestCatalog ());
488+
489+ {
490+ SCOPED_TRACE (" Unsubscribe succeeds, serial subscription" );
491+
492+ EXPECT_CALL (*network_mock_,
493+ Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
494+ .Times (1 );
495+
496+ EXPECT_CALL (*network_mock_,
497+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_SERIAL), _, _, _, _))
498+ .Times (1 );
499+
500+ EXPECT_CALL (
501+ *network_mock_,
502+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_SERIAL), _, _, _, _))
503+ .Times (1 );
504+
505+ StreamLayerClient client (hrn, kLayerId , settings_);
506+
507+ auto subscribe_future = client.Subscribe (SubscribeRequest ()).GetFuture ();
508+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
509+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
510+
511+ // Unsubscribe part
512+ UnsubscribeResponse unsubscribe_response;
513+ Condition condition;
514+ client.Unsubscribe ([&](UnsubscribeResponse response) {
515+ unsubscribe_response = std::move (response);
516+ condition.Notify ();
517+ });
518+
519+ ASSERT_TRUE (condition.Wait (kTimeout ));
520+ ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
521+ EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
522+
523+ ::Mock::VerifyAndClearExpectations (network_mock_.get());
524+ }
525+ {
526+ SCOPED_TRACE (" Unsubscribe succeeds, parallel subscription" );
527+
528+ EXPECT_CALL (*network_mock_,
529+ Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
530+ .Times (1 );
531+
532+ EXPECT_CALL (*network_mock_,
533+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_PARALLEL), _, _, _, _))
534+ .Times (1 );
535+
536+ EXPECT_CALL (
537+ *network_mock_,
538+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_PARALLEL), _, _, _, _))
539+ .Times (1 );
540+
541+ StreamLayerClient client (hrn, kLayerId , settings_);
542+
543+ auto subscribe_future =
544+ client
545+ .Subscribe (SubscribeRequest ().WithSubscriptionMode (
546+ SubscribeRequest::SubscriptionMode::kParallel ))
547+ .GetFuture ();
548+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
549+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
550+
551+ // Unsubscribe part
552+ UnsubscribeResponse unsubscribe_response;
553+ Condition condition;
554+ client.Unsubscribe ([&](UnsubscribeResponse response) {
555+ unsubscribe_response = std::move (response);
556+ condition.Notify ();
557+ });
558+
559+ ASSERT_TRUE (condition.Wait (kTimeout ));
560+ ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
561+ EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
562+
563+ ::Mock::VerifyAndClearExpectations (network_mock_.get());
564+ }
565+ {
566+ SCOPED_TRACE (
567+ " Unsubscribe succeeds, parallel subscription with provided ConsumerID "
568+ " and SubscriptionID" );
569+
570+ EXPECT_CALL (*network_mock_,
571+ Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
572+ .Times (1 );
573+
574+ EXPECT_CALL (
575+ *network_mock_,
576+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_ALL_PARAMETERS), _, _, _, _))
577+ .Times (1 );
578+
579+ EXPECT_CALL (
580+ *network_mock_,
581+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_PARALLEL), _, _, _, _))
582+ .Times (1 );
583+
584+ StreamLayerClient client (hrn, kLayerId , settings_);
585+
586+ auto subscribe_future =
587+ client
588+ .Subscribe (SubscribeRequest ()
589+ .WithConsumerId (kConsumerID )
590+ .WithSubscriptionId (kSubscriptionId )
591+ .WithSubscriptionMode (
592+ SubscribeRequest::SubscriptionMode::kParallel ))
593+ .GetFuture ();
594+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
595+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
596+
597+ UnsubscribeResponse unsubscribe_response;
598+ Condition condition;
599+ client.Unsubscribe ([&](UnsubscribeResponse response) {
600+ unsubscribe_response = std::move (response);
601+ condition.Notify ();
602+ });
603+
604+ ASSERT_TRUE (condition.Wait (kTimeout ));
605+ ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
606+ EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
607+
608+ ::Mock::VerifyAndClearExpectations (network_mock_.get());
609+ }
610+ {
611+ SCOPED_TRACE (" Unsubscribe fails, subscription missing" );
612+
613+ StreamLayerClient client (hrn, kLayerId , settings_);
614+
615+ UnsubscribeResponse unsubscribe_response;
616+ Condition condition;
617+ client.Unsubscribe ([&](UnsubscribeResponse response) {
618+ unsubscribe_response = std::move (response);
619+ condition.Notify ();
620+ });
621+
622+ ASSERT_TRUE (condition.Wait (kTimeout ));
623+ ASSERT_FALSE (unsubscribe_response.IsSuccessful ());
624+
625+ EXPECT_EQ (unsubscribe_response.GetError ().GetErrorCode (),
626+ ErrorCode::PreconditionFailed);
627+
628+ ::Mock::VerifyAndClearExpectations (network_mock_.get());
629+ }
630+ {
631+ SCOPED_TRACE (" Unsubscribe fails, server error" );
632+
633+ EXPECT_CALL (*network_mock_,
634+ Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
635+ .Times (1 );
636+
637+ EXPECT_CALL (*network_mock_,
638+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_SERIAL), _, _, _, _))
639+ .Times (1 );
640+
641+ EXPECT_CALL (
642+ *network_mock_,
643+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_SERIAL), _, _, _, _))
644+ .WillOnce (ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (
645+ olp::http::HttpStatusCode::NOT_FOUND),
646+ HTTP_RESPONSE_UNSUBSCRIBE_404));
647+
648+ StreamLayerClient client (hrn, kLayerId , settings_);
649+
650+ auto subscribe_future = client.Subscribe (SubscribeRequest ()).GetFuture ();
651+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
652+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
653+
654+ // Unsubscribe part
655+ UnsubscribeResponse unsubscribe_response;
656+ Condition condition;
657+ client.Unsubscribe ([&](UnsubscribeResponse response) {
658+ unsubscribe_response = std::move (response);
659+ condition.Notify ();
660+ });
661+
662+ ASSERT_TRUE (condition.Wait (kTimeout ));
663+ ASSERT_FALSE (unsubscribe_response.IsSuccessful ());
664+ EXPECT_EQ (unsubscribe_response.GetError ().GetErrorCode (),
665+ ErrorCode::NotFound);
666+
667+ ::Mock::VerifyAndClearExpectations (network_mock_.get());
668+ }
669+ }
670+
671+ TEST_F (ReadStreamLayerClientTest, UnsubscribeCancellableFuture) {
672+ HRN hrn (GetTestCatalog ());
673+
674+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
675+ .Times (1 );
676+
677+ EXPECT_CALL (*network_mock_,
678+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_SERIAL), _, _, _, _))
679+ .Times (1 );
680+
681+ EXPECT_CALL (*network_mock_,
682+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_SERIAL), _, _, _, _))
683+ .Times (1 );
684+
685+ StreamLayerClient client (hrn, kLayerId , settings_);
686+
687+ auto subscribe_future = client.Subscribe (SubscribeRequest ()).GetFuture ();
688+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
689+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
690+
691+ // Unsubscribe part
692+ UnsubscribeResponse unsubscribe_response;
693+ auto unsubscribe_future = client.Unsubscribe ().GetFuture ();
694+
695+ ASSERT_EQ (unsubscribe_future.wait_for (kTimeout ), std::future_status::ready);
696+
697+ const auto & response = unsubscribe_future.get ();
698+ ASSERT_TRUE (response.IsSuccessful ());
699+
700+ EXPECT_EQ (response.GetResult (), kSubscriptionId );
701+ }
702+
703+ TEST_F (ReadStreamLayerClientTest, UnsubscribeCancelFuture) {
475704 HRN hrn (GetTestCatalog ());
476705
477706 auto request_started = std::make_shared<std::promise<void >>();
478707 auto continue_request = std::make_shared<std::promise<void >>();
479-
480708 {
481709 olp::http::RequestId request_id;
482710 NetworkCallback send_mock;
483711 CancelCallback cancel_mock;
484712
485- std::tie (request_id, send_mock, cancel_mock) =
486- GenerateNetworkMockActions (request_started, continue_request,
487- {olp::http::HttpStatusCode::OK,
488- HTTP_RESPONSE_STREAM_LAYER_SUBSCRIPTION});
713+ std::tie (request_id, send_mock, cancel_mock) = GenerateNetworkMockActions (
714+ request_started, continue_request,
715+ {olp::http::HttpStatusCode::OK, HTTP_RESPONSE_EMPTY});
489716
490717 EXPECT_CALL (*network_mock_,
491718 Send (IsGetRequest (URL_LOOKUP_STREAM), _, _, _, _))
719+ .Times (1 );
720+
721+ EXPECT_CALL (*network_mock_,
722+ Send (IsPostRequest (URL_STREAM_SUBSCRIBE_SERIAL), _, _, _, _))
723+ .Times (1 );
724+
725+ EXPECT_CALL (
726+ *network_mock_,
727+ Send (IsDeleteRequest (URL_STREAM_UNSUBSCRIBE_SERIAL), _, _, _, _))
492728 .Times (1 )
493729 .WillOnce (Invoke (std::move (send_mock)));
494730
@@ -498,19 +734,56 @@ TEST_F(ReadStreamLayerClientTest, CancelPendingRequestsSubscribe) {
498734
499735 StreamLayerClient client (hrn, kLayerId , settings_);
500736
501- auto future = client.Subscribe (SubscribeRequest ());
737+ auto subscribe_future = client.Subscribe (SubscribeRequest ()).GetFuture ();
738+ ASSERT_EQ (subscribe_future.wait_for (kTimeout ), std::future_status::ready);
739+ ASSERT_TRUE (subscribe_future.get ().IsSuccessful ());
740+
741+ auto cancellable_future = client.Unsubscribe ();
502742
503743 request_started->get_future ().get ();
504- client. CancelPendingRequests ();
744+ cancellable_future. GetCancellationToken (). Cancel ();
505745 continue_request->set_value ();
506746
507- auto subscribe_response = future .GetFuture ().get ();
747+ auto unsubscribe_response = cancellable_future .GetFuture ().get ();
508748
509- ASSERT_FALSE (subscribe_response.IsSuccessful ());
749+ ASSERT_FALSE (unsubscribe_response.IsSuccessful ());
750+
751+ EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
752+ unsubscribe_response.GetError ().GetHttpStatusCode ());
753+ EXPECT_EQ (ErrorCode::Cancelled,
754+ unsubscribe_response.GetError ().GetErrorCode ());
755+ }
756+
757+ TEST_F (ReadStreamLayerClientTest, CancelPendingRequests) {
758+ HRN hrn (GetTestCatalog ());
510759
760+ // Simulate a loaded queue
761+ std::promise<void > promise;
762+ auto future = promise.get_future ();
763+ settings_.task_scheduler ->ScheduleTask ([&future]() { future.get (); });
764+
765+ StreamLayerClient client (hrn, kLayerId , settings_);
766+
767+ auto subscribe_future = client.Subscribe (SubscribeRequest ());
768+ auto unsubscribe_future = client.Unsubscribe ();
769+
770+ client.CancelPendingRequests ();
771+
772+ promise.set_value ();
773+
774+ auto subscribe_response = subscribe_future.GetFuture ().get ();
775+
776+ ASSERT_FALSE (subscribe_response.IsSuccessful ());
511777 EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
512778 subscribe_response.GetError ().GetHttpStatusCode ());
513779 EXPECT_EQ (ErrorCode::Cancelled, subscribe_response.GetError ().GetErrorCode ());
780+
781+ auto unsubscribe_response = unsubscribe_future.GetFuture ().get ();
782+ ASSERT_FALSE (unsubscribe_response.IsSuccessful ());
783+ EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
784+ unsubscribe_response.GetError ().GetHttpStatusCode ());
785+ EXPECT_EQ (ErrorCode::Cancelled,
786+ unsubscribe_response.GetError ().GetErrorCode ());
514787}
515788
516789} // namespace
0 commit comments