Skip to content

Commit e6e68c3

Browse files
RB-409 Fixing Pipeline reset (#93)
* fix(rtbot): reseting only internal operators Reseting only internal operators right after a single message comes out of the pipeline, the prior version was expecting to complete the input buffer execution although it is better to check on every iteration because only one message is required to produce output, so no need to wait for the whole input queue to be executed, the current behavior causes issues in cases where a resampler connected to the pipeline outputs more than one messages becuase a device restarted. * fix(rtbot): Fixing for the case of a message generator. Like in the case of including a resampler inside the Pipeline
1 parent 7966a8d commit e6e68c3

File tree

5 files changed

+198
-40
lines changed

5 files changed

+198
-40
lines changed

libs/api/test/test_program.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,114 @@ SCENARIO("Program handles Pipeline operators", "[program][pipeline]") {
333333
}
334334
}
335335

336+
SCENARIO("Program handles Pipeline operators and resets", "[program][pipeline]") {
337+
GIVEN("A program with a Pipeline") {
338+
std::string program_json = R"({
339+
"operators": [
340+
{"type": "Input", "id": "input", "portTypes": ["number"]},
341+
{
342+
"type": "Pipeline",
343+
"id": "pipeline",
344+
"input_port_types": ["number"],
345+
"output_port_types": ["number"],
346+
"operators": [
347+
{"type": "Input", "id": "pinput", "portTypes": ["number"]},
348+
{"type": "MovingAverage", "id": "ma", "window_size": 3}
349+
350+
],
351+
"connections": [
352+
{"from": "pinput", "to": "ma", "fromPort": "o1", "toPort": "i1"}
353+
],
354+
"entryOperator": "pinput",
355+
"outputMappings": {
356+
"ma": {"o1": "o1"}
357+
}
358+
},
359+
{"type": "Output", "id": "output", "portTypes": ["number"]}
360+
],
361+
"connections": [
362+
{"from": "input", "to": "pipeline", "fromPort": "o1", "toPort": "i1"},
363+
{"from": "pipeline", "to": "output", "fromPort": "o1", "toPort": "i1"}
364+
],
365+
"entryOperator": "input",
366+
"output": {
367+
"output": ["o1"]
368+
}
369+
})";
370+
371+
Program program(program_json);
372+
373+
WHEN("Processing messages") {
374+
// Need 5 messages:
375+
// First 3 messages to fill MA(3) buffer
376+
// Message 4 and 5 to emit MA values that will fill STD(3) buffer and produce output
377+
std::vector<Message<NumberData>> messages = {
378+
{1, NumberData{1.0}}, // MA collecting
379+
{2, NumberData{2.0}}, // MA collecting
380+
{3, NumberData{3.0}}, // MA emits value (3.0) -> pipeline resets
381+
{4, NumberData{4.0}}, // MA collecting
382+
{5, NumberData{5.0}}, // MA collecting
383+
{6, NumberData{6.0}}, // MA emits value (5.0) -> pipeline resets
384+
{7, NumberData{0.0}}, // MA collecting
385+
{8, NumberData{0.0}}, // MA collecting
386+
{9, NumberData{0.0}}, // MA emits value (0.0) -> pipeline resets
387+
};
388+
389+
ProgramMsgBatch final_batch;
390+
391+
program.receive(messages.at(0));
392+
program.receive(messages.at(1));
393+
final_batch = program.receive(messages.at(2));
394+
395+
THEN("Pipeline processes messages correctly and resets") {
396+
REQUIRE(final_batch.size() == 1);
397+
REQUIRE(final_batch["output"].count("o1") == 1);
398+
const auto* out_msg = dynamic_cast<const Message<NumberData>*>(final_batch["output"]["o1"].back().get());
399+
REQUIRE(out_msg != nullptr);
400+
REQUIRE(out_msg->time == 3);
401+
REQUIRE(out_msg->data.value == 2.0);
402+
403+
final_batch = program.receive(messages.at(3));
404+
405+
AND_THEN("Pipeline start all over, ma collects") {
406+
REQUIRE(final_batch.size() == 0);
407+
408+
final_batch = program.receive(messages.at(4));
409+
410+
AND_THEN("ma collects") {
411+
REQUIRE(final_batch.size() == 0);
412+
413+
final_batch = program.receive(messages.at(5));
414+
415+
AND_THEN("pipeline emits") {
416+
REQUIRE(final_batch.size() == 1);
417+
REQUIRE(final_batch["output"].count("o1") == 1);
418+
const auto* out_msg = dynamic_cast<const Message<NumberData>*>(final_batch["output"]["o1"].back().get());
419+
REQUIRE(out_msg != nullptr);
420+
REQUIRE(out_msg->time == 6);
421+
REQUIRE(out_msg->data.value == 5.0);
422+
423+
program.receive(messages.at(6));
424+
program.receive(messages.at(7));
425+
final_batch = program.receive(messages.at(8));
426+
427+
AND_THEN("pipeline emits") {
428+
REQUIRE(final_batch.size() == 1);
429+
REQUIRE(final_batch["output"].count("o1") == 1);
430+
const auto* out_msg =
431+
dynamic_cast<const Message<NumberData>*>(final_batch["output"]["o1"].back().get());
432+
REQUIRE(out_msg != nullptr);
433+
REQUIRE(out_msg->time == 9);
434+
REQUIRE(out_msg->data.value == 0.0);
435+
}
436+
}
437+
}
438+
}
439+
}
440+
}
441+
}
442+
}
443+
336444
SCENARIO("Program handles Pipeline serialization", "[program][pipeline]") {
337445
GIVEN("A program with a stateful Pipeline") {
338446
std::string program_json = R"({

libs/core/include/rtbot/Pipeline.h

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -98,30 +98,13 @@ class Pipeline : public Operator {
9898

9999
void reset() override {
100100
RTBOT_LOG_DEBUG("Resetting pipeline");
101-
// First reset our own state
102-
Operator::reset();
103-
104101
// Then reset all internal operators
105102
for (auto& [_, op] : operators_) {
106103
op->reset();
107104
}
108105
}
109106

110107
void clear_all_output_ports() override {
111-
// Check if we produced any output
112-
bool has_output = false;
113-
for (size_t i = 0; i < num_output_ports(); ++i) {
114-
if (!get_output_queue(i).empty()) {
115-
has_output = true;
116-
break;
117-
}
118-
}
119-
120-
// If we produced output, reset the pipeline for next iteration
121-
if (has_output) {
122-
reset();
123-
}
124-
125108
Operator::clear_all_output_ports();
126109
for (auto& [_, op] : operators_) {
127110
op->clear_all_output_ports();
@@ -145,26 +128,36 @@ class Pipeline : public Operator {
145128
entry_operator_->receive_data(msg->clone(), i);
146129
entry_operator_->execute();
147130
input_queue.pop_front();
148-
}
149-
}
150-
151-
// Process output mappings
152-
for (const auto& [op_id, mappings] : output_mappings_) {
153-
auto it = operators_.find(op_id);
154-
if (it != operators_.end()) {
155-
auto& op = it->second;
156-
for (const auto& [operator_port, pipeline_port] : mappings) {
157-
if (operator_port < op->num_output_ports() && pipeline_port < num_output_ports()) {
158-
const auto& source_queue = op->get_output_queue(operator_port);
159-
// Only forward if source operator has produced output on the mapped port
160-
if (!source_queue.empty()) {
161-
auto& target_queue = get_output_queue(pipeline_port);
162-
for (const auto& msg : source_queue) {
163-
RTBOT_LOG_DEBUG("Forwarding message ", msg->to_string(), " from ", op_id, " -> ", pipeline_port);
164-
target_queue.push_back(msg->clone());
131+
// Process output mappings
132+
bool was_reseted = false;
133+
for (const auto& [op_id, mappings] : output_mappings_) {
134+
auto it = operators_.find(op_id);
135+
if (it != operators_.end()) {
136+
auto& op = it->second;
137+
for (const auto& [operator_port, pipeline_port] : mappings) {
138+
if (operator_port < op->num_output_ports() && pipeline_port < num_output_ports()) {
139+
const auto& source_queue = op->get_output_queue(operator_port);
140+
// Only forward if source operator has produced output on the mapped port
141+
if (!source_queue.empty()) {
142+
was_reseted = false;
143+
auto& target_queue = get_output_queue(pipeline_port);
144+
for (const auto& msg : source_queue) {
145+
RTBOT_LOG_DEBUG("Forwarding message ", msg->to_string(), " from ", op_id, " -> ", pipeline_port);
146+
target_queue.push_back(msg->clone());
147+
reset();
148+
was_reseted = true;
149+
break;
150+
}
151+
}
152+
}
153+
if (was_reseted) {
154+
break;
165155
}
166156
}
167157
}
158+
if (was_reseted) {
159+
break;
160+
}
168161
}
169162
}
170163
}

libs/std/include/rtbot/std/ResamplerConstant.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,34 @@ class ResamplerConstant : public Operator {
2929

3030
std::string type_name() const override { return "ResamplerConstant"; }
3131

32+
Bytes collect() override {
33+
// First collect base state
34+
Bytes bytes = Operator::collect();
35+
36+
// Serialize next emission time
37+
bytes.insert(bytes.end(), reinterpret_cast<const uint8_t*>(&next_emit_),
38+
reinterpret_cast<const uint8_t*>(&next_emit_) + sizeof(next_emit_));
39+
40+
// Serialize initialization state
41+
bytes.insert(bytes.end(), reinterpret_cast<const uint8_t*>(&initialized_),
42+
reinterpret_cast<const uint8_t*>(&initialized_) + sizeof(initialized_));
43+
44+
return bytes;
45+
}
46+
47+
void restore(Bytes::const_iterator& it) override {
48+
// First restore base state
49+
Operator::restore(it);
50+
51+
// Restore next emission time
52+
next_emit_ = *reinterpret_cast<const timestamp_t*>(&(*it));
53+
it += sizeof(timestamp_t);
54+
55+
// Restore initialization state
56+
initialized_ = *reinterpret_cast<const bool*>(&(*it));
57+
it += sizeof(bool);
58+
}
59+
3260
timestamp_t get_interval() const { return dt_; }
3361
timestamp_t get_next_emission_time() const { return next_emit_; }
3462
std::optional<timestamp_t> get_t0() const { return t0_; }

libs/std/include/rtbot/std/ResamplerHermite.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class ResamplerHermite : public Buffer<NumberData, ResamplerFeatures> {
3030
Buffer<NumberData, ResamplerFeatures>::reset();
3131
initialized_ = false;
3232
next_emit_ = 0;
33-
pending_emissions_.clear();
3433
}
3534

3635
std::string type_name() const override { return "ResamplerHermite"; }
@@ -125,11 +124,10 @@ class ResamplerHermite : public Buffer<NumberData, ResamplerFeatures> {
125124
return h00 * y1 + h10 * m0 + h01 * y2 + h11 * m1;
126125
}
127126

128-
timestamp_t dt_; // Resampling interval
129-
std::optional<timestamp_t> t0_; // Optional start time
130-
timestamp_t next_emit_; // Next time to emit a sample
131-
bool initialized_; // Whether we've initialized next_emit_
132-
std::vector<std::unique_ptr<Message<NumberData>>> pending_emissions_; // Queue of pending emissions
127+
timestamp_t dt_; // Resampling interval
128+
std::optional<timestamp_t> t0_; // Optional start time
129+
timestamp_t next_emit_; // Next time to emit a sample
130+
bool initialized_; // Whether we've initialized next_emit_
133131
};
134132

135133
inline std::shared_ptr<ResamplerHermite> make_resampler_hermite(std::string id, timestamp_t interval,

libs/std/test/test_filter_scalar.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,37 @@ SCENARIO("FilterScalarOp derived classes handle basic filtering", "[filter_scala
6868
}
6969
}
7070

71+
SECTION("GreaterThan operator small value") {
72+
auto gt = make_greater_than("gt1", 0.5);
73+
74+
REQUIRE(gt->type_name() == "GreaterThan");
75+
REQUIRE(dynamic_cast<GreaterThan*>(gt.get())->get_threshold() == 0.5);
76+
77+
std::vector<std::pair<timestamp_t, double>> inputs = {
78+
{0, 0.3}, // Should be filtered
79+
{1, 1.0}, // Should pass
80+
{2, 4.0}, // Should pass
81+
{4, 0.2}, // Should be filtered
82+
{5, 0.5} // Should be filtered (not strictly greater than)
83+
};
84+
85+
std::vector<std::pair<timestamp_t, double>> expected = {{1, 1.0}, {2, 4.0}};
86+
87+
for (const auto& input : inputs) {
88+
gt->receive_data(create_message<NumberData>(input.first, NumberData{input.second}), 0);
89+
}
90+
gt->execute();
91+
92+
auto& output = gt->get_output_queue(0);
93+
REQUIRE(output.size() == expected.size());
94+
95+
for (size_t i = 0; i < output.size(); ++i) {
96+
auto* msg = dynamic_cast<const Message<NumberData>*>(output[i].get());
97+
REQUIRE(msg->time == expected[i].first);
98+
REQUIRE(msg->data.value == expected[i].second);
99+
}
100+
}
101+
71102
SECTION("EqualTo operator") {
72103
auto eq = make_equal_to("eq1", 3.0, 0.1);
73104

0 commit comments

Comments
 (0)