@@ -106,6 +106,12 @@ constexpr auto kUrlSubscribeNoQueryParams =
106106constexpr auto kUrlSubscribeWithQueryParams =
107107 R"( https://some.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/subscribe?consumerId=test-consumer-id-987&mode=serial&subscriptionId=test-subscription-id-123)" ;
108108
109+ constexpr auto kUrlConsumeDataNoQueryParams =
110+ R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/partitions)" ;
111+
112+ constexpr auto kUrlConsumeDataWithQueryParams =
113+ R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/partitions?mode=parallel&subscriptionId=test-subscription-id-123)" ;
114+
109115constexpr auto kUrlCommitOffsetsNoQueryParams =
110116 R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/offsets)" ;
111117
@@ -124,9 +130,15 @@ constexpr auto kUrlUnsubscribe =
124130constexpr auto kHttpResponseSubscribeSucceeds =
125131 R"jsonString( { "nodeBaseURL": "https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2", "subscriptionId": "test-subscription-id-123" })jsonString" ;
126132
133+ constexpr auto kHttpResponseConsumeDataSucceeds =
134+ R"jsonString( { "messages": [ { "metaData": { "partition": "314010583", "checksum": "ff7494d6f17da702862e550c907c0a91", "data": "iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAABGdBTUEAALGPC", "timestamp": 1517916706 }, "offset": { "partition": 7, "offset": 38562 } }, { "metaData": { "partition": "385010413", "checksum": "19a0c709c055e268207ad71f6d4947ff", "compressedDataSize": 152417, "dataSize": 250110, "dataHandle": "1b2ca68f-d4a0-4379-8120-cd025640510c", "timestamp": 1517918813 }, "offset": { "partition": 8, "offset": 27458 } } ] })jsonString" ;
135+
127136constexpr auto kHttpResponseSubscribeFails =
128137 R"jsonString( { "title": "Subscription mode not supported", "status": 400, "code": "E213002", "cause": "Subscription mode 'singleton' not supported", "action": "Retry with valid subscription mode 'serial' or 'parallel'", "correlationId": "4199533b-6290-41db-8d79-edf4f4019a74" })jsonString" ;
129138
139+ constexpr auto kHttpResponseConsumeDataFails =
140+ R"jsonString( { "title": "Subscription not found", "status": 404, "code": "E213003", "cause": "SubscriptionId -1920183912.123e4567-e89b-12d3-a456-556642440000 not found", "action": "Subscribe again", "correlationId": "4199533b-6290-41db-8d79-edf4f4019a74" })jsonString" ;
141+
130142constexpr auto kHttpResponseCommitOffsetsFails =
131143 R"jsonString( { "title": "Unable to commit offset", "status": 409, "code": "E213028", "cause": "Unable to commit offset", "action": "Commit cannot be completed. Continue with reading and committing new messages", "correlationId": "4199533b-6290-41db-8d79-edf4f4019a74" })jsonString" ;
132144
@@ -225,6 +237,84 @@ TEST_F(StreamApiTest, Subscribe) {
225237 }
226238}
227239
240+ TEST_F (StreamApiTest, ConsumeData) {
241+ {
242+ SCOPED_TRACE (" ConsumeData without optional input fields succeeds" );
243+
244+ EXPECT_CALL (*network_mock_,
245+ Send (AllOf (IsGetRequest (kUrlConsumeDataNoQueryParams ),
246+ HeadersContain (kCorrelationIdHeader )),
247+ _, _, _, _))
248+ .WillOnce (ReturnHttpResponse (
249+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::OK),
250+ kHttpResponseConsumeDataSucceeds ));
251+
252+ olp_client_.SetBaseUrl (kNodeBaseUrl );
253+ std::string x_correlation_id = kCorrelationId ;
254+ CancellationContext context;
255+ const auto consume_data_response =
256+ StreamApi::ConsumeData (olp_client_, kLayerId , boost::none, boost::none,
257+ context, x_correlation_id);
258+
259+ EXPECT_TRUE (consume_data_response.IsSuccessful ())
260+ << ApiErrorToString (consume_data_response.GetError ());
261+ EXPECT_EQ (consume_data_response.GetResult ().GetMessages ().size (), 2 );
262+
263+ Mock::VerifyAndClearExpectations (network_mock_.get ());
264+ }
265+ {
266+ SCOPED_TRACE (" ConsumeData with all optional input fields succeeds" );
267+
268+ EXPECT_CALL (*network_mock_,
269+ Send (AllOf (IsGetRequest (kUrlConsumeDataWithQueryParams ),
270+ HeadersContain (kCorrelationIdHeader )),
271+ _, _, _, _))
272+ .WillOnce (ReturnHttpResponse (
273+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::OK),
274+ kHttpResponseConsumeDataSucceeds ));
275+
276+ olp_client_.SetBaseUrl (kNodeBaseUrl );
277+ std::string x_correlation_id = kCorrelationId ;
278+ CancellationContext context;
279+ const auto consume_data_response =
280+ StreamApi::ConsumeData (olp_client_, kLayerId , kSubscriptionId ,
281+ kParallelMode , context, x_correlation_id);
282+
283+ EXPECT_TRUE (consume_data_response.IsSuccessful ())
284+ << ApiErrorToString (consume_data_response.GetError ());
285+ EXPECT_EQ (consume_data_response.GetResult ().GetMessages ().size (), 2 );
286+
287+ Mock::VerifyAndClearExpectations (network_mock_.get ());
288+ }
289+ {
290+ SCOPED_TRACE (" ConsumeData fails" );
291+
292+ EXPECT_CALL (*network_mock_,
293+ Send (AllOf (IsGetRequest (kUrlConsumeDataNoQueryParams ),
294+ HeadersContain (kCorrelationIdHeader )),
295+ _, _, _, _))
296+ .WillOnce (ReturnHttpResponse (
297+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::NOT_FOUND),
298+ kHttpResponseConsumeDataFails ));
299+
300+ olp_client_.SetBaseUrl (kNodeBaseUrl );
301+ std::string x_correlation_id = kCorrelationId ;
302+ CancellationContext context;
303+ const auto consume_data_response =
304+ StreamApi::ConsumeData (olp_client_, kLayerId , boost::none, boost::none,
305+ context, x_correlation_id);
306+
307+ EXPECT_FALSE (consume_data_response.IsSuccessful ());
308+ EXPECT_EQ (consume_data_response.GetError ().GetHttpStatusCode (),
309+ http::HttpStatusCode::NOT_FOUND);
310+ EXPECT_EQ (consume_data_response.GetError ().GetMessage (),
311+ kHttpResponseConsumeDataFails );
312+ EXPECT_EQ (consume_data_response.GetResult ().GetMessages ().size (), 0 );
313+
314+ Mock::VerifyAndClearExpectations (network_mock_.get ());
315+ }
316+ }
317+
228318TEST_F (StreamApiTest, CommitOffsets) {
229319 const auto stream_offsets = GetStreamOffsets ();
230320
0 commit comments