Skip to content

Commit 5cd5e35

Browse files
authored
Avoid returning an all brokers down error on planned disconnections (#5126)
* Skip increasing brokers down count for planned disconnections, those that were logged as DEBUG, including idle disconnections. Only the first time it reaches the DOWN state it's skipped but it's counted if there are further disconnections. * Avoid returning an all brokers down error on planned disconnections This is done by avoiding to count planned disconnections, such as idle disconnections, broker host change and similar, as events that can cause the client to reach the "all brokers down" state, returning an error and possibly restarting a re-bootstrap sequence since 2.10.0 * Add more information to disconnection error messages * Skip mock test when using SSL * `rd_atomic32_set` and `rd_atomic64_set` return the previous value to be able to check if the atomic set changed the value * Fix for `rd_kafka_broker_set_error` * Regarding broker errors, only log warnings and only report errors to the user for levels less or equal to `ERR` * Reduce flakyness: test 0083 * Reset the `rkb_down_reported` field when a broker reaches the `UP` state, to check again other brokers before reaching the `ALL_BROKERS_DOWN` state. Avoids repeated `ALL_BROKERS_DOWN`, maybe triggered by idle disconnections when only connecting to a single broker * Consider the disconnection after timeout as a planned disconnection that doesn't count for reaching the all brokers down state * Reset any broker down reported on re-bootstrap too * Revert `rd_kafka_connect_any` to the previous version. Given we reset the broker down reported state it's not needed anymore to prioritize the learned brokers as all brokers will be tried again before starting a new re-boostrap sequence * Add the `rk_rebootstrap_in_progress` field to prevent duplicated re-bootstrap starts if the initial re-bootstrap sequence isn't completed yet. * Remove flakyness from test 0151 - `do_test_down_then_up_no_rebootstrap_loop` and make it consistent with current changes * Fix test 0034 given the down reported state is reset, after retrying on re-bootstrap a new ALL_BROKERS_DOWN error is issued. This happens after 1 second max (`rd_kafka_broker_reconnect_backoff`) and the test 0034 timeout is 5 seconds so it doesn't timeout and set the broker up again. With an explicit timeout check the test is reliable. * Fix memory leaks in tests * Fix flakyness in test 0075 when using valgrind due to timing and multiple connect requests * Set up a fixed max jitter of 2s * Remove flakyness from test 0086_purge_remote * Fix for a deadlock when calling `rd_kafka_filter_broker_by_GetTelemetrySubscription` introduced with #5130 (not released)
1 parent a5f5321 commit 5cd5e35

23 files changed

+346
-230
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
librdkafka v2.11.1 is a maintenance release:
44

55
* Made the conditions for enabling the features future proof (#5130).
6+
* Avoid returning an all brokers down error on planned disconnections (#5126).
7+
* An "all brokers down" error isn't returned when we haven't tried to connect
8+
to all brokers since last successful connection (#5126).
69

710

811
## Fixes
@@ -16,6 +19,20 @@ librdkafka v2.11.1 is a maintenance release:
1619
a range and were failing if the older version was removed.
1720
Happening since 1.x (#5130).
1821

22+
* Issues: #5142.
23+
Avoid returning an all brokers down error on planned disconnections.
24+
This is done by avoiding to count planned disconnections, such as idle
25+
disconnections, broker host change and similar as events that can cause
26+
the client to reach the "all brokers down" state, returning an error and
27+
since 2.10.0 possibly starting a re-bootstrap sequence.
28+
* Issues: #5142.
29+
An "all brokers down" error isn't returned when we haven't tried to connect
30+
to all brokers since last successful connection. It happened because the down
31+
state is cached and can be stale when a connection isn't needed to that
32+
particular broker. Solved by resetting the cached broker down state when any
33+
broker successfully connects, so that broker needs to be tried again.
34+
Happening since 1.x (#5126).
35+
1936

2037

2138
# librdkafka v2.11.0

CONFIGURATION.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ socket.max.fails | * | 0 .. 1000000 | 1
3232
broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds). <br>*Type: integer*
3333
broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6 <br>*Type: enum value*
3434
socket.connection.setup.timeout.ms | * | 1000 .. 2147483647 | 30000 | medium | Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake). If the connection to the broker is not fully functional after this the connection will be closed and retried. <br>*Type: integer*
35-
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). <br>*Type: integer*
35+
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). Actual value can be lower, up to 2s lower, only if `connections.max.idle.ms` >= 4s, as jitter is added to avoid disconnecting all brokers at the same time. <br>*Type: integer*
3636
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`. <br>*Type: integer*
3737
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. <br>*Type: integer*
3838
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed. <br>*Type: integer*

src/rdatomic.h

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,22 +111,25 @@ static RD_INLINE int32_t RD_UNUSED rd_atomic32_get(rd_atomic32_t *ra) {
111111
#endif
112112
}
113113

114+
/**
115+
* @brief Set the atomic value to \p v and return the previous value to
116+
* that can be used to determine if the value was changed.
117+
*/
114118
static RD_INLINE int32_t RD_UNUSED rd_atomic32_set(rd_atomic32_t *ra,
115119
int32_t v) {
116120
#ifdef _WIN32
117121
return InterlockedExchange((LONG *)&ra->val, v);
118122
#elif !HAVE_ATOMICS_32
119123
int32_t r;
120124
mtx_lock(&ra->lock);
121-
r = ra->val = v;
125+
r = rd->val;
126+
ra->val = v;
122127
mtx_unlock(&ra->lock);
123128
return r;
124129
#elif HAVE_ATOMICS_32_ATOMIC
125-
__atomic_store_n(&ra->val, v, __ATOMIC_SEQ_CST);
126-
return v;
130+
return __atomic_exchange_n(&ra->val, v, __ATOMIC_SEQ_CST);
127131
#elif HAVE_ATOMICS_32_SYNC
128-
(void)__sync_lock_test_and_set(&ra->val, v);
129-
return v;
132+
return __sync_lock_test_and_set(&ra->val, v);
130133
#else
131134
return ra->val = v; // FIXME
132135
#endif
@@ -200,24 +203,25 @@ static RD_INLINE int64_t RD_UNUSED rd_atomic64_get(rd_atomic64_t *ra) {
200203
#endif
201204
}
202205

203-
206+
/**
207+
* @brief Set the atomic value to \p v and return the previous value to
208+
* that can be used to determine if the value was changed.
209+
*/
204210
static RD_INLINE int64_t RD_UNUSED rd_atomic64_set(rd_atomic64_t *ra,
205211
int64_t v) {
206212
#ifdef _WIN32
207213
return InterlockedExchange64(&ra->val, v);
208214
#elif !HAVE_ATOMICS_64
209215
int64_t r;
210216
mtx_lock(&ra->lock);
211-
ra->val = v;
212217
r = ra->val;
218+
ra->val = v;
213219
mtx_unlock(&ra->lock);
214220
return r;
215221
#elif HAVE_ATOMICS_64_ATOMIC
216-
__atomic_store_n(&ra->val, v, __ATOMIC_SEQ_CST);
217-
return v;
222+
return __atomic_exchange_n(&ra->val, v, __ATOMIC_SEQ_CST);
218223
#elif HAVE_ATOMICS_64_SYNC
219-
(void)__sync_lock_test_and_set(&ra->val, v);
220-
return v;
224+
return __sync_lock_test_and_set(&ra->val, v);
221225
#else
222226
return ra->val = v; // FIXME
223227
#endif

src/rdkafka.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,6 +2060,25 @@ static void rd_kafka_1s_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
20602060
rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
20612061
}
20622062

2063+
/**
2064+
* @brief Reset broker down reported flag for all brokers.
2065+
* In case it was set to 1 it will be reset to 0 and
2066+
* the broker down count will be decremented.
2067+
*
2068+
* @locks none
2069+
* @locks_acquired rd_kafka_rdlock()
2070+
* @locality any
2071+
*/
2072+
void rd_kafka_reset_any_broker_down_reported(rd_kafka_t *rk) {
2073+
rd_kafka_broker_t *rkb;
2074+
rd_kafka_rdlock(rk);
2075+
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
2076+
if (rd_atomic32_set(&rkb->rkb_down_reported, 0) == 1)
2077+
rd_atomic32_sub(&rk->rk_broker_down_cnt, 1);
2078+
}
2079+
rd_kafka_rdunlock(rk);
2080+
}
2081+
20632082
/**
20642083
* @brief Re-bootstrap timer callback.
20652084
*
@@ -2087,6 +2106,9 @@ static void rd_kafka_rebootstrap_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
20872106

20882107
rd_kafka_dbg(rk, ALL, "REBOOTSTRAP", "Starting re-bootstrap sequence");
20892108

2109+
rd_atomic32_set(&rk->rk_rebootstrap_in_progress, 1);
2110+
rd_kafka_reset_any_broker_down_reported(rk);
2111+
20902112
if (rk->rk_conf.brokerlist) {
20912113
rd_kafka_brokers_add0(
20922114
rk,
@@ -2098,7 +2120,7 @@ static void rd_kafka_rebootstrap_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
20982120
rd_kafka_rdlock(rk);
20992121
if (rd_list_cnt(&rk->additional_brokerlists) == 0) {
21002122
rd_kafka_rdunlock(rk);
2101-
return;
2123+
goto done;
21022124
}
21032125

21042126
rd_list_init_copy(&additional_brokerlists, &rk->additional_brokerlists);
@@ -2113,6 +2135,8 @@ static void rd_kafka_rebootstrap_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
21132135
* names even if requested */);
21142136
}
21152137
rd_list_destroy(&additional_brokerlists);
2138+
done:
2139+
rd_atomic32_set(&rk->rk_rebootstrap_in_progress, 0);
21162140
}
21172141

21182142
static void rd_kafka_stats_emit_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
@@ -2377,6 +2401,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23772401
rd_atomic32_init(&rk->rk_logical_broker_cnt, 0);
23782402
rd_atomic32_init(&rk->rk_broker_up_cnt, 0);
23792403
rd_atomic32_init(&rk->rk_broker_down_cnt, 0);
2404+
rd_atomic32_init(&rk->rk_rebootstrap_in_progress, 0);
23802405

23812406
rk->rk_rep = rd_kafka_q_new(rk);
23822407
rk->rk_ops = rd_kafka_q_new(rk);
@@ -2814,9 +2839,15 @@ void rd_kafka_rebootstrap(rd_kafka_t *rk) {
28142839
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
28152840
return;
28162841

2817-
rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rebootstrap_tmr,
2818-
rd_true /*restart*/, 0,
2819-
rd_kafka_rebootstrap_tmr_cb, NULL);
2842+
if (rd_atomic32_set(&rk->rk_rebootstrap_in_progress, 1) == 0) {
2843+
/* Only when not already in progress 0 -> 1.
2844+
* After setting down a learned broker it could reconnect and
2845+
* disconnect again before previous reboostrap completes,
2846+
* causing a new re-bootstrap. */
2847+
rd_kafka_timer_start_oneshot(
2848+
&rk->rk_timers, &rk->rebootstrap_tmr, rd_true /*restart*/,
2849+
0, rd_kafka_rebootstrap_tmr_cb, NULL);
2850+
}
28202851
}
28212852

28222853
/**

0 commit comments

Comments
 (0)