Skip to content

Commit fb86191

Browse files
Merge pull request #62 from PickNikRobotics/finish_queue_before_stop
add ability to finish queue then stop recording
2 parents b4c81df + 3bb5e80 commit fb86191

File tree

5 files changed

+90
-1
lines changed

5 files changed

+90
-1
lines changed

data_tamer_cpp/include/data_tamer/data_sink.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ class DataSinkBase
9696

9797
void stopThread();
9898

99+
void stopAcceptingSnapshots();
100+
101+
void processQueuedSnapshots();
102+
103+
void startAcceptingSnapshots();
104+
99105
private:
100106
struct Pimpl;
101107
std::unique_ptr<Pimpl> _p;

data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class MCAPSink : public DataSinkBase
5252
/// Stop recording and save the file
5353
void stopRecording();
5454

55+
/// Stop taking snapshots, finish the existing queue, then `stopRecording`
56+
/// will block for at least 250 us to ensure the queue is empty
57+
void finishQueueAndStop();
58+
5559
/**
5660
* @brief restartRecording saves the current file (unless we did it already,
5761
* calling stopRecording) and start recording into a new one.

data_tamer_cpp/src/data_sink.cpp

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct DataSinkBase::Pimpl
2929

3030
std::thread thread;
3131
std::atomic_bool run = true;
32+
std::atomic_bool accept_snapshots = true;
3233
moodycamel::ConcurrentQueue<Snapshot> queue;
3334
};
3435

@@ -41,7 +42,33 @@ DataSinkBase::~DataSinkBase()
4142

4243
bool DataSinkBase::pushSnapshot(const Snapshot& snapshot)
4344
{
44-
return _p->queue.enqueue(snapshot);
45+
if(_p->accept_snapshots)
46+
{
47+
return _p->queue.enqueue(snapshot);
48+
}
49+
else
50+
{
51+
return false;
52+
}
53+
}
54+
55+
void DataSinkBase::stopAcceptingSnapshots()
56+
{
57+
_p->accept_snapshots = false;
58+
}
59+
60+
void DataSinkBase::startAcceptingSnapshots()
61+
{
62+
_p->accept_snapshots = true;
63+
}
64+
65+
void DataSinkBase::processQueuedSnapshots()
66+
{
67+
Snapshot snapshot_copy;
68+
while(_p->queue.try_dequeue(snapshot_copy))
69+
{
70+
this->storeSnapshot(snapshot_copy);
71+
}
4572
}
4673

4774
void DataSinkBase::stopThread()

data_tamer_cpp/src/sinks/mcap_sink.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <sstream>
66
#include <mutex>
77
#include <string>
8+
#include <thread>
89

910
#ifndef USING_ROS2
1011
#define MCAP_IMPLEMENTATION
@@ -151,6 +152,22 @@ void MCAPSink::stopRecording()
151152
writer_.reset();
152153
}
153154

155+
void MCAPSink::finishQueueAndStop()
156+
{
157+
// stop accepting new snapshots
158+
stopAcceptingSnapshots();
159+
160+
// finish any that are queued
161+
processQueuedSnapshots();
162+
163+
// sleep and process any that were missed by previous processing
164+
std::this_thread::sleep_for(std::chrono::microseconds(250));
165+
processQueuedSnapshots();
166+
167+
// now stop the recording as normal
168+
stopRecording();
169+
}
170+
154171
void MCAPSink::restartRecording(const std::string& filepath, bool do_compression)
155172
{
156173
restartRecordingImpl(filepath, do_compression, true);
@@ -175,6 +192,9 @@ void MCAPSink::restartRecordingImpl(const std::string& filepath, bool do_compres
175192
{
176193
addChannel(name, schema);
177194
}
195+
196+
// start accepting snapshots again in case they were stopped
197+
startAcceptingSnapshots();
178198
}
179199

180200
} // namespace DataTamer

data_tamer_cpp/tests/dt_tests.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#include "data_tamer/data_tamer.hpp"
22
#include "data_tamer/sinks/dummy_sink.hpp"
3+
#include "data_tamer/sinks/mcap_sink.hpp"
34

45
#include <gtest/gtest.h>
56

7+
#include <filesystem>
68
#include <variant>
79
#include <string>
810
#include <thread>
@@ -291,3 +293,33 @@ TEST(DataTamerBasic, LockedPtr)
291293
// now expect that our assignment to the locked pointer took place
292294
EXPECT_EQ(logged_float->get(), val2);
293295
}
296+
297+
TEST(DataTamerBasic, FinishQueue)
298+
{
299+
auto channel = LogChannel::create("chan");
300+
auto const temp_path =
301+
std::filesystem::temp_directory_path() / std::filesystem::path("data_tamer_test."
302+
"mcap");
303+
auto sink = std::make_shared<MCAPSink>(temp_path.string());
304+
channel->addDataSink(sink);
305+
306+
double const value = 1.;
307+
auto id_value = channel->registerValue("value", &value);
308+
309+
EXPECT_TRUE(channel->takeSnapshot());
310+
311+
sink->finishQueueAndStop();
312+
313+
// now we shouldn't be able to take more snapshots
314+
EXPECT_FALSE(channel->takeSnapshot());
315+
316+
// restart the recording
317+
sink->restartRecording(temp_path);
318+
319+
EXPECT_TRUE(channel->takeSnapshot());
320+
321+
sink->stopRecording();
322+
323+
// since we just stopped recording but not snapshots, we'll still be able to take a snapshot (but it won't be written to disk)
324+
EXPECT_TRUE(channel->takeSnapshot());
325+
}

0 commit comments

Comments
 (0)