Skip to content

Commit d7bd376

Browse files
Refactor the NamedMutex class (#1208)
Refactor the NamedMutex class to avoid using the static global mutex and storage. Instead introduce a NamedMutexStorage class as a temporary solution to avoid crash and secure the API requests merge functionality, until the proper implementation is enabled. The storage is initialized per VersionedLayerClient and VolatileLayerClient. Resolves: OLPSUP-14327 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent ae94452 commit d7bd376

15 files changed

+132
-108
lines changed

olp-cpp-sdk-dataservice-read/src/ApiClientLookup.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
#include "generated/api/ResourcesApi.h"
3333
#include "repositories/ApiCacheRepository.h"
3434

35-
#include "repositories/NamedMutex.h"
36-
3735
namespace olp {
3836
namespace dataservice {
3937
namespace read {
@@ -59,9 +57,9 @@ ApiClientLookup::ApiClientResponse ApiClientLookup::LookupApi(
5957
const client::HRN& catalog,
6058
client::CancellationContext cancellation_context, std::string service,
6159
std::string service_version, FetchOptions options,
62-
client::OlpClientSettings settings) {
60+
client::OlpClientSettings settings, repository::NamedMutexStorage storage) {
6361
// This mutex is required to avoid concurrent requests to online.
64-
repository::NamedMutex mutex(catalog.ToString());
62+
repository::NamedMutex mutex(storage, catalog.ToString());
6563
std::unique_lock<repository::NamedMutex> lock(mutex, std::defer_lock);
6664

6765
// If we are not planning to go online or access the cache, do not lock.

olp-cpp-sdk-dataservice-read/src/ApiClientLookup.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <olp/core/client/OlpClient.h>
2929
#include "generated/model/Api.h"
3030
#include "olp/dataservice/read/FetchOptions.h"
31+
#include "repositories/NamedMutex.h"
3132

3233
namespace olp {
3334
namespace dataservice {
@@ -45,7 +46,8 @@ class ApiClientLookup {
4546
const client::HRN& catalog,
4647
client::CancellationContext cancellation_context, std::string service,
4748
std::string service_version, FetchOptions options,
48-
client::OlpClientSettings settings);
49+
client::OlpClientSettings settings,
50+
repository::NamedMutexStorage storage = repository::NamedMutexStorage());
4951
};
5052

5153
} // namespace read

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ client::CancellationToken VersionedLayerClientImpl::GetPartitions(
101101
const auto version = version_response.GetResult().GetVersion();
102102

103103
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
104-
lookup_client_);
104+
lookup_client_, mutex_storage_);
105105
return repository.GetVersionedPartitionsExtendedResponse(
106106
std::move(partitions_request), version, context);
107107
};
@@ -124,11 +124,6 @@ VersionedLayerClientImpl::GetPartitions(PartitionsRequest partitions_request) {
124124

125125
client::CancellationToken VersionedLayerClientImpl::GetData(
126126
DataRequest request, DataResponseCallback callback) {
127-
auto catalog = catalog_;
128-
auto layer_id = layer_id_;
129-
auto settings = settings_;
130-
auto lookup_client = lookup_client_;
131-
132127
auto data_task =
133128
[=](client::CancellationContext context) mutable -> DataResponse {
134129
if (request.GetFetchOption() == CacheWithUpdate) {
@@ -147,9 +142,9 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
147142
version = version_response.GetResult().GetVersion();
148143
}
149144

150-
repository::DataRepository repository(
151-
std::move(catalog), std::move(settings), std::move(lookup_client));
152-
return repository.GetVersionedData(layer_id, request, version, context);
145+
repository::DataRepository repository(catalog_, settings_, lookup_client_,
146+
mutex_storage_);
147+
return repository.GetVersionedData(layer_id_, request, version, context);
153148
};
154149

155150
return task_sink_.AddTask(std::move(data_task), std::move(callback),
@@ -212,7 +207,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
212207
catalog_.ToCatalogHRNString().c_str(), key.c_str());
213208

214209
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
215-
lookup_client_);
210+
lookup_client_, mutex_storage_);
216211

217212
auto query = [=](std::vector<std::string> partitions,
218213
client::CancellationContext inner_context) mutable
@@ -256,8 +251,8 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
256251
return BlobApi::DataResponse(nullptr);
257252
}
258253

259-
repository::DataRepository repository(catalog_, settings_,
260-
lookup_client_);
254+
repository::DataRepository repository(catalog_, settings_, lookup_client_,
255+
mutex_storage_);
261256
// Fetch from online
262257
return repository.GetVersionedData(
263258
layer_id_,
@@ -367,7 +362,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
367362

368363
repository::PrefetchTilesRepository repository(
369364
catalog_, layer_id_, settings_, lookup_client_,
370-
request.GetBillingTag());
365+
request.GetBillingTag(), mutex_storage_);
371366

372367
auto sliced_tiles = repository.GetSlicedTiles(request.GetTileKeys(),
373368
min_level, max_level);
@@ -427,7 +422,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
427422
}
428423

429424
repository::DataRepository repository(catalog_, settings_,
430-
lookup_client_);
425+
lookup_client_, mutex_storage_);
431426
// Fetch from online
432427
return repository.GetVersionedData(layer_id_,
433428
DataRequest()
@@ -514,11 +509,6 @@ CatalogVersionResponse VersionedLayerClientImpl::GetVersion(
514509

515510
client::CancellationToken VersionedLayerClientImpl::GetData(
516511
TileRequest request, DataResponseCallback callback) {
517-
auto catalog = catalog_;
518-
auto layer_id = layer_id_;
519-
auto settings = settings_;
520-
auto lookup_client = lookup_client_;
521-
522512
auto data_task = [=](client::CancellationContext context) -> DataResponse {
523513
if (request.GetFetchOption() == CacheWithUpdate) {
524514
return {{client::ErrorCode::InvalidArgument,
@@ -535,10 +525,10 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
535525
return version_response.GetError();
536526
}
537527

538-
repository::DataRepository repository(
539-
std::move(catalog), std::move(settings), std::move(lookup_client));
528+
repository::DataRepository repository(catalog_, settings_, lookup_client_,
529+
mutex_storage_);
540530
return repository.GetVersionedTile(
541-
layer_id, request, version_response.GetResult().GetVersion(), context);
531+
layer_id_, request, version_response.GetResult().GetVersion(), context);
542532
};
543533

544534
return task_sink_.AddTask(std::move(data_task), std::move(callback),
@@ -667,11 +657,6 @@ bool VersionedLayerClientImpl::IsCached(const geo::TileKey& tile,
667657

668658
client::CancellationToken VersionedLayerClientImpl::GetAggregatedData(
669659
TileRequest request, AggregatedDataResponseCallback callback) {
670-
auto catalog = catalog_;
671-
auto layer_id = layer_id_;
672-
auto settings = settings_;
673-
auto lookup_client = lookup_client_;
674-
675660
auto data_task =
676661
[=](client::CancellationContext context) -> AggregatedDataResponse {
677662
const auto fetch_option = request.GetFetchOption();
@@ -693,8 +678,8 @@ client::CancellationToken VersionedLayerClientImpl::GetAggregatedData(
693678
}
694679

695680
auto version = version_response.GetResult().GetVersion();
696-
repository::PartitionsRepository repository(catalog_, layer_id, settings_,
697-
lookup_client_);
681+
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
682+
lookup_client_, mutex_storage_);
698683
auto partition_response =
699684
repository.GetAggregatedTile(std::move(request), version, context);
700685
if (!partition_response.IsSuccessful()) {
@@ -710,10 +695,10 @@ client::CancellationToken VersionedLayerClientImpl::GetAggregatedData(
710695
.WithFetchOption(fetch_option)
711696
.WithBillingTag(billing_tag);
712697

713-
repository::DataRepository data_repository(
714-
std::move(catalog), std::move(settings), std::move(lookup_client));
698+
repository::DataRepository data_repository(catalog_, settings_,
699+
lookup_client_, mutex_storage_);
715700
auto data_response = data_repository.GetVersionedData(
716-
layer_id, data_request, version, context);
701+
layer_id_, data_request, version, context);
717702

718703
if (!data_response.IsSuccessful()) {
719704
OLP_SDK_LOG_WARNING_F(

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include <olp/dataservice/read/Types.h>
3737
#include <boost/optional.hpp>
3838
#include "TaskSink.h"
39+
#include "repositories/NamedMutex.h"
3940

4041
namespace olp {
4142
namespace thread {
@@ -124,6 +125,7 @@ class VersionedLayerClientImpl {
124125
std::atomic<int64_t> catalog_version_;
125126
client::ApiLookupClient lookup_client_;
126127
TaskSink task_sink_;
128+
repository::NamedMutexStorage mutex_storage_;
127129
};
128130

129131
} // namespace read

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,9 @@ client::CancellationToken VolatileLayerClientImpl::GetPartitions(
7878
PartitionsRequest request, PartitionsResponseCallback callback) {
7979
auto schedule_get_partitions = [&](PartitionsRequest request,
8080
PartitionsResponseCallback callback) {
81-
auto catalog = catalog_;
82-
auto layer_id = layer_id_;
83-
auto settings = settings_;
84-
auto lookup_client = lookup_client_;
85-
8681
auto data_task = [=](client::CancellationContext context) {
8782
repository::PartitionsRepository repository(
88-
std::move(catalog), std::move(layer_id), std::move(settings),
89-
std::move(lookup_client));
83+
catalog_, layer_id_, settings_, lookup_client_, mutex_storage_);
9084
return repository.GetVolatilePartitions(request, std::move(context));
9185
};
9286

@@ -110,15 +104,10 @@ VolatileLayerClientImpl::GetPartitions(PartitionsRequest request) {
110104

111105
client::CancellationToken VolatileLayerClientImpl::GetData(
112106
DataRequest request, DataResponseCallback callback) {
113-
auto catalog = catalog_;
114-
auto layer_id = layer_id_;
115-
auto settings = settings_;
116-
auto lookup_client = lookup_client_;
117-
118107
auto task = [=](client::CancellationContext context) {
119-
repository::DataRepository repository(
120-
std::move(catalog), std::move(settings), std::move(lookup_client));
121-
return repository.GetVolatileData(layer_id, request, context);
108+
repository::DataRepository repository(catalog_, settings_, lookup_client_,
109+
mutex_storage_);
110+
return repository.GetVolatileData(layer_id_, request, context);
122111
};
123112

124113
return task_sink_.AddTask(std::move(task), std::move(callback),
@@ -198,7 +187,9 @@ client::CancellationToken VolatileLayerClientImpl::PrefetchTiles(
198187
: request.GetMaxLevel());
199188

200189
repository::PrefetchTilesRepository repository(
201-
catalog_, layer_id_, settings_, lookup_client_);
190+
catalog_, layer_id_, settings_, lookup_client_,
191+
request.GetBillingTag(), mutex_storage_);
192+
202193
auto sliced_tiles =
203194
repository.GetSlicedTiles(tile_keys, min_level, max_level);
204195

@@ -242,7 +233,7 @@ client::CancellationToken VolatileLayerClientImpl::PrefetchTiles(
242233
}
243234

244235
repository::DataRepository repository(catalog_, settings_,
245-
lookup_client_);
236+
lookup_client_, mutex_storage_);
246237
// Fetch from online
247238
return repository.GetVolatileData(layer_id_,
248239
DataRequest()

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
#include <olp/dataservice/read/PartitionsRequest.h>
2929
#include <olp/dataservice/read/PrefetchTilesRequest.h>
3030
#include <olp/dataservice/read/Types.h>
31-
3231
#include "TaskSink.h"
32+
#include "repositories/NamedMutex.h"
3333

3434
namespace olp {
3535

@@ -80,6 +80,7 @@ class VolatileLayerClientImpl {
8080
client::OlpClientSettings settings_;
8181
client::ApiLookupClient lookup_client_;
8282
TaskSink task_sink_;
83+
repository::NamedMutexStorage mutex_storage_;
8384
};
8485

8586
} // namespace read

olp-cpp-sdk-dataservice-read/src/repositories/DataRepository.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include <olp/core/logging/Log.h>
2929
#include "CatalogRepository.h"
3030
#include "DataCacheRepository.h"
31-
#include "NamedMutex.h"
3231
#include "PartitionsCacheRepository.h"
3332
#include "PartitionsRepository.h"
3433
#include "generated/api/BlobApi.h"
@@ -51,16 +50,18 @@ constexpr auto kVolatileBlobService = "volatile-blob";
5150

5251
DataRepository::DataRepository(client::HRN catalog,
5352
client::OlpClientSettings settings,
54-
client::ApiLookupClient client)
53+
client::ApiLookupClient client,
54+
NamedMutexStorage storage)
5555
: catalog_(std::move(catalog)),
56-
settings_(settings),
57-
lookup_client_(std::move(client)) {}
56+
settings_(std::move(settings)),
57+
lookup_client_(std::move(client)),
58+
storage_(std::move(storage)) {}
5859

5960
DataResponse DataRepository::GetVersionedTile(
6061
const std::string& layer_id, const TileRequest& request, int64_t version,
6162
client::CancellationContext context) {
62-
PartitionsRepository repository(catalog_, layer_id, settings_,
63-
lookup_client_);
63+
PartitionsRepository repository(catalog_, layer_id, settings_, lookup_client_,
64+
storage_);
6465
auto response = repository.GetTile(request, version, context);
6566

6667
if (!response.IsSuccessful()) {
@@ -93,7 +94,7 @@ BlobApi::DataResponse DataRepository::GetVersionedData(
9394
if (!request.GetDataHandle()) {
9495
// get data handle for a partition to be queried
9596
PartitionsRepository repository(catalog_, layer_id, settings_,
96-
lookup_client_);
97+
lookup_client_, storage_);
9798
auto partitions_response =
9899
repository.GetPartitionById(request, version, context);
99100

@@ -132,7 +133,7 @@ BlobApi::DataResponse DataRepository::GetBlobData(
132133
return {{client::ErrorCode::PreconditionFailed, "Data handle is missing"}};
133134
}
134135

135-
NamedMutex mutex(catalog_.ToString() + layer + *data_handle);
136+
NamedMutex mutex(storage_, catalog_.ToString() + layer + *data_handle);
136137
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
137138

138139
// If we are not planning to go online or access the cache, do not lock.
@@ -208,7 +209,7 @@ BlobApi::DataResponse DataRepository::GetVolatileData(
208209
auto blob_request = request;
209210
if (!request.GetDataHandle()) {
210211
PartitionsRepository repository(catalog_, layer_id, settings_,
211-
lookup_client_);
212+
lookup_client_, storage_);
212213
auto partitions_response =
213214
repository.GetPartitionById(request, boost::none, context);
214215

olp-cpp-sdk-dataservice-read/src/repositories/DataRepository.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "olp/dataservice/read/DataRequest.h"
2828
#include "olp/dataservice/read/Types.h"
2929

30+
#include "NamedMutex.h"
3031
#include "generated/api/BlobApi.h"
3132

3233
namespace olp {
@@ -38,7 +39,8 @@ namespace repository {
3839
class DataRepository final {
3940
public:
4041
DataRepository(client::HRN catalog, client::OlpClientSettings settings,
41-
client::ApiLookupClient client);
42+
client::ApiLookupClient client,
43+
NamedMutexStorage storage = NamedMutexStorage());
4244

4345
DataResponse GetVersionedTile(const std::string& layer_id,
4446
const TileRequest& request, int64_t version,
@@ -62,6 +64,7 @@ class DataRepository final {
6264
client::HRN catalog_;
6365
client::OlpClientSettings settings_;
6466
client::ApiLookupClient lookup_client_;
67+
NamedMutexStorage storage_;
6568
};
6669

6770
} // namespace repository

0 commit comments

Comments
 (0)