Skip to content

Commit 5528f89

Browse files
authored
Merge pull request #75 from lf-lang/mutations-config-and-scaling
Mutations for scaling banks and multiports and adding connections
2 parents 36dd4d2 + e13f68c commit 5528f89

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+999
-86
lines changed

.github/workflows/clang-format.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ jobs:
1010
- uses: actions/checkout@v4
1111
- name: Analyze
1212
run: |
13+
clang-format --version
1314
clang-format --dry-run --Werror -style=file $(find ./ -name '*.cc' -print)
1415
clang-format --dry-run --Werror -style=file $(find ./ -name '*.hh' -print)

Makefile

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
.PHONY: clean test coverage asan format format-check ci lf-test lib proto
2+
3+
test: unit-test lf-test
4+
5+
# Generate protobuf code
6+
proto:
7+
python3 external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto
8+
9+
# Build reactor-uc as a static library
10+
lib:
11+
cmake -Bbuild
12+
cmake --build build
13+
make -C build
14+
15+
# Build and run the unit tests
16+
unit-test:
17+
cmake -Bbuild -DBUILD_TESTS=ON
18+
cmake --build build
19+
make test -C build
20+
21+
# Build and run lf tests
22+
lf-test:
23+
make -C test/lf
24+
25+
# Get coverage data on unit tests
26+
coverage:
27+
cmake -Bbuild -DBUILD_TESTS=ON -DTEST_COVERAGE=ON
28+
cmake --build build
29+
make coverage -C build
30+
31+
# Compile tests with AddressSanitizer and run them
32+
asan:
33+
cmake -Bbuild -DASAN=ON -DBUILD_TESTS=ON
34+
cmake --build build
35+
make test -C build
36+
37+
# Format the code base
38+
SRC_FILES := $(shell find ./lib -name '*.cc' -print)
39+
HDR_FILES := $(shell find ./include -name '*.hh' -print)
40+
41+
format:
42+
clang-format -i -style=file $(SRC_FILES) $(HDR_FILES)
43+
44+
# Check that the code base is formatted
45+
format-check:
46+
clang-format --dry-run --Werror -style=file $(SRC_FILES) $(HDR_FILES)
47+
48+
# Run the entire CI flow
49+
ci: clean test coverage format-check
50+
51+
clean:
52+
rm -rf build

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ add_subdirectory(count)
77
add_subdirectory(ports)
88
add_subdirectory(hello)
99
add_subdirectory(power_train)
10+
add_subdirectory(multiport_mutation)

examples/count/main.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
using namespace reactor;
66
using namespace std::chrono_literals;
77

8-
class Count : public Reactor {
8+
class Count final : public Reactor {
99
private:
1010
// actions
1111
Timer timer{"timer", this};
1212
LogicalAction<int> counter{"counter", this};
1313

1414
// reactions_
15-
Reaction r_init{"r_init", 1, this, [this]() { init(); }};
16-
Reaction r_counter{"r_counter", 2, this, [this]() { print_count(); }};
15+
Reaction r_init{"r_init", 1, this, [this]() { this->init(); }};
16+
Reaction r_counter{"r_counter", 2, this, [this]() { this->print_count(); }};
1717

1818
public:
1919
explicit Count(Environment* env)

examples/hello/main.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
using namespace reactor;
77
using namespace std::chrono_literals;
88

9-
class Hello : public Reactor {
9+
class Hello final : public Reactor {
1010
private:
1111
// actions
1212
Timer timer{"timer", this, 1s, 2s};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
add_executable(mutation_multiports main.cc)
2+
target_link_libraries(mutation_multiports reactor-cpp)
3+
add_dependencies(examples mutation_multiports)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#ifndef CONSUMER_HH // NOLINT
2+
#define CONSUMER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
6+
using namespace reactor;
7+
using namespace std::chrono_literals;
8+
9+
class Consumer final : public Reactor { // NOLINT
10+
class Inner : public Scope {
11+
Inner(Reactor* reactor, std::size_t index)
12+
: Scope(reactor)
13+
, index_(index) {}
14+
std::size_t index_ = 0;
15+
16+
void reaction_1(const Input<unsigned>& in) const {
17+
std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n';
18+
}
19+
20+
friend Consumer;
21+
};
22+
23+
Inner _lf_inner;
24+
Reaction handle{"handle", 1, this, [this]() { _lf_inner.reaction_1(this->in); }};
25+
26+
public:
27+
Consumer(const std::string& name, Environment* env, std::size_t index)
28+
: Reactor(name, env)
29+
, _lf_inner(this, index) {
30+
std::cout << "creating instance of consumer" << '\n';
31+
}
32+
~Consumer() override { std::cout << "Consumer Object is deleted" << '\n'; };
33+
34+
Input<unsigned> in{"in", this}; // NOLINT
35+
36+
void assemble() override { handle.declare_trigger(&in); }
37+
};
38+
39+
#endif // CONSUMER_HH
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#ifndef LOAD_BALANCER_HH // NOLINT
2+
#define LOAD_BALANCER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
6+
#include "../../lib/mutation/multiport.cc"
7+
#include "reactor-cpp/mutations/multiport.hh"
8+
9+
using namespace reactor;
10+
using namespace std::chrono_literals;
11+
12+
class LoadBalancer final : public Reactor { // NOLINT
13+
class Inner : public MutableScope {
14+
explicit Inner(Reactor* reactor)
15+
: MutableScope(reactor) {}
16+
17+
// reaction bodies
18+
static void reaction_1(const Input<unsigned>& inbound, LogicalAction<unsigned>& scale_action,
19+
Multiport<Output<unsigned>>& outbound) {
20+
if (std::rand() % 30 == 0) { // NOLINT
21+
scale_action.schedule(std::rand() % 20 + 1); // NOLINT
22+
}
23+
const unsigned sel = std::rand() % outbound.size(); // NOLINT
24+
std::cout << "Sending out to:" << sel << '\n';
25+
outbound[sel].set(inbound.get());
26+
}
27+
28+
void reaction_2(ModifableMultiport<Output<unsigned>>& outbound,
29+
[[maybe_unused]] const LogicalAction<unsigned>& scale, Output<unsigned>& scale_bank) {
30+
ModifableMultiport<Output<unsigned>>* temp = &outbound;
31+
std::size_t new_size = *scale.get();
32+
33+
auto antideps = (outbound[0]).anti_dependencies();
34+
35+
const auto change_size =
36+
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(temp, this->reactor_, antideps, new_size);
37+
38+
add_to_transaction(change_size);
39+
40+
commit_transaction();
41+
42+
scale_bank.set(new_size);
43+
}
44+
45+
friend LoadBalancer;
46+
};
47+
48+
Inner _lf_inner;
49+
Reaction process{"process", 2, this, [this]() { Inner::reaction_1(this->inbound, this->scale_action, this->out); }};
50+
Reaction scale{"scale", 1, this, [this]() { _lf_inner.reaction_2(this->out, this->scale_action, this->scale_bank); }};
51+
52+
public:
53+
LoadBalancer(const std::string& name, Environment* env)
54+
: Reactor(name, env)
55+
, _lf_inner(this) {
56+
std::cout << "creating instance of load balancer" << '\n';
57+
out.reserve(4);
58+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
59+
std::string _lf_port_name = out.name() + "_" + std::to_string(_lf_idx);
60+
out.emplace_back(_lf_port_name, this);
61+
}
62+
}
63+
~LoadBalancer() override = default;
64+
65+
LogicalAction<unsigned> scale_action{"scale", this, 1us}; // NOLINT
66+
ModifableMultiport<Output<unsigned>> out{"out"}; // NOLINT
67+
Input<unsigned> inbound{"inbound", this}; // NOLINT
68+
Output<unsigned> scale_bank{"scale_bank", this}; // NOLINT
69+
70+
void assemble() override {
71+
std::cout << "assemble LoadBalancer\n";
72+
for (auto& _lf_port : out) {
73+
process.declare_antidependency(&_lf_port);
74+
}
75+
process.declare_trigger(&inbound);
76+
scale.declare_trigger(&scale_action);
77+
scale.declare_antidependency(&scale_bank);
78+
}
79+
};
80+
81+
#endif // LOAD_BALANCER_HH
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#include <iostream>
2+
3+
#include <reactor-cpp/mutations/bank.hh>
4+
#include <reactor-cpp/mutations/connection.hh>
5+
6+
#include "../../lib/mutation/bank.cc"
7+
#include "../../lib/mutation/connection.cc"
8+
#include "./consumer.hh"
9+
#include "./load_balancer.hh"
10+
#include "./producer.hh"
11+
#include <reactor-cpp/reactor-cpp.hh>
12+
13+
class Deployment final : public Reactor { // NOLINT
14+
class Inner : public MutableScope {
15+
int state = 0;
16+
17+
public:
18+
Inner(Reactor* reactor)
19+
: MutableScope(reactor) {}
20+
void reaction_1(const Input<unsigned>& scale, std::vector<std::unique_ptr<Consumer>>& reactor_bank,
21+
ModifableMultiport<Output<unsigned>>& load_balancer) {
22+
std::size_t new_size = *scale.get();
23+
std::size_t old_size = reactor_bank.size();
24+
25+
std::function lambda = [](Reactor* reactor, std::size_t index) {
26+
std::string _lf_inst_name = "consumer_" + std::to_string(index);
27+
return std::make_unique<Consumer>(_lf_inst_name, reactor->environment(), index);
28+
};
29+
30+
auto change_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<Consumer>>>(
31+
&reactor_bank, this->reactor_, new_size, lambda);
32+
33+
add_to_transaction(change_size);
34+
35+
commit_transaction();
36+
37+
if (old_size < new_size) {
38+
for (auto i = 0; i < new_size; i++) {
39+
auto add_conn = std::make_shared<MutationAddConnection<Output<unsigned>, Input<unsigned>>>(
40+
&load_balancer[i], &reactor_bank[i].get()->in, reactor_);
41+
add_to_transaction(add_conn);
42+
}
43+
}
44+
45+
commit_transaction(true);
46+
}
47+
48+
friend LoadBalancer;
49+
};
50+
51+
std::unique_ptr<Producer> producer_;
52+
std::unique_ptr<LoadBalancer> load_balancer_;
53+
std::vector<std::unique_ptr<Consumer>> consumers_;
54+
55+
Reaction scale_bank{"scale_bank", 1, this,
56+
[this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }};
57+
58+
Inner _inner;
59+
60+
public:
61+
Deployment(const std::string& name, Environment* env)
62+
: Reactor(name, env)
63+
, _inner(this)
64+
, producer_(std::make_unique<Producer>("producer", environment()))
65+
, load_balancer_(std::make_unique<LoadBalancer>("load_balancer", environment())) {
66+
std::cout << "creating instance of deployment" << '\n';
67+
consumers_.reserve(4);
68+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
69+
std::string _lf_inst_name = "consumer_" + std::to_string(_lf_idx);
70+
consumers_.push_back(std::make_unique<Consumer>(_lf_inst_name, environment(), _lf_idx));
71+
}
72+
}
73+
~Deployment() override = default;
74+
75+
Input<unsigned> scale{"scale", this}; // NOLINT
76+
77+
void assemble() override {
78+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
79+
environment()->draw_connection(load_balancer_->out[_lf_idx], consumers_[_lf_idx]->in, ConnectionProperties{});
80+
environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{});
81+
}
82+
environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{});
83+
scale_bank.declare_trigger(&this->scale);
84+
}
85+
};
86+
87+
auto main() -> int {
88+
Environment env{4, true};
89+
auto deployment = std::make_unique<Deployment>("c1", &env);
90+
env.optimize();
91+
env.assemble();
92+
auto thread = env.startup();
93+
thread.join();
94+
return 0;
95+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#ifndef PRODUCER_HH // NOLINT
2+
#define PRODUCER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
6+
using namespace reactor;
7+
using namespace std::chrono_literals;
8+
9+
class Producer final : public Reactor { // NOLINT
10+
private:
11+
Timer timer{"timer", this, 1s, 1s};
12+
Reaction r_timer{"r_timer", 1, this, [this]() { _lf_inner.reaction_1(this->value); }};
13+
14+
class Inner : public Scope {
15+
unsigned int counter_ = 0;
16+
17+
void reaction_1([[maybe_unused]] Output<unsigned>& out) {
18+
std::cout << "producing value:" << counter_ << "\n";
19+
out.set(counter_++);
20+
}
21+
22+
explicit Inner(Reactor* reactor)
23+
: Scope(reactor) {}
24+
25+
friend Producer;
26+
};
27+
28+
Inner _lf_inner;
29+
30+
public:
31+
Producer(const std::string& name, Environment* env)
32+
: Reactor(name, env)
33+
, _lf_inner(this) {
34+
std::cout << "creating instance of producer\n";
35+
}
36+
Producer() = delete;
37+
~Producer() override = default;
38+
39+
Output<unsigned> value{"value", this}; // NOLINT
40+
41+
void assemble() override {
42+
r_timer.declare_trigger(&timer);
43+
r_timer.declare_antidependency(&value);
44+
}
45+
};
46+
47+
#endif // PRODUCER_HH

0 commit comments

Comments
 (0)