Skip to content

Commit d9198f3

Browse files
authored
Revert NamedMutex removal (#1174)
Merging same requests inside OlpClient::CallApi does not prevent running a new request between end of the previous call & putting its result into a cache. For concurrent cases it leads to multiple parallel requests and network mock issues in tests. As a quick fix NamedMutexes are reverted for methods where they had been used before. Relates-To: OLPEDGE-2417 Signed-off-by: Iuliia Moroz <[email protected]>
1 parent 08d83f8 commit d9198f3

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

olp-cpp-sdk-dataservice-read/src/repositories/DataRepository.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <olp/core/logging/Log.h>
2929
#include "CatalogRepository.h"
3030
#include "DataCacheRepository.h"
31+
#include "NamedMutex.h"
3132
#include "PartitionsCacheRepository.h"
3233
#include "PartitionsRepository.h"
3334
#include "generated/api/BlobApi.h"
@@ -134,6 +135,14 @@ BlobApi::DataResponse DataRepository::GetBlobData(
134135
repository::DataCacheRepository repository(
135136
catalog_, settings_.cache, settings_.default_cache_expiration);
136137

138+
NamedMutex mutex(catalog_.ToString() + layer + *data_handle);
139+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
140+
141+
// If we are not planning to go online or access the cache, do not lock.
142+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
143+
lock.lock();
144+
}
145+
137146
if (fetch_option != OnlineOnly && fetch_option != CacheWithUpdate) {
138147
auto cached_data = repository.Get(layer, data_handle.value());
139148
if (cached_data) {

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <olp/core/client/Condition.h>
2828
#include <olp/core/logging/Log.h>
2929
#include "CatalogRepository.h"
30+
#include "NamedMutex.h"
3031
#include "generated/api/MetadataApi.h"
3132
#include "generated/api/QueryApi.h"
3233
#include "olp/dataservice/read/CatalogRequest.h"
@@ -92,6 +93,15 @@ repository::PartitionResponse FindPartition(
9293

9394
return std::move(aggregated_partition);
9495
}
96+
97+
std::string HashPartitions(
98+
const read::PartitionsRequest::PartitionIds& partitions) {
99+
size_t seed = 0;
100+
for (const auto& partition : partitions) {
101+
boost::hash_combine(seed, partition);
102+
}
103+
return std::to_string(seed);
104+
}
95105
} // namespace
96106

97107
namespace olp {
@@ -157,6 +167,18 @@ PartitionsRepository::GetPartitionsExtendedResponse(
157167

158168
const auto& partition_ids = request.GetPartitionIds();
159169

170+
// Temporary workaround for merging the same requests. Should be removed after
171+
// OlpClient could handle that.
172+
const auto detail =
173+
partition_ids.empty() ? "" : HashPartitions(partition_ids);
174+
NamedMutex mutex(catalog_str + layer_id_ + detail);
175+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
176+
177+
// If we are not planning to go online or access the cache, do not lock.
178+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
179+
lock.lock();
180+
}
181+
160182
if (fetch_option != OnlineOnly && fetch_option != CacheWithUpdate) {
161183
auto cached_partitions = cache_.Get(request, version);
162184
if (cached_partitions) {
@@ -243,6 +265,14 @@ PartitionsResponse PartitionsRepository::GetPartitionById(
243265
const auto request_key =
244266
catalog_.ToString() + request.CreateKey(layer_id_, version);
245267

268+
NamedMutex mutex(request_key);
269+
std::unique_lock<repository::NamedMutex> lock(mutex, std::defer_lock);
270+
271+
// If we are not planning to go online or access the cache, do not lock.
272+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
273+
lock.lock();
274+
}
275+
246276
std::chrono::seconds timeout{settings_.retry_settings.timeout};
247277
const auto key = request.CreateKey(layer_id_, version);
248278

@@ -318,6 +348,14 @@ QuadTreeIndexResponse PartitionsRepository::GetQuadTreeIndexForTile(
318348
const auto& root_tile_key = tile_key.ChangedLevelBy(-kAggregateQuadTreeDepth);
319349
const auto root_tile_here = root_tile_key.ToHereTile();
320350

351+
NamedMutex mutex(catalog_.ToString() + layer_id_ + root_tile_here + "Index");
352+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
353+
354+
// If we are not planning to go online or access the cache, do not lock.
355+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
356+
lock.lock();
357+
}
358+
321359
// Look for QuadTree covering the tile in the cache
322360
if (fetch_option != OnlineOnly && fetch_option != CacheWithUpdate) {
323361
read::QuadTreeIndex cached_tree;

0 commit comments

Comments
 (0)