Skip to content

Commit 3ff82f0

Browse files
committed
feat: add new operators
1 parent 0d54f3f commit 3ff82f0

20 files changed

+807
-0
lines changed

libs/api/include/rtbot/OperatorJson.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "rtbot/std/MovingAverage.h"
3434
#include "rtbot/std/MovingSum.h"
3535
#include "rtbot/std/PeakDetector.h"
36+
#include "rtbot/std/PctChange.h"
3637
#include "rtbot/std/Replace.h"
3738
#include "rtbot/std/ResamplerConstant.h"
3839
#include "rtbot/std/ResamplerHermite.h"
@@ -186,6 +187,8 @@ class OperatorJson {
186187
return make_resampler_constant(id, parsed["interval"].get<int>());
187188
} else if (type == "ResamplerHermite") {
188189
return make_resampler_hermite(id, parsed["interval"].get<int>());
190+
} else if (type == "PctChange") {
191+
return make_pct_change(id);
189192
} else if (type == "Pipeline") {
190193
// Validate port types
191194
auto input_types = parsed["input_port_types"].get<std::vector<std::string>>();
@@ -337,6 +340,8 @@ class OperatorJson {
337340
}
338341
} else if (type == "ResamplerHermite") {
339342
j["interval"] = std::dynamic_pointer_cast<ResamplerHermite>(op)->get_interval();
343+
} else if (type == "PctChange") {
344+
// PctChange has no additional parameters to serialize
340345
} else if (type == "Pipeline") {
341346
auto pipeline = std::dynamic_pointer_cast<Pipeline>(op);
342347
j["type"] = "Pipeline";
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include <catch2/catch.hpp>
2+
3+
#include "rtbot/OperatorJson.h"
4+
5+
using namespace rtbot;
6+
7+
TEST_CASE("OperatorJson creates PctChange operator", "[OperatorJson][PctChange]") {
8+
const std::string json = R"({
9+
"type": "PctChange",
10+
"id": "pct_op"
11+
})";
12+
13+
auto op = OperatorJson::read_op(json);
14+
REQUIRE(op->type_name() == "PctChange");
15+
REQUIRE(op->id() == "pct_op");
16+
17+
const auto serialized = OperatorJson::write_op(op);
18+
auto round_trip = OperatorJson::read_op(serialized);
19+
REQUIRE(round_trip->type_name() == "PctChange");
20+
REQUIRE(round_trip->id() == "pct_op");
21+
}

libs/std/include/rtbot/std/Clip.h

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#ifndef CLIP_H
2+
#define CLIP_H
3+
4+
#include <algorithm>
5+
#include <optional>
6+
7+
#include "rtbot/Message.h"
8+
#include "rtbot/Operator.h"
9+
#include "rtbot/PortType.h"
10+
11+
namespace rtbot {
12+
13+
class Clip : public Operator {
14+
public:
15+
Clip(std::string id, std::optional<double> lower = std::nullopt, std::optional<double> upper = std::nullopt)
16+
: Operator(std::move(id)), lower_(lower), upper_(upper) {
17+
add_data_port<NumberData>();
18+
add_output_port<NumberData>();
19+
}
20+
21+
void reset() override { Operator::reset(); }
22+
23+
std::string type_name() const override { return "Clip"; }
24+
25+
std::optional<double> lower() const { return lower_; }
26+
std::optional<double> upper() const { return upper_; }
27+
28+
Bytes collect() override {
29+
Bytes bytes = Operator::collect();
30+
31+
bool has_lower = lower_.has_value();
32+
bool has_upper = upper_.has_value();
33+
bytes.push_back(static_cast<uint8_t>(has_lower));
34+
if (has_lower) {
35+
bytes.insert(bytes.end(), reinterpret_cast<const uint8_t*>(&(*lower_)),
36+
reinterpret_cast<const uint8_t*>(&(*lower_)) + sizeof(double));
37+
}
38+
39+
bytes.push_back(static_cast<uint8_t>(has_upper));
40+
if (has_upper) {
41+
bytes.insert(bytes.end(), reinterpret_cast<const uint8_t*>(&(*upper_)),
42+
reinterpret_cast<const uint8_t*>(&(*upper_)) + sizeof(double));
43+
}
44+
45+
return bytes;
46+
}
47+
48+
void restore(Bytes::const_iterator& it) override {
49+
Operator::restore(it);
50+
51+
bool has_lower = static_cast<bool>(*it++);
52+
if (has_lower) {
53+
lower_ = *reinterpret_cast<const double*>(&(*it));
54+
it += sizeof(double);
55+
} else {
56+
lower_.reset();
57+
}
58+
59+
bool has_upper = static_cast<bool>(*it++);
60+
if (has_upper) {
61+
upper_ = *reinterpret_cast<const double*>(&(*it));
62+
it += sizeof(double);
63+
} else {
64+
upper_.reset();
65+
}
66+
}
67+
68+
protected:
69+
void process_data() override {
70+
auto& input_queue = get_data_queue(0);
71+
auto& output_queue = get_output_queue(0);
72+
73+
while (!input_queue.empty()) {
74+
const auto* msg = dynamic_cast<const Message<NumberData>*>(input_queue.front().get());
75+
if (!msg) {
76+
throw std::runtime_error("Invalid message type in Clip operator.");
77+
}
78+
79+
double value = msg->data.value;
80+
if (lower_) {
81+
value = std::max(value, *lower_);
82+
}
83+
if (upper_) {
84+
value = std::min(value, *upper_);
85+
}
86+
87+
output_queue.push_back(create_message<NumberData>(msg->time, NumberData{value}));
88+
input_queue.pop_front();
89+
}
90+
}
91+
92+
private:
93+
std::optional<double> lower_;
94+
std::optional<double> upper_;
95+
};
96+
97+
inline std::shared_ptr<Operator> make_clip(const std::string& id, std::optional<double> lower = std::nullopt,
98+
std::optional<double> upper = std::nullopt) {
99+
return std::make_shared<Clip>(id, lower, upper);
100+
}
101+
102+
} // namespace rtbot
103+
104+
#endif // CLIP_H

libs/std/include/rtbot/std/Clip.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Clip Operator
2+
3+
The `Clip` operator constrains numeric values to a closed interval. Both lower and upper bounds are optional.
4+
5+
## Parameters
6+
- `lower` (number, optional): Minimum value. If omitted the input is not bounded below.
7+
- `upper` (number, optional): Maximum value. If omitted the input is not bounded above.
8+
9+
## Ports
10+
- Input `NumberData`
11+
- Output `NumberData`
12+
13+
## Behaviour
14+
- Each incoming sample is clamped between `lower` and `upper` if the bounds are provided.
15+
- Bounds may be given independently (only lower or only upper).
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#ifndef CUMULATIVE_PRODUCT_H
2+
#define CUMULATIVE_PRODUCT_H
3+
4+
#include "rtbot/Message.h"
5+
#include "rtbot/Operator.h"
6+
#include "rtbot/PortType.h"
7+
8+
namespace rtbot {
9+
10+
class CumulativeProduct : public Operator {
11+
public:
12+
explicit CumulativeProduct(std::string id) : Operator(std::move(id)), product_(1.0) {
13+
add_data_port<NumberData>();
14+
add_output_port<NumberData>();
15+
}
16+
17+
void reset() override {
18+
Operator::reset();
19+
product_ = 1.0;
20+
}
21+
22+
std::string type_name() const override { return "CumulativeProduct"; }
23+
24+
double current_product() const { return product_; }
25+
26+
Bytes collect() override {
27+
Bytes bytes = Operator::collect();
28+
bytes.insert(bytes.end(), reinterpret_cast<const uint8_t*>(&product_),
29+
reinterpret_cast<const uint8_t*>(&product_) + sizeof(product_));
30+
return bytes;
31+
}
32+
33+
void restore(Bytes::const_iterator& it) override {
34+
Operator::restore(it);
35+
product_ = *reinterpret_cast<const double*>(&(*it));
36+
it += sizeof(double);
37+
}
38+
39+
protected:
40+
void process_data() override {
41+
auto& input_queue = get_data_queue(0);
42+
auto& output_queue = get_output_queue(0);
43+
44+
while (!input_queue.empty()) {
45+
const auto* msg = dynamic_cast<const Message<NumberData>*>(input_queue.front().get());
46+
if (!msg) {
47+
throw std::runtime_error("Invalid message type in CumulativeProduct");
48+
}
49+
50+
product_ *= msg->data.value;
51+
output_queue.push_back(create_message<NumberData>(msg->time, NumberData{product_}));
52+
53+
input_queue.pop_front();
54+
}
55+
}
56+
57+
private:
58+
double product_;
59+
};
60+
61+
inline std::shared_ptr<Operator> make_cumulative_product(const std::string& id) {
62+
return std::make_shared<CumulativeProduct>(id);
63+
}
64+
65+
} // namespace rtbot
66+
67+
#endif // CUMULATIVE_PRODUCT_H
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# CumulativeProduct Operator
2+
3+
`CumulativeProduct` maintains the running product of all received numeric samples.
4+
5+
## Ports
6+
- Input `NumberData`
7+
- Output `NumberData`
8+
9+
## Behaviour
10+
- Multiplies the incoming value into the internal state and emits the updated product for each message.
11+
- The running product resets to `1.0` when the operator is reset.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#ifndef MOVING_MAX_H
2+
#define MOVING_MAX_H
3+
4+
#include <algorithm>
5+
#include <vector>
6+
7+
#include "rtbot/Buffer.h"
8+
#include "rtbot/Message.h"
9+
#include "rtbot/Operator.h"
10+
11+
namespace rtbot {
12+
13+
struct MovingMaxFeatures {
14+
static constexpr bool TRACK_SUM = false;
15+
static constexpr bool TRACK_VARIANCE = false;
16+
};
17+
18+
class MovingMax : public Buffer<NumberData, MovingMaxFeatures> {
19+
public:
20+
MovingMax(std::string id, size_t window_size)
21+
: Buffer<NumberData, MovingMaxFeatures>(std::move(id), window_size) {}
22+
23+
std::string type_name() const override { return "MovingMax"; }
24+
25+
protected:
26+
std::vector<std::unique_ptr<Message<NumberData>>> process_message(const Message<NumberData>* msg) override {
27+
if (!this->buffer_full()) {
28+
return {};
29+
}
30+
31+
double max_value = this->buffer().front()->data.value;
32+
for (const auto& entry : this->buffer()) {
33+
max_value = std::max(max_value, entry->data.value);
34+
}
35+
36+
std::vector<std::unique_ptr<Message<NumberData>>> output;
37+
output.push_back(create_message<NumberData>(msg->time, NumberData{max_value}));
38+
return output;
39+
}
40+
};
41+
42+
inline std::shared_ptr<Operator> make_moving_max(const std::string& id, size_t window_size) {
43+
return std::make_shared<MovingMax>(id, window_size);
44+
}
45+
46+
} // namespace rtbot
47+
48+
#endif // MOVING_MAX_H
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# MovingMax Operator
2+
3+
The `MovingMax` operator computes the maximum of the latest `window_size` samples. It behaves similarly to a pandas
4+
rolling maximum with `min_periods=window_size`.
5+
6+
## Parameters
7+
- `window_size` (integer, required): Number of samples in the sliding window.
8+
9+
## Ports
10+
- Input `NumberData`
11+
- Output `NumberData`
12+
13+
## Behaviour
14+
- Emits no data until the buffer contains `window_size` samples.
15+
- Afterwards emits the maximum of the current window for each new input sample.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#ifndef MOVING_MIN_H
2+
#define MOVING_MIN_H
3+
4+
#include <algorithm>
5+
#include <vector>
6+
7+
#include "rtbot/Buffer.h"
8+
#include "rtbot/Message.h"
9+
#include "rtbot/Operator.h"
10+
11+
namespace rtbot {
12+
13+
struct MovingMinFeatures {
14+
static constexpr bool TRACK_SUM = false;
15+
static constexpr bool TRACK_VARIANCE = false;
16+
};
17+
18+
class MovingMin : public Buffer<NumberData, MovingMinFeatures> {
19+
public:
20+
MovingMin(std::string id, size_t window_size)
21+
: Buffer<NumberData, MovingMinFeatures>(std::move(id), window_size) {}
22+
23+
std::string type_name() const override { return "MovingMin"; }
24+
25+
protected:
26+
std::vector<std::unique_ptr<Message<NumberData>>> process_message(const Message<NumberData>* msg) override {
27+
if (!this->buffer_full()) {
28+
return {};
29+
}
30+
31+
double min_value = this->buffer().front()->data.value;
32+
for (const auto& entry : this->buffer()) {
33+
min_value = std::min(min_value, entry->data.value);
34+
}
35+
36+
std::vector<std::unique_ptr<Message<NumberData>>> output;
37+
output.push_back(create_message<NumberData>(msg->time, NumberData{min_value}));
38+
return output;
39+
}
40+
};
41+
42+
inline std::shared_ptr<Operator> make_moving_min(const std::string& id, size_t window_size) {
43+
return std::make_shared<MovingMin>(id, window_size);
44+
}
45+
46+
} // namespace rtbot
47+
48+
#endif // MOVING_MIN_H
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# MovingMin Operator
2+
3+
The `MovingMin` operator computes the minimum of the most recent `window_size` values. It emits a value once the
4+
internal buffer is full and thereafter slides forward one sample at a time.
5+
6+
## Parameters
7+
- `window_size` (integer, required): Number of samples inside the moving window.
8+
9+
## Ports
10+
- Input `NumberData`
11+
- Output `NumberData`
12+
13+
## Behaviour
14+
- Produces no output until `window_size` samples have been observed.
15+
- After the buffer is full, outputs the minimum of the current window for each new sample.

0 commit comments

Comments
 (0)