Skip to content

Commit 8ba8872

Browse files
peci1mergify[bot]
authored andcommitted
Better handling of large bag files (#178)
Signed-off-by: Martin Pecka <[email protected]> (cherry picked from commit c7d3efd)
1 parent 616c518 commit 8ba8872

File tree

4 files changed

+212
-75
lines changed

4 files changed

+212
-75
lines changed

rqt_bag/src/rqt_bag/bag_timeline.py

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import threading
3030
import time
31+
from typing import Callable, Iterable, Iterator, Optional, Tuple, Union
3132

3233
from python_qt_binding.QtCore import qDebug, Qt, QTimer, qWarning, Signal
3334
from python_qt_binding.QtWidgets import QGraphicsScene, QMessageBox
@@ -43,6 +44,7 @@
4344
from .message_loader_thread import MessageLoaderThread
4445
from .player import Player
4546
from .recorder import Recorder
47+
from .rosbag2 import Entry, Rosbag2
4648
from .timeline_frame import TimelineFrame
4749

4850

@@ -67,6 +69,7 @@ def __init__(self, context, publish_clock):
6769
super(BagTimeline, self).__init__()
6870
self._bags = []
6971
self._bag_lock = threading.RLock()
72+
self._bags_size = 0
7073

7174
self.background_task = None # Display string
7275
self.background_task_cancel = False
@@ -158,6 +161,7 @@ def add_bag(self, bag):
158161
:param bag: ros bag file, ''rosbag2.bag''
159162
"""
160163
self._bags.append(bag)
164+
self._bags_size += bag.size()
161165

162166
bag_topics = bag.get_topics()
163167
qDebug('Topics from this bag: {}'.format(bag_topics))
@@ -187,8 +191,7 @@ def add_bag(self, bag):
187191
self._timeline_frame.index_cache_cv.notify()
188192

189193
def file_size(self):
190-
with self._bag_lock:
191-
return sum(b.size() for b in self._bags)
194+
return self._bags_size
192195

193196
# TODO Rethink API and if these need to be visible
194197
def _get_start_stamp(self):
@@ -264,47 +267,37 @@ def get_datatype(self, topic):
264267
datatype = bag_datatype
265268
return datatype
266269

267-
def get_entries(self, topics, start_stamp, end_stamp):
270+
def get_entries(self, topics: Optional[Union[str, Iterable[str]]],
271+
start_stamp: Time, end_stamp: Time,
272+
progress_cb: Optional[Callable[[int], None]] = None) -> Iterator[Entry]:
268273
"""
269274
Get a generator for bag entries.
270275
271276
:param topics: list of topics to query, ''list(str)''
272277
:param start_stamp: stamp to start at, ''rclpy.time.Time''
273-
:param end_stamp: stamp to end at, ''rclpy.time,Time''
278+
:param end_stamp: stamp to end at, ''rclpy.time.Time''
279+
:param progress_cb: callback function to report progress, called once per each percent.
274280
:returns: entries the bag file, ''msg''
275281
"""
276-
with self._bag_lock:
277-
bag_entries = []
278-
for b in self._bags:
279-
bag_start_time = b.get_earliest_timestamp()
280-
if bag_start_time is not None and bag_start_time > end_stamp:
281-
continue
282+
for b, entry in self.get_entries_with_bags(topics, start_stamp, end_stamp, progress_cb):
283+
yield entry
284+
return None
282285

283-
bag_end_time = b.get_latest_timestamp()
284-
if bag_end_time is not None and bag_end_time < start_stamp:
285-
continue
286-
287-
# Get all of the entries for each topic. When opening multiple
288-
# bags, the requested topic may not be in a given bag database
289-
for topic in topics:
290-
entries = b.get_entries_in_range(start_stamp, end_stamp, topic)
291-
if entries is not None:
292-
bag_entries.extend(entries)
293-
294-
for entry in sorted(bag_entries, key=lambda entry: entry.timestamp):
295-
yield entry
296-
297-
def get_entries_with_bags(self, topic, start_stamp, end_stamp):
286+
def get_entries_with_bags(self, topics: Optional[Union[str, Iterable[str]]],
287+
start_stamp: Time, end_stamp: Time,
288+
progress_cb: Optional[Callable[[int], None]] = None) \
289+
-> Iterator[Tuple[Rosbag2, Entry]]:
298290
"""
299291
Get a generator of bag entries.
300292
301-
:param topics: list of topics to query, ''list(str)''
293+
:param topics: list of topics to query (if None, all topics are used), ''list(str)''
302294
:param start_stamp: stamp to start at, ''rclpy.time.Time''
303-
:param end_stamp: stamp to end at, ''rclpy.time,Time''
304-
:returns: tuple of (bag, entry) for the entries in the bag file, ''(rosbag2.bag, msg)''
295+
:param end_stamp: stamp to end at, ''rclpy.time.Time''
296+
:param progress_cb: callback function to report progress, called once per each percent.
297+
:returns: tuple of (bag, entry) for the entries in the bag file, ''(rosbag2.Rosbag2, msg)''
305298
"""
306299
with self._bag_lock:
307-
bag_entries = []
300+
relevant_bags = []
308301
for b in self._bags:
309302
bag_start_time = b.get_earliest_timestamp()
310303
if bag_start_time is not None and bag_start_time > end_stamp:
@@ -314,11 +307,61 @@ def get_entries_with_bags(self, topic, start_stamp, end_stamp):
314307
if bag_end_time is not None and bag_end_time < start_stamp:
315308
continue
316309

317-
for entry in b.get_entries_in_range(start_stamp, end_stamp):
318-
bag_entries.append((b, entry))
310+
relevant_bags.append(b)
311+
312+
generators = {}
313+
last_entries = {}
314+
for b in relevant_bags:
315+
generators[b] = b.entries_in_range_generator(start_stamp, end_stamp, topics)
316+
try:
317+
last_entries[b] = next(generators[b])
318+
except StopIteration:
319+
last_entries[b] = None
320+
321+
to_delete = []
322+
for b in last_entries:
323+
if last_entries[b] is None:
324+
to_delete.append(b)
325+
326+
for b in to_delete:
327+
del last_entries[b]
328+
del generators[b]
329+
relevant_bags.remove(b)
330+
331+
if progress_cb is not None:
332+
progress = 0
333+
num_entries = 0
334+
estimated_num_entries = 0
335+
for b in relevant_bags:
336+
estimated_num_entries += b.estimate_num_entries_in_range(
337+
start_stamp, end_stamp, topics)
338+
339+
while any(last_entries.values()):
340+
min_bag = None
341+
min_entry = None
342+
for b, entry in last_entries.items():
343+
if entry is not None:
344+
if min_entry is None or entry.timestamp < min_entry.timestamp:
345+
min_bag = b
346+
min_entry = entry
347+
if min_bag is None:
348+
return
349+
350+
if progress_cb is not None:
351+
num_entries += 1
352+
new_progress = int(100.0 * (float(num_entries) / estimated_num_entries))
353+
if new_progress != progress:
354+
progress_cb(new_progress)
355+
progress = new_progress
356+
357+
yield min_bag, min_entry
358+
359+
try:
360+
last_entries[min_bag] = next(generators[min_bag])
361+
except StopIteration:
362+
last_entries[min_bag] = None
319363

320-
for bag, entry in sorted(bag_entries, key=lambda item: item[1].timestamp):
321-
yield bag, entry
364+
return
322365

323366
def get_entry(self, t, topic):
324367
"""

rqt_bag/src/rqt_bag/index_cache_thread.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,30 +45,25 @@ def __init__(self, timeline):
4545
self.start()
4646

4747
def run(self):
48+
# Delay start of the indexing so that the basic UI has time to be loaded
49+
time.sleep(2.0)
50+
4851
while not self._stop_flag:
4952
with self.timeline.index_cache_cv:
5053
# Wait until the cache is dirty
5154
while len(self.timeline.invalidated_caches) == 0:
5255
self.timeline.index_cache_cv.wait()
5356
if self._stop_flag:
5457
return
55-
# Update the index for one topic
56-
total_topics = len(self.timeline.topics)
57-
update_step = max(1, total_topics / 100)
58-
topic_num = 1
59-
progress = 0
60-
updated = False
61-
for topic in self.timeline.topics:
62-
if topic in self.timeline.invalidated_caches:
63-
updated |= (self.timeline._update_index_cache(topic) > 0)
64-
if topic_num % update_step == 0 or topic_num == total_topics:
65-
new_progress = int(100.0 * (float(topic_num) / total_topics))
66-
if new_progress != progress:
67-
progress = new_progress
68-
if not self._stop_flag:
69-
self.timeline.scene().background_progress = progress
70-
self.timeline.scene().status_bar_changed_signal.emit()
71-
topic_num += 1
58+
59+
# Update the index for all invalidated topics
60+
def progress_cb(progress: int) -> None:
61+
if not self._stop_flag:
62+
self.timeline.scene().background_progress = progress
63+
self.timeline.scene().status_bar_changed_signal.emit()
64+
65+
topics = self.timeline.invalidated_caches.intersection(set(self.timeline.topics))
66+
updated = (self.timeline._update_index_cache(topics, progress_cb) > 0)
7267

7368
if updated:
7469
self.timeline.scene().background_progress = 0

rqt_bag/src/rqt_bag/rosbag2.py

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,21 @@
3131

3232
from collections import namedtuple
3333
import os
34+
from typing import Callable, Iterable, Iterator, List, Optional, Union
3435

3536
from rclpy import logging
3637
from rclpy.clock import Clock, ClockType
3738
from rclpy.duration import Duration
3839
from rclpy.serialization import deserialize_message
40+
from rclpy.time import Time
3941

4042
import rosbag2_py
4143
from rosbag2_py import get_default_storage_id, StorageFilter
4244

4345
from rosidl_runtime_py.utilities import get_message
4446

47+
from rqt_bag import bag_helper
48+
4549
WRITE_ONLY_MSG = 'open for writing only, returning None'
4650

4751
Entry = namedtuple('Entry', ['topic', 'data', 'timestamp'])
@@ -154,25 +158,111 @@ def get_entry_after(self, timestamp, topic=None):
154158
self.reader.reset_filter()
155159
return result
156160

157-
def get_entries_in_range(self, t_start, t_end, topic=None):
161+
def get_entries_in_range(self, t_start: Time, t_end: Time,
162+
topic: Optional[Union[str, Iterable[str]]] = None,
163+
progress_cb: Optional[Callable[[int], None]] = None) \
164+
-> Optional[List[Entry]]:
165+
"""
166+
Get a list of all entries in a given time range, sorted by receive stamp.
167+
168+
Do not use this function for large bags. It will load all entries into memory. Use
169+
entries_in_range_generator() instead and process the data as they are returned.
170+
171+
:param t_start: stamp to start at, ''rclpy.time.Time''
172+
:param t_end: stamp to end at, ''rclpy.time.Time''
173+
:param topic: topic or list of topics to query (if None, all topics are), ''list(str)''
174+
:param progress_cb: callback function to report progress, called once per each percent.
175+
:returns: entries in the bag file, ''list(Entry)''
176+
"""
158177
if not self.reader:
159178
self._logger.warn('get_entries_in_range - ' + WRITE_ONLY_MSG)
160179
return None
161180

181+
return list(self.entries_in_range_generator(t_start, t_end, topic, progress_cb))
182+
183+
def entries_in_range_generator(self, t_start: Time, t_end: Time,
184+
topic: Optional[Union[str, Iterable[str]]] = None,
185+
progress_cb: Optional[Callable[[int], None]] = None) \
186+
-> Iterator[Entry]:
187+
"""
188+
Get a generator of all entries in a given time range, sorted by receive stamp.
189+
190+
:param t_start: stamp to start at, ''rclpy.time.Time''
191+
:param t_end: stamp to end at, ''rclpy.time.Time''
192+
:param topic: topic or list of topics to query (if None, all topics are), ''list(str)''
193+
:param progress_cb: callback function to report progress, called once per each percent.
194+
:returns: generator of entries in the bag file, ''Generator(Entry)''
195+
"""
196+
if not self.reader:
197+
self._logger.warn('entries_in_range_generator - ' + WRITE_ONLY_MSG)
198+
return
199+
200+
if isinstance(topic, Iterable) and not isinstance(topic, str):
201+
topics = topic
202+
else:
203+
topics = [topic] if topic is not None else []
204+
162205
self.reader.set_read_order(rosbag2_py.ReadOrder(reverse=False))
163-
self.reader.set_filter(rosbag2_py.StorageFilter(topics=[topic] if topic else []))
206+
self.reader.set_filter(rosbag2_py.StorageFilter(topics=topics))
164207
self.reader.seek(t_start.nanoseconds)
165-
entries = []
208+
if progress_cb is not None:
209+
num_entries = 0
210+
progress = 0
211+
estimated_num_entries = self.estimate_num_entries_in_range(t_start, t_end, topic)
212+
166213
while self.reader.has_next():
167214
next_entry = self.read_next()
168215
if next_entry.timestamp <= t_end.nanoseconds:
169-
entries.append(next_entry)
216+
if progress_cb is not None:
217+
num_entries += 1
218+
new_progress = int(100.0 * (float(num_entries) / estimated_num_entries))
219+
if new_progress != progress:
220+
progress_cb(new_progress)
221+
progress = new_progress
222+
yield next_entry
170223
else:
171224
break
172225

173226
# No filter
174227
self.reader.reset_filter()
175-
return entries
228+
229+
if progress_cb is not None and progress != 100:
230+
progress_cb(100)
231+
232+
return
233+
234+
def estimate_num_entries_in_range(self, t_start: Time, t_end: Time,
235+
topic: Optional[Union[str, Iterable[str]]] = None) -> int:
236+
"""
237+
Estimate the number of entries in the given time range.
238+
239+
The computation is only approximate, based on the assumption that messages are distributed
240+
evenly across the whole bag on every topic.
241+
242+
:param t_start: stamp to start at, ''rclpy.time.Time''
243+
:param t_end: stamp to end at, ''rclpy.time.Time''
244+
:param topic: topic or list of topics to query (if None, all topics are), ''list(str)''
245+
:returns: the approximate number of entries, ''int''
246+
"""
247+
if not self.reader:
248+
self._logger.warn('estimate_num_entries_in_range - ' + WRITE_ONLY_MSG)
249+
return 0
250+
251+
if isinstance(topic, Iterable) and not isinstance(topic, str):
252+
topics = topic
253+
else:
254+
topics = [topic] if topic is not None else []
255+
256+
range_duration = t_end - t_start
257+
bag_duration = self.get_latest_timestamp() - self.get_earliest_timestamp()
258+
fraction = bag_helper.to_sec(range_duration) / bag_helper.to_sec(bag_duration)
259+
260+
num_messages = 0
261+
for t_info in self.metadata.topics_with_message_count:
262+
if t_info.topic_metadata.name in topics:
263+
num_messages += t_info.message_count
264+
265+
return int(fraction * num_messages)
176266

177267
def read_next(self):
178268
return Entry(*self.reader.read_next())

0 commit comments

Comments
 (0)