Skip to content

Commit dbba360

Browse files
author
Liubov Didkivska
authored
Add PrefetchPartitions implementation (#1074)
Add implementation to prefetch partitions. Split partitions requests for blocks by 100. Query data handles for partitions and download data. Add unit test to check if after prefetch data is avialible in cache. Add tests with failed responces. Relates-To: OLPEDGE-1902 Signed-off-by: Liubov Didkivska <[email protected]>
1 parent a8def22 commit dbba360

File tree

10 files changed

+1048
-142
lines changed

10 files changed

+1048
-142
lines changed

olp-cpp-sdk-dataservice-read/src/DownloadItemsJob.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <olp/core/client/CancellationContext.h>
2323
#include <olp/core/logging/Log.h>
2424
#include <olp/dataservice/read/Types.h>
25-
2625
#include "Common.h"
2726
#include "ExtendedApiResponse.h"
2827
#include "ExtendedApiResponseHelpers.h"
@@ -43,13 +42,18 @@ using AppendResultFunc =
4342
std::function<void(ExtendedDataResponse response, ItemType item,
4443
PrefetchResult& prefetch_result)>;
4544

46-
template <typename ItemType, typename PrefetchResult>
45+
template <typename PrefetchStatusType>
46+
using PrefetchStatusCallbackType = std::function<void(PrefetchStatusType)>;
47+
48+
template <typename ItemType, typename PrefetchResult,
49+
typename PrefetchStatusType>
4750
class DownloadItemsJob {
4851
public:
49-
DownloadItemsJob(DownloadFunc download,
50-
AppendResultFunc<ItemType, PrefetchResult> append_result,
51-
Callback<PrefetchResult> user_callback,
52-
PrefetchStatusCallback status_callback)
52+
DownloadItemsJob(
53+
DownloadFunc download,
54+
AppendResultFunc<ItemType, PrefetchResult> append_result,
55+
Callback<PrefetchResult> user_callback,
56+
PrefetchStatusCallbackType<PrefetchStatusType> status_callback)
5357
: download_(std::move(download)),
5458
append_result_(std::move(append_result)),
5559
user_callback_(std::move(user_callback)),
@@ -86,7 +90,7 @@ class DownloadItemsJob {
8690
append_result_(response, item, prefetch_result_);
8791

8892
if (status_callback_) {
89-
status_callback_(PrefetchStatus{
93+
status_callback_(PrefetchStatusType{
9094
requests_succeeded_ + requests_failed_, total_download_task_count_,
9195
GetAccumulatedBytes(accumulated_statistics_)});
9296
}
@@ -109,7 +113,7 @@ class DownloadItemsJob {
109113
DownloadFunc download_;
110114
AppendResultFunc<ItemType, PrefetchResult> append_result_;
111115
Callback<PrefetchResult> user_callback_;
112-
PrefetchStatusCallback status_callback_;
116+
PrefetchStatusCallbackType<PrefetchStatusType> status_callback_;
113117
size_t download_task_count_{0};
114118
size_t total_download_task_count_{0};
115119
size_t requests_succeeded_{0};
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include <algorithm>
23+
#include <memory>
24+
#include <string>
25+
#include <utility>
26+
#include <vector>
27+
28+
#include <olp/core/client/CancellationContext.h>
29+
#include <olp/core/client/PendingRequests.h>
30+
#include <olp/core/logging/Log.h>
31+
#include <olp/dataservice/read/Types.h>
32+
#include "Common.h"
33+
#include "DownloadItemsJob.h"
34+
#include "ExtendedApiResponse.h"
35+
#include "ExtendedApiResponseHelpers.h"
36+
#include "QueryMetadataJob.h"
37+
#include "TaskSink.h"
38+
#include "repositories/PartitionsRepository.h"
39+
40+
namespace olp {
41+
namespace dataservice {
42+
namespace read {
43+
44+
using PartitionDataHandleResult =
45+
std::vector<std::pair<std::string, std::string>>;
46+
using PartitionsDataHandleExtendedResponse =
47+
ExtendedApiResponse<PartitionDataHandleResult, client::ApiError,
48+
client::NetworkStatistics>;
49+
50+
class PrefetchPartitionsHelper {
51+
public:
52+
using DownloadJob = DownloadItemsJob<std::string, PrefetchPartitionsResult,
53+
PrefetchPartitionsStatus>;
54+
using QueryFunc = QueryItemsFunc<std::string, std::vector<std::string>,
55+
PartitionsDataHandleExtendedResponse>;
56+
57+
static client::CancellationToken Prefetch(
58+
std::shared_ptr<DownloadJob> download_job,
59+
const std::vector<std::string>& roots, QueryFunc query,
60+
TaskSink& task_sink, size_t query_max_size, uint32_t priority) {
61+
client::CancellationContext execution_context;
62+
63+
auto query_job = std::make_shared<QueryMetadataJob<
64+
std::string, std::vector<std::string>, PrefetchPartitionsResult,
65+
PartitionsDataHandleExtendedResponse, PrefetchPartitionsStatus>>(
66+
std::move(query), nullptr, download_job, task_sink, execution_context,
67+
priority);
68+
69+
auto query_size = roots.size() / query_max_size;
70+
query_size += (roots.size() % query_max_size > 0) ? 1 : 0;
71+
72+
query_job->Initialize(query_size);
73+
74+
OLP_SDK_LOG_DEBUG_F("PrefetchJob", "Starting queries, requests=%zu",
75+
roots.size());
76+
77+
execution_context.ExecuteOrCancelled([&]() {
78+
VectorOfTokens tokens;
79+
tokens.reserve(query_size);
80+
81+
// split items to blocks
82+
auto size_left = roots.size();
83+
auto start = 0u;
84+
while (size_left > start) {
85+
auto size = std::min(query_max_size, size_left - start);
86+
auto query_element = std::vector<std::string>(
87+
roots.begin() + start, roots.begin() + start + size);
88+
89+
tokens.emplace_back(task_sink.AddTask(
90+
[query_element, query_job](client::CancellationContext context) {
91+
return query_job->Query(std::move(query_element), context);
92+
},
93+
[query_job](PartitionsDataHandleExtendedResponse response) {
94+
query_job->CompleteQuery(std::move(response));
95+
},
96+
priority));
97+
98+
start += size;
99+
}
100+
101+
return CreateToken(std::move(tokens));
102+
});
103+
104+
return client::CancellationToken(
105+
[execution_context]() mutable { execution_context.CancelOperation(); });
106+
}
107+
};
108+
109+
} // namespace read
110+
} // namespace dataservice
111+
} // namespace olp

olp-cpp-sdk-dataservice-read/src/PrefetchHelper.h renamed to olp-cpp-sdk-dataservice-read/src/PrefetchTilesHelper.h

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,45 @@
1919

2020
#pragma once
2121

22+
#include <memory>
23+
#include <utility>
24+
#include <vector>
25+
2226
#include <olp/core/client/CancellationContext.h>
2327
#include <olp/core/client/PendingRequests.h>
2428
#include <olp/core/logging/Log.h>
2529
#include <olp/dataservice/read/Types.h>
26-
2730
#include "Common.h"
2831
#include "DownloadItemsJob.h"
2932
#include "ExtendedApiResponse.h"
3033
#include "ExtendedApiResponseHelpers.h"
3134
#include "QueryMetadataJob.h"
3235
#include "TaskSink.h"
36+
#include "repositories/PrefetchTilesRepository.h"
3337

3438
namespace olp {
3539
namespace dataservice {
3640
namespace read {
3741

38-
template <typename PrefetchItemsResult>
39-
using PrefetchItemsResponseCallback = Callback<PrefetchItemsResult>;
40-
41-
class PrefetchHelper {
42+
class PrefetchTilesHelper {
4243
public:
43-
template <typename ItemType, typename QueryType, typename PrefetchResult>
44+
using DownloadJob =
45+
DownloadItemsJob<geo::TileKey, PrefetchTilesResult, PrefetchStatus>;
46+
using QueryFunc =
47+
QueryItemsFunc<geo::TileKey, geo::TileKey, repository::SubQuadsResponse>;
48+
4449
static client::CancellationToken Prefetch(
45-
const std::vector<QueryType>& roots,
46-
QueryItemsFunc<ItemType, QueryType> query,
47-
FilterItemsFunc<ItemType> filter, DownloadFunc download,
48-
AppendResultFunc<ItemType, PrefetchResult> append_result,
49-
Callback<PrefetchResult> user_callback,
50-
PrefetchStatusCallback status_callback, TaskSink& task_sink,
50+
std::shared_ptr<DownloadJob> download_job,
51+
const std::vector<geo::TileKey>& roots, QueryFunc query,
52+
FilterItemsFunc<repository::SubQuadsResult> filter, TaskSink& task_sink,
5153
uint32_t priority) {
5254
client::CancellationContext execution_context;
5355

54-
auto download_job =
55-
std::make_shared<DownloadItemsJob<ItemType, PrefetchResult>>(
56-
std::move(download), std::move(append_result),
57-
std::move(user_callback), std::move(status_callback));
58-
59-
auto query_job =
60-
std::make_shared<QueryMetadataJob<ItemType, QueryType, PrefetchResult>>(
61-
std::move(query), std::move(filter), download_job, task_sink,
62-
execution_context, priority);
56+
auto query_job = std::make_shared<
57+
QueryMetadataJob<geo::TileKey, geo::TileKey, PrefetchTilesResult,
58+
repository::SubQuadsResponse, PrefetchStatus>>(
59+
std::move(query), std::move(filter), download_job, task_sink,
60+
execution_context, priority);
6361

6462
query_job->Initialize(roots.size());
6563

@@ -69,12 +67,12 @@ class PrefetchHelper {
6967
execution_context.ExecuteOrCancelled([&]() {
7068
VectorOfTokens tokens;
7169
std::transform(std::begin(roots), std::end(roots),
72-
std::back_inserter(tokens), [&](QueryType root) {
70+
std::back_inserter(tokens), [&](geo::TileKey root) {
7371
return task_sink.AddTask(
7472
[=](client::CancellationContext context) {
7573
return query_job->Query(root, context);
7674
},
77-
[=](QueryItemsResponse<ItemType> response) {
75+
[=](repository::SubQuadsResponse response) {
7876
query_job->CompleteQuery(std::move(response));
7977
},
8078
priority);

olp-cpp-sdk-dataservice-read/src/QueryMetadataJob.h

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919

2020
#pragma once
2121

22+
#include <iterator>
23+
#include <string>
24+
#include <utility>
25+
#include <vector>
26+
2227
#include <olp/core/client/CancellationContext.h>
2328
#include <olp/core/logging/Log.h>
2429
#include <olp/dataservice/read/Types.h>
25-
2630
#include "Common.h"
2731
#include "ExtendedApiResponse.h"
2832
#include "TaskSink.h"
@@ -31,21 +35,12 @@ namespace olp {
3135
namespace dataservice {
3236
namespace read {
3337

34-
template <typename ItemType>
35-
using QueryItemsResult = std::map<ItemType, std::string>;
36-
37-
template <typename ItemType>
38-
using QueryItemsResponse =
39-
ExtendedApiResponse<QueryItemsResult<ItemType>, client::ApiError,
40-
client::NetworkStatistics>;
41-
42-
template <typename ItemType, typename QueryType>
43-
using QueryItemsFunc = std::function<QueryItemsResponse<ItemType>(
44-
QueryType, client::CancellationContext)>;
38+
template <typename ItemType, typename QueryType, typename QueryResponseType>
39+
using QueryItemsFunc =
40+
std::function<QueryResponseType(QueryType, client::CancellationContext)>;
4541

46-
template <typename ItemType>
47-
using FilterItemsFunc =
48-
std::function<QueryItemsResult<ItemType>(QueryItemsResult<ItemType>)>;
42+
template <typename QueryResponseType>
43+
using FilterItemsFunc = std::function<QueryResponseType(QueryResponseType)>;
4944

5045
using VectorOfTokens = std::vector<olp::client::CancellationToken>;
5146

@@ -60,13 +55,16 @@ static olp::client::CancellationToken CreateToken(VectorOfTokens tokens) {
6055
std::move(tokens)));
6156
}
6257

63-
template <typename ItemType, typename QueryType, typename PrefetchResult>
58+
template <typename ItemType, typename QueryType, typename PrefetchResult,
59+
typename QueryResponseType, typename PrefetchStatusType>
6460
class QueryMetadataJob {
6561
public:
6662
QueryMetadataJob(
67-
QueryItemsFunc<ItemType, QueryType> query,
68-
FilterItemsFunc<ItemType> filter,
69-
std::shared_ptr<DownloadItemsJob<ItemType, PrefetchResult>> download_job,
63+
QueryItemsFunc<ItemType, QueryType, QueryResponseType> query,
64+
FilterItemsFunc<typename QueryResponseType::ResultType> filter,
65+
std::shared_ptr<
66+
DownloadItemsJob<ItemType, PrefetchResult, PrefetchStatusType>>
67+
download_job,
7068
TaskSink& task_sink, client::CancellationContext execution_context,
7169
uint32_t priority)
7270
: query_(std::move(query)),
@@ -78,20 +76,19 @@ class QueryMetadataJob {
7876

7977
void Initialize(size_t query_count) { query_count_ = query_count; }
8078

81-
QueryItemsResponse<ItemType> Query(QueryType root,
82-
client::CancellationContext context) {
79+
QueryResponseType Query(QueryType root, client::CancellationContext context) {
8380
return query_(root, context);
8481
}
8582

86-
void CompleteQuery(QueryItemsResponse<ItemType> response) {
83+
void CompleteQuery(QueryResponseType response) {
8784
std::lock_guard<std::mutex> lock(mutex_);
8885

8986
accumulated_statistics_ += GetNetworkStatistics(response);
9087

9188
if (response.IsSuccessful()) {
9289
auto items = response.MoveResult();
93-
query_result_.insert(std::make_move_iterator(items.begin()),
94-
std::make_move_iterator(items.end()));
90+
std::move(items.begin(), items.end(),
91+
std::inserter(query_result_, query_result_.begin()));
9592
} else {
9693
const auto& error = response.GetError();
9794
if (error.GetErrorCode() == client::ErrorCode::Cancelled) {
@@ -138,7 +135,8 @@ class QueryMetadataJob {
138135
std::transform(
139136
std::begin(query_result_), std::end(query_result_),
140137
std::back_inserter(tokens),
141-
[&](const typename QueryItemsResult<ItemType>::value_type& item) {
138+
[&](const typename QueryResponseType::ResultType::value_type&
139+
item) {
142140
const std::string& data_handle = item.second;
143141
const auto& item_key = item.first;
144142

@@ -170,14 +168,16 @@ class QueryMetadataJob {
170168
}
171169

172170
private:
173-
QueryItemsFunc<ItemType, QueryType> query_;
174-
FilterItemsFunc<ItemType> filter_;
171+
QueryItemsFunc<ItemType, QueryType, QueryResponseType> query_;
172+
FilterItemsFunc<typename QueryResponseType::ResultType> filter_;
175173
size_t query_count_{0};
176174
bool canceled_{false};
177-
QueryItemsResult<ItemType> query_result_;
175+
typename QueryResponseType::ResultType query_result_;
178176
client::NetworkStatistics accumulated_statistics_;
179177
boost::optional<client::ApiError> query_error_;
180-
std::shared_ptr<DownloadItemsJob<ItemType, PrefetchResult>> download_job_;
178+
std::shared_ptr<
179+
DownloadItemsJob<ItemType, PrefetchResult, PrefetchStatusType>>
180+
download_job_;
181181
TaskSink& task_sink_;
182182
client::CancellationContext execution_context_;
183183
uint32_t priority_;

0 commit comments

Comments
 (0)