Skip to content

Commit 3fd8e5d

Browse files
sergii-vostrikovmykhailo-kuchma
authored andcommitted
GetPartitions for VersionedLayerClient
The method introduced. Currently it has the same implementation as CatalogClient, but defines new interface. Test for VersionedLayerClient moved to integration test subsystem. Resolves: OLPEDGE-793 Signed-off-by: Sergii Vostrikov <[email protected]>
1 parent b0e746b commit 3fd8e5d

File tree

9 files changed

+1548
-595
lines changed

9 files changed

+1548
-595
lines changed

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VersionedLayerClient.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
#include <olp/core/client/OlpClientSettings.h>
2929
#include <olp/dataservice/read/DataRequest.h>
3030
#include <olp/dataservice/read/DataServiceReadApi.h>
31+
#include <olp/dataservice/read/PartitionsRequest.h>
3132
#include <olp/dataservice/read/model/Data.h>
33+
#include <olp/dataservice/read/model/Partitions.h>
3234

3335
namespace olp {
3436
namespace dataservice {
@@ -47,6 +49,10 @@ class DATASERVICE_READ_API VersionedLayerClient final {
4749
/// Callback alias
4850
using Callback = std::function<void(CallbackResponse response)>;
4951

52+
using PartitionsResult = model::Partitions;
53+
using PartitionsResponse =
54+
client::ApiResponse<PartitionsResult, client::ApiError>;
55+
using PartitionsCallback = std::function<void(PartitionsResponse response)>;
5056
/**
5157
* @brief VersionedLayerClient constructor
5258
* @param catalog this versioned layer client uses during requests.
@@ -75,6 +81,18 @@ class DATASERVICE_READ_API VersionedLayerClient final {
7581
olp::client::CancellationToken GetData(DataRequest data_request,
7682
Callback callback);
7783

84+
/**
85+
* @brief fetches a list partitions for given generic layer asynchronously.
86+
* @param request contains the complete set of request parameters.
87+
* \note \c PartitionsRequest's GetLayerId value will be ignored and the
88+
* parameter from the constructor will be used instead.
89+
* @param callback will be invoked once the list of partitions is available,
90+
* or an error is encountered.
91+
* @return A token that can be used to cancel this request
92+
*/
93+
client::CancellationToken GetPartitions(PartitionsRequest partitions_request,
94+
PartitionsCallback callback) const;
95+
7896
private:
7997
std::unique_ptr<VersionedLayerClientImpl> impl_;
8098
};

olp-cpp-sdk-dataservice-read/src/VersionedLayerClient.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ olp::client::CancellationToken VersionedLayerClient::GetData(
4040
return impl_->GetData(std::move(data_request), std::move(callback));
4141
}
4242

43+
client::CancellationToken VersionedLayerClient::GetPartitions(
44+
PartitionsRequest partitions_request, PartitionsCallback callback) const {
45+
return impl_->GetPartitions(std::move(partitions_request),
46+
std::move(callback));
47+
}
48+
4349
} // namespace read
4450
} // namespace dataservice
4551
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <olp/core/context/Context.h>
2525
#include <olp/core/thread/TaskScheduler.h>
2626
#include "TaskContext.h"
27+
#include "repositories/ApiRepository.h"
2728
#include "repositories/CatalogRepository.h"
2829
#include "repositories/DataRepository.h"
2930
#include "repositories/ExecuteOrSchedule.inl"
@@ -38,21 +39,62 @@ VersionedLayerClientImpl::VersionedLayerClientImpl(
3839
olp::client::OlpClientSettings client_settings)
3940
: catalog_(std::move(catalog)),
4041
layer_id_(std::move(layer_id)),
41-
settings_(std::move(client_settings)),
42+
settings_(std::make_shared<olp::client::OlpClientSettings>(
43+
std::move(client_settings))),
4244
pending_requests_(std::make_shared<PendingRequests>()) {
43-
if (!settings_.cache) {
44-
settings_.cache =
45+
if (!settings_->cache) {
46+
settings_->cache =
4547
olp::client::OlpClientSettingsFactory::CreateDefaultCache({});
4648
}
4749
// to avoid capturing task scheduler inside a task, we need a copy of settings
4850
// without the scheduler
49-
task_scheduler_ = std::move(settings_.task_scheduler);
51+
task_scheduler_ = std::move(settings_->task_scheduler);
52+
53+
auto api_repo = std::make_shared<repository::ApiRepository>(
54+
catalog_, settings_, settings_->cache);
55+
56+
auto catalog_repo = std::make_shared<repository::CatalogRepository>(
57+
catalog_, api_repo, settings_->cache);
58+
59+
partition_repo_ = std::make_shared<repository::PartitionsRepository>(
60+
catalog_, api_repo, catalog_repo, settings_->cache);
5061
}
5162

5263
VersionedLayerClientImpl::~VersionedLayerClientImpl() {
5364
pending_requests_->CancelPendingRequests();
5465
}
5566

67+
client::CancellationToken VersionedLayerClientImpl::GetPartitions(
68+
PartitionsRequest partitions_request, PartitionsCallback callback) const {
69+
partitions_request.WithLayerId(layer_id_);
70+
olp::client::CancellationToken token;
71+
int64_t request_key = pending_requests_->GenerateRequestPlaceholder();
72+
auto pending_requests = pending_requests_;
73+
auto request_callback = [pending_requests, request_key,
74+
callback](PartitionsResponse response) {
75+
if (pending_requests->Remove(request_key)) {
76+
callback(response);
77+
}
78+
};
79+
if (CacheWithUpdate == partitions_request.GetFetchOption()) {
80+
token = partition_repo_->GetPartitions(
81+
partitions_request.WithFetchOption(CacheOnly), request_callback);
82+
auto onlineKey = pending_requests_->GenerateRequestPlaceholder();
83+
pending_requests_->Insert(
84+
partition_repo_->GetPartitions(
85+
partitions_request.WithFetchOption(OnlineIfNotFound),
86+
[pending_requests, onlineKey](PartitionsResponse) {
87+
pending_requests->Remove(onlineKey);
88+
}),
89+
onlineKey);
90+
} else {
91+
token =
92+
partition_repo_->GetPartitions(partitions_request, request_callback);
93+
}
94+
pending_requests_->Insert(token, request_key);
95+
return token;
96+
}
97+
5698
olp::client::CancellationToken VersionedLayerClientImpl::GetData(
5799
DataRequest data_request, Callback callback) const {
58100
auto fetch_option = data_request.GetFetchOption();
@@ -74,7 +116,7 @@ client::CancellationToken VersionedLayerClientImpl::AddGetDataTask(
74116
DataRequest request, Callback callback) const {
75117
auto catalog = catalog_;
76118
auto layer_id = layer_id_;
77-
auto settings = settings_;
119+
auto settings = *settings_;
78120
auto pending_requests = pending_requests_;
79121

80122
auto data_task = [=](client::CancellationContext context) {

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,19 @@
2929
#include <olp/core/client/HRN.h>
3030
#include <olp/core/client/OlpClientSettings.h>
3131
#include <olp/dataservice/read/DataRequest.h>
32+
#include <olp/dataservice/read/PartitionsRequest.h>
3233
#include <olp/dataservice/read/model/Data.h>
34+
#include <olp/dataservice/read/model/Partitions.h>
3335

3436
namespace olp {
3537
namespace thread {
3638
class TaskScheduler;
3739
}
38-
3940
namespace dataservice {
4041
namespace read {
42+
namespace repository {
43+
class PartitionsRepository;
44+
}
4145

4246
class VersionedLayerClientImpl {
4347
public:
@@ -48,6 +52,11 @@ class VersionedLayerClientImpl {
4852
/// Callback alias
4953
using Callback = std::function<void(CallbackResponse response)>;
5054

55+
using PartitionsResult = model::Partitions;
56+
using PartitionsResponse =
57+
client::ApiResponse<PartitionsResult, client::ApiError>;
58+
using PartitionsCallback = std::function<void(PartitionsResponse response)>;
59+
5160
VersionedLayerClientImpl(olp::client::HRN catalog, std::string layer_id,
5261
olp::client::OlpClientSettings client_settings);
5362

@@ -56,16 +65,20 @@ class VersionedLayerClientImpl {
5665
virtual olp::client::CancellationToken GetData(DataRequest data_request,
5766
Callback callback) const;
5867

68+
virtual client::CancellationToken GetPartitions(
69+
PartitionsRequest partitions_request, PartitionsCallback callback) const;
70+
5971
private:
6072
olp::client::CancellationToken AddGetDataTask(DataRequest data_request,
6173
Callback callback) const;
6274

6375
protected:
6476
olp::client::HRN catalog_;
6577
std::string layer_id_;
66-
olp::client::OlpClientSettings settings_;
78+
std::shared_ptr<olp::client::OlpClientSettings> settings_;
6779
std::shared_ptr<thread::TaskScheduler> task_scheduler_;
6880
std::shared_ptr<PendingRequests> pending_requests_;
81+
std::shared_ptr<repository::PartitionsRepository> partition_repo_;
6982
};
7083

7184
} // namespace read

olp-cpp-sdk-dataservice-read/tests/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ set(OLP_SDK_DATASERVICE_READ_TEST_SOURCES
3030
TaskContextTest.cpp
3131
VolatileLayerClientImplTest.cpp
3232
VolatileLayerClientTest.cpp
33-
VersionedLayerClientTest.cpp
3433
)
3534

3635
if (ANDROID OR IOS)

0 commit comments

Comments
 (0)