Skip to content

Commit 614ed1b

Browse files
Add StreamLayerPartitions API (#1427)
New API is using a SAX parser and a JSON byte stream to parse the layer partitions while they are downloading. Using this approach we can process large responses without any memory impact, since we do not wait for the full response, and parse it afterwards. Relates-To: OLPEDGE-2832 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent e98b71c commit 614ed1b

File tree

18 files changed

+1055
-29
lines changed

18 files changed

+1055
-29
lines changed

olp-cpp-sdk-core/include/olp/core/client/OlpClient.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2020 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ class CORE_API OlpClient {
4848

4949
/**
5050
* @brief Creates the `OlpClient` instance.
51-
*
51+
*
5252
* @param settings The `OlpClientSettings` instance.
5353
* @param base_url The base URL to be used for all outgoing requests.
5454
*/
@@ -160,6 +160,33 @@ class CORE_API OlpClient {
160160
RequestBodyType post_body, std::string content_type,
161161
CancellationContext context) const;
162162

163+
/**
164+
* @brief Executes the HTTP request through the network stack in a blocking
165+
* way. The response content is consumed via data callback.
166+
*
167+
* @param path The path that is appended to the base URL.
168+
* @param method Select one of the following methods: `GET`, `POST`, `DELETE`,
169+
* or `PUT`.
170+
* @param query_params The parameters that are appended to the URL path.
171+
* @param header_params The headers used to customize the request.
172+
* @param data_callback The network data callback to retrieve content.
173+
* @param post_body For the `POST` request, populate `form_params` or
174+
* `post_body`, but not both. This data must not be modified until
175+
* the request is completed.
176+
* @param content_type The content type for the `post_body` or `form_params`.
177+
* @param context The `CancellationContext` instance that is used to cancel
178+
* the request.
179+
*
180+
* @return The `HttpResponse` instance.
181+
*/
182+
HttpResponse CallApiStream(std::string path, std::string method,
183+
ParametersType query_params,
184+
ParametersType header_params,
185+
http::Network::DataCallback data_callback,
186+
RequestBodyType post_body,
187+
std::string content_type,
188+
CancellationContext context) const;
189+
163190
private:
164191
class OlpClientImpl;
165192
std::shared_ptr<OlpClientImpl> impl_;

olp-cpp-sdk-core/src/client/OlpClient.cpp

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ http::NetworkRequest::HttpVerb GetHttpVerb(const std::string& verb) {
254254
}
255255

256256
HttpResponse SendRequest(const http::NetworkRequest& request,
257+
const http::Network::DataCallback& data_callback,
257258
const olp::client::OlpClientSettings& settings,
258259
const olp::client::RetrySettings& retry_settings,
259260
client::CancellationContext context) {
@@ -264,7 +265,11 @@ HttpResponse SendRequest(const http::NetworkRequest& request,
264265
};
265266

266267
auto response_data = std::make_shared<ResponseData>();
267-
auto response_body = std::make_shared<std::stringstream>();
268+
269+
// We dont need a response body in case we want a stream
270+
auto response_body =
271+
data_callback ? nullptr : std::make_shared<std::stringstream>();
272+
268273
http::SendOutcome outcome{http::ErrorCode::CANCELLED_ERROR};
269274
const auto timeout = std::chrono::seconds(retry_settings.timeout);
270275

@@ -279,7 +284,8 @@ HttpResponse SendRequest(const http::NetworkRequest& request,
279284
[response_data](std::string key, std::string value) {
280285
response_data->headers.emplace_back(std::move(key),
281286
std::move(value));
282-
});
287+
},
288+
data_callback);
283289

284290
if (!outcome.IsSuccessful()) {
285291
OLP_SDK_LOG_WARNING_F(kLogTag,
@@ -321,9 +327,12 @@ HttpResponse SendRequest(const http::NetworkRequest& request,
321327
const auto status = response_data->response.GetStatus();
322328
if (status < 0) {
323329
return HttpResponse{status, response_data->response.GetError()};
324-
} else {
330+
} else if (response_body) {
325331
return HttpResponse{status, std::move(*response_body),
326332
std::move(response_data->headers)};
333+
} else {
334+
return HttpResponse{status, std::stringstream(),
335+
std::move(response_data->headers)};
327336
}
328337
}();
329338

@@ -362,7 +371,8 @@ class OlpClient::OlpClientImpl {
362371

363372
HttpResponse CallApi(std::string path, std::string method,
364373
ParametersType query_params,
365-
ParametersType header_params, ParametersType form_params,
374+
ParametersType header_params,
375+
http::Network::DataCallback data_callback,
366376
RequestBodyType post_body, std::string content_type,
367377
CancellationContext context) const;
368378

@@ -595,7 +605,7 @@ HttpResponse OlpClient::OlpClientImpl::CallApi(
595605
std::string path, std::string method,
596606
OlpClient::ParametersType query_params,
597607
OlpClient::ParametersType header_params,
598-
OlpClient::ParametersType /*forms_params*/,
608+
http::Network::DataCallback data_callback,
599609
OlpClient::RequestBodyType post_body, std::string content_type,
600610
CancellationContext context) const {
601611
if (!settings_.network_request_handler) {
@@ -663,8 +673,8 @@ HttpResponse OlpClient::OlpClientImpl::CallApi(
663673
return {status, optional_error->GetMessage()};
664674
}
665675

666-
auto response =
667-
SendRequest(network_request, settings_, retry_settings, context);
676+
auto response = SendRequest(network_request, data_callback, settings_,
677+
retry_settings, context);
668678

669679
NetworkStatistics accumulated_statistics = response.GetNetworkStatistics();
670680

@@ -697,7 +707,8 @@ HttpResponse OlpClient::OlpClientImpl::CallApi(
697707
}
698708

699709
backdown_period = CalculateNextWaitTime(retry_settings, i);
700-
response = SendRequest(network_request, settings_, retry_settings, context);
710+
response = SendRequest(network_request, data_callback, settings_,
711+
retry_settings, context);
701712

702713
// In case we retry, accumulate the stats
703714
accumulated_statistics += response.GetNetworkStatistics();
@@ -744,13 +755,26 @@ CancellationToken OlpClient::CallApi(
744755
HttpResponse OlpClient::CallApi(std::string path, std::string method,
745756
ParametersType query_params,
746757
ParametersType header_params,
747-
ParametersType form_params,
758+
ParametersType /*form_params*/,
748759
RequestBodyType post_body,
749760
std::string content_type,
750761
CancellationContext context) const {
751762
return impl_->CallApi(std::move(path), std::move(method),
752763
std::move(query_params), std::move(header_params),
753-
std::move(form_params), std::move(post_body),
764+
nullptr, std::move(post_body), std::move(content_type),
765+
std::move(context));
766+
}
767+
768+
HttpResponse OlpClient::CallApiStream(std::string path, std::string method,
769+
ParametersType query_params,
770+
ParametersType header_params,
771+
http::Network::DataCallback data_callback,
772+
RequestBodyType post_body,
773+
std::string content_type,
774+
CancellationContext context) const {
775+
return impl_->CallApi(std::move(path), std::move(method),
776+
std::move(query_params), std::move(header_params),
777+
std::move(data_callback), std::move(post_body),
754778
std::move(content_type), std::move(context));
755779
}
756780

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/Types.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2022 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -153,6 +153,12 @@ using CompatibleVersionsCallback = Callback<CompatibleVersionsResult>;
153153
/// The list of tile keys.
154154
using TileKeys = std::vector<geo::TileKey>;
155155

156+
/// A callback type for partitions stream.
157+
using PartitionsStreamCallback = std::function<void(model::Partition)>;
158+
159+
/// A type of callback that has no result, or an error.
160+
using CallbackNoResult = Callback<client::ApiNoResult>;
161+
156162
} // namespace read
157163
} // namespace dataservice
158164
} // namespace olp

olp-cpp-sdk-dataservice-read/include/olp/dataservice/read/VersionedLayerClient.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2022 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -282,6 +282,28 @@ class DATASERVICE_READ_API VersionedLayerClient final {
282282
client::CancellationToken GetPartitions(PartitionsRequest partitions_request,
283283
PartitionsResponseCallback callback);
284284

285+
/**
286+
* @brief Fetches a list of partitions of the given generic layer
287+
* asynchronously. Client does not cache the partitions, instead every
288+
* partition is passed to the provided callback.
289+
*
290+
* @note API is considered experimental and a subject to change.
291+
*
292+
* @param partitions_request The `PartitionsRequest` instance that contains
293+
* a complete set of request parameters.
294+
* @note Fetch option and partition list are not supported.
295+
* @param partition_stream_callback The `PartitionsStreamCallback` that
296+
* receives every fetched partition.
297+
* @param callback The `CallbackNoResult` object that is invoked when
298+
* operation is complete or an error is encountered.
299+
*
300+
* @return A token that can be used to cancel this request.
301+
*/
302+
client::CancellationToken StreamLayerPartitions(
303+
PartitionsRequest partitions_request,
304+
PartitionsStreamCallback partition_stream_callback,
305+
CallbackNoResult callback);
306+
285307
/**
286308
* @brief Fetches a list of partitions of the given generic layer
287309
* asynchronously.

olp-cpp-sdk-dataservice-read/src/VersionedLayerClient.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2022 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,15 @@ client::CancellationToken VersionedLayerClient::GetPartitions(
6262
std::move(callback));
6363
}
6464

65+
client::CancellationToken VersionedLayerClient::StreamLayerPartitions(
66+
PartitionsRequest partitions_request,
67+
PartitionsStreamCallback partition_stream_callback,
68+
CallbackNoResult callback) {
69+
return impl_->StreamLayerPartitions(std::move(partitions_request),
70+
std::move(partition_stream_callback),
71+
std::move(callback));
72+
}
73+
6574
client::CancellableFuture<PartitionsResponse>
6675
VersionedLayerClient::GetPartitions(PartitionsRequest partitions_request) {
6776
return impl_->GetPartitions(std::move(partitions_request));

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2022 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,9 @@
4848
#include "repositories/PartitionsRepository.h"
4949
#include "repositories/PrefetchTilesRepository.h"
5050

51+
#include "repositories/AsyncJsonStream.h"
52+
#include "repositories/PartitionsSaxHandler.h"
53+
5154
namespace olp {
5255
namespace dataservice {
5356
namespace read {
@@ -110,6 +113,53 @@ client::CancellationToken VersionedLayerClientImpl::GetPartitions(
110113
std::move(callback), thread::NORMAL);
111114
}
112115

116+
client::CancellationToken VersionedLayerClientImpl::StreamLayerPartitions(
117+
PartitionsRequest request,
118+
PartitionsStreamCallback partition_stream_callback,
119+
CallbackNoResult callback) {
120+
auto async_stream = std::make_shared<repository::AsyncJsonStream>();
121+
122+
auto request_task =
123+
[=](client::CancellationContext context) -> client::ApiNoResponse {
124+
auto version_response =
125+
GetVersion(boost::none, FetchOptions::OnlineIfNotFound, context);
126+
if (!version_response.IsSuccessful()) {
127+
return version_response.GetError();
128+
}
129+
130+
const auto version = version_response.GetResult().GetVersion();
131+
132+
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
133+
lookup_client_, mutex_storage_);
134+
135+
repository.StreamPartitions(async_stream, version,
136+
request.GetAdditionalFields(),
137+
request.GetBillingTag(), context);
138+
139+
return client::ApiNoResult{};
140+
};
141+
142+
auto request_task_token = task_sink_.AddTask(
143+
std::bind(request_task, std::placeholders::_1), nullptr, thread::NORMAL);
144+
145+
auto parse_task = [=](client::CancellationContext context) {
146+
repository::PartitionsRepository repository(catalog_, layer_id_, settings_,
147+
lookup_client_, mutex_storage_);
148+
149+
return repository.ParsePartitionsStream(async_stream,
150+
partition_stream_callback, context);
151+
};
152+
153+
auto parse_task_token =
154+
task_sink_.AddTask(std::bind(parse_task, std::placeholders::_1),
155+
std::move(callback), thread::NORMAL);
156+
157+
return client::CancellationToken([request_task_token, parse_task_token]() {
158+
request_task_token.Cancel();
159+
parse_task_token.Cancel();
160+
});
161+
}
162+
113163
client::CancellableFuture<PartitionsResponse>
114164
VersionedLayerClientImpl::GetPartitions(PartitionsRequest partitions_request) {
115165
auto promise = std::make_shared<std::promise<PartitionsResponse>>();

olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2022 HERE Europe B.V.
2+
* Copyright (C) 2019-2023 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,6 +74,11 @@ class VersionedLayerClientImpl {
7474
virtual client::CancellationToken GetPartitions(
7575
PartitionsRequest request, PartitionsResponseCallback callback);
7676

77+
virtual client::CancellationToken StreamLayerPartitions(
78+
PartitionsRequest request,
79+
PartitionsStreamCallback partition_stream_callback,
80+
CallbackNoResult callback);
81+
7782
virtual client::CancellableFuture<PartitionsResponse> GetPartitions(
7883
PartitionsRequest partitions_request);
7984

0 commit comments

Comments
 (0)