Skip to content

Commit fb5796f

Browse files
MON-193976 failover can read endpoint config and increase failed write retries delay (#3160) (#3194)
* splitter read does not loop and only splitter write error per second
1 parent b5eb7a2 commit fb5796f

File tree

9 files changed

+202
-21
lines changed

9 files changed

+202
-21
lines changed

broker/core/inc/com/centreon/broker/file/splitter.hh

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,19 @@ class splitter : public fs_file {
7373
std::atomic_int _wid;
7474
long _woffset;
7575

76+
/**
77+
* @brief when disk is full, we will log all events write failures so we will
78+
* log only once by second
79+
*
80+
*/
81+
time_t _last_write_error_log;
82+
7683
void _open_read_file();
7784
bool _open_write_file();
7885

7986
public:
80-
splitter(const std::string& path, uint32_t max_file_size = 100000000u,
87+
splitter(const std::string& path,
88+
uint32_t max_file_size = 100000000u,
8189
bool auto_delete = false);
8290
~splitter();
8391
splitter(const splitter&) = delete;

broker/core/inc/com/centreon/broker/processing/failover.hh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#ifndef CCB_PROCESSING_FAILOVER_HH
2020
#define CCB_PROCESSING_FAILOVER_HH
2121

22-
#include <climits>
22+
#include "com/centreon/broker/config/endpoint.hh"
2323
#include "com/centreon/broker/io/endpoint.hh"
2424
#include "com/centreon/broker/multiplexing/muxer.hh"
2525

@@ -53,13 +53,14 @@ class failover : public endpoint {
5353
_running_state _state ABSL_GUARDED_BY(_state_m);
5454
mutable absl::Mutex _state_m;
5555
std::shared_ptr<spdlog::logger> _logger;
56+
unsigned _max_retry_delay;
5657

5758
void _run();
5859

5960
public:
6061
failover(std::shared_ptr<io::endpoint> endp,
6162
std::shared_ptr<multiplexing::muxer> mux,
62-
std::string const& name);
63+
const config::endpoint& cfg);
6364
failover(failover const& other) = delete;
6465
failover& operator=(failover const& other) = delete;
6566
~failover();

broker/core/src/config/applier/endpoint.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ processing::failover* endpoint::_create_failover(
455455
}
456456

457457
// Return failover thread.
458-
auto fo{std::make_unique<processing::failover>(endp, mux, cfg.name)};
458+
auto fo{std::make_unique<processing::failover>(endp, mux, cfg)};
459459
fo->set_buffering_timeout(cfg.buffering_timeout);
460460
fo->set_retry_interval(cfg.retry_interval);
461461
fo->set_failover(failovr);

broker/core/src/file/splitter.cc

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ splitter::splitter(std::string const& path,
5757
_roffset{0},
5858
_write_m{},
5959
_wfile{},
60-
_woffset{0} {
60+
_woffset{0},
61+
_last_write_error_log(0) {
6162
// Get IDs of already existing file parts. File parts are suffixed
6263
// with their order number. A file named /var/lib/foo would have
6364
// parts named /var/lib/foo, /var/lib/foo1, /var/lib/foo2, ...
@@ -143,8 +144,23 @@ long splitter::read(void* buffer, long max_size) {
143144
/* No lock here, there is only one consumer. */
144145
if (!_rfile) {
145146
_open_read_file();
146-
if (!_rfile)
147+
if (!_rfile) {
148+
// open read failure => next file
149+
bool have_to_read_next_file = false;
150+
{
151+
std::lock_guard l(_write_m);
152+
if (_rid < _wid) {
153+
_rid++;
154+
have_to_read_next_file = true;
155+
} else {
156+
throw exceptions::shutdown("No more data to read");
157+
}
158+
}
159+
if (have_to_read_next_file) {
160+
return read(buffer, max_size);
161+
}
147162
return 0;
163+
}
148164
}
149165

150166
/* We introduce the locker, but don't lock if not necessary */
@@ -448,10 +464,14 @@ bool splitter::_open_write_file() {
448464
size_t size = disk_accessor::instance().fwrite(
449465
header.bytes, 1, sizeof(header), _wfile.get());
450466
if (size != sizeof(header)) {
451-
std::string wfile(get_file_path(_wid));
452-
char msg[1024];
453-
logger->critical("splitter: cannot write to file '{}': {}", wfile,
454-
strerror_r(errno, msg, sizeof(msg)));
467+
time_t now = time(nullptr);
468+
if (now > _last_write_error_log) {
469+
_last_write_error_log = now;
470+
std::string wfile(get_file_path(_wid));
471+
char msg[1024];
472+
logger->critical("splitter: cannot write to file '{}': {}", wfile,
473+
strerror_r(errno, msg, sizeof(msg)));
474+
}
455475
_wfile.reset();
456476
return false;
457477
}

broker/core/src/processing/failover.cc

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ using log_v2 = com::centreon::common::log_v2::log_v2;
3939
*/
4040
failover::failover(std::shared_ptr<io::endpoint> endp,
4141
std::shared_ptr<multiplexing::muxer> mux,
42-
const std::string& name)
43-
: endpoint(false, name),
42+
const config::endpoint& cfg)
43+
: endpoint(false, cfg.name),
4444
_should_exit(false),
4545
_state(not_started),
4646
_logger{log_v2::instance().get(log_v2::PROCESSING)},
47+
_max_retry_delay(30),
4748
_buffering_timeout(0),
4849
_endpoint(endp),
4950
_failover_launched(false),
@@ -52,6 +53,14 @@ failover::failover(std::shared_ptr<io::endpoint> endp,
5253
_muxer(mux),
5354
_update(false) {
5455
SPDLOG_LOGGER_TRACE(_logger, "failover '{}' construction.", _name);
56+
57+
auto search = cfg.params.find("max_retry_delay");
58+
if (search != cfg.params.end()) {
59+
if (!absl::SimpleAtoi(search->second, &_max_retry_delay)) {
60+
throw msg_fmt("max_retry_delay needs a numerical value and not {}",
61+
search->second);
62+
}
63+
}
5564
}
5665

5766
/**
@@ -158,6 +167,27 @@ void failover::_run() {
158167
return;
159168
}
160169

170+
unsigned error_retry_delay = 0;
171+
172+
auto increase_retry_delay_and_wait = [&]() {
173+
// in order to avoid a write infinite loop, we increase delay between two
174+
// failures
175+
if (!error_retry_delay) {
176+
error_retry_delay = 1;
177+
} else {
178+
error_retry_delay *= 2;
179+
}
180+
if (error_retry_delay > _max_retry_delay) {
181+
error_retry_delay = _max_retry_delay;
182+
}
183+
SPDLOG_LOGGER_ERROR(
184+
_logger, " {} Failed to send event to stream, we wait {s} before retry",
185+
_name, error_retry_delay);
186+
for (ssize_t i = 0; !should_exit() && i < error_retry_delay * 10; i++) {
187+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
188+
}
189+
};
190+
161191
auto on_exception_handler = [&]() {
162192
if (_stream) {
163193
int32_t ack_events;
@@ -173,6 +203,7 @@ void failover::_run() {
173203
}
174204
set_state("connecting");
175205
if (!should_exit()) {
206+
increase_retry_delay_and_wait();
176207
_launch_failover();
177208
_initialized = true;
178209
}
@@ -355,8 +386,16 @@ void failover::_run() {
355386
int we(0);
356387

357388
try {
358-
std::lock_guard<std::timed_mutex> stream_lock(_stream_m);
359-
we = _stream->write(d);
389+
{
390+
std::lock_guard<std::timed_mutex> stream_lock(_stream_m);
391+
we = _stream->write(d);
392+
}
393+
if (we < 0) { // stream write failure
394+
increase_retry_delay_and_wait();
395+
} else {
396+
// no exception, no error => reset error_retry_delay
397+
error_retry_delay = 0;
398+
}
360399
} catch (exceptions::shutdown const& e) {
361400
SPDLOG_LOGGER_DEBUG(
362401
_logger,
@@ -625,7 +664,8 @@ bool failover::should_exit() const {
625664
}
626665

627666
bool failover::wait_for_all_events_written(unsigned ms_timeout) {
628-
_logger->info("processing::failover::wait_for_all_events_written");
667+
SPDLOG_LOGGER_INFO(
668+
_logger, "{} processing::failover::wait_for_all_events_written", _name);
629669
std::lock_guard<std::timed_mutex> stream_lock(_stream_m);
630670
if (_stream) {
631671
return _stream->wait_for_all_events_written(ms_timeout);

broker/lua/src/luabinding.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,12 @@ int luabinding::write(std::shared_ptr<io::data> const& data) noexcept {
362362
else
363363
SPDLOG_LOGGER_ERROR(
364364
_logger, "lua: unknown error while running function `filter()'");
365-
RETURN_AND_POP(0);
365+
RETURN_AND_POP(-1);
366366
}
367367

368368
if (!lua_isboolean(_L, -1)) {
369369
SPDLOG_LOGGER_ERROR(_logger, "lua: `filter' must return a boolean");
370-
RETURN_AND_POP(0);
370+
RETURN_AND_POP(-1);
371371
}
372372

373373
execute_write = lua_toboolean(_L, -1);
@@ -403,12 +403,14 @@ int luabinding::write(std::shared_ptr<io::data> const& data) noexcept {
403403
else
404404
SPDLOG_LOGGER_ERROR(_logger,
405405
"lua: unknown error running function `write'");
406-
RETURN_AND_POP(0);
406+
RETURN_AND_POP(-1);
407407
}
408408

409409
if (!lua_isboolean(_L, -1)) {
410-
SPDLOG_LOGGER_ERROR(_logger, "lua: `write' must return a boolean");
411-
RETURN_AND_POP(0);
410+
// despite the fact that it's an error, we log it as an info because some SC
411+
// use it to delay next event write
412+
SPDLOG_LOGGER_INFO(_logger, "lua: `write' must return a boolean");
413+
RETURN_AND_POP(-1);
412414
}
413415
int acknowledge = lua_toboolean(_L, -1);
414416

broker/lua/test/lua.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4871,7 +4871,7 @@ TEST_F(LuaTest, BadLua) {
48714871
s->service_id = 18;
48724872
s->output = "Bonjour";
48734873
std::shared_ptr<io::data> svc(s.release());
4874-
ASSERT_EQ(binding->write(svc), 0);
4874+
ASSERT_EQ(binding->write(svc), -1);
48754875
RemoveFile(filename);
48764876
}
48774877

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,3 +840,82 @@ LUA_CACHE_SAVE_SERVICE_GROUP
840840

841841

842842

843+
LUA_FAILURE
844+
[Documentation] Given a engine broker configured with a SC, we check write intervals in case of lua write returns nothing
845+
[Tags] broker engine services lua MON-193976
846+
Ctn Clear Commands Status
847+
Ctn Clear Retention
848+
849+
Remove File /tmp/test-LUA.log
850+
Ctn Config Engine ${1} ${5} ${20}
851+
Ctn Config Broker central
852+
Ctn Config Broker module
853+
Ctn Config Broker rrd
854+
Ctn Broker Config Log central neb trace
855+
Ctn Broker Config Log central sql error
856+
Ctn Broker Config Log central lua debug
857+
Ctn Broker Config Log central processing debug
858+
Ctn Broker Config Source Log central 1
859+
Ctn Config Broker Sql Output central unified_sql
860+
861+
Remove File /tmp/test-LUA.log
862+
863+
${new_content} Catenate SEPARATOR=\n
864+
... function init(params)
865+
... broker_log:set_parameters(2, '/tmp/test-LUA.log')
866+
... broker_log:info(0, 'lua start test')
867+
... end
868+
...
869+
... function write(e)
870+
... local file = io.open("/tmp/test_lua_failure.txt", "r")
871+
... local now = os.time()
872+
... if file ~= nill then
873+
... broker_log:info(0, now .. ' lua write success')
874+
... file:close()
875+
... return true
876+
... else
877+
... broker_log:info(0, now .. ' lua write error')
878+
... return
879+
... end
880+
... end
881+
882+
# Create the initial LUA script file
883+
Create File /tmp/test-LUA.lua ${new_content}
884+
Create File /tmp/test_lua_failure.txt aa
885+
886+
Ctn Broker Config Add Lua Output central test-LUA /tmp/test-LUA.lua
887+
Ctn Broker Config Output Set central test-LUA max_retry_delay 16
888+
889+
${start} Get Current Date
890+
Ctn Start Broker
891+
Ctn Start Engine
892+
Ctn Wait For Engine To Be Ready ${start} ${1}
893+
894+
FOR ${index} IN RANGE 6
895+
${result} Grep File /tmp/test-LUA.log lua write success
896+
IF len("""${result}""") > 0 BREAK
897+
Sleep 5s
898+
END
899+
900+
Should Not Be Empty ${result} no write succes found in logs
901+
902+
Append To File /tmp/test-LUA.log now failure\n
903+
#now, write will fail
904+
Remove File /tmp/test_lua_failure.txt
905+
906+
#generate many events
907+
Ctn Restart Engine
908+
909+
FOR ${index} IN RANGE 90
910+
${lua_output} Get File /tmp/test-LUA.log
911+
${fail_lines} Get Regexp Matches ${lua_output} ([0-9]+) lua write error
912+
${fail_count} Get Length ${fail_lines}
913+
914+
IF ${fail_count} >= 7 BREAK
915+
Sleep 1s
916+
END
917+
918+
Should Be True ${fail_count} >= 7 no enougth write error found in logs
919+
920+
${res} Ctn Broker Check Failover Lua Retry ${fail_lines} 16
921+
Should Be True ${res} delays beetwen lua retry records are not as expected

tests/resources/Broker.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3287,3 +3287,34 @@ def ctn_check_acknowledgement_in_logs_table(date: int, timeout: int = TIMEOUT):
32873287
return True
32883288
time.sleep(2)
32893289
return False
3290+
3291+
3292+
def ctn_broker_check_failover_lua_retry(lua_log_lines, max_retry_delay: int):
3293+
"""
3294+
Check lines as 1770991869 lua write error
3295+
We check that first field (timestamp) obey to the failover increase interval law
3296+
3297+
:param lua_log_lines: lines given by grep
3298+
:param max_retry_delay: max_retry_delay configured in broker output
3299+
"""
3300+
last_timestamp = 0
3301+
last_interval = 1
3302+
3303+
timestamp_search = re.compile(r"(\d+).*")
3304+
for line in lua_log_lines:
3305+
extract = timestamp_search.search(line)
3306+
if extract is not None:
3307+
new_ts = int(extract.group(1))
3308+
if last_timestamp == 0:
3309+
last_timestamp = new_ts
3310+
else:
3311+
if new_ts != last_timestamp + last_interval:
3312+
logger.console.log(
3313+
f"expected interval: {last_interval}, but interval found: {new_ts} - {last_timestamp} = {new_ts - last_timestamp}")
3314+
return False
3315+
else:
3316+
last_timestamp = new_ts
3317+
last_interval = last_interval * 2
3318+
if last_interval > max_retry_delay:
3319+
last_interval = max_retry_delay
3320+
return True

0 commit comments

Comments
 (0)