Skip to content

Commit 761ea51

Browse files
Fix the veracode warning. (#1154)
Remove the query_max_size parameter, use constant directly instead. Optimize the PrefetchPartitionsHelper to produce less copies. Relates-To: OLPEDGE-2444 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent 5fec077 commit 761ea51

File tree

3 files changed

+220
-180
lines changed

3 files changed

+220
-180
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (C) 2021 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#include "PrefetchPartitionsHelper.h"
21+
22+
#include <algorithm>
23+
#include <memory>
24+
#include <string>
25+
#include <utility>
26+
#include <vector>
27+
28+
#include <olp/core/client/CancellationContext.h>
29+
#include <olp/core/client/PendingRequests.h>
30+
#include <olp/core/logging/Log.h>
31+
#include <olp/dataservice/read/Types.h>
32+
#include "Common.h"
33+
#include "DownloadItemsJob.h"
34+
#include "ExtendedApiResponse.h"
35+
#include "ExtendedApiResponseHelpers.h"
36+
#include "QueryPartitionsJob.h"
37+
#include "TaskSink.h"
38+
#include "repositories/PartitionsRepository.h"
39+
40+
namespace olp {
41+
namespace dataservice {
42+
namespace read {
43+
namespace {
44+
const size_t kQueryPartitionsMaxSize = 100u;
45+
}
46+
47+
void PrefetchPartitionsHelper::Prefetch(
48+
std::shared_ptr<DownloadJob> download_job,
49+
std::vector<std::string> partitions, QueryFunc query, TaskSink& task_sink,
50+
uint32_t priority, client::CancellationContext execution_context) {
51+
auto query_job = std::make_shared<QueryPartitionsJob>(
52+
std::move(query), nullptr, download_job, task_sink, execution_context,
53+
priority);
54+
55+
size_t query_size = partitions.size() / kQueryPartitionsMaxSize;
56+
query_size += (partitions.size() % kQueryPartitionsMaxSize > 0) ? 1 : 0;
57+
58+
query_job->Initialize(query_size);
59+
60+
OLP_SDK_LOG_DEBUG_F("PrefetchJob", "Starting queries, requests=%zu",
61+
query_size);
62+
63+
execution_context.ExecuteOrCancelled(
64+
[&]() {
65+
VectorOfTokens tokens;
66+
tokens.reserve(query_size);
67+
68+
for (auto p_it = partitions.begin(); p_it != partitions.end();) {
69+
const size_t batch_size = std::min(
70+
kQueryPartitionsMaxSize,
71+
static_cast<size_t>(std::distance(p_it, partitions.end())));
72+
std::vector<std::string> elements(
73+
std::make_move_iterator(p_it),
74+
std::make_move_iterator(p_it + batch_size));
75+
76+
std::advance(p_it, batch_size);
77+
78+
auto query_partition_func =
79+
[query_job](client::CancellationContext context,
80+
std::vector<std::string>& partitions) {
81+
return query_job->Query(std::move(partitions), context);
82+
};
83+
84+
auto token = task_sink.AddTaskChecked(
85+
std::bind(std::move(query_partition_func), std::placeholders::_1,
86+
std::move(elements)),
87+
[query_job](PartitionsDataHandleExtendedResponse response) {
88+
query_job->CompleteQuery(std::move(response));
89+
},
90+
priority);
91+
92+
if (!token) {
93+
query_job->CompleteQuery(
94+
client::ApiError(client::ErrorCode::Cancelled, "Cancelled"));
95+
} else {
96+
tokens.emplace_back(*token);
97+
}
98+
}
99+
100+
return CreateToken(std::move(tokens));
101+
},
102+
[&]() {
103+
download_job->OnPrefetchCompleted(
104+
{{client::ErrorCode::Cancelled, "Cancelled"}});
105+
});
106+
}
107+
108+
} // namespace read
109+
} // namespace dataservice
110+
} // namespace olp

olp-cpp-sdk-dataservice-read/src/PrefetchPartitionsHelper.h

Lines changed: 5 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,18 @@
1919

2020
#pragma once
2121

22-
#include <algorithm>
23-
#include <memory>
24-
#include <string>
25-
#include <utility>
26-
#include <vector>
27-
2822
#include <olp/core/client/CancellationContext.h>
29-
#include <olp/core/client/PendingRequests.h>
30-
#include <olp/core/logging/Log.h>
3123
#include <olp/dataservice/read/Types.h>
3224
#include "Common.h"
3325
#include "DownloadItemsJob.h"
34-
#include "ExtendedApiResponse.h"
35-
#include "ExtendedApiResponseHelpers.h"
3626
#include "QueryPartitionsJob.h"
37-
#include "TaskSink.h"
38-
#include "repositories/PartitionsRepository.h"
3927

4028
namespace olp {
4129
namespace dataservice {
4230
namespace read {
4331

32+
class TaskSink;
33+
4434
class PrefetchPartitionsHelper {
4535
public:
4636
using DownloadJob = DownloadItemsJob<std::string, PrefetchPartitionsResult,
@@ -49,62 +39,9 @@ class PrefetchPartitionsHelper {
4939
PartitionsDataHandleExtendedResponse>;
5040

5141
static void Prefetch(std::shared_ptr<DownloadJob> download_job,
52-
const std::vector<std::string>& roots, QueryFunc query,
53-
TaskSink& task_sink, size_t query_max_size,
54-
uint32_t priority,
55-
client::CancellationContext execution_context) {
56-
auto query_job = std::make_shared<QueryPartitionsJob>(
57-
std::move(query), nullptr, download_job, task_sink, execution_context,
58-
priority);
59-
60-
auto query_size = roots.size() / query_max_size;
61-
query_size += (roots.size() % query_max_size > 0) ? 1 : 0;
62-
63-
query_job->Initialize(query_size);
64-
65-
OLP_SDK_LOG_DEBUG_F("PrefetchJob", "Starting queries, requests=%zu",
66-
roots.size());
67-
68-
execution_context.ExecuteOrCancelled(
69-
[&]() {
70-
VectorOfTokens tokens;
71-
tokens.reserve(query_size);
72-
73-
// split items to blocks
74-
auto size_left = roots.size();
75-
auto start = 0u;
76-
while (size_left > start) {
77-
auto size = std::min(query_max_size, size_left - start);
78-
auto query_element = std::vector<std::string>(
79-
roots.begin() + start, roots.begin() + start + size);
80-
81-
auto token = task_sink.AddTaskChecked(
82-
[query_element,
83-
query_job](client::CancellationContext context) {
84-
return query_job->Query(std::move(query_element), context);
85-
},
86-
[query_job](PartitionsDataHandleExtendedResponse response) {
87-
query_job->CompleteQuery(std::move(response));
88-
},
89-
priority);
90-
91-
if (!token) {
92-
query_job->CompleteQuery(
93-
client::ApiError(client::ErrorCode::Cancelled, "Cancelled"));
94-
} else {
95-
tokens.emplace_back(*token);
96-
}
97-
98-
start += size;
99-
}
100-
101-
return CreateToken(std::move(tokens));
102-
},
103-
[&]() {
104-
download_job->OnPrefetchCompleted(
105-
{{client::ErrorCode::Cancelled, "Cancelled"}});
106-
});
107-
}
42+
std::vector<std::string> partitions, QueryFunc query,
43+
TaskSink& task_sink, uint32_t priority,
44+
client::CancellationContext execution_context);
10845
};
10946

11047
} // namespace read

0 commit comments

Comments
 (0)