Skip to content

Commit be960e3

Browse files
Bohdan Kurylovychmykhailo-kuchma
authored andcommitted
Implemented the GetData with CancellableFuture in versioned client
Implemented the GetData method which returns CancellableFuture in the dataservice-read VersionedLayerClient class. Also, added integration tests. Relates to: OLPEDGE-958 Signed-off-by: Bohdan Kurylovych <[email protected]>
1 parent a8e6c69 commit be960e3

File tree

5 files changed

+163
-0
lines changed

5 files changed

+163
-0
lines changed

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VersionedLayerClient.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,21 @@ class DATASERVICE_READ_API VersionedLayerClient final {
120120
client::CancellationToken GetData(DataRequest data_request,
121121
DataResponseCallback callback);
122122

123+
/**
124+
* @brief Fetches data for a partition or data handle asynchronously. If the
125+
* specified partition or data handle cannot be found in the layer, the
126+
* callback will be invoked with an empty DataResponse (nullptr for result and
127+
* error). If neither Partition Id or Data Handle were set in the request, the
128+
* callback will be invoked with an error with ErrorCode::InvalidRequest.
129+
* @param data_request contains the complete set of request parameters.
130+
* \note \c DataRequest's GetLayerId value will be ignored and the parameter
131+
* from the constructor will be used instead.
132+
* @return \c CancellableFuture of type \c DataResponse, which when
133+
* complete will contain the data or an error. Alternatively, the
134+
* \c CancellableFuture can be used to cancel this request.
135+
*/
136+
client::CancellableFuture<DataResponse> GetData(DataRequest data_request);
137+
123138
/**
124139
* @brief Fetches a list of partitions for the given generic layer
125140
* asynchronously.

olp-cpp-sdk-dataservice-read/src/VersionedLayerClient.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ client::CancellationToken VersionedLayerClient::GetData(
3939
return impl_->GetData(std::move(data_request), std::move(callback));
4040
}
4141

42+
client::CancellableFuture<DataResponse> VersionedLayerClient::GetData(
43+
DataRequest data_request) {
44+
return impl_->GetData(std::move(data_request));
45+
}
46+
4247
client::CancellationToken VersionedLayerClient::GetPartitions(
4348
PartitionsRequest partitions_request, PartitionsResponseCallback callback) {
4449
return impl_->GetPartitions(std::move(partitions_request),

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,17 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
147147
}
148148
}
149149

150+
client::CancellableFuture<DataResponse> VersionedLayerClientImpl::GetData(
151+
DataRequest data_request) {
152+
auto promise = std::make_shared<std::promise<DataResponse>>();
153+
auto cancel_token =
154+
GetData(std::move(data_request), [promise](DataResponse response) {
155+
promise->set_value(std::move(response));
156+
});
157+
return client::CancellableFuture<DataResponse>(std::move(cancel_token),
158+
std::move(promise));
159+
}
160+
150161
client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
151162
PrefetchTilesRequest request, PrefetchTilesResponseCallback callback) {
152163
const int64_t request_key = pending_requests_->GenerateRequestPlaceholder();

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class VersionedLayerClientImpl {
5555
virtual client::CancellationToken GetData(DataRequest data_request,
5656
DataResponseCallback callback);
5757

58+
virtual client::CancellableFuture<DataResponse> GetData(
59+
DataRequest data_request);
60+
5861
virtual client::CancellationToken GetPartitions(
5962
PartitionsRequest partitions_request,
6063
PartitionsResponseCallback callback);

tests/integration/olp-cpp-sdk-dataservice-read/VersionedLayerClientTest.cpp

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,45 @@ TEST_F(DataserviceReadVersionedLayerClientTest, GetDataFromPartitionAsync) {
366366
ASSERT_NE(response.GetResult()->size(), 0u);
367367
}
368368

369+
TEST_F(DataserviceReadVersionedLayerClientTest,
370+
GetDataFromPartitionAsyncWithCancellableFuture) {
371+
EXPECT_CALL(*network_mock_, Send(_, _, _, _, _))
372+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
373+
olp::http::HttpStatusCode::OK),
374+
kHttpResponseLookupQuery))
375+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
376+
olp::http::HttpStatusCode::OK),
377+
kHttpResponsePartition_269))
378+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
379+
olp::http::HttpStatusCode::OK),
380+
kHttpResponseLookupBlob))
381+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
382+
olp::http::HttpStatusCode::OK),
383+
kHttpResponseBlobData_269));
384+
385+
auto catalog = olp::client::HRN::FromString(
386+
GetArgument("dataservice_read_test_catalog"));
387+
auto layer = GetArgument("dataservice_read_test_layer");
388+
auto version = std::stoi(GetArgument("dataservice_read_test_layer_version"));
389+
auto catalog_client =
390+
std::make_unique<olp::dataservice::read::VersionedLayerClient>(
391+
catalog, layer, *settings_);
392+
393+
auto partition = GetArgument("dataservice_read_test_partition");
394+
auto data_request = olp::dataservice::read::DataRequest()
395+
.WithVersion(version)
396+
.WithPartitionId(partition);
397+
auto cancellable_future = catalog_client->GetData(std::move(data_request));
398+
399+
auto raw_future = cancellable_future.GetFuture();
400+
ASSERT_NE(raw_future.wait_for(kWaitTimeout), std::future_status::timeout);
401+
DataResponse response = raw_future.get();
402+
403+
ASSERT_TRUE(response.IsSuccessful()) << response.GetError().GetMessage();
404+
ASSERT_NE(response.GetResult(), nullptr);
405+
ASSERT_NE(response.GetResult()->size(), 0u);
406+
}
407+
369408
TEST_F(DataserviceReadVersionedLayerClientTest, GetDataFromPartitionSync) {
370409
EXPECT_CALL(*network_mock_, Send(_, _, _, _, _))
371410
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
@@ -406,6 +445,49 @@ TEST_F(DataserviceReadVersionedLayerClientTest, GetDataFromPartitionSync) {
406445
ASSERT_NE(response.GetResult()->size(), 0u);
407446
}
408447

448+
TEST_F(DataserviceReadVersionedLayerClientTest,
449+
GetDataFromPartitionSyncWithCancellableFuture) {
450+
EXPECT_CALL(*network_mock_, Send(_, _, _, _, _))
451+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
452+
olp::http::HttpStatusCode::OK),
453+
kHttpResponseLookupQuery))
454+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
455+
olp::http::HttpStatusCode::OK),
456+
kHttpResponsePartition_269))
457+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
458+
olp::http::HttpStatusCode::OK),
459+
kHttpResponseLookupBlob))
460+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
461+
olp::http::HttpStatusCode::OK),
462+
kHttpResponseBlobData_269));
463+
464+
auto catalog = olp::client::HRN::FromString(
465+
GetArgument("dataservice_read_test_catalog"));
466+
auto layer = GetArgument("dataservice_read_test_layer");
467+
auto version = 0;
468+
469+
auto sync_settings = *settings_;
470+
sync_settings.task_scheduler.reset();
471+
auto catalog_client =
472+
std::make_unique<olp::dataservice::read::VersionedLayerClient>(
473+
catalog, layer, sync_settings);
474+
ASSERT_TRUE(catalog_client);
475+
476+
auto partition = GetArgument("dataservice_read_test_partition");
477+
auto data_request = olp::dataservice::read::DataRequest()
478+
.WithVersion(version)
479+
.WithPartitionId(partition);
480+
auto cancellable_future = catalog_client->GetData(std::move(data_request));
481+
482+
auto raw_future = cancellable_future.GetFuture();
483+
ASSERT_NE(raw_future.wait_for(kWaitTimeout), std::future_status::timeout);
484+
DataResponse response = raw_future.get();
485+
486+
ASSERT_TRUE(response.IsSuccessful()) << response.GetError().GetMessage();
487+
ASSERT_NE(response.GetResult(), nullptr);
488+
ASSERT_NE(response.GetResult()->size(), 0u);
489+
}
490+
409491
TEST_F(DataserviceReadVersionedLayerClientTest,
410492
GetDataFromPartitionSyncLatestVersionOk) {
411493
EXPECT_CALL(*network_mock_, Send(_, _, _, _, _))
@@ -617,6 +699,53 @@ TEST_F(DataserviceReadVersionedLayerClientTest,
617699
ASSERT_TRUE(response.GetResult() == nullptr);
618700
}
619701

702+
TEST_F(DataserviceReadVersionedLayerClientTest,
703+
GetDataFromPartitionCancelLookupWithCancellableFuture) {
704+
auto wait_for_cancel = std::make_shared<std::promise<void>>();
705+
auto pause_for_cancel = std::make_shared<std::promise<void>>();
706+
707+
olp::http::RequestId request_id;
708+
NetworkCallback send_mock;
709+
CancelCallback cancel_mock;
710+
std::tie(request_id, send_mock, cancel_mock) = GenerateNetworkMockActions(
711+
wait_for_cancel, pause_for_cancel,
712+
{olp::http::HttpStatusCode::OK, kHttpResponseLookupQuery});
713+
714+
EXPECT_CALL(*network_mock_, Send(_, _, _, _, _))
715+
.WillOnce(testing::Invoke(std::move(send_mock)));
716+
717+
EXPECT_CALL(*network_mock_, Cancel(_))
718+
.WillOnce(testing::Invoke(std::move(cancel_mock)));
719+
720+
auto catalog = olp::client::HRN::FromString(
721+
GetArgument("dataservice_read_test_catalog"));
722+
auto layer = GetArgument("dataservice_read_test_layer");
723+
auto version = std::stoi(GetArgument("dataservice_read_test_layer_version"));
724+
725+
auto catalog_client =
726+
std::make_unique<olp::dataservice::read::VersionedLayerClient>(
727+
catalog, layer, *settings_);
728+
ASSERT_TRUE(catalog_client);
729+
730+
auto partition = GetArgument("dataservice_read_test_partition");
731+
auto data_request = olp::dataservice::read::DataRequest()
732+
.WithVersion(version)
733+
.WithPartitionId(partition);
734+
735+
auto cancellable_future = catalog_client->GetData(std::move(data_request));
736+
737+
wait_for_cancel->get_future().get();
738+
cancellable_future.GetCancellationToken().cancel();
739+
pause_for_cancel->set_value();
740+
741+
auto raw_future = cancellable_future.GetFuture();
742+
ASSERT_NE(raw_future.wait_for(kWaitTimeout), std::future_status::timeout);
743+
DataResponse response = raw_future.get();
744+
745+
ASSERT_FALSE(response.IsSuccessful()) << response.GetError().GetMessage();
746+
ASSERT_TRUE(response.GetResult() == nullptr);
747+
}
748+
620749
TEST_F(DataserviceReadVersionedLayerClientTest,
621750
GetDataFromPartitionCancelPartition) {
622751
auto wait_for_cancel = std::make_shared<std::promise<void>>();

0 commit comments

Comments
 (0)