Skip to content

Commit 0cbe525

Browse files
Merge concurrent API requests. (#911)
* Merge concurrent API requests. Exact same API requests are blocked and waits for the first one to complete. The concurrent QueryTreeIndex request are merged, even for different tile requests. Relates-To: OLPEDGE-2039, OLPEDGE-2040 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent ea781c5 commit 0cbe525

File tree

6 files changed

+297
-79
lines changed

6 files changed

+297
-79
lines changed

olp-cpp-sdk-dataservice-read/src/repositories/DataRepository.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "CatalogRepository.h"
3131
#include "DataCacheRepository.h"
3232
#include "ExecuteOrSchedule.inl"
33+
#include "NamedMutex.h"
3334
#include "PartitionsCacheRepository.h"
3435
#include "PartitionsRepository.h"
3536
#include "generated/api/BlobApi.h"
@@ -149,6 +150,14 @@ DataResponse DataRepository::GetBlobData(
149150
return {{client::ErrorCode::PreconditionFailed, "Data handle is missing"}};
150151
}
151152

153+
NamedMutex mutex(catalog.ToString() + layer + *data_handle);
154+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
155+
156+
// If we are not planning to go online or access the cache, do not lock.
157+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
158+
lock.lock();
159+
}
160+
152161
repository::DataCacheRepository repository(catalog, settings.cache,
153162
settings.default_cache_expiration);
154163

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "PartitionsCacheRepository.h"
2929
#include "generated/api/MetadataApi.h"
3030
#include "generated/api/QueryApi.h"
31+
#include "NamedMutex.h"
3132
#include "olp/dataservice/read/CatalogRequest.h"
3233
#include "olp/dataservice/read/CatalogVersionRequest.h"
3334
#include "olp/dataservice/read/DataRequest.h"
@@ -272,11 +273,22 @@ PartitionsResponse PartitionsRepository::GetPartitionById(
272273
return {{client::ErrorCode::PreconditionFailed, "Partition Id is missing"}};
273274
}
274275

276+
auto fetch_option = data_request.GetFetchOption();
277+
278+
const auto request_key = catalog.ToString() + data_request.CreateKey(layer);
279+
280+
NamedMutex mutex(request_key);
281+
std::unique_lock<repository::NamedMutex> lock(mutex, std::defer_lock);
282+
283+
// If we are not planning to go online or access the cache, do not lock.
284+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
285+
lock.lock();
286+
}
287+
275288
const auto& version = data_request.GetVersion();
276289
std::chrono::seconds timeout{settings.retry_settings.timeout};
277290
repository::PartitionsCacheRepository repository(
278291
catalog, settings.cache, settings.default_cache_expiration);
279-
auto fetch_option = data_request.GetFetchOption();
280292
const auto key = data_request.CreateKey(layer);
281293

282294
const std::vector<std::string> partitions{partition_id.value()};
@@ -355,6 +367,18 @@ PartitionResponse PartitionsRepository::GetAggregatedTile(
355367
const client::OlpClientSettings& settings) {
356368
const auto fetch_option = request.GetFetchOption();
357369
const auto& tile_key = request.GetTileKey();
370+
371+
const auto& root_tile_key = tile_key.ChangedLevelBy(-kAggregateQuadTreeDepth);
372+
const auto root_tile_here = root_tile_key.ToHereTile();
373+
374+
NamedMutex mutex(catalog.ToString() + layer + root_tile_here + "Index");
375+
std::unique_lock<NamedMutex> lock(mutex, std::defer_lock);
376+
377+
// If we are not planning to go online or access the cache, do not lock.
378+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
379+
lock.lock();
380+
}
381+
358382
repository::PartitionsCacheRepository repository(
359383
catalog, settings.cache, settings.default_cache_expiration);
360384

@@ -389,8 +413,6 @@ PartitionResponse PartitionsRepository::GetAggregatedTile(
389413
return query_api.GetError();
390414
}
391415

392-
const auto& root_tile_key = tile_key.ChangedLevelBy(-kAggregateQuadTreeDepth);
393-
const auto root_tile_here = root_tile_key.ToHereTile();
394416
auto quadtree_response = QueryApi::QuadTreeIndex(
395417
query_api.GetResult(), layer, root_tile_here, version,
396418
kAggregateQuadTreeDepth, boost::none, request.GetBillingTag(), context);
@@ -429,6 +451,19 @@ PartitionsResponse PartitionsRepository::GetPartitionForVersionedTile(
429451
const TileRequest& request, int64_t version,
430452
client::CancellationContext context,
431453
const client::OlpClientSettings& settings) {
454+
const auto fetch_option = request.GetFetchOption();
455+
456+
const auto request_key = catalog.ToString() + request.CreateKey(layer_id) +
457+
std::to_string(version);
458+
459+
NamedMutex mutex(request_key);
460+
std::unique_lock<repository::NamedMutex> lock(mutex, std::defer_lock);
461+
462+
// If we are not planning to go online or access the cache, do not lock.
463+
if (fetch_option != CacheOnly && fetch_option != OnlineOnly) {
464+
lock.lock();
465+
}
466+
432467
auto tile = request.GetTileKey().ToHereTile();
433468
auto cached_partitions =
434469
GetTileFromCache(catalog, layer_id, request, version, settings);
@@ -451,7 +486,7 @@ PartitionsResponse PartitionsRepository::GetPartitionForVersionedTile(
451486
return result;
452487
}
453488
}
454-
} else if (request.GetFetchOption() == CacheOnly) {
489+
} else if (fetch_option == CacheOnly) {
455490
OLP_SDK_LOG_INFO_F(
456491
kLogTag,
457492
"GetPartitionForVersionedTile not found in cache, hrn='%s', key='%s'",

tests/common/mocks/NetworkMock.cpp

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,30 @@ NetworkMock::NetworkMock() = default;
2929

3030
NetworkMock::~NetworkMock() = default;
3131

32-
std::tuple<olp::http::RequestId, NetworkCallback, CancelCallback>
32+
std::tuple<http::RequestId, NetworkCallback, CancelCallback>
3333
GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
3434
std::shared_ptr<std::promise<void>> wait_for_signal,
3535
MockedResponseInformation response_information,
3636
std::shared_ptr<std::promise<void>> post_signal) {
37-
using namespace olp::http;
37+
static std::atomic<http::RequestId> s_request_id{
38+
static_cast<http::RequestId>(http::RequestIdConstants::RequestIdMin)};
3839

39-
static std::atomic<RequestId> s_request_id{
40-
static_cast<RequestId>(RequestIdConstants::RequestIdMin)};
41-
42-
olp::http::RequestId request_id = s_request_id.fetch_add(1);
40+
http::RequestId request_id = s_request_id.fetch_add(1);
4341

4442
auto completed = std::make_shared<std::atomic_bool>(false);
4543

4644
// callback is generated when the send method is executed, in order to receive
4745
// the cancel callback, we need to pass it to store it somewhere and share
4846
// with cancel mock.
49-
auto callback_placeholder = std::make_shared<olp::http::Network::Callback>();
47+
auto callback_placeholder = std::make_shared<http::Network::Callback>();
5048

5149
auto mocked_send =
5250
[request_id, completed, pre_signal, wait_for_signal, response_information,
5351
post_signal, callback_placeholder](
54-
NetworkRequest request, Network::Payload payload,
55-
Network::Callback callback, Network::HeaderCallback header_callback,
56-
Network::DataCallback /*data_callback*/) -> olp::http::SendOutcome {
52+
http::NetworkRequest request, http::Network::Payload payload,
53+
http::Network::Callback callback,
54+
http::Network::HeaderCallback header_callback,
55+
http::Network::DataCallback /*data_callback*/) -> http::SendOutcome {
5756
*callback_placeholder = callback;
5857

5958
auto mocked_network_block = [request, pre_signal, wait_for_signal,
@@ -75,7 +74,8 @@ GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
7574
for (const auto& header : response_information.headers) {
7675
header_callback(header.first, header.second);
7776
}
78-
callback(NetworkResponse().WithStatus(response_information.status));
77+
callback(
78+
http::NetworkResponse().WithStatus(response_information.status));
7979
}
8080

8181
// notify that request finished
@@ -85,15 +85,16 @@ GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
8585
// simulate that network code is actually running in the background.
8686
std::thread(std::move(mocked_network_block)).detach();
8787

88-
return SendOutcome(request_id);
88+
return http::SendOutcome(request_id);
8989
};
9090

9191
auto mocked_cancel = [completed,
92-
callback_placeholder](olp::http::RequestId /*id*/) {
92+
callback_placeholder](http::RequestId /*id*/) {
9393
if (!completed->exchange(true)) {
94-
auto cancel_code = static_cast<int>(ErrorCode::CANCELLED_ERROR);
95-
(*callback_placeholder)(
96-
NetworkResponse().WithError("Cancelled").WithStatus(cancel_code));
94+
auto cancel_code = static_cast<int>(http::ErrorCode::CANCELLED_ERROR);
95+
(*callback_placeholder)(http::NetworkResponse()
96+
.WithError("Cancelled")
97+
.WithStatus(cancel_code));
9798
}
9899
};
99100

@@ -105,27 +106,28 @@ GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
105106
/// NetworkMock Actions
106107
///
107108

108-
NetworkCallback ReturnHttpResponse(olp::http::NetworkResponse response,
109+
NetworkCallback ReturnHttpResponse(http::NetworkResponse response,
109110
const std::string& response_body,
110-
const http::Headers& headers) {
111-
return [=](olp::http::NetworkRequest /*request*/,
112-
olp::http::Network::Payload payload,
113-
olp::http::Network::Callback callback,
114-
olp::http::Network::HeaderCallback header_callback,
115-
olp::http::Network::DataCallback /*data_callback*/)
116-
-> olp::http::SendOutcome {
117-
std::thread([=]() {
118-
for (const auto& header : headers) {
119-
header_callback(header.first, header.second);
120-
}
121-
*payload << response_body;
122-
callback(response);
123-
})
124-
.detach();
125-
126-
constexpr auto unused_request_id = 5;
127-
return olp::http::SendOutcome(unused_request_id);
128-
};
111+
const http::Headers& headers,
112+
std::chrono::nanoseconds delay) {
113+
return
114+
[=](http::NetworkRequest /*request*/, http::Network::Payload payload,
115+
http::Network::Callback callback,
116+
http::Network::HeaderCallback header_callback,
117+
http::Network::DataCallback /*data_callback*/) -> http::SendOutcome {
118+
std::thread([=]() {
119+
for (const auto& header : headers) {
120+
header_callback(header.first, header.second);
121+
}
122+
*payload << response_body;
123+
std::this_thread::sleep_for(delay);
124+
callback(response);
125+
})
126+
.detach();
127+
128+
constexpr auto unused_request_id = 5;
129+
return http::SendOutcome(unused_request_id);
130+
};
129131
}
130132

131133
} // namespace common

tests/common/mocks/NetworkMock.h

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include <chrono>
2223
#include <future>
2324

2425
#include <gmock/gmock.h>
@@ -28,28 +29,26 @@ namespace olp {
2829
namespace tests {
2930
namespace common {
3031

31-
using NetworkCallback = std::function<olp::http::SendOutcome(
32-
olp::http::NetworkRequest, olp::http::Network::Payload,
33-
olp::http::Network::Callback, olp::http::Network::HeaderCallback,
34-
olp::http::Network::DataCallback)>;
32+
using NetworkCallback = std::function<http::SendOutcome(
33+
http::NetworkRequest, http::Network::Payload, http::Network::Callback,
34+
http::Network::HeaderCallback, http::Network::DataCallback)>;
3535

36-
using CancelCallback = std::function<void(olp::http::RequestId)>;
36+
using CancelCallback = std::function<void(http::RequestId)>;
3737

38-
class NetworkMock : public olp::http::Network {
38+
class NetworkMock : public http::Network {
3939
public:
4040
NetworkMock();
4141

4242
~NetworkMock() override;
4343

44-
MOCK_METHOD(olp::http::SendOutcome, Send,
45-
(olp::http::NetworkRequest request,
46-
olp::http::Network::Payload payload,
47-
olp::http::Network::Callback callback,
48-
olp::http::Network::HeaderCallback header_callback,
49-
olp::http::Network::DataCallback data_callback),
44+
MOCK_METHOD(http::SendOutcome, Send,
45+
(http::NetworkRequest request, http::Network::Payload payload,
46+
http::Network::Callback callback,
47+
http::Network::HeaderCallback header_callback,
48+
http::Network::DataCallback data_callback),
5049
(override));
5150

52-
MOCK_METHOD(void, Cancel, (olp::http::RequestId id), (override));
51+
MOCK_METHOD(void, Cancel, (http::RequestId id), (override));
5352
};
5453

5554
/**
@@ -82,7 +81,7 @@ struct MockedResponseInformation {
8281
* @return Triple: RequestId; Action for method Send(); Action for method
8382
* Cancel();
8483
*/
85-
std::tuple<olp::http::RequestId, NetworkCallback, CancelCallback>
84+
std::tuple<http::RequestId, NetworkCallback, CancelCallback>
8685
GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
8786
std::shared_ptr<std::promise<void>> wait_for_signal,
8887
MockedResponseInformation response_information,
@@ -93,12 +92,25 @@ GenerateNetworkMockActions(std::shared_ptr<std::promise<void>> pre_signal,
9392
/// NetworkMock Actions
9493
///
9594

96-
NetworkCallback ReturnHttpResponse(olp::http::NetworkResponse response,
97-
const std::string& response_body,
98-
const http::Headers& headers = {});
95+
/**
96+
* @brief Helper function creates Network::Send mock function that returns a
97+
* specified response, body, headers after specified delay from a different
98+
* thread.
99+
*
100+
* @param response Network response.
101+
* @param response_body Response payload.
102+
* @param headers Response headers (optional).
103+
* @param delay Response delay (optional).
104+
*
105+
* @return A function that mimics Network::Send method.
106+
*/
107+
NetworkCallback ReturnHttpResponse(
108+
http::NetworkResponse response, const std::string& response_body,
109+
const http::Headers& headers = {},
110+
std::chrono::nanoseconds delay = std::chrono::nanoseconds(0));
99111

100-
inline olp::http::NetworkResponse GetResponse(int status) {
101-
return olp::http::NetworkResponse().WithStatus(status);
112+
inline http::NetworkResponse GetResponse(int status) {
113+
return http::NetworkResponse().WithStatus(status);
102114
}
103115

104116
} // namespace common

tests/integration/olp-cpp-sdk-dataservice-read/HttpResponses.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,12 @@
270270
#define URL_BLOB_DATA_VOLATILE_PREFETCH_7 \
271271
R"(https://volatile-blob-ireland.data.api.platform.here.com/blobstore/v1/catalogs/hereos-internal-test-v2/layers/hype-test-prefetch/data/95c5c703-e00e-4c38-841e-e419367474f1)"
272272

273-
#define URL_BLOB_AGGREGATE_DATA \
273+
#define URL_BLOB_AGGREGATE_DATA_23618364 \
274274
R"(https://blob-ireland.data.api.platform.here.com/blobstore/v1/catalogs/hereos-internal-test-v2/layers/testlayer/data/f9a9fd8e-eb1b-48e5-bfdb-4392b3826443)"
275275

276+
#define URL_BLOB_AGGREGATE_DATA_23618363 \
277+
R"(https://blob-ireland.data.api.platform.here.com/blobstore/v1/catalogs/hereos-internal-test-v2/layers/testlayer/data/95c5c703-e00e-4c38-841e-e419367474f1)"
278+
276279
#define URL_VOLATILE_BLOB_DATA \
277280
R"(https://volatile-blob-ireland.data.api.platform.here.com/blobstore/v1/catalogs/hereos-internal-test-v2/layers/testlayer_volatile/data/4eed6ed1-0d32-43b9-ae79-043cb4256410)"
278281

0 commit comments

Comments
 (0)