Skip to content

Commit b99d11f

Browse files
author
Liubov Didkivska
authored
Add stream layer read example. (#730)
* Add stream layer read example. Create stream layer client with user specified catalog, layer id and credential. Try to poll data till all messages are consumed. Check received messages from a stream layer. If received data is more than 1mb and data handle is returned, call GetData for each received message. Relates-To: OLPEDGE-1306 Signed-off-by: Liubov Didkivska <[email protected]>
1 parent d97dfcf commit b99d11f

File tree

5 files changed

+361
-41
lines changed

5 files changed

+361
-41
lines changed

examples/CMakeLists.txt

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@ set(OLP_SDK_DATASERVICE_EXAMPLE_TARGET dataservice-example)
1919
set(OLP_SDK_DATASERVICE_READ_EXAMPLE_TARGET dataservice-read-example)
2020
set(OLP_SDK_DATASERVICE_WRITE_EXAMPLE_TARGET dataservice-write-example)
2121
set(OLP_SDK_DATASERVICE_CACHE_EXAMPLE_TARGET dataservice-cache-example)
22+
set(OLP_SDK_DATASERVICE_READ_STREAM_LAYER_EXAMPLE_TARGET dataservice-read-stream-layer-example)
2223

2324
set(OLP_SDK_EXAMPLE_SUCCESS_STRING "Example has finished successfully")
2425
set(OLP_SDK_EXAMPLE_FAILURE_STRING "Example failed!")
2526
if (IOS)
26-
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/gen_ios_example.cmake.in)
27+
include(./cmake/gen_ios_example.cmake.in)
2728

2829

2930
gen_ios_example_application(${OLP_SDK_DATASERVICE_EXAMPLE_TARGET}
3031
${OLP_SDK_EXAMPLE_SUCCESS_STRING}
3132
${OLP_SDK_EXAMPLE_FAILURE_STRING})
3233
elseif(ANDROID)
33-
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/gen_android_example.cmake.in)
34+
include(./cmake/gen_android_example.cmake.in)
3435

3536
set(OLP_SDK_EXAMPLE_PACKAGE_NAME "dataservice_example")
3637
set(OLP_SDK_EXAMPLE_PACKAGE_JNI_NAME "dataservice_example")
@@ -44,40 +45,50 @@ else()
4445
project(${OLP_SDK_DATASERVICE_EXAMPLE_TARGET} VERSION 1.4.0)
4546

4647
add_library(${OLP_SDK_DATASERVICE_READ_EXAMPLE_TARGET}
47-
${CMAKE_CURRENT_SOURCE_DIR}/ReadExample.cpp
48-
${CMAKE_CURRENT_SOURCE_DIR}/Examples.h
49-
${CMAKE_CURRENT_SOURCE_DIR}/ReadExample.h)
48+
./ReadExample.cpp
49+
./Examples.h
50+
./ReadExample.h)
5051

5152
target_link_libraries(${OLP_SDK_DATASERVICE_READ_EXAMPLE_TARGET}
5253
olp-cpp-sdk-authentication
5354
olp-cpp-sdk-dataservice-read)
55+
56+
add_library(${OLP_SDK_DATASERVICE_READ_STREAM_LAYER_EXAMPLE_TARGET}
57+
./StreamLayerReadExample.cpp
58+
./Examples.h
59+
./StreamLayerReadExample.h)
60+
61+
target_link_libraries(${OLP_SDK_DATASERVICE_READ_STREAM_LAYER_EXAMPLE_TARGET}
62+
olp-cpp-sdk-authentication
63+
olp-cpp-sdk-dataservice-read)
5464

5565
add_library(${OLP_SDK_DATASERVICE_WRITE_EXAMPLE_TARGET}
56-
${CMAKE_CURRENT_SOURCE_DIR}/WriteExample.cpp
57-
${CMAKE_CURRENT_SOURCE_DIR}/Examples.h
58-
${CMAKE_CURRENT_SOURCE_DIR}/WriteExample.h)
66+
./WriteExample.cpp
67+
./Examples.h
68+
./WriteExample.h)
5969

6070
target_link_libraries(${OLP_SDK_DATASERVICE_WRITE_EXAMPLE_TARGET}
6171
olp-cpp-sdk-authentication
6272
olp-cpp-sdk-dataservice-write)
6373

6474
add_library(${OLP_SDK_DATASERVICE_CACHE_EXAMPLE_TARGET}
65-
${CMAKE_CURRENT_SOURCE_DIR}/ProtectedCacheExample.cpp
66-
${CMAKE_CURRENT_SOURCE_DIR}/Examples.h
67-
${CMAKE_CURRENT_SOURCE_DIR}/ProtectedCacheExample.h)
75+
./ProtectedCacheExample.cpp
76+
./Examples.h
77+
./ProtectedCacheExample.h)
6878

6979
target_link_libraries(${OLP_SDK_DATASERVICE_CACHE_EXAMPLE_TARGET}
7080
olp-cpp-sdk-authentication
7181
olp-cpp-sdk-dataservice-read)
7282

7383
add_executable(${OLP_SDK_DATASERVICE_EXAMPLE_TARGET}
74-
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
75-
${CMAKE_CURRENT_SOURCE_DIR}/Examples.h
76-
${CMAKE_CURRENT_SOURCE_DIR}/Options.h)
84+
./main.cpp
85+
./Examples.h
86+
./Options.h)
7787

7888
target_link_libraries(${OLP_SDK_DATASERVICE_EXAMPLE_TARGET}
7989
${OLP_SDK_DATASERVICE_READ_EXAMPLE_TARGET}
8090
${OLP_SDK_DATASERVICE_WRITE_EXAMPLE_TARGET}
81-
${OLP_SDK_DATASERVICE_CACHE_EXAMPLE_TARGET})
91+
${OLP_SDK_DATASERVICE_CACHE_EXAMPLE_TARGET}
92+
${OLP_SDK_DATASERVICE_READ_STREAM_LAYER_EXAMPLE_TARGET})
8293

8394
endif()

examples/Options.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,30 @@ struct Option {
2626
const Option kHelpOption{"-h", "--help", "Print the help message and exit."};
2727

2828
const Option kExampleOption{"-e", "--example",
29-
"Run example [read, write, cache]."};
29+
"Run example [=read|read_stream|write|cache]."};
3030

31-
const Option kKeyIdOption{"-i", "--key_id", "Here key ID to access OLP."};
31+
const Option kKeyIdOption{"-i", "--key-id", "Here key ID to access OLP."};
3232

33-
const Option kKeySecretOption{"-s", "--key_secret",
33+
const Option kKeySecretOption{"-s", "--key-secret",
3434
"Here secret key to access OLP."};
3535

3636
const Option kCatalogOption{"-c", "--catalog",
3737
"Catalog HRN (HERE Resource Name)."};
3838

3939
const Option kCatalogVersionOption{
40-
"-v", "--catalog_version",
40+
"-v", "--catalog-version",
4141
"The version of the catalog from which you wan to "
4242
"get data(used in read example, optional)."};
4343

44-
const Option kLayerIdOption{"-l", "--layer_id",
44+
const Option kLayerIdOption{"-l", "--layer-id",
4545
"The layer ID inside the catalog where you want to "
4646
"publish data to(required for write example)."};
4747

48+
const Option kSubscriptionTypeOption{
49+
"-t", "--type-of-subscription",
50+
"[Optional] Type of subscription [=serial|parallel] (used for read_stream "
51+
"example). The default option is serial."};
52+
4853
const Option kAllOption{"-a", "--all", "Run all examples."};
4954

5055
} // namespace tools
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#include "StreamLayerReadExample.h"
21+
22+
#include <future>
23+
#include <iostream>
24+
25+
#include <olp/authentication/TokenProvider.h>
26+
#include <olp/core/client/HRN.h>
27+
#include <olp/core/client/OlpClientSettings.h>
28+
#include <olp/core/client/OlpClientSettingsFactory.h>
29+
#include <olp/core/logging/Log.h>
30+
#include <olp/core/utils/Base64.h>
31+
#include <olp/dataservice/read/StreamLayerClient.h>
32+
33+
using namespace olp::dataservice::read;
34+
namespace {
35+
constexpr auto kLogTag = "read-stream-layer-example";
36+
constexpr auto kNumberOfThreads = 2u;
37+
38+
bool CreateSubscription(StreamLayerClient& client,
39+
SubscribeRequest subscribe_request) {
40+
auto subscribe_future = client.Subscribe(subscribe_request);
41+
auto subscribe_response = subscribe_future.GetFuture().get();
42+
if (!subscribe_response.IsSuccessful()) {
43+
OLP_SDK_LOG_ERROR_F(
44+
kLogTag, "Failed to create subscription - HTTP Status: %d Message: %s",
45+
subscribe_response.GetError().GetHttpStatusCode(),
46+
subscribe_response.GetError().GetMessage().c_str());
47+
return false;
48+
}
49+
return true;
50+
}
51+
52+
int GetDataFromMessages(StreamLayerClient& client,
53+
const model::Messages& result) {
54+
const auto& messages = result.GetMessages();
55+
for (const auto& message : messages) {
56+
// If data is greater than 1 MB, the data handle is present. The data handle
57+
// is a unique identifier that is used to identify content and retrieve the
58+
// content using GetData.
59+
auto handle = message.GetMetaData().GetDataHandle();
60+
if (handle) {
61+
OLP_SDK_LOG_INFO_F(kLogTag, "Message data: handle - %s, size - %lu",
62+
handle.get().c_str(),
63+
message.GetMetaData().GetDataSize().get());
64+
// use GetData(const model::Message& message) with message instance to
65+
// request actual data with data handle.
66+
auto message_future = client.GetData(message);
67+
auto message_result = message_future.GetFuture().get();
68+
if (!message_result.IsSuccessful()) {
69+
OLP_SDK_LOG_WARNING_F(kLogTag,
70+
"Failed to get data for data handle %s - HTTP "
71+
"Status: %d Message: %s",
72+
handle.get().c_str(),
73+
message_result.GetError().GetHttpStatusCode(),
74+
message_result.GetError().GetMessage().c_str());
75+
continue;
76+
}
77+
auto message_data = message_result.MoveResult();
78+
OLP_SDK_LOG_INFO_F(kLogTag, "GetData for %s successful: size - %lu",
79+
handle.get().c_str(), message_data->size());
80+
} else {
81+
// If data is less than 1 MB, the data content published directly in the
82+
// metadata and encoded in Base64.
83+
OLP_SDK_LOG_INFO_F(kLogTag, "Message data: size - %lu",
84+
message.GetData()->size());
85+
}
86+
}
87+
return messages.size();
88+
}
89+
90+
void RunPoll(StreamLayerClient& client) {
91+
unsigned int total_messages_size = 0;
92+
// Get the messages, and commit offsets till all data is consumed, or max
93+
// times 5
94+
unsigned int max_times_to_poll = 5;
95+
while (max_times_to_poll-- > 0) {
96+
auto poll_future = client.Poll();
97+
auto poll_response = poll_future.GetFuture().get();
98+
if (!poll_response.IsSuccessful()) {
99+
OLP_SDK_LOG_WARNING_F(kLogTag,
100+
"Failed to poll data - HTTP Status: %d Message: %s",
101+
poll_response.GetError().GetHttpStatusCode(),
102+
poll_response.GetError().GetMessage().c_str());
103+
continue;
104+
}
105+
106+
auto result = poll_response.MoveResult();
107+
auto messages_size = GetDataFromMessages(client, result);
108+
total_messages_size += messages_size;
109+
if (!messages_size) {
110+
OLP_SDK_LOG_INFO(kLogTag, "No new messages is received");
111+
break;
112+
}
113+
}
114+
115+
if (total_messages_size > 0) {
116+
OLP_SDK_LOG_INFO_F(kLogTag, "Poll data - Success, messages size - %u",
117+
total_messages_size);
118+
} else {
119+
OLP_SDK_LOG_INFO(kLogTag, "No messages is received");
120+
}
121+
}
122+
123+
bool DeleteSubscription(StreamLayerClient& client) {
124+
auto unsubscribe_future = client.Unsubscribe();
125+
auto unsubscribe_response = unsubscribe_future.GetFuture().get();
126+
if (!unsubscribe_response.IsSuccessful()) {
127+
OLP_SDK_LOG_ERROR_F(kLogTag,
128+
"Failed to unsubscribe - HTTP Status: %d Message: %s",
129+
unsubscribe_response.GetError().GetHttpStatusCode(),
130+
unsubscribe_response.GetError().GetMessage().c_str());
131+
return false;
132+
}
133+
return true;
134+
}
135+
} // namespace
136+
137+
int RunStreamLayerExampleRead(
138+
const AccessKey& access_key, const std::string& catalog,
139+
const std::string& layer_id,
140+
SubscribeRequest::SubscriptionMode subscription_mode) {
141+
// Create a task scheduler instance
142+
std::shared_ptr<olp::thread::TaskScheduler> task_scheduler =
143+
olp::client::OlpClientSettingsFactory::CreateDefaultTaskScheduler();
144+
// Create a network client
145+
auto http_client = olp::client::OlpClientSettingsFactory::
146+
CreateDefaultNetworkRequestHandler();
147+
148+
// Initialize authentication settings
149+
olp::authentication::Settings settings({access_key.id, access_key.secret});
150+
settings.task_scheduler = task_scheduler;
151+
settings.network_request_handler = http_client;
152+
// Setup AuthenticationSettings with a default token provider that will
153+
// retrieve an OAuth 2.0 token from OLP.
154+
olp::client::AuthenticationSettings auth_settings;
155+
auth_settings.provider =
156+
olp::authentication::TokenProviderDefault(std::move(settings));
157+
158+
// Setup OlpClientSettings and provide it to the StreamLayerClient.
159+
olp::client::OlpClientSettings client_settings;
160+
client_settings.authentication_settings = auth_settings;
161+
client_settings.network_request_handler = std::move(http_client);
162+
client_settings.task_scheduler = task_scheduler;
163+
164+
// Set consumer configuration options. Other options ant its default values
165+
// described here:
166+
// https://developer.here.com/olp/documentation/data-api/api-reference-stream.html
167+
ConsumerOptions expected_options = {{"auto.offset.reset", "earliest"},
168+
{"enable.auto.commit", "false"},
169+
{"group.id", "group_id_1"}};
170+
171+
// Create subscription, used kSerial or kParallel subscription mode
172+
SubscribeRequest subscribe_request =
173+
SubscribeRequest()
174+
.WithSubscriptionMode(subscription_mode)
175+
.WithConsumerProperties(ConsumerProperties(expected_options));
176+
177+
// value accumulate result
178+
std::atomic<int> value(0);
179+
auto read_from_stream_layer = [&]() {
180+
// Create stream layer client with settings and catalog, layer specified
181+
StreamLayerClient client(olp::client::HRN{catalog}, layer_id,
182+
client_settings);
183+
if (!CreateSubscription(client, subscribe_request)) {
184+
value.store(-1);
185+
}
186+
187+
RunPoll(client);
188+
189+
if (!DeleteSubscription(client)) {
190+
value.store(-1);
191+
}
192+
};
193+
194+
if (subscription_mode == SubscribeRequest::SubscriptionMode::kSerial) {
195+
// With serial subscription you can read smaller volumes of data with a
196+
// single subscription.
197+
read_from_stream_layer();
198+
} else {
199+
// With parallel subscription you can read large volumes of data in a
200+
// parallel manner. The subscription and message reading workflow is similar
201+
// to serial subscription except that you are allowed to create multiple
202+
// subscriptions for the same HRN, layer and group.id using multiple
203+
// processes/threads. This allows you to read and commit messages for each
204+
// subscription in parallel.
205+
OLP_SDK_LOG_INFO_F(kLogTag,
206+
"Starting parallel subscription mode, threads=%u",
207+
kNumberOfThreads);
208+
std::vector<std::thread> threads;
209+
threads.reserve(kNumberOfThreads);
210+
for (unsigned int i = 0; i < kNumberOfThreads; i++) {
211+
threads.emplace_back(read_from_stream_layer);
212+
}
213+
214+
for (auto& thread : threads) {
215+
thread.join();
216+
}
217+
}
218+
return value.load();
219+
}

examples/StreamLayerReadExample.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (C) 2019-2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include "Examples.h"
23+
24+
#include <olp/dataservice/read/SubscribeRequest.h>
25+
26+
/**
27+
* @brief Dataservice read from stream layer example. Authenticate client using
28+
* access key id and secret. If used parallel mode create threads, if serial use
29+
* current thread. Create a subscription. Poll data and then unsubscribe.
30+
* @param access_key here.access.key.id and here.access.key.secret.
31+
* @param catalog The HERE Resource Name (HRN) of the catalog from which you
32+
* want to read data.
33+
* @param layer_id The layer ID of the catalog.
34+
* @param subscription_mode The type of client subscription.
35+
* @return 0 on success, -1 otherwise.
36+
*/
37+
int RunStreamLayerExampleRead(
38+
const AccessKey& access_key, const std::string& catalog,
39+
const std::string& layer_id,
40+
olp::dataservice::read::SubscribeRequest::SubscriptionMode
41+
subscription_mode = olp::dataservice::read::SubscribeRequest::
42+
SubscriptionMode::kSerial);

0 commit comments

Comments
 (0)