Skip to content

Commit ced8ef7

Browse files
committed
examples: add streamed parsing example
Example for parsing and processing an ULog file in real-time, without keeping the whole file in memory.
1 parent 280f180 commit ced8ef7

File tree

5 files changed

+197
-0
lines changed

5 files changed

+197
-0
lines changed

examples/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@ add_executable(ulog_writer ulog_writer.cpp)
1414
target_link_libraries(ulog_writer PUBLIC
1515
ulog_cpp::ulog_cpp
1616
)
17+
18+
add_executable(ulog_streamed_parsing ulog_streamed_parsing.cpp)
19+
target_link_libraries(ulog_streamed_parsing PUBLIC
20+
ulog_cpp::ulog_cpp
21+
)

examples/ulog_data.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include <ulog_cpp/data_container.hpp>
1010
#include <ulog_cpp/reader.hpp>
1111

12+
// Example of how to use the typed data API for accessing topic data
13+
1214
int main(int argc, char** argv)
1315
{
1416
if (argc < 2) {

examples/ulog_info.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include <ulog_cpp/reader.hpp>
1414
#include <variant>
1515

16+
// Example of how to access the different data messages (info, logging, parameters, subscriptions)
17+
1618
int main(int argc, char** argv)
1719
{
1820
if (argc < 2) {

examples/ulog_streamed_parsing.cpp

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/****************************************************************************
2+
* Copyright (c) 2025 PX4 Development Team.
3+
* SPDX-License-Identifier: BSD-3-Clause
4+
****************************************************************************/
5+
6+
#include <cinttypes>
7+
#include <fstream>
8+
#include <iostream>
9+
#include <string>
10+
#include <ulog_cpp/data_container.hpp>
11+
#include <ulog_cpp/reader.hpp>
12+
13+
// Example for parsing and processing an ULog file in real-time, without keeping the whole file
14+
// in memory
15+
16+
class TopicSubscription {
17+
public:
18+
virtual ~TopicSubscription() = default;
19+
20+
virtual void handleData(const ulog_cpp::TypedDataView& data) = 0;
21+
};
22+
23+
// A topic we are interested in
24+
class VehicleStatus : public TopicSubscription {
25+
public:
26+
explicit VehicleStatus(const std::shared_ptr<ulog_cpp::Subscription>& subscription)
27+
{
28+
_timestamp_field = subscription->field("timestamp");
29+
_nav_state_field = subscription->field("nav_state");
30+
// Optional field (e.g. when a message changes)
31+
if (subscription->fieldMap().find("armed_state") != subscription->fieldMap().end()) {
32+
_armed_state_field = subscription->field("armed_state");
33+
}
34+
}
35+
36+
void handleData(const ulog_cpp::TypedDataView& data) override
37+
{
38+
const auto timestamp = data[*_timestamp_field].as<std::uint64_t>();
39+
const auto nav_state = data[*_nav_state_field].as<std::uint32_t>();
40+
uint8_t armed_state = 0;
41+
if (_armed_state_field) {
42+
armed_state = data[*_nav_state_field].as<std::uint8_t>();
43+
}
44+
printf("vehicle_status: t: %" PRId64 ": nav_state: %" PRIu32 ", armed_state: %" PRId8 "\n",
45+
timestamp, nav_state, armed_state);
46+
}
47+
48+
private:
49+
std::shared_ptr<ulog_cpp::Field> _timestamp_field;
50+
std::shared_ptr<ulog_cpp::Field> _nav_state_field;
51+
std::shared_ptr<ulog_cpp::Field> _armed_state_field;
52+
};
53+
54+
class ULogDataHandler : public ulog_cpp::DataContainer {
55+
public:
56+
ULogDataHandler() : ulog_cpp::DataContainer(ulog_cpp::DataContainer::StorageConfig::Header) {}
57+
58+
void error(const std::string& msg, bool is_recoverable) override
59+
{
60+
printf("Parsing error: %s\n", msg.c_str());
61+
}
62+
63+
void headerComplete() override { ulog_cpp::DataContainer::headerComplete(); }
64+
65+
void messageInfo(const ulog_cpp::MessageInfo& message_info) override
66+
{
67+
DataContainer::messageInfo(message_info);
68+
if (message_info.isMulti()) {
69+
// Multi messages might be continued, but we only know with the next message, so we keep it
70+
// stored and append if needed. We assume that continued multi messages are not interleaved
71+
// with other messages.
72+
if (message_info.isContinued()) {
73+
if (_current_multi_message.field().name() == message_info.field().name()) {
74+
// Append to previous
75+
_current_multi_message.valueRaw().insert(_current_multi_message.valueRaw().end(),
76+
message_info.valueRaw().begin(),
77+
message_info.valueRaw().end());
78+
}
79+
} else {
80+
finishCurrentMultiMessage();
81+
_current_multi_message = message_info;
82+
}
83+
} else {
84+
finishCurrentMultiMessage();
85+
messageInfoComplete(message_info);
86+
}
87+
}
88+
void parameter(const ulog_cpp::Parameter& parameter) override
89+
{
90+
finishCurrentMultiMessage();
91+
DataContainer::parameter(parameter);
92+
}
93+
void addLoggedMessage(const ulog_cpp::AddLoggedMessage& add_logged_message) override
94+
{
95+
finishCurrentMultiMessage();
96+
DataContainer::addLoggedMessage(add_logged_message);
97+
if (_subscriptions_by_message_id.find(add_logged_message.msgId()) !=
98+
_subscriptions_by_message_id.end()) {
99+
throw ulog_cpp::ParsingException("Duplicate AddLoggedMessage message ID");
100+
}
101+
102+
auto format_iter = messageFormats().find(add_logged_message.messageName());
103+
if (format_iter == messageFormats().end()) {
104+
throw ulog_cpp::ParsingException("AddLoggedMessage message format not found");
105+
}
106+
107+
auto ulog_subscription = std::make_shared<ulog_cpp::Subscription>(
108+
add_logged_message, std::vector<ulog_cpp::Data>{}, format_iter->second);
109+
110+
if (add_logged_message.messageName() == "vehicle_status" && add_logged_message.multiId() == 0) {
111+
auto subscription = std::make_shared<VehicleStatus>(ulog_subscription);
112+
_subscriptions_by_message_id.insert(
113+
{add_logged_message.msgId(), SubscriptionData{ulog_subscription, subscription}});
114+
}
115+
}
116+
void logging(const ulog_cpp::Logging& logging) override
117+
{
118+
finishCurrentMultiMessage();
119+
DataContainer::logging(logging);
120+
}
121+
void data(const ulog_cpp::Data& data) override
122+
{
123+
finishCurrentMultiMessage();
124+
const auto iter = _subscriptions_by_message_id.find(data.msgId());
125+
if (iter != _subscriptions_by_message_id.end()) {
126+
const ulog_cpp::TypedDataView data_view(data, *iter->second.ulog_subscription->format());
127+
iter->second.subscription->handleData(data_view);
128+
}
129+
}
130+
131+
private:
132+
struct SubscriptionData {
133+
std::shared_ptr<ulog_cpp::Subscription> ulog_subscription;
134+
std::shared_ptr<TopicSubscription> subscription;
135+
};
136+
137+
void finishCurrentMultiMessage()
138+
{
139+
if (!_current_multi_message.field().name().empty()) {
140+
messageInfoComplete(_current_multi_message);
141+
_current_multi_message.field() = {};
142+
}
143+
}
144+
void messageInfoComplete(const ulog_cpp::MessageInfo& message_info)
145+
{
146+
if (message_info.field().definitionResolved()) {
147+
printf("Info message: %s\n", message_info.field().name().c_str());
148+
}
149+
}
150+
151+
std::map<uint16_t, SubscriptionData> _subscriptions_by_message_id;
152+
ulog_cpp::MessageInfo _current_multi_message{"",
153+
""}; ///< Keep this stored for continued messages
154+
};
155+
156+
int main(int argc, char** argv)
157+
{
158+
if (argc < 2) {
159+
printf("Usage: %s <file.ulg>\n", argv[0]);
160+
return -1;
161+
}
162+
FILE* file = fopen(argv[1], "rb");
163+
if (!file) {
164+
printf("opening file failed\n");
165+
return -1;
166+
}
167+
uint8_t buffer[4096];
168+
int bytes_read;
169+
const auto data_container = std::make_shared<ULogDataHandler>();
170+
ulog_cpp::Reader reader{data_container};
171+
while ((bytes_read = fread(buffer, 1, sizeof(buffer), file)) > 0) {
172+
try {
173+
reader.readChunk(buffer, bytes_read);
174+
} catch (const ulog_cpp::ExceptionBase& exception) {
175+
printf("Failed to parse ulog file: %s\n", exception.what());
176+
return -1;
177+
}
178+
if (data_container->hadFatalError()) {
179+
printf("Failed to parse ulog file\n");
180+
return -1;
181+
}
182+
}
183+
fclose(file);
184+
185+
return 0;
186+
}

examples/ulog_writer.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <thread>
99
#include <ulog_cpp/simple_writer.hpp>
1010

11+
// Example of how to create an ULog file with timeseries, printf messages and parameters
12+
1113
using namespace std::chrono_literals;
1214

1315
namespace {

0 commit comments

Comments
 (0)