Skip to content

Commit 21db284

Browse files
authored
PrefetchTiles for VolatileLayerClientImpl. (#755)
Second part of prefetch update of VolatileLayerClient. This change contains impl update and unit tests. The method loads data for specified tile and levels. Resolves: OLPEDGE-794 Signed-off-by: Kostiantyn Zvieriev <[email protected]>
1 parent dd66e04 commit 21db284

File tree

6 files changed

+738
-20
lines changed

6 files changed

+738
-20
lines changed

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
221221
request.CreateKey(layer_id).c_str());
222222
return response.GetError();
223223
}
224-
225-
request.WithVersion(response.GetResult().GetVersion());
224+
auto version = response.GetResult().GetVersion();
226225

227226
const auto key = request.CreateKey(layer_id);
228227
OLP_SDK_LOG_INFO_F(kLogTag, "PrefetchTiles: using key=%s", key.c_str());
@@ -253,7 +252,8 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
253252
sliced_tiles.size(), key.c_str());
254253

255254
auto sub_tiles = repository::PrefetchTilesRepository::GetSubTiles(
256-
catalog, layer_id, request, sliced_tiles, context, settings);
255+
catalog, layer_id, request, version, sliced_tiles, context,
256+
settings);
257257

258258
if (!sub_tiles.IsSuccessful()) {
259259
return sub_tiles.GetError();

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <olp/core/client/PendingRequests.h>
2626
#include <olp/core/client/TaskContext.h>
2727
#include <olp/core/logging/Log.h>
28+
#include <olp/dataservice/read/CatalogVersionRequest.h>
29+
#include <olp/dataservice/read/PrefetchTileResult.h>
2830

2931
#include "Common.h"
3032
#include "repositories/CatalogRepository.h"
@@ -33,13 +35,21 @@
3335
#include "repositories/ExecuteOrSchedule.inl"
3436
#include "repositories/PartitionsCacheRepository.h"
3537
#include "repositories/PartitionsRepository.h"
38+
#include "repositories/PrefetchTilesRepository.h"
3639

3740
namespace olp {
3841
namespace dataservice {
3942
namespace read {
4043

4144
namespace {
4245
constexpr auto kLogTag = "VolatileLayerClientImpl";
46+
47+
bool IsOnlyInputTiles(const PrefetchTilesRequest& request) {
48+
return !(request.GetMinLevel() > 0 &&
49+
request.GetMinLevel() < request.GetMaxLevel() &&
50+
request.GetMaxLevel() < geo::TileKey().Level() &&
51+
request.GetMinLevel() < geo::TileKey().Level());
52+
}
4353
} // namespace
4454

4555
VolatileLayerClientImpl::VolatileLayerClientImpl(
@@ -149,6 +159,198 @@ bool VolatileLayerClientImpl::RemoveFromCache(const geo::TileKey& tile) {
149159
return RemoveFromCache(partition_id);
150160
}
151161

162+
client::CancellationToken VolatileLayerClientImpl::PrefetchTiles(
163+
PrefetchTilesRequest request, PrefetchTilesResponseCallback callback) {
164+
// Used as empty response to be able to execute initial task
165+
using EmptyResponse = Response<PrefetchTileNoError>;
166+
using PrefetchResult = PrefetchTilesResult::value_type;
167+
using PrefetchResultFuture = std::future<PrefetchResult>;
168+
using PrefetchResultPromise = std::promise<PrefetchResult>;
169+
using client::CancellationContext;
170+
using client::ErrorCode;
171+
172+
auto catalog = catalog_;
173+
auto layer_id = layer_id_;
174+
auto settings = settings_;
175+
auto pending_requests = pending_requests_;
176+
177+
auto token = AddTask(
178+
settings.task_scheduler, pending_requests,
179+
[=](CancellationContext context) mutable -> EmptyResponse {
180+
const auto& tile_keys = request.GetTileKeys();
181+
if (tile_keys.empty()) {
182+
OLP_SDK_LOG_WARNING_F(kLogTag,
183+
"PrefetchTiles : invalid request, layer=%s",
184+
layer_id.c_str());
185+
return {{ErrorCode::InvalidArgument, "Empty tile key list"}};
186+
}
187+
188+
const auto key = request.CreateKey(layer_id);
189+
OLP_SDK_LOG_INFO_F(kLogTag, "PrefetchTiles: using key=%s", key.c_str());
190+
191+
// Calculate the minimal set of Tile keys and depth to
192+
// cover tree.
193+
bool request_only_input_tiles = IsOnlyInputTiles(request);
194+
unsigned int min_level =
195+
(request_only_input_tiles ? 0 : request.GetMinLevel());
196+
unsigned int max_level =
197+
(request_only_input_tiles ? 0 : request.GetMaxLevel());
198+
199+
auto sliced_tiles = repository::PrefetchTilesRepository::GetSlicedTiles(
200+
tile_keys, min_level, max_level);
201+
202+
if (sliced_tiles.empty()) {
203+
OLP_SDK_LOG_WARNING_F(kLogTag,
204+
"PrefetchTiles: tile/level mismatch, key=%s",
205+
key.c_str());
206+
return {{ErrorCode::InvalidArgument, "TileKeys/levels mismatch"}};
207+
}
208+
209+
OLP_SDK_LOG_DEBUG_F(kLogTag, "PrefetchTiles, subquads=%zu, key=%s",
210+
sliced_tiles.size(), key.c_str());
211+
212+
auto sub_tiles = repository::PrefetchTilesRepository::GetSubTiles(
213+
catalog, layer_id, request, boost::none, sliced_tiles, context,
214+
settings);
215+
216+
if (!sub_tiles.IsSuccessful()) {
217+
return sub_tiles.GetError();
218+
}
219+
220+
const auto& tiles_result = sub_tiles.GetResult();
221+
if (tiles_result.empty()) {
222+
OLP_SDK_LOG_WARNING_F(
223+
kLogTag, "PrefetchTiles: subtiles empty, key=%s", key.c_str());
224+
return {{ErrorCode::InvalidArgument, "Subquads retrieval failed"}};
225+
}
226+
227+
OLP_SDK_LOG_INFO_F(kLogTag, "Prefetch start, key=%s, tiles=%zu",
228+
key.c_str(), tiles_result.size());
229+
230+
// Once we have the data create for each subtile a task and push it
231+
// onto the TaskScheduler. One additional last task is added which
232+
// waits for all previous tasks to finish so that it may call the user
233+
// with the result.
234+
auto futures = std::make_shared<std::vector<PrefetchResultFuture>>();
235+
std::vector<CancellationContext> contexts;
236+
contexts.reserve(tiles_result.size() + 1u);
237+
auto it = tiles_result.begin();
238+
auto skip_tile = [&](const geo::TileKey& tile_key) {
239+
if (request_only_input_tiles) {
240+
return (std::find(tile_keys.begin(), tile_keys.end(), tile_key) ==
241+
tile_keys.end());
242+
}
243+
// skip tiles outside min/max segment
244+
return (tile_key.Level() < request.GetMinLevel() ||
245+
tile_key.Level() > request.GetMaxLevel());
246+
};
247+
248+
while (!context.IsCancelled() && it != tiles_result.end()) {
249+
auto const& tile = it->first;
250+
if (skip_tile(tile)) {
251+
it++;
252+
continue;
253+
}
254+
255+
auto const& handle = it->second;
256+
auto const& biling_tag = request.GetBillingTag();
257+
auto promise = std::make_shared<PrefetchResultPromise>();
258+
auto flag = std::make_shared<std::atomic_bool>(false);
259+
futures->emplace_back(promise->get_future());
260+
auto context_it = contexts.emplace(contexts.end());
261+
262+
AddTask(
263+
settings.task_scheduler, pending_requests,
264+
[=](CancellationContext inner_context) {
265+
auto data = repository::DataRepository::GetVolatileData(
266+
catalog, layer_id,
267+
DataRequest().WithDataHandle(handle).WithBillingTag(
268+
biling_tag),
269+
inner_context, settings);
270+
271+
if (!data.IsSuccessful()) {
272+
promise->set_value(std::make_shared<PrefetchTileResult>(
273+
tile, data.GetError()));
274+
} else {
275+
promise->set_value(std::make_shared<PrefetchTileResult>(
276+
tile, PrefetchTileNoError()));
277+
}
278+
279+
flag->exchange(true);
280+
return EmptyResponse(PrefetchTileNoError());
281+
},
282+
[=](EmptyResponse) {
283+
if (!flag->load()) {
284+
// If above task was cancelled we might need to set
285+
// promise else below task will wait forever
286+
promise->set_value(std::make_shared<PrefetchTileResult>(
287+
tile,
288+
client::ApiError(ErrorCode::Cancelled, "Cancelled")));
289+
}
290+
},
291+
*context_it);
292+
it++;
293+
}
294+
295+
// Task to wait for previously triggered data download to collect
296+
// responses and trigger user callback.
297+
AddTask(
298+
settings.task_scheduler, pending_requests,
299+
[=](CancellationContext inner_context) -> PrefetchTilesResponse {
300+
PrefetchTilesResult result;
301+
result.reserve(futures->size());
302+
303+
for (auto& future : *futures) {
304+
// Check if cancelled in between.
305+
if (inner_context.IsCancelled()) {
306+
return {{ErrorCode::Cancelled, "Cancelled"}};
307+
}
308+
309+
auto tile_result = future.get();
310+
result.emplace_back(std::move(tile_result));
311+
}
312+
313+
OLP_SDK_LOG_INFO_F(kLogTag, "Prefetch done, key=%s, tiles=%zu",
314+
key.c_str(), result.size());
315+
return PrefetchTilesResponse(std::move(result));
316+
},
317+
callback, *contexts.emplace(contexts.end()));
318+
319+
context.ExecuteOrCancelled([&]() {
320+
return client::CancellationToken([contexts]() {
321+
for (auto context : contexts) {
322+
context.CancelOperation();
323+
}
324+
});
325+
});
326+
327+
return EmptyResponse(PrefetchTileNoError());
328+
},
329+
// Because the handling of prefetch tiles responses is performed by the
330+
// inner-task, no need to set a callback here. Otherwise, the user would
331+
// be notified with empty results.
332+
// It is possible to not invoke inner task, when it was cancelled before
333+
// execution.
334+
[callback](EmptyResponse response) {
335+
// Inner task only generates successfull result
336+
if (!response.IsSuccessful()) {
337+
callback(response.GetError());
338+
}
339+
});
340+
341+
return token;
342+
}
343+
344+
client::CancellableFuture<PrefetchTilesResponse>
345+
VolatileLayerClientImpl::PrefetchTiles(PrefetchTilesRequest request) {
346+
auto promise = std::make_shared<std::promise<PrefetchTilesResponse>>();
347+
auto callback = [=](PrefetchTilesResponse resp) {
348+
promise->set_value(std::move(resp));
349+
};
350+
auto token = PrefetchTiles(std::move(request), std::move(callback));
351+
return client::CancellableFuture<PrefetchTilesResponse>(token, promise);
352+
}
353+
152354
} // namespace read
153355
} // namespace dataservice
154356
} // namespace olp

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <olp/core/geo/tiling/TileKey.h>
2626
#include <olp/dataservice/read/DataRequest.h>
2727
#include <olp/dataservice/read/PartitionsRequest.h>
28+
#include <olp/dataservice/read/PrefetchTilesRequest.h>
2829
#include <olp/dataservice/read/Types.h>
2930

3031
namespace olp {
@@ -64,6 +65,12 @@ class VolatileLayerClientImpl {
6465

6566
virtual bool RemoveFromCache(const geo::TileKey& tile);
6667

68+
virtual client::CancellationToken PrefetchTiles(
69+
PrefetchTilesRequest request, PrefetchTilesResponseCallback callback);
70+
71+
virtual client::CancellableFuture<PrefetchTilesResponse> PrefetchTiles(
72+
PrefetchTilesRequest request);
73+
6774
private:
6875
client::HRN catalog_;
6976
std::string layer_id_;

olp-cpp-sdk-dataservice-read/src/repositories/PrefetchTilesRepository.cpp

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,9 @@ RootTilesForRequest PrefetchTilesRepository::GetSlicedTiles(
126126

127127
SubTilesResponse PrefetchTilesRepository::GetSubTiles(
128128
const client::HRN& catalog, const std::string& layer_id,
129-
const PrefetchTilesRequest& request, const RootTilesForRequest& root_tiles,
130-
client::CancellationContext context,
129+
const PrefetchTilesRequest& request, boost::optional<std::int64_t> version,
130+
const RootTilesForRequest& root_tiles, client::CancellationContext context,
131131
const client::OlpClientSettings& settings) {
132-
// Version needs to be set, else we cannot move forward
133-
if (!request.GetVersion()) {
134-
OLP_SDK_LOG_WARNING_F(kLogTag,
135-
"GetSubTiles: catalog version missing, key=%s",
136-
request.CreateKey(layer_id).c_str());
137-
return ApiError{ErrorCode::InvalidArgument, "Catalog version invalid"};
138-
}
139132
SubTilesResult result;
140133
OLP_SDK_LOG_INFO_F(kLogTag, "GetSubTiles: hrn=%s, layer=%s, quads=%zu",
141134
catalog.ToString().c_str(), layer_id.c_str(),
@@ -148,8 +141,11 @@ SubTilesResponse PrefetchTilesRepository::GetSubTiles(
148141

149142
auto& tile = quad.first;
150143
auto& depth = quad.second;
151-
auto response =
152-
GetSubQuads(catalog, layer_id, request, tile, depth, settings, context);
144+
auto response = version
145+
? GetSubQuads(catalog, layer_id, request, version.get(),
146+
tile, depth, settings, context)
147+
: GetVolatileSubQuads(catalog, layer_id, request, tile,
148+
depth, settings, context);
153149
if (!response.IsSuccessful()) {
154150
// Just abort if something else then 404 Not Found is returned
155151
auto& error = response.GetError();
@@ -166,11 +162,11 @@ SubTilesResponse PrefetchTilesRepository::GetSubTiles(
166162

167163
SubQuadsResponse PrefetchTilesRepository::GetSubQuads(
168164
const HRN& catalog, const std::string& layer_id,
169-
const PrefetchTilesRequest& request, geo::TileKey tile, int32_t depth,
170-
const OlpClientSettings& settings, CancellationContext context) {
165+
const PrefetchTilesRequest& request, std::int64_t version,
166+
geo::TileKey tile, int32_t depth, const OlpClientSettings& settings,
167+
CancellationContext context) {
171168
OLP_SDK_LOG_TRACE_F(kLogTag, "GetSubQuads(%s, %" PRId64 ", %" PRId32 ")",
172-
tile.ToHereTile().c_str(), request.GetVersion().get(),
173-
depth);
169+
tile.ToHereTile().c_str(), version, depth);
174170

175171
auto query_api =
176172
ApiClientLookup::LookupApi(catalog, context, "query", "v1",
@@ -184,7 +180,6 @@ SubQuadsResponse PrefetchTilesRepository::GetSubQuads(
184180
using QuadTreeIndexPromise = std::promise<QuadTreeIndexResponse>;
185181
auto promise = std::make_shared<QuadTreeIndexPromise>();
186182
auto tile_key = tile.ToHereTile();
187-
auto version = request.GetVersion().get();
188183

189184
OLP_SDK_LOG_INFO_F(kLogTag,
190185
"GetSubQuads execute(%s, %" PRId64 ", %" PRId32 ")",
@@ -229,6 +224,65 @@ SubQuadsResponse PrefetchTilesRepository::GetSubQuads(
229224
return result;
230225
}
231226

227+
SubQuadsResponse PrefetchTilesRepository::GetVolatileSubQuads(
228+
const HRN& catalog, const std::string& layer_id,
229+
const PrefetchTilesRequest& request, geo::TileKey tile, int32_t depth,
230+
const OlpClientSettings& settings, CancellationContext context) {
231+
OLP_SDK_LOG_TRACE_F(kLogTag, "GetSubQuadsVolatile(%s, %" PRId32 ")",
232+
tile.ToHereTile().c_str(), depth);
233+
234+
auto query_api =
235+
ApiClientLookup::LookupApi(catalog, context, "query", "v1",
236+
FetchOptions::OnlineIfNotFound, settings);
237+
238+
if (!query_api.IsSuccessful()) {
239+
return query_api.GetError();
240+
}
241+
242+
using QuadTreeIndexResponse = QueryApi::QuadTreeIndexResponse;
243+
using QuadTreeIndexPromise = std::promise<QuadTreeIndexResponse>;
244+
auto promise = std::make_shared<QuadTreeIndexPromise>();
245+
auto tile_key = tile.ToHereTile();
246+
247+
OLP_SDK_LOG_INFO_F(kLogTag, "GetSubQuadsVolatile execute(%s, %" PRId32 ")",
248+
tile_key.c_str(), depth);
249+
auto quad_tree = QueryApi::QuadTreeIndexVolatile(
250+
query_api.GetResult(), layer_id, tile_key, depth, boost::none,
251+
request.GetBillingTag(), context);
252+
253+
if (!quad_tree.IsSuccessful()) {
254+
OLP_SDK_LOG_INFO_F(kLogTag, "GetSubQuadsVolatile failed(%s, %" PRId32 ")",
255+
tile_key.c_str(), depth);
256+
return quad_tree.GetError();
257+
}
258+
259+
SubQuadsResult result;
260+
model::Partitions partitions;
261+
262+
const auto& subquads = quad_tree.GetResult().GetSubQuads();
263+
partitions.GetMutablePartitions().reserve(subquads.size());
264+
265+
for (const auto& subquad : subquads) {
266+
auto subtile = tile.AddedSubHereTile(subquad->GetSubQuadKey());
267+
OLP_SDK_LOG_DEBUG_F(
268+
kLogTag, "GetSubQuadsVolatile key(%s, %s | %" PRId32 ")",
269+
subtile.ToHereTile().c_str(), subquad->GetDataHandle().c_str(), depth);
270+
// Add to result
271+
result.emplace(subtile, subquad->GetDataHandle());
272+
273+
// add to bulk partitions for cacheing
274+
partitions.GetMutablePartitions().emplace_back(
275+
PartitionsRepository::PartitionFromSubQuad(*subquad,
276+
subtile.ToHereTile()));
277+
}
278+
279+
// add to cache
280+
repository::PartitionsCacheRepository cache(catalog, settings.cache);
281+
cache.Put({}, partitions, layer_id, boost::none, false);
282+
283+
return result;
284+
}
285+
232286
PORTING_POP_WARNINGS()
233287
} // namespace repository
234288
} // namespace read

0 commit comments

Comments
 (0)