Skip to content

Commit 55978d8

Browse files
Add AddTask method to remove duplicate code
Remove CatalogClientImpl::AsFuture, align with layer clients. Relates-To: OLPEDGE-1109 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent 041207c commit 55978d8

File tree

6 files changed

+67
-108
lines changed

6 files changed

+67
-108
lines changed

olp-cpp-sdk-dataservice-read/src/CatalogClientImpl.cpp

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,15 @@ CancellationToken CatalogClientImpl::GetCatalog(
7171
CatalogResponseCallback callback) {
7272
auto catalog = catalog_;
7373
auto settings = settings_;
74-
auto pending_requests = pending_requests_;
7574

76-
auto data_task = [=](client::CancellationContext context) {
75+
auto get_catalog_task = [=](client::CancellationContext context) {
7776
return repository::CatalogRepository::GetCatalog(
7877
std::move(catalog), std::move(context), std::move(request),
7978
std::move(settings));
8079
};
8180

82-
auto context =
83-
client::TaskContext::Create(std::move(data_task), std::move(callback));
84-
85-
pending_requests->Insert(context);
86-
87-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
88-
context.Execute();
89-
pending_requests->Remove(context);
90-
});
91-
92-
return context.CancelToken();
81+
return AddTask(task_scheduler_, pending_requests_,
82+
std::move(get_catalog_task), std::move(callback));
9383
};
9484

9585
return ScheduleFetch(std::move(schedule_get_catalog), std::move(request),
@@ -98,11 +88,13 @@ CancellationToken CatalogClientImpl::GetCatalog(
9888

9989
CancellableFuture<CatalogResponse> CatalogClientImpl::GetCatalog(
10090
CatalogRequest request) {
101-
return AsFuture<CatalogRequest, CatalogResponse>(
102-
std::move(request),
103-
static_cast<client::CancellationToken (CatalogClientImpl::*)(
104-
CatalogRequest, CatalogResponseCallback)>(
105-
&CatalogClientImpl::GetCatalog));
91+
auto promise = std::make_shared<std::promise<CatalogResponse>>();
92+
auto cancel_token =
93+
GetCatalog(std::move(request), [promise](CatalogResponse response) {
94+
promise->set_value(std::move(response));
95+
});
96+
return client::CancellableFuture<CatalogResponse>(std::move(cancel_token),
97+
std::move(promise));
10698
}
10799

108100
CancellationToken CatalogClientImpl::GetLatestVersion(
@@ -112,25 +104,15 @@ CancellationToken CatalogClientImpl::GetLatestVersion(
112104
CatalogVersionCallback callback) {
113105
auto catalog = catalog_;
114106
auto settings = settings_;
115-
auto pending_requests = pending_requests_;
116107

117-
auto data_task = [=](client::CancellationContext context) {
108+
auto get_latest_version_task = [=](client::CancellationContext context) {
118109
return repository::CatalogRepository::GetLatestVersion(
119110
std::move(catalog), std::move(context), std::move(request),
120111
std::move(settings));
121112
};
122113

123-
auto context =
124-
client::TaskContext::Create(std::move(data_task), std::move(callback));
125-
126-
pending_requests->Insert(context);
127-
128-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
129-
context.Execute();
130-
pending_requests->Remove(context);
131-
});
132-
133-
return context.CancelToken();
114+
return AddTask(task_scheduler_, pending_requests_,
115+
std::move(get_latest_version_task), std::move(callback));
134116
};
135117

136118
return ScheduleFetch(std::move(schedule_get_latest_version),
@@ -139,11 +121,13 @@ CancellationToken CatalogClientImpl::GetLatestVersion(
139121

140122
CancellableFuture<CatalogVersionResponse> CatalogClientImpl::GetLatestVersion(
141123
CatalogVersionRequest request) {
142-
return AsFuture<CatalogVersionRequest, CatalogVersionResponse>(
143-
std::move(request),
144-
static_cast<client::CancellationToken (CatalogClientImpl::*)(
145-
CatalogVersionRequest, CatalogVersionCallback)>(
146-
&CatalogClientImpl::GetLatestVersion));
124+
auto promise = std::make_shared<std::promise<CatalogVersionResponse>>();
125+
auto cancel_token = GetLatestVersion(
126+
std::move(request), [promise](CatalogVersionResponse response) {
127+
promise->set_value(std::move(response));
128+
});
129+
return client::CancellableFuture<CatalogVersionResponse>(
130+
std::move(cancel_token), std::move(promise));
147131
}
148132
} // namespace read
149133
} // namespace dataservice

olp-cpp-sdk-dataservice-read/src/CatalogClientImpl.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,6 @@ class CatalogClientImpl final {
7272
client::OlpClientSettings settings_;
7373
std::shared_ptr<thread::TaskScheduler> task_scheduler_;
7474
std::shared_ptr<client::PendingRequests> pending_requests_;
75-
76-
template <typename Request, typename Response>
77-
client::CancellableFuture<Response> AsFuture(
78-
const Request& request, std::function<client::CancellationToken(
79-
CatalogClientImpl*, const Request&,
80-
const std::function<void(Response)>&)>
81-
func) {
82-
auto promise = std::make_shared<std::promise<Response> >();
83-
84-
auto cancel_token = func(this, request, [promise](Response response) {
85-
promise->set_value(response);
86-
});
87-
88-
return client::CancellableFuture<Response>(cancel_token, promise);
89-
}
9075
};
9176

9277
} // namespace read

olp-cpp-sdk-dataservice-read/src/Common.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
#pragma once
2121

2222
#include <olp/core/client/CancellationToken.h>
23+
#include <olp/core/client/PendingRequests.h>
24+
#include <olp/core/thread/TaskScheduler.h>
2325
#include <olp/dataservice/read/FetchOptions.h>
26+
#include "repositories/ExecuteOrSchedule.inl"
2427

2528
namespace olp {
2629
namespace dataservice {
@@ -58,6 +61,35 @@ inline client::CancellationToken ScheduleFetch(TaskScheduler&& schedule_task,
5861
std::forward<Callback>(callback));
5962
}
6063

64+
/*
65+
* @brief Common function used to wrap a lambda function and a callback that
66+
* consumes the function result with a TaskContext class and schedule this to a
67+
* task scheduler.
68+
* @param task_scheduler Task scheduler instance.
69+
* @param pending_requests PendingRequests instance that tracks current
70+
* requests.
71+
* @param task Function that will be executed.
72+
* @param callback Function that will consume task output.
73+
* @param args Additional agrs to pass to TaskContext.
74+
* @return CancellationToken used to cancel the operation.
75+
*/
76+
template <typename Function, typename Callback, typename... Args>
77+
inline client::CancellationToken AddTask(
78+
const std::shared_ptr<thread::TaskScheduler>& task_scheduler,
79+
const std::shared_ptr<client::PendingRequests>& pending_requests,
80+
Function task, Callback callback, Args&&... args) {
81+
auto context = client::TaskContext::Create(
82+
std::move(task), std::move(callback), std::forward<Args>(args)...);
83+
pending_requests->Insert(context);
84+
85+
repository::ExecuteOrSchedule(task_scheduler, [=] {
86+
context.Execute();
87+
pending_requests->Remove(context);
88+
});
89+
90+
return context.CancelToken();
91+
}
92+
6193
} // namespace read
6294
} // namespace dataservice
6395
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,14 @@ client::CancellationToken VersionedLayerClientImpl::GetPartitions(
9696
auto catalog = catalog_;
9797
auto layer_id = layer_id_;
9898
auto settings = *settings_;
99-
auto pending_requests = pending_requests_;
10099

101100
auto partitions_task = [=](client::CancellationContext context) {
102101
return repository::PartitionsRepository::GetVersionedPartitions(
103102
catalog, layer_id, context, std::move(request), settings);
104103
};
105104

106-
auto context = client::TaskContext::Create(std::move(partitions_task),
107-
std::move(callback));
108-
109-
pending_requests->Insert(context);
110-
111-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
112-
context.Execute();
113-
pending_requests->Remove(context);
114-
});
115-
116-
return context.CancelToken();
105+
return AddTask(task_scheduler_, pending_requests_,
106+
std::move(partitions_task), std::move(callback));
117107
};
118108

119109
return ScheduleFetch(std::move(schedule_get_partitions), std::move(request),
@@ -138,25 +128,15 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
138128
auto catalog = catalog_;
139129
auto layer_id = layer_id_;
140130
auto settings = *settings_;
141-
auto pending_requests = pending_requests_;
142-
131+
143132
auto data_task = [=](client::CancellationContext context) {
144133
return repository::DataRepository::GetVersionedData(
145134
std::move(catalog), std::move(layer_id), std::move(request), context,
146135
std::move(settings));
147136
};
148137

149-
auto context =
150-
client::TaskContext::Create(std::move(data_task), std::move(callback));
151-
152-
pending_requests->Insert(context);
153-
154-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
155-
context.Execute();
156-
pending_requests->Remove(context);
157-
});
158-
159-
return context.CancelToken();
138+
return AddTask(task_scheduler_, pending_requests_, std::move(data_task),
139+
std::move(callback));
160140
};
161141
return ScheduleFetch(std::move(schedule_get_data), std::move(request),
162142
std::move(callback));

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -78,25 +78,15 @@ client::CancellationToken VolatileLayerClientImpl::GetPartitions(
7878
auto catalog = catalog_;
7979
auto layer_id = layer_id_;
8080
auto settings = *settings_;
81-
auto pending_requests = pending_requests_;
8281

8382
auto data_task = [=](client::CancellationContext context) {
8483
return repository::PartitionsRepository::GetVolatilePartitions(
8584
std::move(catalog), std::move(layer_id), std::move(context),
8685
std::move(request), std::move(settings));
8786
};
8887

89-
auto context =
90-
client::TaskContext::Create(std::move(data_task), std::move(callback));
91-
92-
pending_requests->Insert(context);
93-
94-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
95-
context.Execute();
96-
pending_requests->Remove(context);
97-
});
98-
99-
return context.CancelToken();
88+
return AddTask(task_scheduler_, pending_requests_, std::move(data_task),
89+
std::move(callback));
10090
};
10191

10292
return ScheduleFetch(std::move(schedule_get_partitions), std::move(request),
@@ -120,24 +110,14 @@ client::CancellationToken VolatileLayerClientImpl::GetData(
120110
auto catalog = catalog_;
121111
auto layer_id = layer_id_;
122112
auto settings = *settings_;
123-
auto pending_requests = pending_requests_;
124113

125-
auto data_task = [=](client::CancellationContext context) {
114+
auto partitions_task = [=](client::CancellationContext context) {
126115
return repository::DataRepository::GetVolatileData(
127116
catalog, layer_id, request, context, settings);
128117
};
129118

130-
auto context =
131-
client::TaskContext::Create(std::move(data_task), std::move(callback));
132-
133-
pending_requests->Insert(context);
134-
135-
repository::ExecuteOrSchedule(task_scheduler_, [=]() {
136-
context.Execute();
137-
pending_requests->Remove(context);
138-
});
139-
140-
return context.CancelToken();
119+
return AddTask(task_scheduler_, pending_requests_,
120+
std::move(partitions_task), std::move(callback));
141121
};
142122

143123
return ScheduleFetch(std::move(schedule_get_data), std::move(request),

olp-cpp-sdk-dataservice-read/src/repositories/ExecuteOrSchedule.inl

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* License-Filename: LICENSE
1818
*/
1919

20+
#pragma once
21+
2022
#include <olp/core/client/OlpClientSettings.h>
2123
#include <olp/core/thread/TaskScheduler.h>
2224

@@ -25,11 +27,9 @@ namespace dataservice {
2527
namespace read {
2628
namespace repository {
2729

28-
namespace {
29-
3030
using CallFuncType = thread::TaskScheduler::CallFuncType;
3131

32-
void ExecuteOrSchedule(
32+
inline void ExecuteOrSchedule(
3333
const std::shared_ptr<thread::TaskScheduler>& task_scheduler,
3434
CallFuncType&& func) {
3535
if (!task_scheduler) {
@@ -40,14 +40,12 @@ void ExecuteOrSchedule(
4040
}
4141
}
4242

43-
void ExecuteOrSchedule(const client::OlpClientSettings* settings,
44-
CallFuncType&& func) {
43+
inline void ExecuteOrSchedule(const client::OlpClientSettings* settings,
44+
CallFuncType&& func) {
4545
ExecuteOrSchedule(settings ? settings->task_scheduler : nullptr,
4646
std::move(func));
4747
}
4848

49-
} // namespace
50-
5149
} // namespace repository
5250
} // namespace read
5351
} // namespace dataservice

0 commit comments

Comments
 (0)