Skip to content

Commit 77931b4

Browse files
Merge the parallel query partitions requests. (#1113)
The order of partition ids in the request matters. Worst case - two parallel downloads. This is a temporary workaround. Change the log in TaskSink to print the number of tasks pending. Add a new method to PendingRequests to query a number of task pending. Fix a typo. Resolves: OLPEDGE-2395 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent 597c930 commit 77931b4

File tree

6 files changed

+69
-28
lines changed

6 files changed

+69
-28
lines changed

olp-cpp-sdk-core/include/olp/core/client/PendingRequests.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,17 @@ class CORE_API PendingRequests final {
6868
*/
6969
void Remove(TaskContext task_context);
7070

71+
/**
72+
* @brief Gets the number of tasks.
73+
*
74+
* @return The number of tasks pending.
75+
*/
76+
size_t GetTaskCount() const;
77+
7178
private:
7279
using ContextMap = std::unordered_set<TaskContext, TaskContextHash>;
7380
ContextMap task_contexts_;
74-
std::mutex task_contexts_lock_;
81+
mutable std::mutex task_contexts_lock_;
7582
};
7683

7784
} // namespace client

olp-cpp-sdk-core/src/client/ApiLookupClientImpl.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,13 +242,12 @@ boost::optional<OlpClient> ApiLookupClientImpl::GetCachedClient(
242242
std::lock_guard<std::mutex> lock(cached_clients_mutex_);
243243
const auto client_it = cached_clients_.find(key);
244244
if (client_it != cached_clients_.end()) {
245-
const ClientWithExpiration& client_with_expirtation = client_it->second;
246-
if (client_with_expirtation.expire_at >
247-
std::chrono::steady_clock::now()) {
245+
const ClientWithExpiration& client_with_expiration = client_it->second;
246+
if (client_with_expiration.expire_at > std::chrono::steady_clock::now()) {
248247
OLP_SDK_LOG_DEBUG_F(
249248
kLogTag, "LookupApi(%s/%s) found in client cache, hrn='%s'",
250249
service.c_str(), service_version.c_str(), catalog_string_.c_str());
251-
return client_with_expirtation.client;
250+
return client_with_expiration.client;
252251
}
253252
}
254253
}

olp-cpp-sdk-core/src/client/PendingRequests.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,10 @@ void PendingRequests::Remove(TaskContext task_context) {
7171
task_contexts_.erase(task_context);
7272
}
7373

74+
size_t PendingRequests::GetTaskCount() const {
75+
std::lock_guard<std::mutex> lock(task_contexts_lock_);
76+
return task_contexts_.size();
77+
}
78+
7479
} // namespace client
7580
} // namespace olp

olp-cpp-sdk-dataservice-read/src/TaskSink.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@ TaskSink::TaskSink(std::shared_ptr<thread::TaskScheduler> task_scheduler)
3434
closed_(false) {}
3535

3636
TaskSink::~TaskSink() {
37-
OLP_SDK_LOG_INFO(kLogTag, "Finishing, canceling all current tasks.");
3837
{
3938
std::lock_guard<std::mutex> lock(mutex_);
4039
closed_ = true;
40+
const auto task_count = pending_requests_->GetTaskCount();
41+
if (task_count > 0) {
42+
OLP_SDK_LOG_INFO_F(kLogTag, "Finishing, canceling %" PRIu64 " tasks.",
43+
static_cast<std::uint64_t>(task_count));
44+
}
4145
}
4246
// CancelAllAndWait method should be called without mutex, since potentially
4347
// there might be new added tasks, it may result in deadlock.

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,34 +83,33 @@ bool VersionedLayerClientImpl::CancelPendingRequests() {
8383

8484
client::CancellationToken VersionedLayerClientImpl::GetPartitions(
8585
PartitionsRequest request, PartitionsResponseCallback callback) {
86-
auto catalog = catalog_;
87-
auto layer_id = layer_id_;
88-
auto settings = settings_;
89-
auto lookup_client = lookup_client_;
90-
9186
auto partitions_task =
92-
[=](client::CancellationContext context) mutable -> PartitionsResponse {
93-
if (request.GetFetchOption() == CacheWithUpdate) {
94-
return {{client::ErrorCode::InvalidArgument,
95-
"CacheWithUpdate option can not be used for versioned "
96-
"layer"}};
87+
[this](PartitionsRequest partitions_request,
88+
client::CancellationContext context) -> PartitionsResponse {
89+
const auto fetch_option = partitions_request.GetFetchOption();
90+
if (fetch_option == CacheWithUpdate) {
91+
return client::ApiError(
92+
client::ErrorCode::InvalidArgument,
93+
"CacheWithUpdate option can not be used for versioned layer");
9794
}
9895

9996
auto version_response =
100-
GetVersion(request.GetBillingTag(), request.GetFetchOption(), context);
97+
GetVersion(partitions_request.GetBillingTag(), fetch_option, context);
10198
if (!version_response.IsSuccessful()) {
10299
return version_response.GetError();
103100
}
101+
104102
const auto version = version_response.GetResult().GetVersion();
105103

106-
repository::PartitionsRepository repository(std::move(catalog), layer_id,
107-
std::move(settings),
108-
std::move(lookup_client));
109-
return repository.GetVersionedPartitions(request, version, context);
104+
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
105+
lookup_client_);
106+
return repository.GetVersionedPartitionsExtendedResponse(
107+
std::move(partitions_request), version, context);
110108
};
111109

112-
return task_sink_.AddTask(std::move(partitions_task), std::move(callback),
113-
thread::NORMAL);
110+
return task_sink_.AddTask(
111+
std::bind(partitions_task, std::move(request), std::placeholders::_1),
112+
std::move(callback), thread::NORMAL);
114113
}
115114

116115
client::CancellableFuture<PartitionsResponse>

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <algorithm>
2323
#include <utility>
2424

25+
#include <boost/functional/hash.hpp>
26+
2527
#include <olp/core/client/Condition.h>
2628
#include <olp/core/logging/Log.h>
2729
#include "CatalogRepository.h"
@@ -91,6 +93,16 @@ repository::PartitionResponse FindPartition(
9193

9294
return std::move(aggregated_partition);
9395
}
96+
97+
std::string HashPartitions(
98+
const read::PartitionsRequest::PartitionIds& partitions) {
99+
size_t seed = 0;
100+
for (const auto& partition : partitions) {
101+
boost::hash_combine(seed, partition);
102+
}
103+
return std::to_string(seed);
104+
}
105+
94106
} // namespace
95107

96108
namespace olp {
@@ -152,24 +164,39 @@ PartitionsRepository::GetPartitionsExtendedResponse(
152164
auto fetch_option = request.GetFetchOption();
153165
const auto key = request.CreateKey(layer_id_);
154166

167+
const auto catalog_str = catalog_.ToCatalogHRNString();
168+
169+
const auto& partition_ids = request.GetPartitionIds();
170+
171+
// Temporary workaround for merging the same requests. Should be removed after
172+
// OlpClient could handle that.
173+
const auto detail =
174+
partition_ids.empty() ? "" : HashPartitions(partition_ids);
175+
NamedMutex mutex(catalog_str + layer_id_ + detail);
176+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
177+
178+
// If we are not planning to go online or access the cache, do not lock.
179+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
180+
lock.lock();
181+
}
182+
155183
if (fetch_option != OnlineOnly && fetch_option != CacheWithUpdate) {
156184
auto cached_partitions = cache_.Get(request, version);
157185
if (cached_partitions) {
158186
OLP_SDK_LOG_DEBUG_F(kLogTag,
159187
"GetPartitions found in cache, hrn='%s', key='%s'",
160-
catalog_.ToCatalogHRNString().c_str(), key.c_str());
188+
catalog_str.c_str(), key.c_str());
161189
return cached_partitions.get();
162190
} else if (fetch_option == CacheOnly) {
163191
OLP_SDK_LOG_INFO_F(kLogTag,
164192
"GetPartitions not found in cache, hrn='%s', key='%s'",
165-
catalog_.ToCatalogHRNString().c_str(), key.c_str());
193+
catalog_str.c_str(), key.c_str());
166194
return {{client::ErrorCode::NotFound,
167195
"CacheOnly: resource not found in cache"}};
168196
}
169197
}
170198

171199
QueryApi::PartitionsExtendedResponse response;
172-
const auto& partition_ids = request.GetPartitionIds();
173200

174201
if (partition_ids.empty()) {
175202
auto metadata_api = lookup_client_.LookupApi(
@@ -202,7 +229,7 @@ PartitionsRepository::GetPartitionsExtendedResponse(
202229
if (response.IsSuccessful() && fetch_option != OnlineOnly) {
203230
OLP_SDK_LOG_DEBUG_F(kLogTag,
204231
"GetPartitions put to cache, hrn='%s', key='%s'",
205-
catalog_.ToCatalogHRNString().c_str(), key.c_str());
232+
catalog_str.c_str(), key.c_str());
206233
cache_.Put(response.GetResult(), version, expiry, is_layer_metadata);
207234
}
208235
if (!response.IsSuccessful()) {
@@ -211,7 +238,7 @@ PartitionsRepository::GetPartitionsExtendedResponse(
211238
OLP_SDK_LOG_WARNING_F(
212239
kLogTag,
213240
"GetPartitions 403 received, remove from cache, hrn='%s', key='%s'",
214-
catalog_.ToCatalogHRNString().c_str(), key.c_str());
241+
catalog_str.c_str(), key.c_str());
215242
cache_.Clear();
216243
}
217244
}

0 commit comments

Comments
 (0)