diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml index 9d9efd5e..a8c88927 100644 --- a/.github/workflows/clang-format.yml +++ b/.github/workflows/clang-format.yml @@ -10,5 +10,6 @@ jobs: - uses: actions/checkout@v4 - name: Analyze run: | + clang-format --version clang-format --dry-run --Werror -style=file $(find ./ -name '*.cc' -print) clang-format --dry-run --Werror -style=file $(find ./ -name '*.hh' -print) diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..fcbcb7d6 --- /dev/null +++ b/Makefile @@ -0,0 +1,52 @@ +.PHONY: clean test coverage asan format format-check ci lf-test lib proto + +test: unit-test lf-test + +# Generate protobuf code +proto: + python3 external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto + +# Build reactor-uc as a static library +lib: + cmake -Bbuild + cmake --build build + make -C build + +# Build and run the unit tests +unit-test: + cmake -Bbuild -DBUILD_TESTS=ON + cmake --build build + make test -C build + +# Build and run lf tests +lf-test: + make -C test/lf + +# Get coverage data on unit tests +coverage: + cmake -Bbuild -DBUILD_TESTS=ON -DTEST_COVERAGE=ON + cmake --build build + make coverage -C build + +# Compile tests with AddressSanitizer and run them +asan: + cmake -Bbuild -DASAN=ON -DBUILD_TESTS=ON + cmake --build build + make test -C build + +# Format the code base +SRC_FILES := $(shell find ./lib -name '*.cc' -print) +HDR_FILES := $(shell find ./include -name '*.hh' -print) + +format: + clang-format -i -style=file $(SRC_FILES) $(HDR_FILES) + +# Check that the code base is formatted +format-check: + clang-format --dry-run --Werror -style=file $(SRC_FILES) $(HDR_FILES) + +# Run the entire CI flow +ci: clean test coverage format-check + +clean: + rm -rf build diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5f05c960..5b9bbf09 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -7,3 +7,4 @@ add_subdirectory(count) add_subdirectory(ports) add_subdirectory(hello) add_subdirectory(power_train) +add_subdirectory(multiport_mutation) diff --git a/examples/count/main.cc b/examples/count/main.cc index e0c6e92b..08012a2b 100644 --- a/examples/count/main.cc +++ b/examples/count/main.cc @@ -5,15 +5,15 @@ using namespace reactor; using namespace std::chrono_literals; -class Count : public Reactor { +class Count final : public Reactor { private: // actions Timer timer{"timer", this}; LogicalAction counter{"counter", this}; // reactions_ - Reaction r_init{"r_init", 1, this, [this]() { init(); }}; - Reaction r_counter{"r_counter", 2, this, [this]() { print_count(); }}; + Reaction r_init{"r_init", 1, this, [this]() { this->init(); }}; + Reaction r_counter{"r_counter", 2, this, [this]() { this->print_count(); }}; public: explicit Count(Environment* env) diff --git a/examples/hello/main.cc b/examples/hello/main.cc index 3b02fd89..2f6c88f8 100644 --- a/examples/hello/main.cc +++ b/examples/hello/main.cc @@ -6,7 +6,7 @@ using namespace reactor; using namespace std::chrono_literals; -class Hello : public Reactor { +class Hello final : public Reactor { private: // actions Timer timer{"timer", this, 1s, 2s}; diff --git a/examples/multiport_mutation/CMakeLists.txt b/examples/multiport_mutation/CMakeLists.txt new file mode 100644 index 00000000..b19dd98a --- /dev/null +++ b/examples/multiport_mutation/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(mutation_multiports main.cc) +target_link_libraries(mutation_multiports reactor-cpp) +add_dependencies(examples mutation_multiports) diff --git a/examples/multiport_mutation/consumer.hh b/examples/multiport_mutation/consumer.hh new file mode 100644 index 00000000..704d62f6 --- /dev/null +++ b/examples/multiport_mutation/consumer.hh @@ -0,0 +1,39 @@ +#ifndef CONSUMER_HH // NOLINT +#define CONSUMER_HH // NOLINT + +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Consumer final : public Reactor { // NOLINT + class Inner : public Scope { + Inner(Reactor* reactor, std::size_t index) + : Scope(reactor) + , index_(index) {} + std::size_t index_ = 0; + + void reaction_1(const Input& in) const { + std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n'; + } + + friend Consumer; + }; + + Inner _lf_inner; + Reaction handle{"handle", 1, this, [this]() { _lf_inner.reaction_1(this->in); }}; + +public: + Consumer(const std::string& name, Environment* env, std::size_t index) + : Reactor(name, env) + , _lf_inner(this, index) { + std::cout << "creating instance of consumer" << '\n'; + } + ~Consumer() override { std::cout << "Consumer Object is deleted" << '\n'; }; + + Input in{"in", this}; // NOLINT + + void assemble() override { handle.declare_trigger(&in); } +}; + +#endif // CONSUMER_HH diff --git a/examples/multiport_mutation/load_balancer.hh b/examples/multiport_mutation/load_balancer.hh new file mode 100644 index 00000000..db2e34ed --- /dev/null +++ b/examples/multiport_mutation/load_balancer.hh @@ -0,0 +1,81 @@ +#ifndef LOAD_BALANCER_HH // NOLINT +#define LOAD_BALANCER_HH // NOLINT + +#include + +#include "../../lib/mutation/multiport.cc" +#include "reactor-cpp/mutations/multiport.hh" + +using namespace reactor; +using namespace std::chrono_literals; + +class LoadBalancer final : public Reactor { // NOLINT + class Inner : public MutableScope { + explicit Inner(Reactor* reactor) + : MutableScope(reactor) {} + + // reaction bodies + static void reaction_1(const Input& inbound, LogicalAction& scale_action, + Multiport>& outbound) { + if (std::rand() % 30 == 0) { // NOLINT + scale_action.schedule(std::rand() % 20 + 1); // NOLINT + } + const unsigned sel = std::rand() % outbound.size(); // NOLINT + std::cout << "Sending out to:" << sel << '\n'; + outbound[sel].set(inbound.get()); + } + + void reaction_2(ModifableMultiport>& outbound, + [[maybe_unused]] const LogicalAction& scale, Output& scale_bank) { + ModifableMultiport>* temp = &outbound; + std::size_t new_size = *scale.get(); + + auto antideps = (outbound[0]).anti_dependencies(); + + const auto change_size = + std::make_shared>(temp, this->reactor_, antideps, new_size); + + add_to_transaction(change_size); + + commit_transaction(); + + scale_bank.set(new_size); + } + + friend LoadBalancer; + }; + + Inner _lf_inner; + Reaction process{"process", 2, this, [this]() { Inner::reaction_1(this->inbound, this->scale_action, this->out); }}; + Reaction scale{"scale", 1, this, [this]() { _lf_inner.reaction_2(this->out, this->scale_action, this->scale_bank); }}; + +public: + LoadBalancer(const std::string& name, Environment* env) + : Reactor(name, env) + , _lf_inner(this) { + std::cout << "creating instance of load balancer" << '\n'; + out.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + std::string _lf_port_name = out.name() + "_" + std::to_string(_lf_idx); + out.emplace_back(_lf_port_name, this); + } + } + ~LoadBalancer() override = default; + + LogicalAction scale_action{"scale", this, 1us}; // NOLINT + ModifableMultiport> out{"out"}; // NOLINT + Input inbound{"inbound", this}; // NOLINT + Output scale_bank{"scale_bank", this}; // NOLINT + + void assemble() override { + std::cout << "assemble LoadBalancer\n"; + for (auto& _lf_port : out) { + process.declare_antidependency(&_lf_port); + } + process.declare_trigger(&inbound); + scale.declare_trigger(&scale_action); + scale.declare_antidependency(&scale_bank); + } +}; + +#endif // LOAD_BALANCER_HH diff --git a/examples/multiport_mutation/main.cc b/examples/multiport_mutation/main.cc new file mode 100644 index 00000000..cf59b29e --- /dev/null +++ b/examples/multiport_mutation/main.cc @@ -0,0 +1,95 @@ +#include + +#include +#include + +#include "../../lib/mutation/bank.cc" +#include "../../lib/mutation/connection.cc" +#include "./consumer.hh" +#include "./load_balancer.hh" +#include "./producer.hh" +#include + +class Deployment final : public Reactor { // NOLINT + class Inner : public MutableScope { + int state = 0; + + public: + Inner(Reactor* reactor) + : MutableScope(reactor) {} + void reaction_1(const Input& scale, std::vector>& reactor_bank, + ModifableMultiport>& load_balancer) { + std::size_t new_size = *scale.get(); + std::size_t old_size = reactor_bank.size(); + + std::function lambda = [](Reactor* reactor, std::size_t index) { + std::string _lf_inst_name = "consumer_" + std::to_string(index); + return std::make_unique(_lf_inst_name, reactor->environment(), index); + }; + + auto change_size = std::make_shared>>( + &reactor_bank, this->reactor_, new_size, lambda); + + add_to_transaction(change_size); + + commit_transaction(); + + if (old_size < new_size) { + for (auto i = 0; i < new_size; i++) { + auto add_conn = std::make_shared, Input>>( + &load_balancer[i], &reactor_bank[i].get()->in, reactor_); + add_to_transaction(add_conn); + } + } + + commit_transaction(true); + } + + friend LoadBalancer; + }; + + std::unique_ptr producer_; + std::unique_ptr load_balancer_; + std::vector> consumers_; + + Reaction scale_bank{"scale_bank", 1, this, + [this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }}; + + Inner _inner; + +public: + Deployment(const std::string& name, Environment* env) + : Reactor(name, env) + , _inner(this) + , producer_(std::make_unique("producer", environment())) + , load_balancer_(std::make_unique("load_balancer", environment())) { + std::cout << "creating instance of deployment" << '\n'; + consumers_.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + std::string _lf_inst_name = "consumer_" + std::to_string(_lf_idx); + consumers_.push_back(std::make_unique(_lf_inst_name, environment(), _lf_idx)); + } + } + ~Deployment() override = default; + + Input scale{"scale", this}; // NOLINT + + void assemble() override { + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + environment()->draw_connection(load_balancer_->out[_lf_idx], consumers_[_lf_idx]->in, ConnectionProperties{}); + environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{}); + } + environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{}); + scale_bank.declare_trigger(&this->scale); + } +}; + +auto main() -> int { + Environment env{4, true}; + auto deployment = std::make_unique("c1", &env); + env.optimize(); + env.assemble(); + auto thread = env.startup(); + thread.join(); + return 0; +} diff --git a/examples/multiport_mutation/producer.hh b/examples/multiport_mutation/producer.hh new file mode 100644 index 00000000..d6416e9a --- /dev/null +++ b/examples/multiport_mutation/producer.hh @@ -0,0 +1,47 @@ +#ifndef PRODUCER_HH // NOLINT +#define PRODUCER_HH // NOLINT + +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Producer final : public Reactor { // NOLINT +private: + Timer timer{"timer", this, 1s, 1s}; + Reaction r_timer{"r_timer", 1, this, [this]() { _lf_inner.reaction_1(this->value); }}; + + class Inner : public Scope { + unsigned int counter_ = 0; + + void reaction_1([[maybe_unused]] Output& out) { + std::cout << "producing value:" << counter_ << "\n"; + out.set(counter_++); + } + + explicit Inner(Reactor* reactor) + : Scope(reactor) {} + + friend Producer; + }; + + Inner _lf_inner; + +public: + Producer(const std::string& name, Environment* env) + : Reactor(name, env) + , _lf_inner(this) { + std::cout << "creating instance of producer\n"; + } + Producer() = delete; + ~Producer() override = default; + + Output value{"value", this}; // NOLINT + + void assemble() override { + r_timer.declare_trigger(&timer); + r_timer.declare_antidependency(&value); + } +}; + +#endif // PRODUCER_HH diff --git a/examples/ports/main.cc b/examples/ports/main.cc index 671d89b7..cc164d85 100644 --- a/examples/ports/main.cc +++ b/examples/ports/main.cc @@ -5,7 +5,7 @@ using namespace reactor; using namespace std::chrono_literals; -class Trigger : public Reactor { +class Trigger final : public Reactor { private: Timer timer; Reaction r_timer{"r_timer", 1, this, [this]() { on_timer(); }}; @@ -25,7 +25,7 @@ class Trigger : public Reactor { void on_timer() { trigger.set(); } }; -class Counter : public Reactor { +class Counter final : public Reactor { private: int value_{0}; Reaction r_trigger{"r_trigger", 1, this, [this]() { on_trigger(); }}; @@ -49,9 +49,9 @@ class Counter : public Reactor { } }; -class Printer : public Reactor { +class Printer final : public Reactor { private: - Reaction r_value{"r_value", 1, this, [this]() { on_value(); }}; + Reaction r_value{"r_value", 1, this, [this]() { this->on_value(); }}; public: Input value{"value", this}; // NOLINT @@ -67,9 +67,9 @@ class Printer : public Reactor { void on_value() { std::cout << this->name() << ": " << *value.get() << '\n'; } }; -class Adder : public Reactor { +class Adder final : public Reactor { private: - Reaction r_add{"r_add", 1, this, [this]() { add(); }}; + Reaction r_add{"r_add", 1, this, [this]() { this->add(); }}; public: Input i1{"i1", this}; // NOLINT diff --git a/examples/power_train/main.cc b/examples/power_train/main.cc index 05efe57f..7badae84 100644 --- a/examples/power_train/main.cc +++ b/examples/power_train/main.cc @@ -4,7 +4,7 @@ using namespace reactor; -class LeftPedal : public Reactor { +class LeftPedal final : public Reactor { public: // ports Output angle{"angle", this}; // NOLINT @@ -30,7 +30,7 @@ class LeftPedal : public Reactor { } }; -class RightPedal : public Reactor { +class RightPedal final : public Reactor { public: // ports Output angle{"angle", this}; // NOLINT @@ -60,7 +60,7 @@ class RightPedal : public Reactor { } }; -class BrakeControl : public Reactor { +class BrakeControl final : public Reactor { public: // ports Input angle{"angle", this}; // NOLINT @@ -81,7 +81,7 @@ class BrakeControl : public Reactor { } }; -class EngineControl : public Reactor { +class EngineControl final : public Reactor { public: // ports Input angle{"angle", this}; // NOLINT @@ -118,7 +118,7 @@ class EngineControl : public Reactor { } }; -class Brake : public Reactor { +class Brake final : public Reactor { public: // ports Input force{"force", this}; // NOLINT @@ -136,7 +136,7 @@ class Brake : public Reactor { void assemble() override { r1.declare_trigger(&force); } }; -class Engine : public Reactor { +class Engine final : public Reactor { public: // ports Input torque{"torque", this}; // NOLINT diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 825b3bba..8e416ba1 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -32,8 +32,9 @@ enum class Phase : std::uint8_t { Assembly = 1, Startup = 2, Execution = 3, - Shutdown = 4, - Deconstruction = 5 + Mutation = 4, + Shutdown = 5, + Deconstruction = 6, }; class Environment { @@ -74,25 +75,47 @@ private: Graph graph_{}; Graph optimized_graph_{}; - void build_dependency_graph(Reactor* reactor); - void calculate_indexes(); - std::mutex shutdown_mutex_{}; auto startup(const TimePoint& start_time) -> std::thread; public: + // TODO: fix visebility + void calculate_indexes(); + void build_dependency_graph(Reactor* reactor); + void clear_dependency_graph(); + explicit Environment(unsigned int num_workers, bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max()); explicit Environment(const std::string& name, Environment* containing_environment); auto name() -> const std::string& { return name_; } + void start_mutation() { phase_ = Phase::Mutation; } + + void stop_mutation() { phase_ = Phase::Execution; } + // this method draw a connection between two graph elements with some properties template void draw_connection(Port& source, Port& sink, ConnectionProperties properties) { this->draw_connection(&source, &sink, properties); } + template void remove_connection(Port* source, Port* sink) { + if (top_environment_ == nullptr || top_environment_ == this) { + log::Debug() << "remove connection: " << source->fqn() << " -/-> " << sink->fqn(); + graph_.remove_edge(source, sink); + } else { + top_environment_->remove_connection(source, sink); + } + } + + void remove_top_level_reactor(Reactor* reactor) { + auto elements_erased = top_level_reactors_.erase(reactor); + if (elements_erased == 0) { + std::cout << "no elements erased" << '\n'; + } + } + template void draw_connection(Port* source, Port* sink, ConnectionProperties properties) { if (top_environment_ == nullptr || top_environment_ == this) { log::Debug() << "drawing connection: " << source->fqn() << " --> " << sink->fqn(); @@ -114,7 +137,7 @@ public: void export_dependency_graph(const std::string& path); - [[nodiscard]] auto top_level_reactors() const noexcept -> const auto& { return top_level_reactors_; } + [[nodiscard]] auto top_level_reactors() noexcept -> auto& { return top_level_reactors_; } [[nodiscard]] auto phase() const noexcept -> Phase { return phase_; } [[nodiscard]] auto scheduler() const noexcept -> const Scheduler* { return &scheduler_; } diff --git a/include/reactor-cpp/graph.hh b/include/reactor-cpp/graph.hh index 4009e169..739d8b33 100644 --- a/include/reactor-cpp/graph.hh +++ b/include/reactor-cpp/graph.hh @@ -9,6 +9,7 @@ #ifndef REACTOR_CPP_GRAPH_HH #define REACTOR_CPP_GRAPH_HH +#include #include #include #include @@ -54,7 +55,25 @@ public: std::vector> edges{std::make_pair(properties, destination)}; graph_[source] = edges; } else { - graph_[source].emplace_back(properties, destination); + auto& edges = graph_[source]; + auto duplicate = std::find_if(edges.begin(), edges.end(), [&](const std::pair& edge) { + return edge.first == properties && edge.second == destination; + }); + if (duplicate == edges.end()) { + graph_[source].emplace_back(properties, destination); + } + } + } + + auto remove_edge(E source, E destinations) noexcept { + if (graph_.find(source) == std::end(graph_)) { + return; + } + auto conns = std::find_if(std::begin(graph_[source]), std::end(graph_[source]), + [destinations](auto val) { return val.second == destinations; }); + + if (conns != std::end(graph_[source])) { + graph_[source].erase(conns); } } diff --git a/include/reactor-cpp/logging.hh b/include/reactor-cpp/logging.hh index 4ba08434..43da8212 100644 --- a/include/reactor-cpp/logging.hh +++ b/include/reactor-cpp/logging.hh @@ -10,13 +10,10 @@ #define REACTOR_CPP_LOGGING_HH #include "reactor-cpp/config.hh" -#include "reactor-cpp/time.hh" -#include #include #include #include #include -#include namespace reactor::log { @@ -30,7 +27,6 @@ private: // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) inline static std::mutex mutex_{}; - Lock lock_{}; public: diff --git a/include/reactor-cpp/multiport.hh b/include/reactor-cpp/multiport.hh index 3e66cee5..e2e42714 100644 --- a/include/reactor-cpp/multiport.hh +++ b/include/reactor-cpp/multiport.hh @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "assert.hh" @@ -23,14 +24,16 @@ namespace reactor { class BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions,-warnings-as-errors private: - std::atomic size_{0}; + std::atomic present_ports_size_{0}; std::vector present_ports_{}; + std::string multiport_name_; + std::string fqn_; // record that the port with the given index has been set void set_present(std::size_t index); // reset the list of set port indexes - void reset() noexcept { size_.store(0, std::memory_order_relaxed); } + void reset() noexcept { present_ports_size_.store(0, std::memory_order_relaxed); } [[nodiscard]] auto get_set_callback(std::size_t index) noexcept -> PortCallback; const PortCallback clean_callback_{[this]([[maybe_unused]] const BasePort& port) { this->reset(); }}; @@ -39,22 +42,29 @@ private: protected: [[nodiscard]] auto present_ports() const -> const auto& { return present_ports_; } - [[nodiscard]] auto present_ports_size() const -> auto { return size_.load(); } + [[nodiscard]] auto present_ports_size() const -> auto { return present_ports_size_.load(); } void present_ports_reserve(size_t n) { present_ports_.reserve(n); } + void present_ports_pop_back() { present_ports_.pop_back(); } void register_port(BasePort& port, size_t idx); public: - BaseMultiport() = default; + explicit BaseMultiport(std::string name) + : multiport_name_(std::move(name)) {} ~BaseMultiport() = default; + [[nodiscard]] auto name() const -> std::string { return multiport_name_; } }; +template class MutationChangeMultiportSize; + template > class Multiport : public BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions protected: std::vector ports_{}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes + friend MutationChangeMultiportSize; + public: using value_type = typename A::value_type; using size_type = typename A::size_type; @@ -62,7 +72,8 @@ public: using iterator = typename std::vector::iterator; using const_iterator = typename std::vector::const_iterator; - Multiport() noexcept = default; + explicit Multiport(const std::string& name) noexcept + : BaseMultiport(name) {} ~Multiport() noexcept = default; auto operator==(const Multiport& other) const noexcept -> bool { @@ -72,14 +83,14 @@ public: auto operator[](std::size_t index) noexcept -> T& { return ports_[index]; } auto operator[](std::size_t index) const noexcept -> const T& { return ports_[index]; } - auto begin() noexcept -> iterator { return ports_.begin(); }; - auto begin() const noexcept -> const_iterator { return ports_.begin(); }; - auto cbegin() const noexcept -> const_iterator { return ports_.cbegin(); }; - auto end() noexcept -> iterator { return ports_.end(); }; - auto end() const noexcept -> const_iterator { return ports_.end(); }; - auto cend() const noexcept -> const_iterator { return ports_.cend(); }; + [[nodiscard]] auto begin() noexcept -> iterator { return ports_.begin(); }; + [[nodiscard]] auto begin() const noexcept -> const_iterator { return ports_.begin(); }; + [[nodiscard]] auto cbegin() const noexcept -> const_iterator { return ports_.cbegin(); }; + [[nodiscard]] auto end() noexcept -> iterator { return ports_.end(); }; + [[nodiscard]] auto end() const noexcept -> const_iterator { return ports_.end(); }; + [[nodiscard]] auto cend() const noexcept -> const_iterator { return ports_.cend(); }; - auto size() const noexcept -> size_type { return ports_.size(); }; + [[nodiscard]] auto size() const noexcept -> size_type { return ports_.size(); }; [[nodiscard]] auto empty() const noexcept -> bool { return ports_.empty(); }; [[nodiscard]] auto present_indices_unsorted() const noexcept -> std::vector { @@ -95,16 +106,24 @@ public: template > class ModifableMultiport : public Multiport { public: + ModifableMultiport(const std::string& name) + : Multiport(name) {} + void reserve(std::size_t size) noexcept { this->ports_.reserve(size); this->present_ports_reserve(size); } - void push_back(const T& elem) noexcept { - this->ports_.push_back(elem); + void push_back(const T&& elem) noexcept { + this->ports_.push_back(std::move(elem)); this->register_port(this->ports_.back(), this->ports_.size() - 1); } + void pop_back() { + this->ports_.pop_back(); + this->present_ports_pop_back(); + } + template void emplace_back(Args&&... args) noexcept { this->ports_.emplace_back(std::forward(args)...); this->register_port(this->ports_.back(), this->ports_.size() - 1); diff --git a/include/reactor-cpp/mutations.hh b/include/reactor-cpp/mutations.hh new file mode 100644 index 00000000..3aaa1eee --- /dev/null +++ b/include/reactor-cpp/mutations.hh @@ -0,0 +1,30 @@ +#ifndef REACTOR_CPP_MUTATIONS_HH +#define REACTOR_CPP_MUTATIONS_HH + +#include + +namespace reactor { +class Reactor; +class Environment; + +enum MutationResult : std::int8_t { + Success = 0, + NotMatchingBankSize = 1, +}; + +class Mutation { +public: + Mutation() = default; + Mutation(const Mutation& other) = default; + Mutation(Mutation&& other) = default; + virtual ~Mutation() = default; + auto operator=(const Mutation& other) -> Mutation& = default; + auto operator=(Mutation&& other) -> Mutation& = default; + + virtual auto run() -> MutationResult = 0; + virtual auto rollback() -> MutationResult = 0; +}; + +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_HH diff --git a/include/reactor-cpp/mutations/bank.hh b/include/reactor-cpp/mutations/bank.hh new file mode 100644 index 00000000..f8cfc017 --- /dev/null +++ b/include/reactor-cpp/mutations/bank.hh @@ -0,0 +1,51 @@ +// +// Created by tanneberger on 11/18/24. +// + +#ifndef REACTOR_CPP_MUTATIONS_BANK_HH +#define REACTOR_CPP_MUTATIONS_BANK_HH + +#include + +#include "../mutations.hh" +#include "../reactor.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationChangeBankSize final : public Mutation { + std::vector* bank_ = nullptr; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + Reactor* reactor_ = nullptr; + std::function create_lambda_; + + void change_size(std::size_t new_size); + +public: + MutationChangeBankSize() = default; + MutationChangeBankSize(const MutationChangeBankSize& other) noexcept + : bank_(other.bank_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) + , reactor_(other.reactor_) + , create_lambda_(other.create_lambda_) {} + MutationChangeBankSize(MutationChangeBankSize&& other) noexcept + : bank_(other.bank_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) + , reactor_(other.reactor_) + , create_lambda_(other.create_lambda_) {} + explicit MutationChangeBankSize(std::vector* bank, Reactor* reactor, std::size_t size, + std::function create_lambda); + ~MutationChangeBankSize() override = default; + auto operator=(const MutationChangeBankSize& other) -> MutationChangeBankSize& = default; + auto operator=(MutationChangeBankSize&& other) -> MutationChangeBankSize& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_BANK_HH diff --git a/include/reactor-cpp/mutations/connection.hh b/include/reactor-cpp/mutations/connection.hh new file mode 100644 index 00000000..25bb9b37 --- /dev/null +++ b/include/reactor-cpp/mutations/connection.hh @@ -0,0 +1,43 @@ +// +// Created by tanneberger on 11/18/24. +// + +#ifndef REACTOR_CPP_MUTATIONS_CONNECTION_HH +#define REACTOR_CPP_MUTATIONS_CONNECTION_HH + +#include "../mutations.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationAddConnection : public Mutation { +private: + A* source_; + B* sink_; + bool connection_ = false; + Reactor* reactor_{}; + +public: + explicit MutationAddConnection(A* source, B* sink, Reactor* reactor); + MutationAddConnection(const MutationAddConnection& other) + : source_(other.source_) + , sink_(other.sink_) + , connection_(other.connection_) + , reactor_(other.reactor_) {} + MutationAddConnection(MutationAddConnection&& other) noexcept + : source_(other.source_) + , sink_(other.sink_) + , connection_(other.connection_) + , reactor_(other.reactor_) {} + MutationAddConnection() = default; + ~MutationAddConnection() override = default; + auto operator=(const MutationAddConnection& other) -> MutationAddConnection& = default; + auto operator=(MutationAddConnection&& other) -> MutationAddConnection& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_CONNECTION_HH diff --git a/include/reactor-cpp/mutations/multiport.hh b/include/reactor-cpp/mutations/multiport.hh new file mode 100644 index 00000000..268d6267 --- /dev/null +++ b/include/reactor-cpp/mutations/multiport.hh @@ -0,0 +1,49 @@ +// +// Created by tanneberger on 11/11/24. +// + +#ifndef REACTOR_CPP_MUTATIONS_MULTIPORT_HH +#define REACTOR_CPP_MUTATIONS_MULTIPORT_HH + +#include + +#include "../multiport.hh" +#include "../mutations.hh" +#include "../port.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationChangeOutputMultiportSize : public Mutation { +private: + ModifableMultiport>* multiport_ = nullptr; + std::set anti_dependencies_{}; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + Reactor* reactor_{}; + + void change_size(std::size_t new_size); + +public: + MutationChangeOutputMultiportSize(ModifableMultiport>* multiport, Reactor* reactor, + std::set& anti_dependencies, std::size_t size); + MutationChangeOutputMultiportSize() = default; + MutationChangeOutputMultiportSize(const MutationChangeOutputMultiportSize& other) + : multiport_(other.multiport_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) {} + MutationChangeOutputMultiportSize(MutationChangeOutputMultiportSize&& other) noexcept + : multiport_(other.multiport_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) {} + ~MutationChangeOutputMultiportSize() override = default; + auto operator=(const MutationChangeOutputMultiportSize& other) -> MutationChangeOutputMultiportSize& = default; + auto operator=(MutationChangeOutputMultiportSize&& other) -> MutationChangeOutputMultiportSize& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_MULTIPORT_HH diff --git a/include/reactor-cpp/port.hh b/include/reactor-cpp/port.hh index fd3048da..10627714 100644 --- a/include/reactor-cpp/port.hh +++ b/include/reactor-cpp/port.hh @@ -22,6 +22,8 @@ namespace reactor { +template class MutationChangeMultiportSize; + enum class PortType : std::uint8_t { Input, Output, Delay }; class BasePort : public ReactorElement { @@ -44,8 +46,6 @@ protected: : ReactorElement(name, match_port_enum(type), container) , type_(type) {} - void register_dependency(Reaction* reaction, bool is_trigger) noexcept; - void register_antidependency(Reaction* reaction) noexcept; virtual void cleanup() = 0; static auto match_port_enum(PortType type) noexcept -> ReactorElement::Type { @@ -72,7 +72,15 @@ protected: } public: - void set_inward_binding(BasePort* port) noexcept { inward_binding_ = port; } + void register_dependency(Reaction* reaction, bool is_trigger) noexcept; + void register_antidependency(Reaction* reaction) noexcept; + void set_inward_binding(BasePort* port) noexcept { + if (port != nullptr) { + std::cout << port->fqn() << "(" << port << ")" << " --> " << this->fqn() << "(" << this << ")" << '\n'; + } + + inward_binding_ = port; + } void add_outward_binding(BasePort* port) noexcept { outward_bindings_.insert(port); } virtual void instantiate_connection_to(const ConnectionProperties& properties, @@ -97,7 +105,7 @@ public: [[nodiscard]] auto triggers() const noexcept -> const auto& { return triggers_; } [[nodiscard]] auto dependencies() const noexcept -> const auto& { return dependencies_; } - [[nodiscard]] auto anti_dependencies() const noexcept -> const auto& { return anti_dependencies_; } + [[nodiscard]] auto anti_dependencies() noexcept -> auto& { return anti_dependencies_; } // TODO: make it const again [[nodiscard]] auto port_type() const noexcept -> PortType { return type_; } void register_set_callback(const PortCallback& callback); @@ -173,7 +181,9 @@ public: Input(const std::string& name, Reactor* container) : Port(name, PortType::Input, container) {} - Input(Input&&) noexcept = default; + Input(Input&&) = default; + + ~Input() override { std::cout << "Input port gets deallocated:" << this->fqn() << "\n"; } }; template class Output : public Port { // NOLINT(cppcoreguidelines-special-member-functions) @@ -182,6 +192,8 @@ public: : Port(name, PortType::Output, container) {} Output(Output&&) noexcept = default; + + ~Output() override { std::cout << "Output port gets deallocated: " << this->fqn() << "\n"; } }; } // namespace reactor diff --git a/include/reactor-cpp/reaction.hh b/include/reactor-cpp/reaction.hh index 84b8e539..e4cac716 100644 --- a/include/reactor-cpp/reaction.hh +++ b/include/reactor-cpp/reaction.hh @@ -26,8 +26,8 @@ private: std::set antidependencies_; std::set dependencies_; - const int priority_; - unsigned int index_{}; + const int priority_ = -1; + int index_ = -1; std::function body_{nullptr}; @@ -52,6 +52,7 @@ public: [[nodiscard]] auto port_triggers() const noexcept -> const auto& { return port_trigger_; } [[maybe_unused]] [[nodiscard]] auto antidependencies() const noexcept -> const auto& { return antidependencies_; } + [[maybe_unused]] void clear_antidependencies() noexcept { antidependencies_.clear(); } [[nodiscard]] auto dependencies() const noexcept -> const auto& { return dependencies_; } @@ -62,7 +63,7 @@ public: void startup() final {} void shutdown() final {} void trigger(); - void set_index(unsigned index); + void set_index(int index); template void set_deadline(Dur deadline, const std::function& handler) { set_deadline_impl(std::chrono::duration_cast(deadline), handler); diff --git a/include/reactor-cpp/reactor-cpp.hh b/include/reactor-cpp/reactor-cpp.hh index 420ed9e3..9f8ecf1c 100644 --- a/include/reactor-cpp/reactor-cpp.hh +++ b/include/reactor-cpp/reactor-cpp.hh @@ -20,6 +20,7 @@ #include "port.hh" #include "reaction.hh" #include "reactor.hh" +#include "scopes.hh" #include "time.hh" #endif // REACTOR_CPP_REACTOR_CPP_HH diff --git a/include/reactor-cpp/reactor.hh b/include/reactor-cpp/reactor.hh index 02b698dd..11e861ac 100644 --- a/include/reactor-cpp/reactor.hh +++ b/include/reactor-cpp/reactor.hh @@ -14,7 +14,6 @@ #include #include "action.hh" -#include "environment.hh" #include "logical_time.hh" #include "reactor_element.hh" @@ -23,10 +22,10 @@ class Reactor : public ReactorElement { // NOLINT(cppcoreguidelines-special-memb private: std::set actions_{}; - std::set inputs_{}; + std::vector inputs_{}; std::set outputs_{}; std::set reactions_{}; - std::set reactors_{}; + std::vector reactors_{}; std::set> connections_{}; void register_action(BaseAction* action); @@ -38,6 +37,7 @@ private: public: Reactor(const std::string& name, Reactor* container); Reactor(const std::string& name, Environment* environment); + Reactor() = delete; ~Reactor() override = default; void register_connection(std::unique_ptr&& connection); @@ -60,6 +60,9 @@ public: [[nodiscard]] auto get_elapsed_logical_time() const noexcept -> Duration; [[nodiscard]] auto get_elapsed_physical_time() const noexcept -> Duration; + void remove_inputs(BasePort* base_port); + void remove_child_reactor(const Reactor* base_reactor); + friend ReactorElement; }; diff --git a/include/reactor-cpp/reactor_element.hh b/include/reactor-cpp/reactor_element.hh index d78bfeff..d9f44013 100644 --- a/include/reactor-cpp/reactor_element.hh +++ b/include/reactor-cpp/reactor_element.hh @@ -12,7 +12,6 @@ #include #include -#include #include #include @@ -52,6 +51,12 @@ public: virtual void startup() = 0; virtual void shutdown() = 0; + + auto operator==(const ReactorElement& other) const -> bool { + // std::cout << other.name() << "==" << name_ << std::endl; + return name_ == other.name(); // && container_ == other.container() && environment_ == other.environment(); // && + // fqn_ == other.fqn() + } }; } // namespace reactor diff --git a/include/reactor-cpp/scopes.hh b/include/reactor-cpp/scopes.hh new file mode 100644 index 00000000..fe1de9f4 --- /dev/null +++ b/include/reactor-cpp/scopes.hh @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2024 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_SCOPES_HH +#define REACTOR_CPP_SCOPES_HH + +#include "environment.hh" +#include "logical_time.hh" +#include "reactor.hh" +#include "transaction.hh" + +namespace reactor { + +class Scope { +private: + Reactor* reactor_; + +public: + explicit Scope(Reactor* reactor) + : reactor_(reactor) {} + + [[nodiscard]] static auto get_physical_time() noexcept -> TimePoint { return Reactor::get_physical_time(); } + [[nodiscard]] auto get_tag() const noexcept -> Tag { return reactor_->get_tag(); } + [[nodiscard]] auto get_logical_time() const noexcept -> TimePoint { return reactor_->get_logical_time(); } + [[nodiscard]] auto get_microstep() const noexcept -> mstep_t { return reactor_->get_microstep(); } + [[nodiscard]] auto get_elapsed_logical_time() const noexcept -> Duration { + return reactor_->get_elapsed_logical_time(); + } + [[nodiscard]] auto get_elapsed_physical_time() const noexcept -> Duration { + return reactor_->get_elapsed_physical_time(); + } + [[nodiscard]] auto environment() const noexcept -> Environment* { return reactor_->environment(); } + void request_stop() const { environment()->sync_shutdown(); } +}; + +class MutableScope : public Scope { +public: + Transaction transaction_; + Reactor* reactor_; + Environment* env_ = nullptr; + + explicit MutableScope(Reactor* reactor) + : Scope(reactor) + , transaction_(reactor) + , reactor_(reactor) + , env_(reactor->environment()) {} + MutableScope(const MutableScope& other) + : Scope(other.reactor_) + , transaction_(other.transaction_) + , reactor_(other.reactor_) + , env_(other.env_) {} + MutableScope(MutableScope&& other) noexcept + : Scope(other.reactor_) + , transaction_(std::move(other.transaction_)) + , reactor_(other.reactor_) + , env_(other.env_) {} + ~MutableScope() = default; + auto operator=(const MutableScope& other) -> MutableScope& = default; + auto operator=(MutableScope&& other) -> MutableScope& = default; + + void commit_transaction(bool recalculate = false); + void add_to_transaction(const std::shared_ptr& mutation); +}; + +} // namespace reactor + +#endif // REACTOR_CPP_SCOPES_HH diff --git a/include/reactor-cpp/transaction.hh b/include/reactor-cpp/transaction.hh new file mode 100644 index 00000000..b0ce287c --- /dev/null +++ b/include/reactor-cpp/transaction.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_TRANSACTION_HH +#define REACTOR_CPP_TRANSACTION_HH + +#include +#include + +#include "mutations.hh" +// #include "reactor.hh" + +namespace reactor { +class Reactor; +class Environment; + +class Transaction { +private: + Environment* environment_ = nullptr; + std::vector> mutations_{}; + +public: + explicit Transaction(Reactor* parent); + Transaction(const Transaction& other) = default; + Transaction(Transaction&& other) = default; + auto operator=(const Transaction& other) -> Transaction& = default; + auto operator=(Transaction&& other) -> Transaction& = default; + ~Transaction() = default; + + void push_back(const std::shared_ptr& mutation); + auto execute(bool recalculate = false) -> MutationResult; +}; +} // namespace reactor +#endif // REACTOR_CPP_TRANSACTION_HH diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 6f2ffd7d..2ade6076 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -7,13 +7,18 @@ set(SOURCE_FILES reaction.cc reactor.cc scheduler.cc + scopes.cc time.cc + transaction.cc multiport.cc reactor_element.cc + mutation/multiport.cc + mutation/bank.cc + mutation/connection.cc ) if(REACTOR_CPP_TRACE) - set(SOURCE_FILES ${SOURCE_FILES} trace.cc ) + set(SOURCE_FILES ${SOURCE_FILES} trace.cc) endif() if (DEFINED LF_REACTOR_CPP_SUFFIX) @@ -22,7 +27,7 @@ else() set(REACTOR_CPP_INCLUDE "include") endif() -add_library(${LIB_TARGET} SHARED ${SOURCE_FILES}) +add_library(${LIB_TARGET} STATIC ${SOURCE_FILES}) target_include_directories(${LIB_TARGET} PUBLIC "$" "$" diff --git a/lib/action.cc b/lib/action.cc index 04a52a41..ed8a43a7 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -29,7 +29,7 @@ void BaseAction::register_trigger(Reaction* reaction) { validate(this->container() == reaction->container(), "Action triggers must belong to the same reactor as the triggered " "reaction"); - [[maybe_unused]] bool result = triggers_.insert(reaction).second; + [[maybe_unused]] const bool result = triggers_.insert(reaction).second; reactor_assert(result); } @@ -40,7 +40,7 @@ void BaseAction::register_scheduler(Reaction* reaction) { // the reaction must belong to the same reactor as this action validate(this->container() == reaction->container(), "Scheduable actions must belong to the same reactor as the " "triggered reaction"); - [[maybe_unused]] bool result = schedulers_.insert(reaction).second; + [[maybe_unused]] const bool result = schedulers_.insert(reaction).second; reactor_assert(result); } @@ -62,8 +62,8 @@ void Timer::cleanup() noexcept { BaseAction::cleanup(); // schedule the timer again if (period_ != Duration::zero()) { - Tag now = Tag::from_logical_time(environment()->logical_time()); - Tag next = now.delay(period_); + const Tag now = Tag::from_logical_time(environment()->logical_time()); + const Tag next = now.delay(period_); environment()->scheduler()->schedule_sync(this, next); } } @@ -74,7 +74,7 @@ ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container) void ShutdownTrigger::setup() noexcept { BaseAction::setup(); } void ShutdownTrigger::shutdown() { - Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); + const Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); environment()->scheduler()->schedule_sync(this, tag); } diff --git a/lib/environment.cc b/lib/environment.cc index 52c0b6bf..ca08e670 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -41,15 +41,16 @@ Environment::Environment(const std::string& name, Environment* containing_enviro , top_environment_(containing_environment_->top_environment_) , scheduler_(this) , timeout_(containing_environment->timeout()) { - [[maybe_unused]] bool result = containing_environment->contained_environments_.insert(this).second; + [[maybe_unused]] const bool result = containing_environment->contained_environments_.insert(this).second; reactor_assert(result); } void Environment::register_reactor(Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->phase() == Phase::Construction, "Reactors may only be registered during construction phase!"); + validate(this->phase() == Phase::Construction || this->phase() == Phase::Mutation, + "Reactors may only be registered during construction phase!"); validate(reactor->is_top_level(), "The environment may only contain top level reactors!"); - [[maybe_unused]] bool result = top_level_reactors_.insert(reactor).second; + [[maybe_unused]] const bool result = top_level_reactors_.insert(reactor).second; reactor_assert(result); } @@ -57,7 +58,7 @@ void Environment::register_input_action(BaseAction* action) { reactor_assert(action != nullptr); validate(this->phase() == Phase::Construction || this->phase() == Phase::Assembly, "Input actions may only be registered during construction or assembly phase!"); - [[maybe_unused]] bool result = input_actions_.insert(action).second; + [[maybe_unused]] const bool result = input_actions_.insert(action).second; reactor_assert(result); run_forever_ = true; } @@ -144,6 +145,11 @@ void Environment::assemble() { // NOLINT(readability-function-cognitive-complexi } } +void Environment::clear_dependency_graph() { + dependencies_.clear(); + reactions_.clear(); +} + void Environment::build_dependency_graph(Reactor* reactor) { // obtain dependencies from each contained reactor for (auto* sub_reactor : reactor->reactors()) { @@ -273,7 +279,7 @@ void Environment::calculate_indexes() { } log_.debug() << "Reactions sorted by index:"; - unsigned int index = 0; + int index = 0; while (!graph.empty()) { // find nodes with degree zero and assign index std::set degree_zero; diff --git a/lib/logical_time.cc b/lib/logical_time.cc index c17ca2f4..3e4ea73b 100644 --- a/lib/logical_time.cc +++ b/lib/logical_time.cc @@ -45,7 +45,8 @@ auto Tag::subtract(Duration offset) const noexcept -> Tag { auto Tag::decrement() const noexcept -> Tag { if (micro_step_ == 0) { - return {time_point_ - Duration{1}, std::numeric_limits::max()}; + // FIXME: return {time_point_ - Duration{1}, std::numeric_limits::max()}; + return {time_point_ - Duration{1}, 0}; } return {time_point_, micro_step_ - 1}; } diff --git a/lib/multiport.cc b/lib/multiport.cc index c6674cb1..ca9c5d82 100644 --- a/lib/multiport.cc +++ b/lib/multiport.cc @@ -21,7 +21,7 @@ auto reactor::BaseMultiport::get_set_callback(std::size_t index) noexcept -> rea } void reactor::BaseMultiport::set_present(std::size_t index) { - auto calculated_index = size_.fetch_add(1, std::memory_order_relaxed); + auto calculated_index = present_ports_size_.fetch_add(1, std::memory_order_relaxed); reactor_assert(calculated_index < present_ports_.size()); diff --git a/lib/mutation/bank.cc b/lib/mutation/bank.cc new file mode 100644 index 00000000..5ce7d635 --- /dev/null +++ b/lib/mutation/bank.cc @@ -0,0 +1,50 @@ +// +// Created by tanneberger on 11/11/24. +// + +#include "reactor-cpp/mutations/bank.hh" +#include "reactor-cpp/action.hh" + +template +reactor::MutationChangeBankSize::MutationChangeBankSize( + std::vector* bank, Reactor* reactor, std::size_t size, + std::function create_lambda) + : bank_(bank) + , reactor_(reactor) + , desired_size_(size) + , create_lambda_(std::move(create_lambda)) {} + +template void reactor::MutationChangeBankSize::change_size(std::size_t new_size) { + bank_->reserve(new_size); + auto current_size = bank_->size(); + std::cout << "scaling from: " << current_size << " to " << new_size << std::endl; + + if (current_size >= new_size) { + // downscale + + for (auto i = 0; i < current_size - new_size; i++) { + // TODO: consider saving the ports here here + std::unique_ptr lastElement = std::move(bank_->back()); + bank_->pop_back(); + reactor_->environment()->remove_top_level_reactor(lastElement.get()); + } + } else { + // upscale + + for (auto i = 0; i < new_size - current_size; i++) { + bank_->push_back(create_lambda_(reactor_, current_size + i)); + (*bank_)[bank_->size() - 1]->assemble(); + } + std::cout << "created new reactors" << '\n'; + } +} +template auto reactor::MutationChangeBankSize::run() -> MutationResult { + size_before_application_ = bank_->size(); + change_size(desired_size_); + return Success; +} + +template auto reactor::MutationChangeBankSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/mutation/connection.cc b/lib/mutation/connection.cc new file mode 100644 index 00000000..7bbbcd76 --- /dev/null +++ b/lib/mutation/connection.cc @@ -0,0 +1,27 @@ +// +// Created by tanneberger on 11/20/24. +// + +#include "reactor-cpp/mutations/connection.hh" +#include "reactor-cpp/reactor.hh" + +template +reactor::MutationAddConnection::MutationAddConnection(A* source, B* sink, Reactor* reactor) + : source_(source) + , sink_(sink) + , reactor_(reactor) {} + +template auto reactor::MutationAddConnection::run() -> MutationResult { + reactor_->environment()->draw_connection(source_, sink_, ConnectionProperties{}); + sink_->set_inward_binding(source_); + source_->add_outward_binding(sink_); + // std::cout << "from: " << source_->fqn() << "(" << source_ << ")" + // << " --> to: " << sink_->fqn() << "(" << sink_ << ")" << std::endl; + return Success; +} + +template auto reactor::MutationAddConnection::rollback() -> MutationResult { + reactor_->environment()->remove_connection(source_, sink_); + + return Success; +} diff --git a/lib/mutation/multiport.cc b/lib/mutation/multiport.cc new file mode 100644 index 00000000..486d6b6c --- /dev/null +++ b/lib/mutation/multiport.cc @@ -0,0 +1,64 @@ +// +// Created by tanneberger on 11/11/24. +// + +#include "reactor-cpp/mutations/multiport.hh" +#include "reactor-cpp/reaction.hh" + +template +reactor::MutationChangeOutputMultiportSize::MutationChangeOutputMultiportSize( + ModifableMultiport>* multiport, Reactor* reactor, std::set& anti_dependencies, + std::size_t size) + : multiport_(multiport) + , reactor_(reactor) + , desired_size_(size) + , anti_dependencies_(anti_dependencies) {} + +template void reactor::MutationChangeOutputMultiportSize::change_size(std::size_t new_size) { + auto current_size = multiport_->size(); + if (current_size >= new_size) { + // downscale + + for (auto i = 0; i < current_size - new_size; i++) { + // TODO: consider saving the ports here here + + std::string port_name_ = multiport_->name() + "_" + std::to_string(current_size - i - 1); + multiport_->pop_back(); + auto base_port = Output{port_name_, reactor_}; + reactor_->remove_inputs(&base_port); + } + + for (auto* anti_dep : anti_dependencies_) { + anti_dep->clear_antidependencies(); + for (auto i = 0; i < new_size; i++) { + + multiport_->operator[](i).anti_dependencies().clear(); + anti_dep->declare_antidependency(&multiport_->operator[](i)); + } + } + } else { + // upscale + + for (auto i = 0; i < new_size - current_size; i++) { + std::string port_name_ = multiport_->name() + "_" + std::to_string(current_size + i); + multiport_->emplace_back(port_name_, reactor_); + } + + /*for (auto* anti_dep : anti_dependencies_) { + anti_dep->clear_antidependencies(); + for (auto i = 0; i < new_size; i++) { + anti_dep->declare_antidependency(&multiport_->operator[](i)); + } + }*/ + } +} +template auto reactor::MutationChangeOutputMultiportSize::run() -> MutationResult { + size_before_application_ = multiport_->size(); + change_size(desired_size_); + return Success; +} + +template auto reactor::MutationChangeOutputMultiportSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/port.cc b/lib/port.cc index 7260c661..23af9ea3 100644 --- a/lib/port.cc +++ b/lib/port.cc @@ -20,7 +20,7 @@ void BasePort::register_dependency(Reaction* reaction, bool is_trigger) noexcept reactor_assert(reaction != nullptr); reactor_assert(this->environment() == reaction->environment()); validate(!this->has_outward_bindings(), "Dependencies may no be declared on ports with an outward binding!"); - assert_phase(this, Phase::Assembly); + // assert_phase(this, Phase::Assembly); if (this->is_input()) { validate(this->container() == reaction->container(), "Dependent input ports must belong to the same reactor as the " @@ -42,7 +42,7 @@ void BasePort::register_antidependency(Reaction* reaction) noexcept { reactor_assert(reaction != nullptr); reactor_assert(this->environment() == reaction->environment()); validate(!this->has_inward_binding(), "Antidependencies may no be declared on ports with an inward binding!"); - assert_phase(this, Phase::Assembly); + // TODO: assert_phase(this, Phase::Assembly); if (this->is_output()) { validate(this->container() == reaction->container(), diff --git a/lib/reaction.cc b/lib/reaction.cc index 697ab52b..20858170 100644 --- a/lib/reaction.cc +++ b/lib/reaction.cc @@ -51,7 +51,7 @@ void Reaction::declare_schedulable_action(BaseAction* action) { void Reaction::declare_trigger(BasePort* port) { reactor_assert(port != nullptr); reactor_assert(this->environment() == port->environment()); - assert_phase(this, Phase::Assembly); + // assert_phase(this, Phase::Assembly); if (port->is_input()) { validate(this->container() == port->container(), @@ -90,7 +90,7 @@ void Reaction::declare_dependency(BasePort* port) { void Reaction::declare_antidependency(BasePort* port) { reactor_assert(port != nullptr); reactor_assert(this->environment() == port->environment()); - assert_phase(this, Phase::Assembly); + // assert_phase(this, Phase::Assembly); if (port->is_output()) { validate(this->container() == port->container(), "Antidependent output ports must belong to the same reactor as " @@ -125,8 +125,9 @@ void Reaction::set_deadline_impl(Duration deadline, const std::functiondeadline_handler_ = handler; } -void Reaction::set_index(unsigned index) { - validate(this->environment()->phase() == Phase::Assembly, "Reaction indexes may only be set during assembly phase!"); +void Reaction::set_index(int index) { + validate(this->environment()->phase() == Phase::Assembly || this->environment()->phase() == Phase::Mutation, + "Reaction indexes may only be set during assembly phase!"); this->index_ = index; } diff --git a/lib/reactor.cc b/lib/reactor.cc index f42488b0..b73263de 100644 --- a/lib/reactor.cc +++ b/lib/reactor.cc @@ -18,7 +18,9 @@ namespace reactor { Reactor::Reactor(const std::string& name, Reactor* container) - : ReactorElement(name, ReactorElement::Type::Reactor, container) {} + : ReactorElement(name, ReactorElement::Type::Reactor, container) { + container->register_reactor(this); +} Reactor::Reactor(const std::string& name, Environment* environment) : ReactorElement(name, ReactorElement::Type::Reactor, environment) { environment->register_reactor(this); @@ -36,26 +38,31 @@ void Reactor::register_action([[maybe_unused]] BaseAction* action) { void Reactor::register_input(BasePort* port) { reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; - reactor_assert(result); + //[[maybe_unused]] bool result = inputs_.insert(port).second; + inputs_.push_back(port); + // reactor_assert(result); Statistics::increment_ports(); } void Reactor::register_output(BasePort* port) { + (void)port; reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; - reactor_assert(result); + //[[maybe_unused]] bool result = inputs_.insert(port).second; + // std::cout << "reactor port count:" << inputs_.size() << std::endl; + // TODO: reactor_assert(result); Statistics::increment_ports(); } void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { reactor_assert(reaction != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); [[maybe_unused]] bool result = reactions_.insert(reaction).second; reactor_assert(result); @@ -64,10 +71,16 @@ void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); - [[maybe_unused]] bool result = reactors_.insert(reactor).second; - reactor_assert(result); + if (std::find(std::begin(reactors_), std::end(reactors_), reactor) == std::end(reactors_)) { + reactors_.push_back(reactor); + } else { + std::cout << "duplicate insertion!" << '\n'; + } + + //[[maybe_unused]] bool result = reactors_.insert(reactor).second; + // reactor_assert(result); Statistics::increment_reactor_instances(); } @@ -141,4 +154,22 @@ auto Reactor::get_elapsed_physical_time() const noexcept -> Duration { return get_physical_time() - environment()->start_tag().time_point(); } +void Reactor::remove_inputs(BasePort* base_port) { + auto index = std::find_if(std::begin(inputs_), std::end(inputs_), + [base_port](const BasePort* other) { return *other == *base_port; }); + + if (index != std::end(inputs_)) { + inputs_.erase(index); + } +}; + +void Reactor::remove_child_reactor(const Reactor* base_reactor) { + const auto index = std::find_if(std::begin(reactors_), std::end(reactors_), + [base_reactor](const Reactor* other) { return base_reactor == other; }); + + if (index != std::end(reactors_)) { + reactors_.erase(index); + } +} + } // namespace reactor diff --git a/lib/reactor_element.cc b/lib/reactor_element.cc index 32db319f..672fa5d7 100644 --- a/lib/reactor_element.cc +++ b/lib/reactor_element.cc @@ -24,7 +24,7 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ reactor_assert(container != nullptr); this->environment_ = container->environment(); reactor_assert(this->environment_ != nullptr); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction || this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); // We need a reinterpret_cast here as the derived class is not yet created @@ -71,7 +71,7 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ reactor_assert(environment != nullptr); validate(type == Type::Reactor || type == Type::Action, "Only reactors and actions can be owned by the environment!"); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction || this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); diff --git a/lib/scopes.cc b/lib/scopes.cc new file mode 100644 index 00000000..dee9ee79 --- /dev/null +++ b/lib/scopes.cc @@ -0,0 +1,12 @@ + + +#include "reactor-cpp/scopes.hh" + +void reactor::MutableScope::add_to_transaction(const std::shared_ptr& mutation) { + transaction_.push_back(mutation); +} + +void reactor::MutableScope::commit_transaction(bool recalculate) { + (void)recalculate; + transaction_.execute(recalculate); +} \ No newline at end of file diff --git a/lib/transaction.cc b/lib/transaction.cc new file mode 100644 index 00000000..decb77b9 --- /dev/null +++ b/lib/transaction.cc @@ -0,0 +1,34 @@ + + +#include "reactor-cpp/transaction.hh" +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/reactor.hh" + +reactor::Transaction::Transaction(Reactor* parent) + : environment_(parent->environment()) {} + +auto reactor::Transaction::execute(bool recalculate) -> MutationResult { + + this->environment_->start_mutation(); + for (const auto& mutation : mutations_) { + mutation->run(); + } + + if (recalculate) { + this->environment_->clear_dependency_graph(); + for (auto* reactor : this->environment_->top_level_reactors()) { + this->environment_->build_dependency_graph(reactor); + } + + this->environment_->calculate_indexes(); + } + + this->environment_->stop_mutation(); + + mutations_.clear(); + return Success; +} + +void reactor::Transaction::push_back(const std::shared_ptr& mutation) { + mutations_.push_back(mutation); +} \ No newline at end of file