@@ -36,6 +36,7 @@ using namespace testing;
3636const std::string kCatalog =
3737 " hrn:here:data::olp-here-test:hereos-internal-test-v2" ;
3838const std::string kConsumerID = " consumer_id_1234" ;
39+ const std::string kDataHandle = " 4eed6ed1-0d32-43b9-ae79-043cb4256432" ;
3940const std::string kLayerId = " testlayer" ;
4041const std::string kSubscriptionId = " subscribe_id_12345" ;
4142const auto kTimeout = std::chrono::seconds(5 );
@@ -77,7 +78,7 @@ void ReadStreamLayerClientTest::SetUp() {
7778}
7879
7980void ReadStreamLayerClientTest::TearDown () {
80- :: Mock::VerifyAndClearExpectations (network_mock_.get());
81+ Mock::VerifyAndClearExpectations (network_mock_.get ());
8182 network_mock_.reset ();
8283}
8384
@@ -134,6 +135,16 @@ void ReadStreamLayerClientTest::SetUpCommonNetworkMockCalls() {
134135 olp::http::HttpStatusCode::OK),
135136 HTTP_RESPONSE_EMPTY));
136137
138+ ON_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
139+ .WillByDefault (
140+ ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (200 ),
141+ HTTP_RESPONSE_LOOKUP_BLOB));
142+
143+ ON_CALL (*network_mock_, Send (IsGetRequest (URL_BLOB_DATA_269), _, _, _, _))
144+ .WillByDefault (
145+ ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (200 ),
146+ HTTP_RESPONSE_BLOB_DATA_STREAM_MESSAGE));
147+
137148 // Catch any non-interesting network calls that don't need to be verified
138149 EXPECT_CALL (*network_mock_, Send (_, _, _, _, _)).Times (AtLeast (0 ));
139150}
@@ -167,7 +178,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
167178
168179 EXPECT_EQ (kSubscriptionId , subscribe_response.GetResult ().c_str ());
169180
170- :: Mock::VerifyAndClearExpectations (network_mock_.get());
181+ Mock::VerifyAndClearExpectations (network_mock_.get ());
171182 }
172183 {
173184 SCOPED_TRACE (" Subscribe succeeds, parallel" );
@@ -198,7 +209,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
198209
199210 EXPECT_EQ (kSubscriptionId , subscribe_response.GetResult ().c_str ());
200211
201- :: Mock::VerifyAndClearExpectations (network_mock_.get());
212+ Mock::VerifyAndClearExpectations (network_mock_.get ());
202213 }
203214 {
204215 SCOPED_TRACE (" Subscribe succeeds, with subscriptionID" );
@@ -229,7 +240,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
229240
230241 EXPECT_EQ (kSubscriptionId , subscribe_response.GetResult ().c_str ());
231242
232- :: Mock::VerifyAndClearExpectations (network_mock_.get());
243+ Mock::VerifyAndClearExpectations (network_mock_.get ());
233244 }
234245 {
235246 SCOPED_TRACE (" Subscribe succeeds, with consumerID" );
@@ -260,7 +271,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
260271
261272 EXPECT_EQ (kSubscriptionId , subscribe_response.GetResult ().c_str ());
262273
263- :: Mock::VerifyAndClearExpectations (network_mock_.get());
274+ Mock::VerifyAndClearExpectations (network_mock_.get ());
264275 }
265276 {
266277 SCOPED_TRACE (" Subscribe succeeds, multiple query parameters" );
@@ -295,7 +306,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
295306
296307 EXPECT_EQ (kSubscriptionId , subscribe_response.GetResult ().c_str ());
297308
298- :: Mock::VerifyAndClearExpectations (network_mock_.get());
309+ Mock::VerifyAndClearExpectations (network_mock_.get ());
299310 }
300311 {
301312 SCOPED_TRACE (" Subscribe fails, incorrect request" );
@@ -323,7 +334,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
323334
324335 EXPECT_FALSE (subscribe_response.IsSuccessful ());
325336
326- :: Mock::VerifyAndClearExpectations (network_mock_.get());
337+ Mock::VerifyAndClearExpectations (network_mock_.get ());
327338 }
328339 {
329340 SCOPED_TRACE (" Subscribe fails, incorrect hrn" );
@@ -347,7 +358,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
347358
348359 EXPECT_FALSE (subscribe_response.IsSuccessful ());
349360
350- :: Mock::VerifyAndClearExpectations (network_mock_.get());
361+ Mock::VerifyAndClearExpectations (network_mock_.get ());
351362 }
352363 {
353364 SCOPED_TRACE (" Subscribe fails, incorrect layer" );
@@ -375,7 +386,7 @@ TEST_F(ReadStreamLayerClientTest, Subscribe) {
375386
376387 EXPECT_FALSE (subscribe_response.IsSuccessful ());
377388
378- :: Mock::VerifyAndClearExpectations (network_mock_.get());
389+ Mock::VerifyAndClearExpectations (network_mock_.get ());
379390 }
380391}
381392
@@ -520,7 +531,7 @@ TEST_F(ReadStreamLayerClientTest, Unsubscribe) {
520531 ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
521532 EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
522533
523- :: Mock::VerifyAndClearExpectations (network_mock_.get());
534+ Mock::VerifyAndClearExpectations (network_mock_.get ());
524535 }
525536 {
526537 SCOPED_TRACE (" Unsubscribe succeeds, parallel subscription" );
@@ -560,7 +571,7 @@ TEST_F(ReadStreamLayerClientTest, Unsubscribe) {
560571 ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
561572 EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
562573
563- :: Mock::VerifyAndClearExpectations (network_mock_.get());
574+ Mock::VerifyAndClearExpectations (network_mock_.get ());
564575 }
565576 {
566577 SCOPED_TRACE (
@@ -605,7 +616,7 @@ TEST_F(ReadStreamLayerClientTest, Unsubscribe) {
605616 ASSERT_TRUE (unsubscribe_response.IsSuccessful ());
606617 EXPECT_EQ (kSubscriptionId , unsubscribe_response.GetResult ());
607618
608- :: Mock::VerifyAndClearExpectations (network_mock_.get());
619+ Mock::VerifyAndClearExpectations (network_mock_.get ());
609620 }
610621 {
611622 SCOPED_TRACE (" Unsubscribe fails, subscription missing" );
@@ -625,7 +636,7 @@ TEST_F(ReadStreamLayerClientTest, Unsubscribe) {
625636 EXPECT_EQ (unsubscribe_response.GetError ().GetErrorCode (),
626637 ErrorCode::PreconditionFailed);
627638
628- :: Mock::VerifyAndClearExpectations (network_mock_.get());
639+ Mock::VerifyAndClearExpectations (network_mock_.get ());
629640 }
630641 {
631642 SCOPED_TRACE (" Unsubscribe fails, server error" );
@@ -664,7 +675,7 @@ TEST_F(ReadStreamLayerClientTest, Unsubscribe) {
664675 EXPECT_EQ (unsubscribe_response.GetError ().GetErrorCode (),
665676 ErrorCode::NotFound);
666677
667- :: Mock::VerifyAndClearExpectations (network_mock_.get());
678+ Mock::VerifyAndClearExpectations (network_mock_.get ());
668679 }
669680}
670681
@@ -754,6 +765,204 @@ TEST_F(ReadStreamLayerClientTest, UnsubscribeCancelFuture) {
754765 unsubscribe_response.GetError ().GetErrorCode ());
755766}
756767
768+ TEST_F (ReadStreamLayerClientTest, GetData) {
769+ HRN hrn (GetTestCatalog ());
770+
771+ {
772+ SCOPED_TRACE (" GetData success" );
773+
774+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
775+ .Times (1 );
776+
777+ EXPECT_CALL (*network_mock_,
778+ Send (IsGetRequest (URL_BLOB_DATA_269), _, _, _, _))
779+ .Times (1 );
780+
781+ StreamLayerClient client (hrn, kLayerId , settings_);
782+
783+ std::promise<DataResponse> promise;
784+ auto future = promise.get_future ();
785+
786+ model::Metadata metadata;
787+ metadata.SetDataHandle (kDataHandle );
788+ model::Message message;
789+ message.SetMetaData (metadata);
790+
791+ client.GetData (message,
792+ [&](DataResponse response) { promise.set_value (response); });
793+
794+ ASSERT_EQ (future.wait_for (kTimeout ), std::future_status::ready);
795+
796+ const auto & response = future.get ();
797+ EXPECT_TRUE (response.IsSuccessful ());
798+ ASSERT_TRUE (response.GetResult ());
799+
800+ const std::string blob_data = HTTP_RESPONSE_BLOB_DATA_STREAM_MESSAGE;
801+ EXPECT_THAT (*response.GetResult (),
802+ ElementsAreArray (blob_data.begin (), blob_data.end ()));
803+
804+ Mock::VerifyAndClearExpectations (network_mock_.get ());
805+ }
806+ {
807+ SCOPED_TRACE (" GetData fails, no data handle" );
808+
809+ StreamLayerClient client (hrn, kLayerId , settings_);
810+
811+ std::promise<DataResponse> promise;
812+ auto future = promise.get_future ();
813+
814+ client.GetData (model::Message{},
815+ [&](DataResponse response) { promise.set_value (response); });
816+
817+ ASSERT_EQ (future.wait_for (kTimeout ), std::future_status::ready);
818+
819+ const auto & response = future.get ();
820+ ASSERT_FALSE (response.IsSuccessful ());
821+ EXPECT_EQ (response.GetError ().GetErrorCode (), ErrorCode::InvalidArgument);
822+
823+ Mock::VerifyAndClearExpectations (network_mock_.get ());
824+ }
825+ {
826+ SCOPED_TRACE (" GetData fails, lookup server error" );
827+
828+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
829+ .WillOnce (ReturnHttpResponse (
830+ olp::http::NetworkResponse ().WithStatus (
831+ olp::http::HttpStatusCode::AUTHENTICATION_TIMEOUT),
832+ HTTP_RESPONSE_EMPTY));
833+
834+ StreamLayerClient client (hrn, kLayerId , settings_);
835+
836+ std::promise<DataResponse> promise;
837+ auto future = promise.get_future ();
838+
839+ model::Metadata metadata;
840+ metadata.SetDataHandle (kDataHandle );
841+ model::Message message;
842+ message.SetMetaData (metadata);
843+
844+ client.GetData (message,
845+ [&](DataResponse response) { promise.set_value (response); });
846+
847+ ASSERT_EQ (future.wait_for (kTimeout ), std::future_status::ready);
848+
849+ const auto & response = future.get ();
850+ EXPECT_FALSE (response.IsSuccessful ());
851+ EXPECT_EQ (response.GetError ().GetHttpStatusCode (),
852+ olp::http::HttpStatusCode::AUTHENTICATION_TIMEOUT);
853+
854+ Mock::VerifyAndClearExpectations (network_mock_.get ());
855+ }
856+ {
857+ SCOPED_TRACE (" GetData fails, blob server error" );
858+
859+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
860+ .Times (1 );
861+
862+ EXPECT_CALL (*network_mock_,
863+ Send (IsGetRequest (URL_BLOB_DATA_269), _, _, _, _))
864+ .WillOnce (ReturnHttpResponse (olp::http::NetworkResponse ().WithStatus (
865+ olp::http::HttpStatusCode::NOT_FOUND),
866+ HTTP_RESPONSE_EMPTY));
867+
868+ StreamLayerClient client (hrn, kLayerId , settings_);
869+
870+ std::promise<DataResponse> promise;
871+ auto future = promise.get_future ();
872+
873+ model::Metadata metadata;
874+ metadata.SetDataHandle (kDataHandle );
875+ model::Message message;
876+ message.SetMetaData (metadata);
877+
878+ client.GetData (message,
879+ [&](DataResponse response) { promise.set_value (response); });
880+
881+ ASSERT_EQ (future.wait_for (kTimeout ), std::future_status::ready);
882+
883+ const auto & response = future.get ();
884+ ASSERT_FALSE (response.IsSuccessful ());
885+ EXPECT_EQ (response.GetError ().GetHttpStatusCode (),
886+ olp::http::HttpStatusCode::NOT_FOUND);
887+
888+ Mock::VerifyAndClearExpectations (network_mock_.get ());
889+ }
890+ }
891+
892+ TEST_F (ReadStreamLayerClientTest, GetDataCancellableFuture) {
893+ HRN hrn (GetTestCatalog ());
894+
895+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
896+ .Times (1 );
897+
898+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_BLOB_DATA_269), _, _, _, _))
899+ .Times (1 );
900+
901+ StreamLayerClient client (hrn, kLayerId , settings_);
902+
903+ model::Metadata metadata;
904+ metadata.SetDataHandle (kDataHandle );
905+ model::Message message;
906+ message.SetMetaData (metadata);
907+
908+ auto future = client.GetData (message).GetFuture ();
909+
910+ ASSERT_EQ (future.wait_for (kTimeout ), std::future_status::ready);
911+
912+ const auto & response = future.get ();
913+ EXPECT_TRUE (response.IsSuccessful ());
914+ ASSERT_TRUE (response.GetResult ());
915+
916+ const std::string blob_data = HTTP_RESPONSE_BLOB_DATA_STREAM_MESSAGE;
917+ EXPECT_THAT (*response.GetResult (),
918+ ElementsAreArray (blob_data.begin (), blob_data.end ()));
919+ }
920+
921+ TEST_F (ReadStreamLayerClientTest, GetDataCancel) {
922+ HRN hrn (GetTestCatalog ());
923+
924+ auto request_started = std::make_shared<std::promise<void >>();
925+ auto continue_request = std::make_shared<std::promise<void >>();
926+
927+ {
928+ olp::http::RequestId request_id;
929+ NetworkCallback send_mock;
930+ CancelCallback cancel_mock;
931+
932+ std::tie (request_id, send_mock, cancel_mock) = GenerateNetworkMockActions (
933+ request_started, continue_request,
934+ {olp::http::HttpStatusCode::OK, HTTP_RESPONSE_LOOKUP_BLOB});
935+
936+ EXPECT_CALL (*network_mock_, Send (IsGetRequest (URL_LOOKUP_BLOB), _, _, _, _))
937+ .Times (1 )
938+ .WillOnce (Invoke (std::move (send_mock)));
939+
940+ EXPECT_CALL (*network_mock_, Cancel (request_id))
941+ .WillOnce (Invoke (std::move (cancel_mock)));
942+ }
943+
944+ StreamLayerClient client (hrn, kLayerId , settings_);
945+
946+ model::Metadata metadata;
947+ metadata.SetDataHandle (kDataHandle );
948+ model::Message message;
949+ message.SetMetaData (metadata);
950+
951+ auto future = client.GetData (message);
952+
953+ request_started->get_future ().get ();
954+ future.GetCancellationToken ().Cancel ();
955+ continue_request->set_value ();
956+
957+ auto response = future.GetFuture ().get ();
958+
959+ ASSERT_FALSE (response.IsSuccessful ());
960+
961+ EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
962+ response.GetError ().GetHttpStatusCode ());
963+ EXPECT_EQ (ErrorCode::Cancelled, response.GetError ().GetErrorCode ());
964+ }
965+
757966TEST_F (ReadStreamLayerClientTest, CancelPendingRequests) {
758967 HRN hrn (GetTestCatalog ());
759968
@@ -765,19 +974,25 @@ TEST_F(ReadStreamLayerClientTest, CancelPendingRequests) {
765974 StreamLayerClient client (hrn, kLayerId , settings_);
766975
767976 auto subscribe_future = client.Subscribe (SubscribeRequest ());
977+ auto get_data_future = client.GetData (model::Message ());
768978 auto unsubscribe_future = client.Unsubscribe ();
769979
770980 client.CancelPendingRequests ();
771981
772982 promise.set_value ();
773983
774984 auto subscribe_response = subscribe_future.GetFuture ().get ();
775-
776985 ASSERT_FALSE (subscribe_response.IsSuccessful ());
777986 EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
778987 subscribe_response.GetError ().GetHttpStatusCode ());
779988 EXPECT_EQ (ErrorCode::Cancelled, subscribe_response.GetError ().GetErrorCode ());
780989
990+ auto get_data_response = get_data_future.GetFuture ().get ();
991+ ASSERT_FALSE (get_data_response.IsSuccessful ());
992+ EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
993+ get_data_response.GetError ().GetHttpStatusCode ());
994+ EXPECT_EQ (ErrorCode::Cancelled, get_data_response.GetError ().GetErrorCode ());
995+
781996 auto unsubscribe_response = unsubscribe_future.GetFuture ().get ();
782997 ASSERT_FALSE (unsubscribe_response.IsSuccessful ());
783998 EXPECT_EQ (static_cast <int >(olp::http::ErrorCode::CANCELLED_ERROR),
0 commit comments