Skip to content

Commit da093f7

Browse files
Bohdan Kurylovychborzun
authored andcommitted
Added the FlushRequest class in StreamLayerClient's API
Added the model::FlushRequest class, which manages the parameters for the customization of StreamLayerClient::Flush logic. + re-enabled the StreamLayerClientCacheTest integration tests on PSV Relates to: OLPEDGE-826 Signed-off-by: Bohdan Kurylovych <[email protected]>
1 parent 3db5c8c commit da093f7

File tree

10 files changed

+119
-35
lines changed

10 files changed

+119
-35
lines changed

examples/dataservice-write/example.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ int RunExample() {
112112
}
113113

114114
// Flush and wait for uploading
115-
auto future_response = client->Flush();
115+
auto flush_request = FlushRequest();
116+
auto future_response = client->Flush(std::move(flush_request));
116117
auto responses = future_response.GetFuture().get();
117118
if (responses.empty()) {
118119
OLP_SDK_LOG_ERROR_F(kLogTag, "Error on Flush()");

olp-cpp-sdk-dataservice-write/include/olp/dataservice/write/StreamLayerClient.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
#include <olp/core/client/ApiError.h>
2525
#include <olp/core/client/ApiResponse.h>
2626
#include <olp/core/client/OlpClientSettings.h>
27-
2827
#include <olp/dataservice/write/DataServiceWriteApi.h>
2928
#include <olp/dataservice/write/FlushSettings.h>
3029
#include <olp/dataservice/write/generated/model/ResponseOk.h>
3130
#include <olp/dataservice/write/generated/model/ResponseOkSingle.h>
31+
#include <olp/dataservice/write/model/FlushRequest.h>
3232
#include <olp/dataservice/write/model/PublishDataRequest.h>
3333
#include <olp/dataservice/write/model/PublishSdiiRequest.h>
3434

@@ -119,17 +119,25 @@ class DATASERVICE_WRITE_API StreamLayerClient {
119119
/**
120120
* @brief Flush PublishDataRequests which have been queued via the Queue
121121
* API.
122+
* @param request \c FlushRequest object representing the parameters for
123+
* this \c Flush method call.
122124
* @return A CancellableFuture containing the FlushResponse.
123125
*/
124-
olp::client::CancellableFuture<FlushResponse> Flush();
126+
olp::client::CancellableFuture<FlushResponse> Flush(
127+
model::FlushRequest request);
125128

126129
/**
127130
* @brief Flush PublishDataRequests which have been queued via the Queue
128131
* API.
132+
* @param request \c FlushRequest object representing the parameters for
133+
* this \c Flush method call.
134+
* @param callback The callback which will be called when all the flush
135+
* results (see \c FlushResponse) will be available.
129136
* @return A CancellationToken which can be used to cancel the ongoing
130137
* request.
131138
*/
132-
olp::client::CancellationToken Flush(FlushCallback callback);
139+
olp::client::CancellationToken Flush(model::FlushRequest request,
140+
FlushCallback callback);
133141

134142
/**
135143
* @brief Call to send a list of SDII messages to an OLP Stream Layer.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (C) 2019 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include <olp/dataservice/write/DataServiceWriteApi.h>
23+
24+
namespace olp {
25+
namespace dataservice {
26+
namespace write {
27+
namespace model {
28+
/**
29+
* @brief FlushRequest used to flush requests into an OLP Stream Layer.
30+
*/
31+
class DATASERVICE_WRITE_API FlushRequest {
32+
public:
33+
FlushRequest() = default;
34+
35+
/**
36+
* @return The number of \c PublishDataRequest requests, which will be
37+
* flushed by \c StreamLayerCLient. By default this value is initilized to 0,
38+
* thus, all queued \c StreamLayerClient's requests will be flushed.
39+
*/
40+
inline int GetNumberOfRequestsToFlush() const {
41+
return num_requests_per_flush_;
42+
}
43+
44+
/**
45+
* @param num_requests Maximum number of partitions (i.e. \c
46+
* PublishDataRequest) to be flushed by \c StreamLayerClient. If this value is
47+
* negative, nothing will be flushed. Set 0 to flush all requests.
48+
* @note Required.
49+
*/
50+
inline FlushRequest& WithNumberOfRequestsToFlush(int num_requests) {
51+
num_requests_per_flush_ = num_requests;
52+
return *this;
53+
}
54+
55+
private:
56+
/// Number of requests to flush
57+
int num_requests_per_flush_ = 0;
58+
};
59+
60+
} // namespace model
61+
} // namespace write
62+
} // namespace dataservice
63+
} // namespace olp

olp-cpp-sdk-dataservice-write/src/AutoFlushController.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,16 @@ class EnabledAutoFlushControllerImpl
167167
NotifyFlushEventStart();
168168
auto flush_thread = std::thread([self, impl_pointer]() {
169169
auto id = self->background_task_col_.AddTask();
170-
auto cancel_token =
171-
impl_pointer->Flush([self, id](FlushResponse results) {
170+
171+
const auto num_requests_to_flush =
172+
self->flush_settings_.events_per_single_flush
173+
? *(self->flush_settings_.events_per_single_flush)
174+
: 0;
175+
model::FlushRequest request =
176+
model::FlushRequest().WithNumberOfRequestsToFlush(
177+
num_requests_to_flush);
178+
auto cancel_token = impl_pointer->Flush(
179+
std::move(request), [self, id](FlushResponse results) {
172180
self->background_task_col_.ReleaseTask(id);
173181
self->RemoveCancelToken(id);
174182
self->NotifyFlushEventResults(results);

olp-cpp-sdk-dataservice-write/src/StreamLayerClient.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@ boost::optional<std::string> StreamLayerClient::Queue(
6161
}
6262

6363
olp::client::CancellableFuture<StreamLayerClient::FlushResponse>
64-
StreamLayerClient::Flush() {
65-
return impl_->Flush();
64+
StreamLayerClient::Flush(model::FlushRequest request) {
65+
return impl_->Flush(std::move(request));
6666
}
6767

6868
olp::client::CancellationToken StreamLayerClient::Flush(
69-
FlushCallback callback) {
70-
return impl_->Flush(std::move(callback));
69+
model::FlushRequest request, FlushCallback callback) {
70+
return impl_->Flush(std::move(request), std::move(callback));
7171
}
7272

7373
olp::client::CancellableFuture<PublishSdiiResponse>

olp-cpp-sdk-dataservice-write/src/StreamLayerClientImpl.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -358,34 +358,34 @@ StreamLayerClientImpl::PopFromQueue() {
358358
}
359359

360360
olp::client::CancellableFuture<StreamLayerClient::FlushResponse>
361-
StreamLayerClientImpl::Flush() {
361+
StreamLayerClientImpl::Flush(model::FlushRequest request) {
362362
auto promise =
363363
std::make_shared<std::promise<StreamLayerClient::FlushResponse> >();
364-
auto cancel_token =
365-
Flush([promise](StreamLayerClient::FlushResponse response) {
364+
auto cancel_token = Flush(
365+
std::move(request), [promise](StreamLayerClient::FlushResponse response) {
366366
promise->set_value(std::move(response));
367367
});
368368
return CancellableFuture<StreamLayerClient::FlushResponse>(cancel_token,
369369
promise);
370370
}
371371

372372
olp::client::CancellationToken StreamLayerClientImpl::Flush(
373-
StreamLayerClient::FlushCallback callback) {
373+
model::FlushRequest request, StreamLayerClient::FlushCallback callback) {
374374
CancellationContext cancel_context;
375375

376376
auto self = shared_from_this();
377377

378378
auto func = [=]() mutable {
379379
StreamLayerClient::FlushResponse response;
380380

381-
auto maximum_events_number = self->flush_settings_.events_per_single_flush;
382-
if (maximum_events_number && *maximum_events_number <= 0) {
381+
const auto maximum_events_number = request.GetNumberOfRequestsToFlush();
382+
if (maximum_events_number < 0) {
383383
callback(std::move(response));
384384
return;
385385
}
386386

387387
int counter = 0;
388-
while ((!maximum_events_number || counter < *maximum_events_number) &&
388+
while ((!maximum_events_number || counter < maximum_events_number) &&
389389
(self->QueueSize() > 0) && !cancel_context.IsCancelled()) {
390390
auto publish_request = self->PopFromQueue();
391391
if (publish_request == boost::none) {

olp-cpp-sdk-dataservice-write/src/StreamLayerClientImpl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ class StreamLayerClientImpl
5555
const model::PublishDataRequest& request, PublishDataCallback callback);
5656

5757
boost::optional<std::string> Queue(const model::PublishDataRequest& request);
58-
olp::client::CancellableFuture<StreamLayerClient::FlushResponse> Flush();
58+
olp::client::CancellableFuture<StreamLayerClient::FlushResponse> Flush(
59+
model::FlushRequest request);
5960
olp::client::CancellationToken Flush(
60-
StreamLayerClient::FlushCallback callback);
61+
model::FlushRequest request, StreamLayerClient::FlushCallback callback);
6162
size_t QueueSize() const;
6263
boost::optional<model::PublishDataRequest> PopFromQueue();
6364

scripts/linux/psv/travis_test_psv.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ $CPP_TEST_SOURCE_DARASERVICE_WRITE/olp-cpp-sdk-dataservice-write-tests \
1919
--gtest_output="xml:olp-cpp-sdk-dataservice-write-tests-report.xml"
2020
echo ">>> Integration Test ... >>>"
2121
$CPP_TEST_SOURCE_INTEGRATION/olp-cpp-sdk-integration-tests \
22-
--gtest_output="xml:olp-cpp-sdk-integration-tests-report.xml" \
23-
--gtest_filter=-"StreamLayerClientCacheTest*"
22+
--gtest_output="xml:olp-cpp-sdk-integration-tests-report.xml"
2423

2524
bash <(curl -s https://codecov.io/bash)

tests/functional/olp-cpp-sdk-dataservice-write/DataserviceWriteStreamLayerClientCacheTest.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataSingle) {
232232

233233
ASSERT_FALSE(error) << error.get();
234234

235-
auto response = client_->Flush().GetFuture().get();
235+
auto response = client_->Flush(model::FlushRequest()).GetFuture().get();
236236

237237
ASSERT_FALSE(response.empty());
238238
ASSERT_NO_FATAL_FAILURE(PublishDataSuccessAssertions(response[0]));
@@ -241,7 +241,7 @@ TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataSingle) {
241241
TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataMultiple) {
242242
ASSERT_NO_FATAL_FAILURE(QueueMultipleEvents(5));
243243

244-
auto response = client_->Flush().GetFuture().get();
244+
auto response = client_->Flush(model::FlushRequest()).GetFuture().get();
245245

246246
ASSERT_EQ(5, response.size());
247247
for (auto& single_response : response) {
@@ -257,8 +257,8 @@ TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataSingleAsync) {
257257

258258
std::promise<StreamLayerClient::FlushResponse> response_promise;
259259
bool call_is_async = true;
260-
auto cancel_token =
261-
client_->Flush([&](StreamLayerClient::FlushResponse response) {
260+
auto cancel_token = client_->Flush(
261+
model::FlushRequest(), [&](StreamLayerClient::FlushResponse response) {
262262
call_is_async = false;
263263
response_promise.set_value(response);
264264
});
@@ -280,8 +280,8 @@ TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataMultipleAsync) {
280280

281281
std::promise<StreamLayerClient::FlushResponse> response_promise;
282282
bool call_is_async = true;
283-
auto cancel_token =
284-
client_->Flush([&](StreamLayerClient::FlushResponse response) {
283+
auto cancel_token = client_->Flush(
284+
model::FlushRequest(), [&](StreamLayerClient::FlushResponse response) {
285285
call_is_async = false;
286286
response_promise.set_value(response);
287287
});
@@ -306,7 +306,7 @@ TEST_F(DataserviceWriteStreamLayerClientCacheTest, FlushDataCancel) {
306306

307307
ASSERT_FALSE(error) << error.get();
308308

309-
auto cancel_future = client_->Flush();
309+
auto cancel_future = client_->Flush(model::FlushRequest());
310310

311311
std::thread([cancel_future]() {
312312
std::this_thread::sleep_for(std::chrono::milliseconds(200));

tests/integration/olp-cpp-sdk-dataservice-write/StreamLayerClientCacheTest.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,13 @@ class StreamLayerClientCacheTest : public ::testing::Test {
226226
PublishDataRequest().WithData(data_).WithLayerId(GetTestLayer()));
227227
EXPECT_FALSE(error) << error.get();
228228
}
229-
auto response = client_->Flush().GetFuture().get();
229+
230+
const auto num_requests_to_flush =
231+
max_events_per_flush ? *max_events_per_flush : 0;
232+
auto flush_request = model::FlushRequest().WithNumberOfRequestsToFlush(
233+
num_requests_to_flush);
234+
235+
auto response = client_->Flush(std::move(flush_request)).GetFuture().get();
230236
if (!max_events_per_flush || *max_events_per_flush > 5) {
231237
EXPECT_EQ(5, response.size());
232238
} else if (*max_events_per_flush <= 0) {
@@ -302,7 +308,7 @@ TEST_F(StreamLayerClientCacheTest, FlushDataSingle) {
302308

303309
ASSERT_FALSE(error) << error.get();
304310

305-
auto response = client_->Flush().GetFuture().get();
311+
auto response = client_->Flush(model::FlushRequest()).GetFuture().get();
306312

307313
ASSERT_FALSE(response.empty());
308314
ASSERT_NO_FATAL_FAILURE(PublishDataSuccessAssertions(response[0]));
@@ -324,7 +330,7 @@ TEST_F(StreamLayerClientCacheTest, FlushDataMultiple) {
324330

325331
ASSERT_NO_FATAL_FAILURE(QueueMultipleEvents(5));
326332

327-
auto response = client_->Flush().GetFuture().get();
333+
auto response = client_->Flush(model::FlushRequest()).GetFuture().get();
328334

329335
ASSERT_EQ(5, response.size());
330336
for (auto& single_response : response) {
@@ -360,7 +366,7 @@ TEST_F(StreamLayerClientCacheTest, DISABLED_FlushDataCancel) {
360366

361367
ASSERT_FALSE(error) << error.get();
362368

363-
auto promise = client_->Flush();
369+
auto promise = client_->Flush(model::FlushRequest());
364370
wait_for_cancel->get_future().get();
365371
promise.GetCancellationToken().cancel();
366372
pause_for_cancel->set_value();
@@ -599,7 +605,6 @@ TEST_F(StreamLayerClientCacheTest, FlushDataMaxEventsDefaultSetting) {
599605
TEST_F(StreamLayerClientCacheTest, FlushDataMaxEventsValidCustomSetting) {
600606
const int max_events_per_flush = 3;
601607
disk_cache_->Close();
602-
flush_settings_.events_per_single_flush = max_events_per_flush;
603608
client_ = CreateStreamLayerClient();
604609
{
605610
testing::InSequence dummy;
@@ -621,7 +626,6 @@ TEST_F(StreamLayerClientCacheTest, FlushDataMaxEventsValidCustomSetting) {
621626
TEST_F(StreamLayerClientCacheTest, FlushDataMaxEventsInvalidCustomSetting) {
622627
const int max_events_per_flush = -3;
623628
disk_cache_->Close();
624-
flush_settings_.events_per_single_flush = max_events_per_flush;
625629
client_ = CreateStreamLayerClient();
626630
{
627631
testing::InSequence dummy;
@@ -693,7 +697,7 @@ TEST_F(StreamLayerClientCacheTest, FlushSettingsMaximumRequests) {
693697
}
694698

695699
QueueMultipleEvents(15);
696-
auto response = client_->Flush().GetFuture().get();
700+
auto response = client_->Flush(model::FlushRequest()).GetFuture().get();
697701

698702
ASSERT_EQ(15, response.size());
699703
for (auto& single_response : response) {

0 commit comments

Comments
 (0)