@@ -39,6 +39,18 @@ std::string ApiErrorToString(const ApiError& error) {
3939 return result_stream.str ();
4040}
4141
42+ model::StreamOffsets GetStreamOffsets () {
43+ model::StreamOffset offset1;
44+ offset1.SetPartition (7 );
45+ offset1.SetOffset (38562 );
46+ model::StreamOffset offset2;
47+ offset2.SetPartition (8 );
48+ offset2.SetOffset (27458 );
49+ model::StreamOffsets offsets;
50+ offsets.SetOffsets ({offset1, offset2});
51+ return offsets;
52+ }
53+
4254MATCHER_P (BodyEq, expected_body, " " ) {
4355 std::string expected_body_str (expected_body);
4456
@@ -88,13 +100,19 @@ const std::string kCorrelationId{"test-correlation-id"};
88100const std::pair<std::string, std::string> kCorrelationIdHeader {
89101 " X-Correlation-Id" , kCorrelationId };
90102
91- constexpr auto kUrlStreamSubscribeNoQueryParams =
103+ constexpr auto kUrlSubscribeNoQueryParams =
92104 R"( https://some.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/subscribe)" ;
93105
94- constexpr auto kUrlStreamSubscribeWithQueryParams =
106+ constexpr auto kUrlSubscribeWithQueryParams =
95107 R"( https://some.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/subscribe?consumerId=test-consumer-id-987&mode=serial&subscriptionId=test-subscription-id-123)" ;
96108
97- constexpr auto kUrlStreamUnsubscribe =
109+ constexpr auto kUrlCommitOffsetsNoQueryParams =
110+ R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/offsets)" ;
111+
112+ constexpr auto kUrlCommitOffsetsWithQueryParams =
113+ R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/offsets?mode=parallel&subscriptionId=test-subscription-id-123)" ;
114+
115+ constexpr auto kUrlUnsubscribe =
98116 R"( https://some.node.base.url/stream/v2/catalogs/hrn:here:data::olp-here-test:hereos-internal-test-v2/layers/test-layer/subscribe?mode=parallel&subscriptionId=test-subscription-id-123)" ;
99117
100118constexpr auto kHttpResponseSubscribeSucceeds =
@@ -103,19 +121,24 @@ constexpr auto kHttpResponseSubscribeSucceeds =
103121constexpr auto kHttpResponseSubscribeFails =
104122 R"jsonString( { "title": "Subscription mode not supported", "status": 400, "code": "E213002", "cause": "Subscription mode 'singleton' not supported", "action": "Retry with valid subscription mode 'serial' or 'parallel'", "correlationId": "4199533b-6290-41db-8d79-edf4f4019a74" })jsonString" ;
105123
124+ constexpr auto kHttpResponseCommitOffsetsFails =
125+ R"jsonString( { "title": "Unable to commit offset", "status": 409, "code": "E213028", "cause": "Unable to commit offset", "action": "Commit cannot be completed. Continue with reading and committing new messages", "correlationId": "4199533b-6290-41db-8d79-edf4f4019a74" })jsonString" ;
126+
106127constexpr auto kHttpResponseUnsubscribeFails =
107128 R"jsonString( { "error": "Unauthorized", "error_description": "Token Validation Failure - invalid time in token" })jsonString" ;
108129
109130constexpr auto kHttpRequestBodyWithConsumerProperties =
110131 R"jsonString( {"kafkaConsumerProperties":{"field_string":"abc","field_int":"456","field_bool":"1"}})jsonString" ;
111132
133+ constexpr auto kHttpRequestBodyWithStreamOffsets =
134+ R"jsonString( {"offsets":[{"partition":7,"offset":38562},{"partition":8,"offset":27458}]})jsonString" ;
135+
112136TEST_F (StreamApiTest, Subscribe) {
113137 {
114138 SCOPED_TRACE (" Subscribe without optional input fields succeeds" );
115139
116- EXPECT_CALL (
117- *network_mock_,
118- Send (IsPostRequest (kUrlStreamSubscribeNoQueryParams ), _, _, _, _))
140+ EXPECT_CALL (*network_mock_,
141+ Send (IsPostRequest (kUrlSubscribeNoQueryParams ), _, _, _, _))
119142 .WillOnce (ReturnHttpResponse (
120143 http::NetworkResponse ().WithStatus (http::HttpStatusCode::CREATED),
121144 kHttpResponseSubscribeSucceeds ));
@@ -139,7 +162,7 @@ TEST_F(StreamApiTest, Subscribe) {
139162 SCOPED_TRACE (" Subscribe with all optional input fields succeeds" );
140163
141164 EXPECT_CALL (*network_mock_,
142- Send (AllOf (IsPostRequest (kUrlStreamSubscribeWithQueryParams ),
165+ Send (AllOf (IsPostRequest (kUrlSubscribeWithQueryParams ),
143166 BodyEq (kHttpRequestBodyWithConsumerProperties )),
144167 _, _, _, _))
145168 .WillOnce (ReturnHttpResponse (
@@ -170,9 +193,8 @@ TEST_F(StreamApiTest, Subscribe) {
170193 {
171194 SCOPED_TRACE (" Subscribe fails" );
172195
173- EXPECT_CALL (
174- *network_mock_,
175- Send (IsPostRequest (kUrlStreamSubscribeNoQueryParams ), _, _, _, _))
196+ EXPECT_CALL (*network_mock_,
197+ Send (IsPostRequest (kUrlSubscribeNoQueryParams ), _, _, _, _))
176198 .WillOnce (ReturnHttpResponse (
177199 http::NetworkResponse ().WithStatus (http::HttpStatusCode::FORBIDDEN),
178200 kHttpResponseSubscribeFails ));
@@ -194,12 +216,92 @@ TEST_F(StreamApiTest, Subscribe) {
194216 }
195217}
196218
219+ TEST_F (StreamApiTest, CommitOffsets) {
220+ const auto stream_offsets = GetStreamOffsets ();
221+
222+ {
223+ SCOPED_TRACE (" CommitOffsets without optional input fields succeeds" );
224+
225+ EXPECT_CALL (*network_mock_,
226+ Send (AllOf (IsPutRequest (kUrlCommitOffsetsNoQueryParams ),
227+ HeadersContain (kCorrelationIdHeader ),
228+ BodyEq (kHttpRequestBodyWithStreamOffsets )),
229+ _, _, _, _))
230+ .WillOnce (ReturnHttpResponse (
231+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::OK), " " ));
232+
233+ olp_client_.SetBaseUrl (kNodeBaseUrl );
234+ std::string x_correlation_id = kCorrelationId ;
235+ CancellationContext context;
236+ const auto commit_offsets_response = StreamApi::CommitOffsets (
237+ olp_client_, kLayerId , stream_offsets, boost::none, boost::none,
238+ context, x_correlation_id);
239+
240+ EXPECT_TRUE (commit_offsets_response.IsSuccessful ())
241+ << ApiErrorToString (commit_offsets_response.GetError ());
242+ EXPECT_EQ (commit_offsets_response.GetResult (), http::HttpStatusCode::OK);
243+
244+ Mock::VerifyAndClearExpectations (network_mock_.get ());
245+ }
246+ {
247+ SCOPED_TRACE (" CommitOffsets with all optional input fields succeeds" );
248+
249+ EXPECT_CALL (*network_mock_,
250+ Send (AllOf (IsPutRequest (kUrlCommitOffsetsWithQueryParams ),
251+ HeadersContain (kCorrelationIdHeader ),
252+ BodyEq (kHttpRequestBodyWithStreamOffsets )),
253+ _, _, _, _))
254+ .WillOnce (ReturnHttpResponse (
255+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::OK), " " ));
256+
257+ olp_client_.SetBaseUrl (kNodeBaseUrl );
258+ std::string x_correlation_id = kCorrelationId ;
259+ CancellationContext context;
260+ const auto commit_offsets_response = StreamApi::CommitOffsets (
261+ olp_client_, kLayerId , stream_offsets, kSubscriptionId , kParallelMode ,
262+ context, x_correlation_id);
263+
264+ EXPECT_TRUE (commit_offsets_response.IsSuccessful ())
265+ << ApiErrorToString (commit_offsets_response.GetError ());
266+ EXPECT_EQ (commit_offsets_response.GetResult (), http::HttpStatusCode::OK);
267+
268+ Mock::VerifyAndClearExpectations (network_mock_.get ());
269+ }
270+ {
271+ SCOPED_TRACE (" CommitOffsets fails" );
272+
273+ EXPECT_CALL (*network_mock_,
274+ Send (AllOf (IsPutRequest (kUrlCommitOffsetsNoQueryParams ),
275+ HeadersContain (kCorrelationIdHeader ),
276+ BodyEq (kHttpRequestBodyWithStreamOffsets )),
277+ _, _, _, _))
278+ .WillOnce (ReturnHttpResponse (
279+ http::NetworkResponse ().WithStatus (http::HttpStatusCode::CONFLICT),
280+ kHttpResponseCommitOffsetsFails ));
281+
282+ olp_client_.SetBaseUrl (kNodeBaseUrl );
283+ std::string x_correlation_id = kCorrelationId ;
284+ CancellationContext context;
285+ const auto commit_offsets_response = StreamApi::CommitOffsets (
286+ olp_client_, kLayerId , stream_offsets, boost::none, boost::none,
287+ context, x_correlation_id);
288+
289+ EXPECT_FALSE (commit_offsets_response.IsSuccessful ());
290+ EXPECT_EQ (commit_offsets_response.GetError ().GetHttpStatusCode (),
291+ http::HttpStatusCode::CONFLICT);
292+ EXPECT_EQ (commit_offsets_response.GetError ().GetMessage (),
293+ kHttpResponseCommitOffsetsFails );
294+
295+ Mock::VerifyAndClearExpectations (network_mock_.get ());
296+ }
297+ }
298+
197299TEST_F (StreamApiTest, DeleteSubscription) {
198300 {
199301 SCOPED_TRACE (" DeleteSubscription succeeds" );
200302
201303 EXPECT_CALL (*network_mock_,
202- Send (AllOf (IsDeleteRequest (kUrlStreamUnsubscribe ),
304+ Send (AllOf (IsDeleteRequest (kUrlUnsubscribe ),
203305 HeadersContain (kCorrelationIdHeader )),
204306 _, _, _, _))
205307 .WillOnce (ReturnHttpResponse (
@@ -221,7 +323,7 @@ TEST_F(StreamApiTest, DeleteSubscription) {
221323 SCOPED_TRACE (" DeleteSubscription fails" );
222324
223325 EXPECT_CALL (*network_mock_,
224- Send (AllOf (IsDeleteRequest (kUrlStreamUnsubscribe ),
326+ Send (AllOf (IsDeleteRequest (kUrlUnsubscribe ),
225327 HeadersContain (kCorrelationIdHeader )),
226328 _, _, _, _))
227329 .WillOnce (ReturnHttpResponse (http::NetworkResponse ().WithStatus (
0 commit comments