Skip to content

Commit e21e50d

Browse files
Add CancellableFuture methods in VolatileLayerClient
Add GetData and GetPartitions methods to VolatileLayerClient that return CancellableFuture. Add tests. Resolves: OLPEDGE-959 Signed-off-by: Diachenko Mykahilo <[email protected]>
1 parent 927c3e3 commit e21e50d

File tree

6 files changed

+214
-3
lines changed

6 files changed

+214
-3
lines changed

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VolatileLayerClient.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,43 @@ class DATASERVICE_READ_API VolatileLayerClient final {
111111
client::CancellationToken GetPartitions(
112112
PartitionsRequest request, Callback<PartitionsResponse> callback);
113113

114+
/**
115+
* @brief Fetches a list partitions for given volatile layer asynchronously.
116+
* @param request contains the complete set of request parameters.
117+
* @return CancellableFuture, which when complete will contain the
118+
* PartitionsResponse or an error. Alternatively, the CancellableFuture can be
119+
* used to cancel this request.
120+
*/
121+
olp::client::CancellableFuture<PartitionsResponse> GetPartitions(
122+
PartitionsRequest request);
123+
114124
/**
115125
* @brief GetData fetches data for a partition or data handle asynchronously.
116126
* If the specified partition or data handle cannot be found in the layer, the
117127
* callback is invoked with an empty DataResponse (a nullptr result and
118128
* error). If Partition Id or Data Handle are not set in the request, the
119129
* callback is invoked with the error ErrorCode::InvalidRequest.
120-
* @param request contains a complete set of request parameters.
130+
* @param request Contains a complete set of request parameters.
121131
* @param callback is invoked once the DataResult is available, or an
122132
* error is encountered.
123-
* @return A token that can be used to cancel this request.
133+
* @return Token that can be used to cancel this request.
124134
*/
125135
olp::client::CancellationToken GetData(DataRequest request,
126136
Callback<DataResponse> callback);
127137

138+
/**
139+
* @brief Fetches data for a partition or data handle asynchronously.
140+
* If the specified partition or data handle cannot be found in the layer, the
141+
* callback is invoked with an empty DataResponse (a nullptr result and
142+
* error). If Partition Id or Data Handle are not set in the request, the
143+
* callback is invoked with the error ErrorCode::InvalidRequest.
144+
* @param request contains a complete set of request parameters.
145+
* @return CancellableFuture, which when complete will contain the
146+
* PartitionsResponse or an error. Alternatively, the CancellableFuture can be
147+
* used to cancel this request.
148+
*/
149+
olp::client::CancellableFuture<DataResponse> GetData(DataRequest request);
150+
128151
private:
129152
std::unique_ptr<VolatileLayerClientImpl> impl_;
130153
};

olp-cpp-sdk-dataservice-read/src/VolatileLayerClient.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,20 @@ client::CancellationToken VolatileLayerClient::GetPartitions(
3939
return impl_->GetPartitions(std::move(request), std::move(callback));
4040
}
4141

42+
olp::client::CancellableFuture<VolatileLayerClient::PartitionsResponse>
43+
VolatileLayerClient::GetPartitions(PartitionsRequest request) {
44+
return impl_->GetPartitions(std::move(request));
45+
}
46+
4247
client::CancellationToken VolatileLayerClient::GetData(
4348
DataRequest request, Callback<DataResponse> callback) {
4449
return impl_->GetData(std::move(request), std::move(callback));
4550
}
51+
52+
olp::client::CancellableFuture<VolatileLayerClient::DataResponse>
53+
VolatileLayerClient::GetData(DataRequest request) {
54+
return impl_->GetData(std::move(request));
55+
}
4656
} // namespace read
4757
} // namespace dataservice
4858
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <olp/core/cache/DefaultCache.h>
2323
#include <olp/core/client/CancellationContext.h>
2424
#include <olp/core/client/OlpClientSettingsFactory.h>
25-
#include <olp/dataservice/read/PartitionsRequest.h>
2625

2726
#include "TaskContext.h"
2827
#include "repositories/ApiRepository.h"
@@ -100,6 +99,16 @@ client::CancellationToken VolatileLayerClientImpl::GetPartitions(
10099
return token;
101100
}
102101

102+
client::CancellableFuture<PartitionsResponse>
103+
VolatileLayerClientImpl::GetPartitions(PartitionsRequest request) {
104+
auto promise = std::make_shared<std::promise<PartitionsResponse> >();
105+
auto callback = [=](PartitionsResponse resp) {
106+
promise->set_value(std::move(resp));
107+
};
108+
auto token = GetPartitions(std::move(request), std::move(callback));
109+
return olp::client::CancellableFuture<PartitionsResponse>(token, promise);
110+
}
111+
103112
client::CancellationToken VolatileLayerClientImpl::GetData(
104113
DataRequest request, Callback<DataResponse> callback) {
105114
auto add_task = [&](DataRequest& request, Callback<DataResponse> callback) {
@@ -141,6 +150,16 @@ client::CancellationToken VolatileLayerClientImpl::GetData(
141150
}
142151
}
143152

153+
client::CancellableFuture<DataResponse> VolatileLayerClientImpl::GetData(
154+
DataRequest request) {
155+
auto promise = std::make_shared<std::promise<DataResponse> >();
156+
auto callback = [=](DataResponse resp) {
157+
promise->set_value(std::move(resp));
158+
};
159+
auto token = GetData(std::move(request), std::move(callback));
160+
return olp::client::CancellableFuture<DataResponse>(token, promise);
161+
}
162+
144163
} // namespace read
145164
} // namespace dataservice
146165
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,14 @@ class VolatileLayerClientImpl {
6363
client::CancellationToken GetPartitions(
6464
PartitionsRequest request, Callback<PartitionsResponse> callback);
6565

66+
client::CancellableFuture<PartitionsResponse> GetPartitions(
67+
PartitionsRequest request);
68+
6669
client::CancellationToken GetData(DataRequest request,
6770
Callback<DataResponse> callback);
6871

72+
client::CancellableFuture<DataResponse> GetData(DataRequest request);
73+
6974
private:
7075
client::HRN catalog_;
7176
std::string layer_id_;

olp-cpp-sdk-dataservice-read/tests/VolatileLayerClientImplTest.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,90 @@ TEST(VolatileLayerClientImplTest, GetData) {
168168
}
169169
}
170170

171+
TEST(VolatileLayerClientImplTest, GetDataCancellableFuture) {
172+
std::shared_ptr<NetworkMock> network_mock = std::make_shared<NetworkMock>();
173+
std::shared_ptr<CacheMock> cache_mock = std::make_shared<CacheMock>();
174+
olp::client::OlpClientSettings settings;
175+
settings.network_request_handler = network_mock;
176+
settings.cache = cache_mock;
177+
178+
VolatileLayerClientImpl client(kHRN, kLayerId, settings);
179+
180+
{
181+
SCOPED_TRACE("Get Data with DataHandle");
182+
183+
SetupNetworkExpectation(*network_mock, kUrlLookupVolatileBlob,
184+
kHttpResponseLookupVolatileBlob,
185+
olp::http::HttpStatusCode::OK);
186+
187+
SetupNetworkExpectation(*network_mock, kUrlVolatileBlobData, "someData",
188+
olp::http::HttpStatusCode::OK);
189+
190+
auto future = client.GetData(DataRequest().WithDataHandle(kBlobDataHandle))
191+
.GetFuture();
192+
193+
EXPECT_EQ(future.wait_for(kTimeout), std::future_status::ready);
194+
195+
const auto& response = future.get();
196+
ASSERT_TRUE(response.IsSuccessful());
197+
198+
Mock::VerifyAndClearExpectations(network_mock.get());
199+
}
200+
201+
{
202+
SCOPED_TRACE("Get Data with PartitionId");
203+
204+
SetupNetworkExpectation(*network_mock, kUrlLookupQuery,
205+
kHttpResponseLookupQuery,
206+
olp::http::HttpStatusCode::OK);
207+
208+
SetupNetworkExpectation(*network_mock, kUrlQueryPartition269,
209+
kHttpResponsePartition269,
210+
olp::http::HttpStatusCode::OK);
211+
212+
SetupNetworkExpectation(*network_mock, kUrlLookupVolatileBlob,
213+
kHttpResponseLookupVolatileBlob,
214+
olp::http::HttpStatusCode::OK);
215+
216+
SetupNetworkExpectation(*network_mock, kUrlVolatileBlobData, "someData",
217+
olp::http::HttpStatusCode::OK);
218+
219+
auto future =
220+
client.GetData(DataRequest().WithPartitionId(kPartitionId)).GetFuture();
221+
222+
EXPECT_EQ(future.wait_for(kTimeout), std::future_status::ready);
223+
224+
const auto& response = future.get();
225+
ASSERT_TRUE(response.IsSuccessful());
226+
227+
Mock::VerifyAndClearExpectations(network_mock.get());
228+
}
229+
230+
{
231+
SCOPED_TRACE("Get Data from non existent partition");
232+
233+
SetupNetworkExpectation(*network_mock, kUrlLookupQuery,
234+
kHttpResponseLookupQuery,
235+
olp::http::HttpStatusCode::OK);
236+
237+
SetupNetworkExpectation(*network_mock, kUrlQueryPartition269,
238+
kHttpResponseNoPartition,
239+
olp::http::HttpStatusCode::OK);
240+
241+
auto future =
242+
client.GetData(DataRequest().WithPartitionId(kPartitionId)).GetFuture();
243+
244+
EXPECT_EQ(future.wait_for(kTimeout), std::future_status::ready);
245+
246+
const auto& response = future.get();
247+
ASSERT_FALSE(response.IsSuccessful());
248+
EXPECT_EQ(response.GetError().GetErrorCode(),
249+
olp::client::ErrorCode::NotFound);
250+
251+
Mock::VerifyAndClearExpectations(network_mock.get());
252+
}
253+
}
254+
171255
TEST(VolatileLayerClientImplTest, GetDataCancelOnClientDestroy) {
172256
std::shared_ptr<NetworkMock> network_mock = std::make_shared<NetworkMock>();
173257
std::shared_ptr<CacheMock> cache_mock = std::make_shared<CacheMock>();
@@ -201,4 +285,28 @@ TEST(VolatileLayerClientImplTest, GetDataCancelOnClientDestroy) {
201285
}
202286
}
203287

288+
TEST(VolatileLayerClientImplTest, GetDataCancellableFutureCancel) {
289+
std::shared_ptr<NetworkMock> network_mock = std::make_shared<NetworkMock>();
290+
std::shared_ptr<CacheMock> cache_mock = std::make_shared<CacheMock>();
291+
olp::client::OlpClientSettings settings;
292+
settings.network_request_handler = network_mock;
293+
settings.cache = cache_mock;
294+
settings.task_scheduler =
295+
olp::client::OlpClientSettingsFactory::CreateDefaultTaskScheduler(1);
296+
VolatileLayerClientImpl client(kHRN, kLayerId, std::move(settings));
297+
298+
auto cancellable =
299+
client.GetData(DataRequest().WithPartitionId(kPartitionId));
300+
301+
auto data_future = cancellable.GetFuture();
302+
cancellable.GetCancellationToken().cancel();
303+
ASSERT_EQ(data_future.wait_for(kTimeout), std::future_status::ready);
304+
305+
auto data_response = data_future.get();
306+
307+
// Callback must be called during client destructor.
308+
EXPECT_FALSE(data_response.IsSuccessful());
309+
EXPECT_EQ(data_response.GetError().GetErrorCode(),
310+
olp::client::ErrorCode::Cancelled);
311+
}
204312
} // namespace

tests/integration/olp-cpp-sdk-dataservice-read/VolatileLayerClientTest.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,52 @@ TEST_F(DataserviceReadVolatileLayerClientTest, GetPartitions) {
295295
ASSERT_EQ(4u, partitions_response.GetResult().GetPartitions().size());
296296
}
297297

298+
TEST_F(DataserviceReadVolatileLayerClientTest, GetPartitionsCancellableFuture) {
299+
olp::client::HRN hrn(GetTestCatalog());
300+
301+
EXPECT_CALL(*network_mock_, Send(IsGetRequest(URL_CONFIG), _, _, _, _))
302+
.Times(1);
303+
304+
olp::dataservice::read::VolatileLayerClient client(hrn, "testlayer",
305+
settings_);
306+
307+
auto request = olp::dataservice::read::PartitionsRequest();
308+
auto cancellable = client.GetPartitions(request);
309+
auto future = cancellable.GetFuture();
310+
311+
ASSERT_EQ(std::future_status::ready, future.wait_for(kTimeout));
312+
313+
auto response = future.get();
314+
ASSERT_TRUE(response.IsSuccessful()) << ApiErrorToString(response.GetError());
315+
ASSERT_EQ(4u, response.GetResult().GetPartitions().size());
316+
}
317+
318+
TEST_F(DataserviceReadVolatileLayerClientTest,
319+
GetPartitionsCancellableFutureCancellation) {
320+
olp::client::HRN hrn(GetTestCatalog());
321+
322+
settings_.task_scheduler =
323+
olp::client::OlpClientSettingsFactory::CreateDefaultTaskScheduler(1);
324+
// Simulate a loaded queue
325+
settings_.task_scheduler->ScheduleTask(
326+
[]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
327+
328+
olp::dataservice::read::VolatileLayerClient client(hrn, "testlayer",
329+
settings_);
330+
331+
auto request = olp::dataservice::read::PartitionsRequest();
332+
auto cancellable = client.GetPartitions(request);
333+
auto future = cancellable.GetFuture();
334+
335+
cancellable.GetCancellationToken().cancel();
336+
ASSERT_EQ(std::future_status::ready, future.wait_for(kTimeout));
337+
338+
auto response = future.get();
339+
EXPECT_FALSE(response.IsSuccessful());
340+
EXPECT_EQ(response.GetError().GetErrorCode(),
341+
olp::client::ErrorCode::Cancelled);
342+
}
343+
298344
TEST_F(DataserviceReadVolatileLayerClientTest, GetEmptyPartitions) {
299345
olp::client::HRN hrn(GetTestCatalog());
300346

0 commit comments

Comments
 (0)