Skip to content

Commit 962f738

Browse files
committed
Implemented VolatileLayerClient::GetPartitions() method
Added functional tests for VolatileLayerClient::GetPartitions() Added integration tests for VolatileLayerClient::GetPartitions() Resolves: OLPEDGE-792 Signed-off-by: Serhii Lozynskyi <[email protected]>
1 parent e39ffbe commit 962f738

File tree

9 files changed

+1159
-11
lines changed

9 files changed

+1159
-11
lines changed

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VolatileLayerClient.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
#include <olp/core/client/HRN.h>
2828
#include <olp/core/client/OlpClientSettings.h>
2929
#include <olp/dataservice/read/DataRequest.h>
30+
#include <olp/dataservice/read/PartitionsRequest.h>
3031
#include <olp/dataservice/read/model/Data.h>
32+
#include <olp/dataservice/read/model/Partitions.h>
3133

3234
namespace olp {
3335
namespace dataservice {
@@ -77,6 +79,11 @@ class DATASERVICE_READ_API VolatileLayerClient final {
7779
using DataResult = model::Data;
7880
/// CallbackResponse alias
7981
using DataResponse = client::ApiResponse<DataResult, client::ApiError>;
82+
/// PartitionResult alias
83+
using PartitionsResult = model::Partitions;
84+
/// CallbackResponse
85+
using PartitionsResponse =
86+
client::ApiResponse<PartitionsResult, client::ApiError>;
8087
/// Callback alias
8188
template <class Response>
8289
using Callback = std::function<void(Response response)>;
@@ -94,6 +101,16 @@ class DATASERVICE_READ_API VolatileLayerClient final {
94101

95102
~VolatileLayerClient();
96103

104+
/**
105+
* @brief Fetches a list partitions for given volatile layer asynchronously.
106+
* @param request contains the complete set of request parameters.
107+
* @param callback will be invoked once the list of partitions is available,
108+
* or an error is encountered.
109+
* @return A token that can be used to cancel this request
110+
*/
111+
client::CancellationToken GetPartitions(
112+
PartitionsRequest request, Callback<PartitionsResponse> callback);
113+
97114
/**
98115
* @brief GetData fetches data for a partition or data handle asynchronously.
99116
* If the specified partition or data handle cannot be found in the layer, the

olp-cpp-sdk-dataservice-read/src/VolatileLayerClient.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
* License-Filename: LICENSE
1818
*/
1919

20+
#include <olp/core/porting/make_unique.h>
2021
#include <olp/dataservice/read/VolatileLayerClient.h>
2122

22-
#include <olp/core/porting/make_unique.h>
2323
#include "VolatileLayerClientImpl.h"
2424

2525
namespace olp {
@@ -34,6 +34,11 @@ VolatileLayerClient::VolatileLayerClient(client::HRN catalog,
3434

3535
VolatileLayerClient::~VolatileLayerClient() = default;
3636

37+
client::CancellationToken VolatileLayerClient::GetPartitions(
38+
PartitionsRequest request, Callback<PartitionsResponse> callback) {
39+
return impl_->GetPartitions(std::move(request), std::move(callback));
40+
}
41+
3742
client::CancellationToken VolatileLayerClient::GetData(
3843
DataRequest request, Callback<DataResponse> callback) {
3944
return impl_->GetData(std::move(request), std::move(callback));

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
#include <olp/core/client/CancellationContext.h>
2424
#include <olp/core/client/OlpClientSettingsFactory.h>
2525
#include <olp/dataservice/read/PartitionsRequest.h>
26+
27+
#include "TaskContext.h"
28+
#include "repositories/ApiRepository.h"
29+
#include "repositories/CatalogRepository.h"
2630
#include "repositories/DataRepository.h"
2731
#include "repositories/ExecuteOrSchedule.inl"
2832
#include "repositories/PartitionsRepository.h"
2933

30-
#include "TaskContext.h"
31-
3234
namespace olp {
3335
namespace dataservice {
3436
namespace read {
@@ -42,26 +44,68 @@ VolatileLayerClientImpl::VolatileLayerClientImpl(
4244
client::OlpClientSettings settings)
4345
: catalog_(std::move(catalog)),
4446
layer_id_(std::move(layer_id)),
45-
settings_(std::move(settings)),
47+
settings_(
48+
std::make_shared<client::OlpClientSettings>(std::move(settings))),
4649
pending_requests_(std::make_shared<PendingRequests>()) {
47-
if (!settings_.cache) {
48-
settings_.cache = client::OlpClientSettingsFactory::CreateDefaultCache({});
50+
if (!settings_->cache) {
51+
settings_->cache = client::OlpClientSettingsFactory::CreateDefaultCache({});
4952
}
5053
// to avoid capturing task scheduler inside a task, we need a copy of settings
5154
// without the scheduler
52-
task_scheduler_ = std::move(settings_.task_scheduler);
55+
task_scheduler_ = std::move(settings_->task_scheduler);
56+
57+
auto cache = settings_->cache;
58+
59+
auto api_repo =
60+
std::make_shared<repository::ApiRepository>(catalog_, settings_, cache);
61+
62+
auto catalog_repo = std::make_shared<repository::CatalogRepository>(
63+
catalog_, api_repo, cache);
64+
65+
partition_repo_ = std::make_shared<repository::PartitionsRepository>(
66+
catalog_, api_repo, catalog_repo, cache);
5367
}
5468

5569
VolatileLayerClientImpl::~VolatileLayerClientImpl() {
5670
pending_requests_->CancelPendingRequests();
5771
}
5872

73+
client::CancellationToken VolatileLayerClientImpl::GetPartitions(
74+
PartitionsRequest request, Callback<PartitionsResponse> callback) {
75+
request.WithLayerId(layer_id_);
76+
client::CancellationToken token;
77+
int64_t request_key = pending_requests_->GenerateRequestPlaceholder();
78+
auto pending_requests = pending_requests_;
79+
auto request_callback = [pending_requests, request_key,
80+
callback](PartitionsResponse response) {
81+
if (pending_requests->Remove(request_key)) {
82+
callback(response);
83+
}
84+
};
85+
if (CacheWithUpdate == request.GetFetchOption()) {
86+
token = partition_repo_->GetPartitions(request.WithFetchOption(CacheOnly),
87+
request_callback);
88+
auto onlineKey = pending_requests_->GenerateRequestPlaceholder();
89+
pending_requests_->Insert(
90+
partition_repo_->GetPartitions(
91+
request.WithFetchOption(OnlineIfNotFound),
92+
[pending_requests, onlineKey](PartitionsResponse) {
93+
pending_requests->Remove(onlineKey);
94+
}),
95+
onlineKey);
96+
} else {
97+
token = partition_repo_->GetPartitions(request, request_callback);
98+
}
99+
pending_requests_->Insert(token, request_key);
100+
return token;
101+
}
102+
59103
client::CancellationToken VolatileLayerClientImpl::GetData(
60104
DataRequest request, Callback<DataResponse> callback) {
61105
auto add_task = [&](DataRequest& request, Callback<DataResponse> callback) {
62106
auto catalog = catalog_;
63107
auto layer_id = layer_id_;
64-
auto settings = settings_;
108+
auto settings = *settings_;
65109
auto pending_requests = pending_requests_;
66110

67111
auto data_task = [=](client::CancellationContext context) {

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.h

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,38 @@
1919

2020
#pragma once
2121

22-
#include <memory>
23-
2422
#include <olp/core/client/ApiError.h>
2523
#include <olp/core/client/ApiResponse.h>
2624
#include <olp/core/client/CancellationToken.h>
2725
#include <olp/core/client/HRN.h>
2826
#include <olp/core/client/OlpClientSettings.h>
2927
#include <olp/dataservice/read/DataRequest.h>
28+
#include <olp/dataservice/read/PartitionsRequest.h>
3029
#include <olp/dataservice/read/model/Data.h>
30+
#include <olp/dataservice/read/model/Partitions.h>
31+
3132
#include "PendingRequests.h"
3233

3334
namespace olp {
3435
namespace dataservice {
3536
namespace read {
3637

38+
namespace repository {
39+
class CatalogRepository;
40+
class PartitionsRepository;
41+
} // namespace repository
42+
3743
class VolatileLayerClientImpl {
3844
public:
3945
/// DataResult alias
4046
using DataResult = model::Data;
4147
/// CallbackResponse alias
4248
using DataResponse = client::ApiResponse<DataResult, client::ApiError>;
49+
/// PartitionResult alias
50+
using PartitionsResult = model::Partitions;
51+
/// CallbackResponse
52+
using PartitionsResponse =
53+
client::ApiResponse<PartitionsResult, client::ApiError>;
4354
/// Callback alias
4455
template <class Response>
4556
using Callback = std::function<void(Response response)>;
@@ -49,15 +60,19 @@ class VolatileLayerClientImpl {
4960

5061
~VolatileLayerClientImpl();
5162

63+
client::CancellationToken GetPartitions(
64+
PartitionsRequest request, Callback<PartitionsResponse> callback);
65+
5266
client::CancellationToken GetData(DataRequest request,
5367
Callback<DataResponse> callback);
5468

5569
private:
5670
client::HRN catalog_;
5771
std::string layer_id_;
58-
client::OlpClientSettings settings_;
72+
std::shared_ptr<client::OlpClientSettings> settings_;
5973
std::shared_ptr<thread::TaskScheduler> task_scheduler_;
6074
std::shared_ptr<PendingRequests> pending_requests_;
75+
std::shared_ptr<repository::PartitionsRepository> partition_repo_;
6176
};
6277

6378
} // namespace read

scripts/linux/fv/olp-cpp-sdk-functional-test.variables

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ export dataservice_read_test_versioned_prefetch_layer="hype-test-prefetch"
1717
export dataservice_read_test_versioned_prefetch_tile="5904591"
1818
export dataservice_read_test_versioned_prefetch_subpartition1="23618365"
1919
export dataservice_read_test_versioned_prefetch_subpartition2="1476147"
20+
21+
export dataservice_read_volatile_layer="testlayer"

tests/functional/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(OLP_SDK_FUNCTIONAL_TESTS_SOURCES
2323
./olp-cpp-sdk-dataservice-read/ApiTest.cpp
2424
./olp-cpp-sdk-dataservice-read/CatalogClientTest.cpp
2525
./olp-cpp-sdk-dataservice-read/DataserviceReadVersionedLayerClientTest.cpp
26+
./olp-cpp-sdk-dataservice-read/DataserviceReadVolatileLayerClientTest.cpp
2627
./olp-cpp-sdk-dataservice-write/DataserviceWriteIndexLayerClientTest.cpp
2728
./olp-cpp-sdk-dataservice-write/DataserviceWriteStreamLayerClientCacheTest.cpp
2829
./olp-cpp-sdk-dataservice-write/DataserviceWriteStreamLayerClientTest.cpp

0 commit comments

Comments
 (0)