Skip to content

Commit 6e8fa33

Browse files
Abort PrefetchTiles/Data with error on cache issue (#1207)
All prefetch operations are required to have properly working cache. Thus any cache related issue should led to the failed request Relates-To: OLPEDGE-2552 Signed-off-by: Andrey Kashcheev <[email protected]>
1 parent d7bd376 commit 6e8fa33

22 files changed

+503
-98
lines changed

olp-cpp-sdk-core/include/olp/core/client/ApiError.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ class CORE_API ApiError {
5656
return ApiError(ErrorCode::NetworkConnection, description);
5757
}
5858

59+
/**
60+
* @brief Creates the `ApiError` instance with the precondition failed error
61+
* code and description.
62+
* @param description The optional description.
63+
* @return The `ApiError` instance.
64+
*/
65+
static ApiError PreconditionFailed(
66+
const char* description = "Precondition failed") {
67+
return ApiError(ErrorCode::PreconditionFailed, description);
68+
}
69+
5970
ApiError() = default;
6071

6172
/**

olp-cpp-sdk-core/include/olp/core/client/ApiNoResult.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#pragma once
2121

22+
#include <olp/core/client/ApiError.h>
23+
#include <olp/core/client/ApiResponse.h>
24+
2225
namespace olp {
2326
namespace client {
2427

@@ -29,5 +32,8 @@ namespace client {
2932
*/
3033
class ApiNoResult {};
3134

35+
/// The alias to the response without result.
36+
using ApiNoResponse = ApiResponse<ApiNoResult, ApiError>;
37+
3238
} // namespace client
3339
} // namespace olp

olp-cpp-sdk-core/include/olp/core/client/ErrorCode.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ enum class ErrorCode {
8585
/**
8686
* The network connection error.
8787
*/
88-
NetworkConnection
88+
NetworkConnection,
89+
90+
/**
91+
* A failed cache IO operation.
92+
*/
93+
CacheIO
8994
};
9095

9196
} // namespace client

olp-cpp-sdk-core/src/client/OlpClientSettingsFactory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ OlpClientSettingsFactory::CreateDefaultNetworkRequestHandler(
4949
std::unique_ptr<cache::KeyValueCache>
5050
OlpClientSettingsFactory::CreateDefaultCache(cache::CacheSettings settings) {
5151
#ifdef OLP_SDK_ENABLE_DEFAULT_CACHE
52-
auto cache = std::make_unique<cache::DefaultCache>(std::move(settings));
52+
auto cache = std::make_unique<cache::DefaultCache>(settings);
5353
auto error = cache->Open();
5454
if (error == cache::DefaultCache::StorageOpenResult::OpenDiskPathFailure) {
5555
OLP_SDK_LOG_FATAL_F(

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/PartitionsRequest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class DATASERVICE_READ_API PartitionsRequest final {
135135
*/
136136
inline PartitionsRequest& WithBillingTag(
137137
boost::optional<std::string> billingTag) {
138-
billing_tag_ = billingTag;
138+
billing_tag_ = std::move(billingTag);
139139
return *this;
140140
}
141141

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/PrefetchPartitionsRequest.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class DATASERVICE_READ_API PrefetchPartitionsRequest final {
4747
*
4848
* When the list is empty, the GetPartitions method will download the whole
4949
* layer metadata. Additionally, a single request supports up to 100
50-
* partitions. If partitions list has more than 100, it will be splitted
50+
* partitions. If partitions list has more than 100, it will be split
5151
* internally to multiple requests.
5252
*
5353
* @param partitions The list of partitions to request.
@@ -92,7 +92,7 @@ class DATASERVICE_READ_API PrefetchPartitionsRequest final {
9292
*/
9393
inline PrefetchPartitionsRequest& WithBillingTag(
9494
boost::optional<std::string> billing_tag) {
95-
billing_tag_ = billing_tag;
95+
billing_tag_ = std::move(billing_tag);
9696
return *this;
9797
}
9898

@@ -147,7 +147,7 @@ class DATASERVICE_READ_API PrefetchPartitionsRequest final {
147147
out << "@" << version.get();
148148
}
149149
out << "^" << partition_ids_.size();
150-
if (partition_ids_.size() > 0) {
150+
if (!partition_ids_.empty()) {
151151
out << "[" << partition_ids_.front() << ", ...]";
152152
}
153153
if (GetBillingTag()) {

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/Types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <vector>
2525

2626
#include <olp/core/client/ApiError.h>
27+
#include <olp/core/client/ApiNoResult.h>
2728
#include <olp/core/client/ApiResponse.h>
2829

2930
#include <olp/dataservice/read/AggregatedDataResult.h>

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -174,22 +174,32 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
174174
auto task = [=](client::CancellationContext context,
175175
PrefetchPartitionsRequest& request) mutable {
176176
if (context.IsCancelled()) {
177-
callback(ApiError(ErrorCode::Cancelled, "Canceled"));
177+
callback(ApiError::Cancelled());
178+
return;
179+
}
180+
181+
const auto key = request.CreateKey(layer_id_);
182+
183+
if (!settings_.cache) {
184+
OLP_SDK_LOG_ERROR_F(
185+
kLogTag,
186+
"PrefetchPartitions: cache is missing, aborting, hrn=%s, key=%s",
187+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
188+
callback(
189+
ApiError::PreconditionFailed("Unable to prefetch without a cache"));
178190
return;
179191
}
180192

181193
if (request.GetPartitionIds().empty()) {
182194
OLP_SDK_LOG_WARNING_F(
183-
kLogTag, "PrefetchPartitions : invalid request, catalog=%s, layer=%s",
184-
catalog_.ToCatalogHRNString().c_str(), layer_id_.c_str());
195+
kLogTag, "PrefetchPartitions: invalid request, catalog=%s, key=%s",
196+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
185197
callback(ApiError(ErrorCode::InvalidArgument, "Empty partitions list"));
186198
return;
187199
}
188200

189201
auto billing_tag = request.GetBillingTag();
190202

191-
const auto key = request.CreateKey(layer_id_);
192-
193203
auto response = GetVersion(billing_tag, OnlineIfNotFound, context);
194204

195205
if (!response.IsSuccessful()) {
@@ -216,7 +226,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
216226
.WithPartitionIds(std::move(partitions))
217227
.WithBillingTag(billing_tag);
218228
auto response = repository.GetVersionedPartitionsExtendedResponse(
219-
partitions_request, version, inner_context);
229+
partitions_request, version, std::move(inner_context), true);
220230

221231
if (!response.IsSuccessful()) {
222232
OLP_SDK_LOG_WARNING_F(kLogTag,
@@ -256,8 +266,10 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
256266
// Fetch from online
257267
return repository.GetVersionedData(
258268
layer_id_,
259-
DataRequest().WithDataHandle(data_handle).WithBillingTag(billing_tag),
260-
version, inner_context);
269+
DataRequest()
270+
.WithDataHandle(std::move(data_handle))
271+
.WithBillingTag(billing_tag),
272+
version, std::move(inner_context), true);
261273
};
262274

263275
auto append_result = [](ExtendedDataResponse response, std::string item,
@@ -281,7 +293,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchPartitions(
281293
std::move(call_user_callback), std::move(status_callback));
282294
return PrefetchPartitionsHelper::Prefetch(
283295
std::move(download_job), request.GetPartitionIds(), std::move(query),
284-
task_sink_, request.GetPriority(), context);
296+
task_sink_, request.GetPriority(), std::move(context));
285297
};
286298
const auto priority = request.GetPriority();
287299
return task_sink_.AddTask(
@@ -315,20 +327,30 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
315327
return task_sink_.AddTask(
316328
[=](client::CancellationContext context) mutable -> void {
317329
if (context.IsCancelled()) {
318-
callback(ApiError(ErrorCode::Cancelled, "Canceled"));
330+
callback(ApiError::Cancelled());
331+
return;
332+
}
333+
334+
const auto key = request.CreateKey(layer_id_);
335+
336+
if (!settings_.cache) {
337+
OLP_SDK_LOG_ERROR_F(
338+
kLogTag,
339+
"PrefetchPartitions: cache is missing, aborting, hrn=%s, key=%s",
340+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
341+
callback(ApiError::PreconditionFailed(
342+
"Unable to prefetch without a cache"));
319343
return;
320344
}
321345

322346
if (request.GetTileKeys().empty()) {
323347
OLP_SDK_LOG_WARNING_F(
324-
kLogTag, "PrefetchTiles : invalid request, catalog=%s, layer=%s",
325-
catalog_.ToCatalogHRNString().c_str(), layer_id_.c_str());
348+
kLogTag, "PrefetchTiles: invalid request, catalog=%s, key=%s",
349+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
326350
callback(ApiError(ErrorCode::InvalidArgument, "Empty tile key list"));
327351
return;
328352
}
329353

330-
const auto key = request.CreateKey(layer_id_);
331-
332354
auto response =
333355
GetVersion(request.GetBillingTag(), OnlineIfNotFound, context);
334356

@@ -408,7 +430,7 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
408430
return response;
409431
};
410432

411-
auto billing_tag = request.GetBillingTag();
433+
auto& billing_tag = request.GetBillingTag();
412434
auto download = [=](std::string data_handle,
413435
client::CancellationContext inner_context) mutable {
414436
if (data_handle.empty()) {
@@ -424,11 +446,12 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
424446
repository::DataRepository repository(catalog_, settings_,
425447
lookup_client_, mutex_storage_);
426448
// Fetch from online
427-
return repository.GetVersionedData(layer_id_,
428-
DataRequest()
429-
.WithDataHandle(data_handle)
430-
.WithBillingTag(billing_tag),
431-
version, inner_context);
449+
return repository.GetVersionedData(
450+
layer_id_,
451+
DataRequest()
452+
.WithDataHandle(std::move(data_handle))
453+
.WithBillingTag(billing_tag),
454+
version, std::move(inner_context), true);
432455
};
433456

434457
std::vector<geo::TileKey> roots;
@@ -458,7 +481,8 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(
458481

459482
return PrefetchTilesHelper::Prefetch(
460483
std::move(download_job), std::move(roots), std::move(query),
461-
std::move(filter), task_sink_, request.GetPriority(), context);
484+
std::move(filter), task_sink_, request.GetPriority(),
485+
std::move(context));
462486
},
463487
request.GetPriority(), execution_context);
464488
}
@@ -487,7 +511,7 @@ CatalogVersionResponse VersionedLayerClientImpl::GetVersion(
487511
}
488512

489513
CatalogVersionRequest request;
490-
request.WithBillingTag(billing_tag);
514+
request.WithBillingTag(std::move(billing_tag));
491515
request.WithFetchOption(fetch_options);
492516

493517
repository::CatalogRepository repository(catalog_, settings_, lookup_client_);
@@ -528,7 +552,8 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
528552
repository::DataRepository repository(catalog_, settings_, lookup_client_,
529553
mutex_storage_);
530554
return repository.GetVersionedTile(
531-
layer_id_, request, version_response.GetResult().GetVersion(), context);
555+
layer_id_, request, version_response.GetResult().GetVersion(),
556+
std::move(context));
532557
};
533558

534559
return task_sink_.AddTask(std::move(data_task), std::move(callback),

olp-cpp-sdk-dataservice-read/src/VolatileLayerClientImpl.cpp

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,31 @@ client::CancellationToken VolatileLayerClientImpl::PrefetchTiles(
157157
return task_sink_.AddTask(
158158
[=](client::CancellationContext context) mutable -> void {
159159
if (context.IsCancelled()) {
160-
callback(ApiError(ErrorCode::Cancelled, "Canceled"));
160+
callback(ApiError::Cancelled());
161+
return;
162+
}
163+
164+
const auto key = request.CreateKey(layer_id_);
165+
166+
if (!settings_.cache) {
167+
OLP_SDK_LOG_ERROR_F(
168+
kLogTag,
169+
"PrefetchPartitions: cache is missing, aborting, hrn=%s, key=%s",
170+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
171+
callback(ApiError::PreconditionFailed(
172+
"Unable to prefetch without a cache"));
161173
return;
162174
}
163175

164176
const auto& tile_keys = request.GetTileKeys();
165177
if (tile_keys.empty()) {
166-
OLP_SDK_LOG_WARNING_F(kLogTag,
167-
"PrefetchTiles : invalid request, layer=%s",
168-
layer_id_.c_str());
169-
178+
OLP_SDK_LOG_WARNING_F(
179+
kLogTag, "PrefetchTiles: invalid request, hrn=%s, key=%s",
180+
catalog_.ToCatalogHRNString().c_str(), key.c_str());
170181
callback(ApiError(ErrorCode::InvalidArgument, "Empty tile key list"));
171182
return;
172183
}
173184

174-
const auto key = request.CreateKey(layer_id_);
175185
OLP_SDK_LOG_INFO_F(kLogTag, "PrefetchTiles: using key=%s", key.c_str());
176186

177187
// Calculate the minimal set of Tile keys and depth to
@@ -235,11 +245,12 @@ client::CancellationToken VolatileLayerClientImpl::PrefetchTiles(
235245
repository::DataRepository repository(catalog_, settings_,
236246
lookup_client_, mutex_storage_);
237247
// Fetch from online
238-
return repository.GetVolatileData(layer_id_,
239-
DataRequest()
240-
.WithDataHandle(data_handle)
241-
.WithBillingTag(billing_tag),
242-
inner_context);
248+
return repository.GetVolatileData(
249+
layer_id_,
250+
DataRequest()
251+
.WithDataHandle(std::move(data_handle))
252+
.WithBillingTag(billing_tag),
253+
std::move(inner_context), true);
243254
};
244255

245256
std::vector<geo::TileKey> roots;

olp-cpp-sdk-dataservice-read/src/repositories/DataCacheRepository.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,22 @@ namespace repository {
4242
DataCacheRepository::DataCacheRepository(
4343
const client::HRN& hrn, std::shared_ptr<cache::KeyValueCache> cache,
4444
std::chrono::seconds default_expiry)
45-
: hrn_(hrn), cache_(cache), default_expiry_(ConvertTime(default_expiry)) {}
45+
: hrn_(hrn),
46+
cache_(std::move(cache)),
47+
default_expiry_(ConvertTime(default_expiry)) {}
4648

47-
void DataCacheRepository::Put(const model::Data& data,
48-
const std::string& layer_id,
49-
const std::string& data_handle) {
49+
client::ApiNoResponse DataCacheRepository::Put(const model::Data& data,
50+
const std::string& layer_id,
51+
const std::string& data_handle) {
5052
auto key = CreateKey(layer_id, data_handle);
5153
OLP_SDK_LOG_DEBUG_F(kLogTag, "Put -> '%s'", key.c_str());
5254

53-
cache_->Put(key, data, default_expiry_);
55+
if (!cache_->Put(key, data, default_expiry_)) {
56+
OLP_SDK_LOG_ERROR_F(kLogTag, "Failed to write -> '%s'", key.c_str());
57+
return {{client::ErrorCode::CacheIO, "Put to cache failed"}};
58+
}
59+
60+
return {client::ApiNoResult{}};
5461
}
5562

5663
boost::optional<model::Data> DataCacheRepository::Get(

0 commit comments

Comments
 (0)