Skip to content

Commit aef48d6

Browse files
committed
Request/response tests excluded and README typos corrected
Signed-off-by: zesk1999 <zesk1999@gmail.com>
1 parent eaee879 commit aef48d6

File tree

6 files changed

+51
-26
lines changed

6 files changed

+51
-26
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ AI developers from all experience levels can make use of the framework through i
4444

4545
### Project Architecture
4646

47-
The *SustainML Framework* is composed of different Software Modules, each one related to specific task, which are specialized in solving the different parts of the machine learning problem architecture definition, starting from the user’s problem description.
48-
Each of the modules conforms a Node.
47+
The *SustainML Framework* is composed of different Software Modules, each one related to a specific task, which are specialized in solving the different parts of the machine learning problem architecture definition, starting from the user’s problem description.
48+
Each of the modules forms a Node.
4949
These steps are basically:
5050

5151
1. Encode the problem and constraints defined by the user
@@ -64,7 +64,7 @@ The exchanged information between the modules is over DDS.
6464
### Framework
6565

6666
The Framework includes a *Graphical User Interface* (GUI) in which user interacts and introduces the ML problem definition.
67-
That GUI implements also the **Orchestrator** node, a key node that feds the remain modules with the information provided by the user, retrieves all the results, and display them to the user though this GUI.
67+
That GUI implements also the **Orchestrator** node, a key node that feeds the remaining modules with the information provided by the user, retrieves all the results, and display them to the user through this GUI.
6868

6969
This process can be iterative.
7070
So, based on a previous solution and the user's feedback, the framework provides new ML solutions.
@@ -77,9 +77,9 @@ This repository is divided in sub-packages with different targets:
7777

7878
* `sustainml_cpp`: Main definition and implementation of the project logic library. C++ API provided.
7979
* `sustainml_docs`: ReadTheDocs documentation project ([available here](https://sustainml.readthedocs.io/en/latest/))
80-
* `sustainml_modules`: Set of piped modules that use the Python API.
81-
* `sustainml_py`: Wrap of the project logic library. Python API.
82-
* `sustainml_swig`: Binding from the C++ API `sustainml_cpp` to the `sustainml_py` Python API.
80+
* `sustainml_modules`: Set of pipeline modules that use the Python API.
81+
* `sustainml_py`: Wrapper of the project logic library. Python API.
82+
* `sustainml_swig`: Bindings from the C++ API `sustainml_cpp` to the `sustainml_py` Python API.
8383

8484
## Getting Help
8585

sustainml_cpp/src/cpp/core/GenericServiceNodeImpl.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ class GenericServiceNodeImpl : public ServerBase
5656
const eprosima::fastdds::dds::rpc::RpcRequest& /*info*/,
5757
const std::string& configuration) override
5858
{
59+
if (node_.shutting_down())
60+
{
61+
throw ::InternalError("update_configuration: node is shutting down");
62+
}
63+
5964
std::cout << "[RPC SERVER/" << tag_ << "] update_configuration cfg='"
6065
<< configuration << "'\n";
6166

sustainml_cpp/src/cpp/core/NodeImpl.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,24 +118,26 @@ NodeImpl::NodeImpl(
118118

119119
NodeImpl::~NodeImpl()
120120
{
121+
shutting_down_.store(true, std::memory_order_release);
122+
121123
EPROSIMA_LOG_INFO(NODE, "Destroying Node");
122124

123-
// Stop RPC server before destroying the participant
124125
if (rpc_server_)
125126
{
126127
rpc_server_->stop();
127-
}
128128

129-
if (rpc_server_thread_.joinable())
130-
{
131-
rpc_server_thread_.join();
129+
if (rpc_server_thread_.joinable())
130+
{
131+
rpc_server_thread_.join();
132+
}
133+
rpc_server_.reset();
132134
}
133135

134-
if (nullptr != participant_)
136+
if (participant_)
135137
{
136138
participant_->delete_contained_entities();
137-
auto dpf = DomainParticipantFactory::get_instance();
138-
dpf->delete_participant(participant_);
139+
DomainParticipantFactory::get_instance()->delete_participant(participant_);
140+
participant_ = nullptr;
139141
}
140142

141143
dispatcher_->stop();

sustainml_cpp/src/cpp/core/NodeImpl.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ class NodeImpl
127127
return req_res_listener_;
128128
}
129129

130+
bool shutting_down() const noexcept
131+
{
132+
return shutting_down_.load(std::memory_order_acquire);
133+
}
134+
130135
protected:
131136

132137
/**
@@ -253,6 +258,8 @@ class NodeImpl
253258

254259
}
255260
control_listener_;
261+
262+
std::atomic<bool> shutting_down_{false};
256263
};
257264

258265
} // namespace core

sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ template <typename ClientT>
6262
bool rpc_update_configuration(
6363
ClientT& client,
6464
const std::string& configuration,
65-
std::string& out_cfg)
65+
std::string& out_cfg,
66+
const std::atomic<bool>& terminate_flag)
6667
{
6768
auto future = client.update_configuration(configuration);
6869

@@ -73,6 +74,12 @@ bool rpc_update_configuration(
7374

7475
while (true)
7576
{
77+
if (terminate_flag.load(std::memory_order_acquire))
78+
{
79+
std::cerr << "[INFO] RPC aborted due to shutdown\n";
80+
return false;
81+
}
82+
7683
// Wait a bit, but don’t block forever
7784
auto status = future.wait_for(step);
7885

@@ -148,7 +155,7 @@ void OrchestratorNode::OrchestratorParticipantListener::on_participant_discovery
148155
});
149156
}
150157

151-
// create the proxy for this node
158+
// Create the proxy for this node
152159
NodeID node_id = common::get_node_id_from_name(participant_name);
153160

154161
std::cout << "[DEBUG Orchestrator] Participant discovered: name='"
@@ -159,8 +166,9 @@ void OrchestratorNode::OrchestratorParticipantListener::on_participant_discovery
159166

160167
std::lock_guard<std::mutex> lock(orchestrator_->proxies_mtx_);
161168

162-
// check if the node has been terminated
163-
if (!orchestrator_->terminated_.load())
169+
// Check if the node has been terminated
170+
if (!orchestrator_->terminate_.load(std::memory_order_acquire) &&
171+
!orchestrator_->terminated_.load(std::memory_order_acquire))
164172
{
165173
if (reason == eprosima::fastdds::rtps::ParticipantDiscoveryStatus::DISCOVERED_PARTICIPANT &&
166174
orchestrator_->node_proxies_[static_cast<uint32_t>(node_id)] == nullptr)
@@ -223,6 +231,8 @@ OrchestratorNode::~OrchestratorNode()
223231

224232
void OrchestratorNode::destroy()
225233
{
234+
terminate_.store(true, std::memory_order_release);
235+
226236
if (!terminated_.load())
227237
{
228238
{
@@ -379,7 +389,7 @@ bool OrchestratorNode::init()
379389

380390
holder->app_requirements_client = create_AppRequirementsServiceClient(
381391
*participant_,
382-
"AppRequirementsService", // must match server side
392+
"AppRequirementsService", // Must match server side
383393
rqos);
384394
std::cout << "[DEBUG Orchestrator] Created AppRequirementsServiceClient service='AppRequirementsService' ok="
385395
<< (holder->app_requirements_client ? "true" : "false")
@@ -481,7 +491,7 @@ std::pair<types::TaskId, types::UserInput*> OrchestratorNode::prepare_new_iterat
481491
std::lock_guard<std::mutex> lock(task_db_->get_mutex());
482492
task_db_->prepare_new_entry_nts(new_task_id, true);
483493
// Copy the UserInput from the previous iteration
484-
// it also updates the iteration_id in the data
494+
// It also updates the iteration_id in the data
485495
task_db_->copy_data_nts(old_task_id, new_task_id, {NodeID::ID_ORCHESTRATOR});
486496
task_db_->get_task_data_nts(new_task_id, output.second);
487497
}
@@ -715,7 +725,7 @@ types::ResponseType OrchestratorNode::configuration_request (
715725
{
716726
std::cout << "[RPC CLIENT] calling AppRequirementsService.update_configuration tx="
717727
<< req.transaction_id() << std::endl;
718-
if (!rpc_update_configuration(*holder->app_requirements_client, req.configuration(), cfg))
728+
if (!rpc_update_configuration(*holder->app_requirements_client, req.configuration(), cfg, terminate_))
719729
{
720730
return res; // Timeout or not ready
721731
}
@@ -725,7 +735,7 @@ types::ResponseType OrchestratorNode::configuration_request (
725735
{
726736
std::cout << "[RPC CLIENT] calling HWConstraintsService.update_configuration tx="
727737
<< req.transaction_id() << std::endl;
728-
if (!rpc_update_configuration(*holder->hw_constraints_client, req.configuration(), cfg))
738+
if (!rpc_update_configuration(*holder->hw_constraints_client, req.configuration(), cfg, terminate_))
729739
{
730740
return res;
731741
}
@@ -736,7 +746,7 @@ types::ResponseType OrchestratorNode::configuration_request (
736746
std::cout << "[RPC CLIENT] calling HWResourcesService.update_configuration tx="
737747
<< req.transaction_id() << std::endl;
738748

739-
if (!rpc_update_configuration(*holder->hw_resources_client, req.configuration(), cfg))
749+
if (!rpc_update_configuration(*holder->hw_resources_client, req.configuration(), cfg, terminate_))
740750
{
741751
return res;
742752
}
@@ -746,7 +756,7 @@ types::ResponseType OrchestratorNode::configuration_request (
746756
{
747757
std::cout << "[RPC CLIENT] calling CarbonFootprintService.update_configuration tx="
748758
<< req.transaction_id() << std::endl;
749-
if (!rpc_update_configuration(*holder->carbon_footprint_client, req.configuration(), cfg))
759+
if (!rpc_update_configuration(*holder->carbon_footprint_client, req.configuration(), cfg, terminate_))
750760
{
751761
return res;
752762
}
@@ -756,7 +766,7 @@ types::ResponseType OrchestratorNode::configuration_request (
756766
{
757767
std::cout << "[RPC CLIENT] calling MLModelMetadataService.update_configuration tx="
758768
<< req.transaction_id() << std::endl;
759-
if (!rpc_update_configuration(*holder->ml_model_metadata_client, req.configuration(), cfg))
769+
if (!rpc_update_configuration(*holder->ml_model_metadata_client, req.configuration(), cfg, terminate_))
760770
{
761771
return res;
762772
}
@@ -766,7 +776,7 @@ types::ResponseType OrchestratorNode::configuration_request (
766776
{
767777
std::cout << "[RPC CLIENT] calling MLModelService.update_configuration tx="
768778
<< req.transaction_id() << std::endl;
769-
if (!rpc_update_configuration(*holder->ml_model_client, req.configuration(), cfg))
779+
if (!rpc_update_configuration(*holder->ml_model_client, req.configuration(), cfg, terminate_))
770780
{
771781
return res;
772782
}

sustainml_cpp/test/blackbox/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
file(GLOB BLACKBOXTESTS_TEST_SOURCE "common/BlackboxTests*.cpp")
17+
list(FILTER BLACKBOXTESTS_TEST_SOURCE EXCLUDE REGEX ".*ResponseNodes.*\\.cpp$") # Old behavior tests
1718
set(BLACKBOXTESTS_SOURCE ${BLACKBOXTESTS_TEST_SOURCE}
1819
${PROJECT_SOURCE_DIR}/src/cpp/types/typesImplTypeObjectSupport.cxx
1920
${PROJECT_SOURCE_DIR}/src/cpp/types/typesImplPubSubTypes.cxx

0 commit comments

Comments
 (0)