Skip to content

Commit bb8d77d

Browse files
author
Liubov Didkivska
authored
Add api to prefetch partitions. (#1049)
Add public api to prefetch partitions for versioned layer client. Add integration tests to check prefetching, handling errors, canceling, request prioritization, request progress. Change query to return error only if all query partitions fail. Add test to check if all possible partitions prefetched, even if some queries fail, check if error returned if all queries fails. Relates-To: OLPEDGE-1901 Signed-off-by: Liubov Didkivska <[email protected]>
1 parent 30b0cdc commit bb8d77d

File tree

9 files changed

+662
-28
lines changed

9 files changed

+662
-28
lines changed

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VersionedLayerClient.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <olp/dataservice/read/DataRequest.h>
3232
#include <olp/dataservice/read/DataServiceReadApi.h>
3333
#include <olp/dataservice/read/PartitionsRequest.h>
34+
#include <olp/dataservice/read/PrefetchPartitionsRequest.h>
3435
#include <olp/dataservice/read/PrefetchTileResult.h>
3536
#include <olp/dataservice/read/PrefetchTilesRequest.h>
3637
#include <olp/dataservice/read/TileRequest.h>
@@ -351,6 +352,58 @@ class DATASERVICE_READ_API VersionedLayerClient final {
351352
PrefetchTilesRequest request,
352353
PrefetchStatusCallback status_callback = nullptr);
353354

355+
/**
356+
* @brief Prefetches a set of partitions asynchronously.
357+
*
358+
* This method downloads all partitions listed in
359+
* `PrefetchPartitionsRequest`. Only partitions that are not already present
360+
* in the cache are downloaded. It helps reduce the network load.
361+
*
362+
* @note This method does not guarantee that all partitions are available
363+
* offline as the cache might overflow, and data might be evicted at any
364+
* point. Use `GetData(DataRequest)` to retrieve partitions loaded by
365+
* `PrefetchPartitions`.
366+
*
367+
* @param request The `PrefetchPartitionsRequest` instance that contains
368+
* a complete set of request parameters.
369+
* @param callback The `PrefetchPartitionsResponseCallback` object that is
370+
* invoked if the `PrefetchPartitionsResult` instance is available or an error
371+
* occurs.
372+
* @param status_callback The `PrefetchPartitionsStatusCallback` object that
373+
* is invoked every time a partition is fetched.
374+
*
375+
* @return A token that can be used to cancel this request.
376+
*/
377+
client::CancellationToken PrefetchPartitions(
378+
PrefetchPartitionsRequest request,
379+
PrefetchPartitionsResponseCallback callback,
380+
PrefetchPartitionsStatusCallback status_callback = nullptr);
381+
382+
/**
383+
* @brief Prefetches a set of partitions asynchronously.
384+
*
385+
* This method downloads all partitions listed in
386+
* `PrefetchPartitionsRequest`. Only partitions that are not already present
387+
* in the cache are downloaded. It helps reduce the network load.
388+
*
389+
* @note This method does not guarantee that all partitions are available
390+
* offline as the cache might overflow, and data might be evicted at any
391+
* point. Use `GetData(DataRequest)` to retrieve partitions loaded by
392+
* `PrefetchPartitions`.
393+
*
394+
* @param request The `PrefetchPartitionsRequest` instance that contains
395+
* a complete set of request parameters.
396+
* @param status_callback The `PrefetchPartitionsStatusCallback` object that
397+
* is invoked every time a partition is fetched.
398+
*
399+
* @return `CancellableFuture` that contains the `PrefetchPartitionsResponse`
400+
* instance with data or an error. You can also use `CancellableFuture` to
401+
* cancel this request.
402+
*/
403+
client::CancellableFuture<PrefetchPartitionsResponse> PrefetchPartitions(
404+
PrefetchPartitionsRequest request,
405+
PrefetchPartitionsStatusCallback status_callback = nullptr);
406+
354407
/**
355408
* @brief Removes the partition from the mutable disk cache.
356409
*

olp-cpp-sdk-dataservice-read/src/PrefetchPartitionsHelper.h

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,14 @@
3333
#include "DownloadItemsJob.h"
3434
#include "ExtendedApiResponse.h"
3535
#include "ExtendedApiResponseHelpers.h"
36-
#include "QueryMetadataJob.h"
36+
#include "QueryPartitionsJob.h"
3737
#include "TaskSink.h"
3838
#include "repositories/PartitionsRepository.h"
3939

4040
namespace olp {
4141
namespace dataservice {
4242
namespace read {
4343

44-
using PartitionDataHandleResult =
45-
std::vector<std::pair<std::string, std::string>>;
46-
using PartitionsDataHandleExtendedResponse =
47-
ExtendedApiResponse<PartitionDataHandleResult, client::ApiError,
48-
client::NetworkStatistics>;
49-
5044
class PrefetchPartitionsHelper {
5145
public:
5246
using DownloadJob = DownloadItemsJob<std::string, PrefetchPartitionsResult,
@@ -60,9 +54,7 @@ class PrefetchPartitionsHelper {
6054
TaskSink& task_sink, size_t query_max_size, uint32_t priority) {
6155
client::CancellationContext execution_context;
6256

63-
auto query_job = std::make_shared<QueryMetadataJob<
64-
std::string, std::vector<std::string>, PrefetchPartitionsResult,
65-
PartitionsDataHandleExtendedResponse, PrefetchPartitionsStatus>>(
57+
auto query_job = std::make_shared<QueryPartitionsJob>(
6658
std::move(query), nullptr, download_job, task_sink, execution_context,
6759
priority);
6860

olp-cpp-sdk-dataservice-read/src/QueryMetadataJob.h

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,18 @@ class QueryMetadataJob {
7474
execution_context_(execution_context),
7575
priority_(priority) {}
7676

77-
void Initialize(size_t query_count) { query_count_ = query_count; }
77+
virtual ~QueryMetadataJob() = default;
78+
79+
virtual bool CheckIfFail() {
80+
// The old behavior: when one of the query requests fails, we fail the
81+
// entire prefetch.
82+
return (!query_errors_.empty());
83+
}
84+
85+
void Initialize(size_t query_count) {
86+
query_count_ = query_count;
87+
query_size_ = query_count;
88+
}
7889

7990
QueryResponseType Query(QueryType root, client::CancellationContext context) {
8091
return query_(root, context);
@@ -94,15 +105,14 @@ class QueryMetadataJob {
94105
if (error.GetErrorCode() == client::ErrorCode::Cancelled) {
95106
canceled_ = true;
96107
} else {
97-
// The old behavior: when one of the query requests fails, we fail the
98-
// entire prefetch.
99-
query_error_ = error;
108+
// Collect all errors.
109+
query_errors_.push_back(error);
100110
}
101111
}
102112

103113
if (!--query_count_) {
104-
if (query_error_) {
105-
download_job_->OnPrefetchCompleted(query_error_.value());
114+
if (CheckIfFail()) {
115+
download_job_->OnPrefetchCompleted(query_errors_.front());
106116
return;
107117
}
108118

@@ -167,14 +177,15 @@ class QueryMetadataJob {
167177
}
168178
}
169179

170-
private:
180+
protected:
171181
QueryItemsFunc<ItemType, QueryType, QueryResponseType> query_;
172182
FilterItemsFunc<typename QueryResponseType::ResultType> filter_;
173183
size_t query_count_{0};
184+
size_t query_size_{0};
174185
bool canceled_{false};
175186
typename QueryResponseType::ResultType query_result_;
176187
client::NetworkStatistics accumulated_statistics_;
177-
boost::optional<client::ApiError> query_error_;
188+
std::vector<client::ApiError> query_errors_;
178189
std::shared_ptr<
179190
DownloadItemsJob<ItemType, PrefetchResult, PrefetchStatusType>>
180191
download_job_;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include <iterator>
23+
#include <string>
24+
#include <utility>
25+
#include <vector>
26+
27+
#include <olp/core/client/CancellationContext.h>
28+
#include <olp/core/logging/Log.h>
29+
#include <olp/dataservice/read/Types.h>
30+
#include "Common.h"
31+
#include "ExtendedApiResponse.h"
32+
#include "QueryMetadataJob.h"
33+
#include "TaskSink.h"
34+
35+
namespace olp {
36+
namespace dataservice {
37+
namespace read {
38+
39+
using PartitionDataHandleResult =
40+
std::vector<std::pair<std::string, std::string>>;
41+
using PartitionsDataHandleExtendedResponse =
42+
ExtendedApiResponse<PartitionDataHandleResult, client::ApiError,
43+
client::NetworkStatistics>;
44+
45+
class QueryPartitionsJob
46+
: public QueryMetadataJob<
47+
std::string, std::vector<std::string>, PrefetchPartitionsResult,
48+
PartitionsDataHandleExtendedResponse, PrefetchPartitionsStatus> {
49+
public:
50+
QueryPartitionsJob(
51+
QueryItemsFunc<std::string, std::vector<std::string>,
52+
PartitionsDataHandleExtendedResponse>
53+
query,
54+
FilterItemsFunc<PartitionDataHandleResult> filter,
55+
std::shared_ptr<DownloadItemsJob<std::string, PrefetchPartitionsResult,
56+
PrefetchPartitionsStatus>>
57+
download_job,
58+
TaskSink& task_sink, client::CancellationContext execution_context,
59+
uint32_t priority)
60+
: QueryMetadataJob(std::move(query), std::move(filter),
61+
std::move(download_job), task_sink, execution_context,
62+
priority) {}
63+
64+
virtual bool CheckIfFail() {
65+
// Return error only if all fails
66+
return (query_errors_.size() == query_size_);
67+
}
68+
};
69+
70+
} // namespace read
71+
} // namespace dataservice
72+
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VersionedLayerClient.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,22 @@ VersionedLayerClient::PrefetchTiles(PrefetchTilesRequest request,
8080
return impl_->PrefetchTiles(std::move(request), std::move(status_callback));
8181
}
8282

83+
client::CancellationToken VersionedLayerClient::PrefetchPartitions(
84+
PrefetchPartitionsRequest request,
85+
PrefetchPartitionsResponseCallback callback,
86+
PrefetchPartitionsStatusCallback status_callback) {
87+
return impl_->PrefetchPartitions(std::move(request), std::move(callback),
88+
std::move(status_callback));
89+
}
90+
91+
client::CancellableFuture<PrefetchPartitionsResponse>
92+
VersionedLayerClient::PrefetchPartitions(
93+
PrefetchPartitionsRequest request,
94+
PrefetchPartitionsStatusCallback status_callback) {
95+
return impl_->PrefetchPartitions(std::move(request),
96+
std::move(status_callback));
97+
}
98+
8399
client::CancellationToken VersionedLayerClient::GetData(
84100
TileRequest request, DataResponseCallback callback) {
85101
return impl_->GetData(std::move(request), std::move(callback));

tests/integration/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ set(OLP_SDK_INTEGRATIONAL_TESTS_SOURCES
3030
./olp-cpp-sdk-dataservice-read/HttpResponses.h
3131
./olp-cpp-sdk-dataservice-read/StreamLayerClientTest.cpp
3232
./olp-cpp-sdk-dataservice-read/VersionedLayerClientTest.cpp
33+
./olp-cpp-sdk-dataservice-read/VersionedLayerClientPrefetchPartitionsTest.cpp
3334
./olp-cpp-sdk-dataservice-read/VolatileLayerClientTest.cpp
3435
./olp-cpp-sdk-dataservice-write/HttpResponses.h
3536
./olp-cpp-sdk-dataservice-write/IndexLayerClientTest.cpp

0 commit comments

Comments
 (0)