Skip to content

Commit 77bd63c

Browse files
brendangeckUbuntu
andauthored
RosBag odd padding parsing error fix (#23)
* Small reformat and adding gdb to yum install list to use with --compilation_mode dbg * Removing order-naive parsing of bagfiles in favor of a more informed algorithm. * Bumping version number * Adding unit tests to verify we support the 'oddly padded bag' edge case Co-authored-by: Ubuntu <ubuntu@ip-172-31-63-133.us-west-2.compute.internal>
1 parent 1c43ca2 commit 77bd63c

File tree

8 files changed

+159
-133
lines changed

8 files changed

+159
-133
lines changed

Dockerfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
FROM quay.io/pypa/manylinux2014_x86_64
22

3-
RUN yum install npm git python-devel python2-pip -y -q && npm install -g @bazel/bazelisk && pip install wheel && pip install --upgrade "pip < 21.0"
3+
RUN yum install npm git python-devel python2-pip gdb -y -q && \
4+
npm install -g @bazel/bazelisk && \
5+
pip install wheel && \
6+
pip install --upgrade "pip < 21.0"
47

58
RUN mkdir -p /tmp/embag /tmp/pip_build /tmp/out
69
COPY WORKSPACE /tmp/embag

lib/embag.cc

Lines changed: 144 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -147,138 +147,152 @@ RosBagTypes::header_t Bag::readHeader(const RosBagTypes::record_t &record) {
147147

148148
template <typename T>
149149
bool Bag::readRecords(boost::iostreams::stream<T> &stream) {
150-
while (true) {
151-
const auto pos = stream.tellg();
152-
if (pos == -1 || pos == std::streampos(bag_bytes_size_)) {
153-
break;
150+
// Docs on the RosBag 2.0 format: http://wiki.ros.org/Bags/Format/2.0
151+
152+
/**
153+
* Read the BAG_HEADER record. So long as the file is not corrupted, this is guaranteed
154+
* to be the first record in the bag
155+
*/
156+
const auto pos = stream.tellg();
157+
uint32_t connection_count;
158+
uint32_t chunk_count;
159+
uint64_t index_pos;
160+
161+
const auto bag_header_record = readRecord(stream);
162+
const auto bag_header_header = readHeader(bag_header_record);
163+
164+
bag_header_header.getField("conn_count", connection_count);
165+
bag_header_header.getField("chunk_count", chunk_count);
166+
bag_header_header.getField("index_pos", index_pos);
167+
168+
// TODO: check these values are nonzero and index_pos is > 64
169+
connections_.resize(connection_count);
170+
chunk_infos_.reserve(chunk_count);
171+
chunks_.reserve(chunk_count);
172+
index_pos_ = index_pos;
173+
174+
/**
175+
* Read the CONNECTION records. As per the docs, these are located at the start of the "INDEX" section,
176+
* which is denoted by the index_pos field of the bag header
177+
*/
178+
stream.seekg(index_pos_, std::ios_base::beg);
179+
for (size_t i = 0; i < connection_count; i++) {
180+
const auto conn_record = readRecord(stream);
181+
const auto conn_header = readHeader(conn_record);
182+
183+
uint32_t connection_id;
184+
std::string topic;
185+
186+
conn_header.getField("conn", connection_id);
187+
conn_header.getField("topic", topic);
188+
189+
if (topic.empty())
190+
continue;
191+
192+
// TODO: check these variables along with md5sum
193+
RosBagTypes::connection_data_t connection_data;
194+
connection_data.topic = topic;
195+
196+
const auto fields = readFields(conn_record.data, conn_record.data_len);
197+
198+
connection_data.type = fields->at("type");
199+
const size_t slash_pos = connection_data.type.find_first_of('/');
200+
if (slash_pos != std::string::npos) {
201+
connection_data.scope = connection_data.type.substr(0, slash_pos);
154202
}
155-
const auto record = readRecord(stream);
156-
const auto header = readHeader(record);
157-
158-
const auto op = header.getOp();
159-
160-
switch (op) {
161-
case RosBagTypes::header_t::op::BAG_HEADER: {
162-
uint32_t connection_count;
163-
uint32_t chunk_count;
164-
uint64_t index_pos;
165-
166-
header.getField("conn_count", connection_count);
167-
header.getField("chunk_count", chunk_count);
168-
header.getField("index_pos", index_pos);
169-
170-
// TODO: check these values are nonzero and index_pos is > 64
171-
connections_.resize(connection_count);
172-
chunks_.reserve(chunk_count);
173-
index_pos_ = index_pos;
174-
175-
break;
176-
}
177-
case RosBagTypes::header_t::op::CHUNK: {
178-
RosBagTypes::chunk_t chunk{record};
179-
chunk.offset = pos;
180-
181-
header.getField("compression", chunk.compression);
182-
header.getField("size", chunk.uncompressed_size);
183-
184-
if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
185-
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
186-
}
187-
188-
chunks_.emplace_back(chunk);
189-
190-
break;
191-
}
192-
case RosBagTypes::header_t::op::INDEX_DATA: {
193-
uint32_t version;
194-
uint32_t connection_id;
195-
uint32_t msg_count;
196-
197-
// TODO: check these values
198-
header.getField("ver", version);
199-
header.getField("conn", connection_id);
200-
header.getField("count", msg_count);
201-
202-
RosBagTypes::index_block_t index_block{};
203-
index_block.into_chunk = &chunks_.back();
204-
205-
connections_[connection_id].blocks.emplace_back(index_block);
206-
207-
break;
208-
}
209-
case RosBagTypes::header_t::op::CONNECTION: {
210-
uint32_t connection_id;
211-
std::string topic;
212-
213-
header.getField("conn", connection_id);
214-
header.getField("topic", topic);
215-
216-
if (topic.empty())
217-
continue;
218-
219-
// TODO: check these variables along with md5sum
220-
RosBagTypes::connection_data_t connection_data;
221-
connection_data.topic = topic;
222-
223-
const auto fields = readFields(record.data, record.data_len);
224-
225-
connection_data.type = fields->at("type");
226-
const size_t slash_pos = connection_data.type.find_first_of('/');
227-
if (slash_pos != std::string::npos) {
228-
connection_data.scope = connection_data.type.substr(0, slash_pos);
229-
}
230-
connection_data.md5sum = fields->at("md5sum");
231-
connection_data.message_definition = fields->at("message_definition");
232-
if (fields->find("callerid") != fields->end()) {
233-
connection_data.callerid = fields->at("callerid");
234-
}
235-
if (fields->find("latching") != fields->end()) {
236-
connection_data.latching = fields->at("latching") == "1";
237-
}
238-
239-
connections_[connection_id].id = connection_id;
240-
connections_[connection_id].topic = topic;
241-
connections_[connection_id].data = connection_data;
242-
topic_connection_map_[topic].emplace_back(&connections_[connection_id]);
243-
244-
break;
245-
}
246-
case RosBagTypes::header_t::op::MESSAGE_DATA: {
247-
// Message data is usually found in chunks
248-
break;
249-
}
250-
case RosBagTypes::header_t::op::CHUNK_INFO: {
251-
uint32_t ver;
252-
uint64_t chunk_pos;
253-
RosValue::ros_time_t start_time;
254-
RosValue::ros_time_t end_time;
255-
uint32_t count;
256-
257-
header.getField("ver", ver);
258-
header.getField("chunk_pos", chunk_pos);
259-
header.getField("start_time", start_time);
260-
header.getField("end_time", end_time);
261-
header.getField("count", count);
262-
263-
// TODO: It might make sense to save this data in a map or reverse the search.
264-
// At the moment there are only a few chunks so this doesn't really take long
265-
auto chunk_it = std::find_if(chunks_.begin(), chunks_.end(), [&chunk_pos](const RosBagTypes::chunk_t &c) {
266-
return c.offset == chunk_pos;
267-
});
268-
269-
if (chunk_it == chunks_.end()) {
270-
throw std::runtime_error("Unable to find chunk for chunk info at pos: " + std::to_string(chunk_pos));
271-
}
272-
273-
chunk_it->info.start_time = start_time;
274-
chunk_it->info.end_time = end_time;
275-
chunk_it->info.message_count = count;
276-
277-
break;
278-
}
279-
case RosBagTypes::header_t::op::UNSET:
280-
default:throw std::runtime_error("Unknown record operation: " + std::to_string(uint8_t(op)));
203+
connection_data.md5sum = fields->at("md5sum");
204+
connection_data.message_definition = fields->at("message_definition");
205+
if (fields->find("callerid") != fields->end()) {
206+
connection_data.callerid = fields->at("callerid");
281207
}
208+
if (fields->find("latching") != fields->end()) {
209+
connection_data.latching = fields->at("latching") == "1";
210+
}
211+
212+
connections_[connection_id].id = connection_id;
213+
connections_[connection_id].topic = topic;
214+
connections_[connection_id].data = connection_data;
215+
topic_connection_map_[topic].push_back(&connections_[connection_id]);
216+
}
217+
218+
/**
219+
* Read the CHUNK_INFO records. These are guaranteed to be immediately after the CONNECTION records,
220+
* so no need to seek the file pointer
221+
*/
222+
for (size_t i = 0; i < chunk_count; i++) {
223+
const auto chunk_info_record = readRecord(stream);
224+
const auto chunk_info_header = readHeader(chunk_info_record);
225+
226+
RosBagTypes::chunk_info_t chunk_info;
227+
228+
uint32_t ver;
229+
uint64_t chunk_pos;
230+
RosValue::ros_time_t start_time;
231+
RosValue::ros_time_t end_time;
232+
uint32_t count;
233+
234+
chunk_info_header.getField("ver", ver);
235+
chunk_info_header.getField("chunk_pos", chunk_pos);
236+
chunk_info_header.getField("start_time", start_time);
237+
chunk_info_header.getField("end_time", end_time);
238+
chunk_info_header.getField("count", count);
239+
240+
chunk_info.chunk_pos = chunk_pos;
241+
chunk_info.start_time = start_time;
242+
chunk_info.end_time = end_time;
243+
chunk_info.message_count = count;
244+
245+
chunk_infos_[i] = chunk_info;
246+
}
247+
248+
/**
249+
* Now that we have some chunk metadata from the CHUNK_INFO records, process the CHUNK records from
250+
* earlier in the file. Each CHUNK_INFO knows the position of its corresponding chunk.
251+
*/
252+
for (size_t i = 0; i < chunk_count; i++) {
253+
const auto info = chunk_infos_[i];
254+
255+
// TODO: The chunk infos are not necessarily Revisit this logic if seeking back and forth across the file causes a slowdown
256+
stream.seekg(info.chunk_pos, std::ios_base::beg);
257+
258+
const auto chunk_record = readRecord(stream);
259+
const auto chunk_header = readHeader(chunk_record);
260+
261+
RosBagTypes::chunk_t chunk{chunk_record};
262+
chunk.offset = stream.tellg();
263+
264+
chunk_header.getField("compression", chunk.compression);
265+
chunk_header.getField("size", chunk.uncompressed_size);
266+
267+
if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
268+
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
269+
}
270+
271+
chunk.info = info;
272+
273+
chunks_.push_back(chunk);
274+
275+
// Each chunk is followed by an INDEX_DATA record, so parse that out here
276+
const auto index_data_record = readRecord(stream);
277+
const auto index_data_header = readHeader(index_data_record);
278+
279+
uint32_t version;
280+
uint32_t connection_id;
281+
uint32_t msg_count;
282+
283+
// TODO: check these values
284+
index_data_header.getField("ver", version);
285+
index_data_header.getField("conn", connection_id);
286+
index_data_header.getField("count", msg_count);
287+
288+
RosBagTypes::index_block_t index_block{};
289+
290+
// NOTE: It seems like it would be simpler to just do &chunk here right? WRONG.
291+
// C++ resuses the same memory location for the chunk variable for each loop, so
292+
// if you use &chunk, all `into_chunk` values will be exactly the same
293+
index_block.into_chunk = &chunks_[i];
294+
295+
connections_[connection_id].blocks.push_back(index_block);
282296
}
283297

284298
return true;

lib/embag.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class Bag {
124124
// Bag data
125125
std::vector<RosBagTypes::connection_record_t> connections_;
126126
std::unordered_map<std::string, std::vector<RosBagTypes::connection_record_t *>> topic_connection_map_;
127+
std::vector<RosBagTypes::chunk_info_t> chunk_infos_;
127128
std::vector<RosBagTypes::chunk_t> chunks_;
128129
uint64_t index_pos_ = 0;
129130
std::unordered_map<std::string, std::shared_ptr<RosMsgTypes::ros_msg_def>> message_schemata_;

lib/ros_bag_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ struct RosBagTypes {
5656
};
5757

5858
struct chunk_info_t {
59+
uint64_t chunk_pos;
5960
RosValue::ros_time_t start_time;
6061
RosValue::ros_time_t end_time;
6162
uint32_t message_count = 0;

lib/version.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
EMBAG_VERSION = "0.0.34"
1+
EMBAG_VERSION = "0.0.35"

test/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ cc_test(
22
name = "embag_test",
33
srcs = ["embag_test.cc"],
44
copts = ["-Iexternal/gtest/include"],
5-
data = ["test.bag"],
5+
data = ["test.bag", "test_2.bag"],
66
deps = [
77
"//lib:embag",
88
"@gtest",

test/embag_test.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ TEST(EmbagTest, OpenCloseBag) {
1212
bag.close();
1313
}
1414

15+
TEST(EmbagTest, OpenCloseOddPaddingBag) {
16+
// In some very rare cases rosbags will contain padding between the CHUNK and INDEX section of the file.
17+
// This bagfile is one such case.
18+
Embag::Bag oddly_padded_bag{"test/test_2.bag"};
19+
oddly_padded_bag.close();
20+
}
21+
1522
class BagTest : public ::testing::Test {
1623
protected:
1724
Embag::Bag bag_{"test/test.bag"};

test/test_2.bag

5.22 KB
Binary file not shown.

0 commit comments

Comments
 (0)