Skip to content

Commit b5eb7a2

Browse files
Mon 194544 broker process output (#3149)
* add event_script output * add UT * add robot test * fix tests * review comment
1 parent 2ed0c62 commit b5eb7a2

File tree

25 files changed

+1282
-12
lines changed

25 files changed

+1282
-12
lines changed

broker/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ target_link_libraries(
420420
roker
421421
multiplexing
422422
centreon_common
423+
centreon_process
423424
-Wl,--no-whole-archive
424425
nlohmann_json::nlohmann_json
425426
-Wl,--whole-archive
@@ -469,6 +470,7 @@ else()
469470
add_broker_module(DUMP OFF)
470471
add_broker_module(GRPC ON)
471472
add_broker_module(VICTORIA_METRICS ON)
473+
add_broker_module(EVENT_SCRIPT ON)
472474
add_subdirectory(http_tsdb)
473475
endif()
474476

broker/core/src/config/parser.cc

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -574,17 +574,40 @@ void parser::_parse_endpoint(const json& elem,
574574
else // Output.
575575
member = &endpoint::write_filters;
576576
(e.*member).clear();
577-
if (it.value().is_object() && it.value()["category"].is_array())
578-
for (auto& cat : it.value()["category"])
579-
(e.*member).insert(cat.get<std::string>());
580-
else if (it.value().is_object() && it.value()["category"].is_string())
581-
(e.*member).insert(it.value()["category"].get<std::string>());
582-
else if (it.value().is_string() && it.value().get<std::string>() == "all")
577+
const auto& filter_obj = it.value();
578+
if (filter_obj.is_string() && filter_obj.get<std::string>() == "all") {
583579
(e.*member).insert("all");
584-
else
580+
} else if (!filter_obj.is_object()) {
585581
throw msg_fmt(
586582
"config parser: cannot parse key "
587-
"'filters': value is invalid");
583+
"'filters': value is not an object");
584+
}
585+
586+
if (filter_obj.contains("category")) {
587+
const auto& category = filter_obj["category"];
588+
if (category.is_array())
589+
for (auto& cat : category)
590+
(e.*member).insert(cat.get<std::string>());
591+
else if (category.is_string())
592+
(e.*member).insert(category.get<std::string>());
593+
else
594+
throw msg_fmt(
595+
"config parser: cannot parse key "
596+
"'filters': value is invalid");
597+
}
598+
if (filter_obj.contains("event")) {
599+
const auto& event = filter_obj["event"];
600+
if (event.is_array())
601+
for (auto& cat : event)
602+
(e.*member).insert(cat.get<std::string>());
603+
else if (event.is_string())
604+
(e.*member).insert(event.get<std::string>());
605+
else
606+
throw msg_fmt(
607+
"config parser: cannot parse key "
608+
"'filters': value is invalid");
609+
}
610+
588611
} else if (it.key() == "cache") {
589612
auto cc = check_and_read<bool>(elem, "cache");
590613
e.cache_enabled = cc ? cc.value() : false;
@@ -614,6 +637,8 @@ void parser::_parse_endpoint(const json& elem,
614637
module = "70-influxdb.so";
615638
else if (e.type == "victoria_metrics")
616639
module = "70-victoria_metrics.so";
640+
else if (e.type == "event_script")
641+
module = "80-event_script.so";
617642
else if (e.type == "grpc")
618643
module = "50-grpc.so";
619644
else if (e.type == "bbdo_server" || e.type == "bbdo_client") {

broker/event_script/CMakeLists.txt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#
2+
# Copyright 2026 Centreon
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
# use this file except in compliance with the License. You may obtain a copy of
6+
# the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations under
14+
# the License.
15+
#
16+
# For more information : contact@centreon.com
17+
#
18+
19+
# Global options.
20+
set(INC_DIR "${PROJECT_SOURCE_DIR}/event_script/inc")
21+
set(SRC_DIR "${PROJECT_SOURCE_DIR}/event_script/src")
22+
set(TEST_DIR "${PROJECT_SOURCE_DIR}/event_script/test")
23+
include_directories(
24+
${INC_DIR} ${PROJECT_SOURCE_DIR}/neb/inc ${CMAKE_SOURCE_DIR}/common/process/inc)
25+
26+
add_library(
27+
"80-event_script" SHARED
28+
# Sources.
29+
"${SRC_DIR}/connector.cc"
30+
"${SRC_DIR}/factory.cc"
31+
"${SRC_DIR}/main.cc"
32+
"${SRC_DIR}/stream.cc"
33+
)
34+
target_link_libraries(
35+
"80-event_script" PRIVATE fmt::fmt)
36+
37+
target_precompile_headers("80-event_script" PRIVATE precomp_inc/precomp.hpp)
38+
set_target_properties("80-event_script" PROPERTIES PREFIX ""
39+
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/broker/lib")
40+
41+
add_dependencies("80-event_script" process_stat)
42+
43+
#Testing.
44+
if(WITH_TESTING)
45+
set(TESTS_SOURCES
46+
${TESTS_SOURCES} ${TEST_DIR}/factory_test.cc ${TEST_DIR}/stream_test.cc
47+
PARENT_SCOPE)
48+
set(TESTS_LIBRARIES
49+
${TESTS_LIBRARIES} 80-event_script
50+
PARENT_SCOPE)
51+
endif(WITH_TESTING)
52+
53+
# Install rule.
54+
install(TARGETS "80-event_script" LIBRARY DESTINATION "${PREFIX_MODULES}")
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright 2026 Centreon
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* For more information : contact@centreon.com
17+
*/
18+
19+
#ifndef CCB_EVENT_SCRIPT_CONNECTOR_HH
20+
#define CCB_EVENT_SCRIPT_CONNECTOR_HH
21+
22+
#include "com/centreon/broker/io/endpoint.hh"
23+
24+
namespace com::centreon::broker::event_script {
25+
26+
/**
27+
* @class connector connector.hh "com/centreon/broker/event_script/connector.hh"
28+
* @brief Crate an event_script stream.
29+
*/
30+
class connector : public io::endpoint {
31+
std::string _script_path;
32+
std::chrono::system_clock::duration _managed_event_ttl;
33+
std::chrono::system_clock::duration _timeout;
34+
35+
public:
36+
connector();
37+
~connector() noexcept = default;
38+
connector(const connector&) = delete;
39+
connector& operator=(const connector&) = delete;
40+
void connect_to(const std::string_view& script_path,
41+
const std::chrono::system_clock::duration& managed_event_ttl,
42+
const std::chrono::system_clock::duration& timeout);
43+
std::shared_ptr<io::stream> open() override;
44+
};
45+
46+
} // namespace com::centreon::broker::event_script
47+
48+
#endif // !CCB_INFLUXDB_CONNECTOR_HH
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2026 Centreon
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* For more information : contact@centreon.com
17+
*/
18+
19+
#ifndef CCB_EVENT_SCRIPT_FACTORY_HH
20+
#define CCB_EVENT_SCRIPT_FACTORY_HH
21+
22+
#include "com/centreon/broker/io/factory.hh"
23+
24+
namespace com::centreon::broker::event_script {
25+
26+
/**
27+
* @class factory factory.hh "com/centreon/broker/influxdb/factory.hh"
28+
* @brief event_script layer factory.
29+
*
30+
* Build event_script layer objects.
31+
*/
32+
class factory : public io::factory {
33+
public:
34+
factory() = default;
35+
factory(factory const&) = delete;
36+
~factory() = default;
37+
factory& operator=(factory const& other) = delete;
38+
bool has_endpoint(const config::endpoint& cfg,
39+
io::extension* ext) const override;
40+
io::endpoint* new_endpoint(
41+
config::endpoint& cfg,
42+
const std::map<std::string, std::string>& global_params,
43+
bool& is_acceptor,
44+
std::shared_ptr<persistent_cache> cache) const override;
45+
};
46+
47+
} // namespace com::centreon::broker::event_script
48+
49+
#endif // !CCB_INFLUXDB_FACTORY_HH
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Copyright 2026 Centreon
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* For more information : contact@centreon.com
17+
*/
18+
19+
#ifndef CCB_EVENT_SCRIPT_STREAM_HH
20+
#define CCB_EVENT_SCRIPT_STREAM_HH
21+
22+
#include "com/centreon/broker/io/protobuf.hh"
23+
#include "com/centreon/broker/io/stream.hh"
24+
#include "com/centreon/common/process/process.hh"
25+
26+
namespace com::centreon::broker::event_script {
27+
28+
/**
29+
* @brief this struct is used to create unordered containers of protobuf
30+
* events Caution, it must not be used with ordered containers (,) operator
31+
* only test equality
32+
*
33+
*/
34+
struct io_data_compare {
35+
using is_transparent = void;
36+
bool operator()(const std::shared_ptr<io::protobuf_base>& left,
37+
const std::shared_ptr<io::protobuf_base>& right) const;
38+
size_t operator()(const std::shared_ptr<io::protobuf_base>& to_hash) const;
39+
};
40+
41+
/**
42+
* @brief This stream executes a script on each event received from muxer
43+
*
44+
*/
45+
class stream : public io::stream, public std::enable_shared_from_this<stream> {
46+
std::chrono::system_clock::duration _managed_event_ttl;
47+
std::chrono::system_clock::duration _timeout;
48+
49+
/**
50+
* @brief we store event with insertion timestamp
51+
* So we are able to not deal several times the same event object over a
52+
* defined period
53+
*
54+
*/
55+
struct event_with_time {
56+
std::shared_ptr<io::protobuf_base> evt;
57+
std::chrono::system_clock::time_point inserted;
58+
};
59+
60+
using event_cont = boost::multi_index::multi_index_container<
61+
event_with_time,
62+
boost::multi_index::indexed_by<
63+
boost::multi_index::hashed_unique<
64+
BOOST_MULTI_INDEX_MEMBER(event_with_time,
65+
std::shared_ptr<io::protobuf_base>,
66+
evt),
67+
io_data_compare,
68+
io_data_compare>,
69+
boost::multi_index::ordered_non_unique<BOOST_MULTI_INDEX_MEMBER(
70+
event_with_time,
71+
std::chrono::system_clock::time_point,
72+
inserted)>>>;
73+
74+
event_cont _events;
75+
76+
std::shared_ptr<spdlog::logger> _logger;
77+
78+
std::atomic_uint _to_ack;
79+
bool _writing ABSL_GUARDED_BY(_write_queue_m);
80+
81+
std::queue<std::shared_ptr<io::protobuf_base>> _write_queue
82+
ABSL_GUARDED_BY(_write_queue_m);
83+
absl::Mutex _write_queue_m;
84+
85+
const com::centreon::common::process_args::pointer _script_cmdline;
86+
87+
void _write(const std::shared_ptr<io::protobuf_base>& event);
88+
void _write_completion(const common::process<true>& proc,
89+
int exit_code,
90+
common::e_exit_status exit_status,
91+
const std::string& std_err);
92+
93+
public:
94+
stream(const std::string_view& script_path,
95+
const std::chrono::system_clock::duration& managed_event_ttl,
96+
const std::chrono::system_clock::duration& timeout);
97+
98+
bool read(std::shared_ptr<io::data>& d, time_t deadline) override;
99+
int write(const std::shared_ptr<io::data>& d) override;
100+
int32_t stop() override;
101+
};
102+
103+
} // namespace com::centreon::broker::event_script
104+
105+
#endif
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2026 Centreon
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* For more information : contact@centreon.com
17+
*/
18+
19+
#ifndef CC_EVENT_SCRIPT_PRECOMP_HH
20+
#define CC_EVENT_SCRIPT_PRECOMP_HH
21+
22+
#include <atomic>
23+
#include <condition_variable>
24+
#include <deque>
25+
#include <future>
26+
#include <limits>
27+
#include <list>
28+
#include <queue>
29+
#include <set>
30+
#include <string>
31+
#include <thread>
32+
#include <unordered_map>
33+
#include <vector>
34+
35+
#include <absl/base/thread_annotations.h>
36+
#include <absl/container/btree_map.h>
37+
#include <absl/container/btree_set.h>
38+
#include <absl/container/flat_hash_map.h>
39+
#include <absl/container/flat_hash_set.h>
40+
#include <absl/synchronization/mutex.h>
41+
42+
#include <boost/asio.hpp>
43+
// with this define boost::interprocess doesn't need Boost.DataTime
44+
#define BOOST_DATE_TIME_NO_LIB 1
45+
#include <boost/interprocess/containers/string.hpp>
46+
#include <boost/interprocess/managed_mapped_file.hpp>
47+
#include <boost/multi_index/hashed_index.hpp>
48+
#include <boost/multi_index/member.hpp>
49+
#include <boost/multi_index/ordered_index.hpp>
50+
#include <boost/multi_index_container.hpp>
51+
52+
namespace asio = boost::asio;
53+
54+
#include <spdlog/fmt/ostr.h>
55+
#include <spdlog/fmt/ranges.h>
56+
#include <spdlog/spdlog.h>
57+
58+
#endif

0 commit comments

Comments
 (0)