Skip to content

Commit c7649b8

Browse files
authored
Fix for KIP-1102 time based re-bootstrap condition (#5177)
Re-bootstrap is now triggered only after metadata.recovery.rebootstrap.trigger.ms have passed since first metadata refresh request after last successful metadata response. The calculation was since last successful metadata response so it's possible it did overlap with the periodic topic.metadata.refresh.interval.ms and cause a re-bootstrap even if not needed.
1 parent acec9a5 commit c7649b8

File tree

5 files changed

+75
-8
lines changed

5 files changed

+75
-8
lines changed

CHANGELOG.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,23 @@ librdkafka v2.12.0 is a feature release:
44

55
* Fix compression types read issue in GetTelemetrySubscriptions response
66
for big-endian architectures (#5183, @paravoid).
7+
* Fix for KIP-1102 time based re-bootstrap condition (#5177).
8+
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
9+
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
710

811

912
## Fixes
1013

1114
### General fixes
12-
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
13-
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
15+
16+
* Issues: #5178.
17+
Fix for KIP-1102 time based re-bootstrap condition.
18+
Re-bootstrap is now triggered only after `metadata.recovery.rebootstrap.trigger.ms`
19+
have passed since first metadata refresh request after last successful
20+
metadata response. The calculation was since last successful metadata response
21+
so it's possible it did overlap with the periodic `topic.metadata.refresh.interval.ms`
22+
and cause a re-bootstrap even if not needed.
23+
Happening since 2.11.0 (#5177).
1424

1525
### Telemetry fixes
1626

src/rdkafka.c

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2833,6 +2833,10 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
28332833

28342834
/**
28352835
* Schedules a rebootstrap of the cluster immediately.
2836+
*
2837+
* @locks none
2838+
* @locks_acquired rd_kafka_timers_lock()
2839+
* @locality any
28362840
*/
28372841
void rd_kafka_rebootstrap(rd_kafka_t *rk) {
28382842
if (rk->rk_conf.metadata_recovery_strategy ==
@@ -2851,22 +2855,42 @@ void rd_kafka_rebootstrap(rd_kafka_t *rk) {
28512855
}
28522856

28532857
/**
2854-
* Restarts rebootstrap timer with the configured interval.
2858+
* Starts rebootstrap timer with the configured interval. Only if not
2859+
* started or stopped.
28552860
*
28562861
* @locks none
2862+
* @locks_acquired rd_kafka_timers_lock()
28572863
* @locality any
28582864
*/
2859-
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk) {
2865+
void rd_kafka_rebootstrap_tmr_start_maybe(rd_kafka_t *rk) {
28602866
if (rk->rk_conf.metadata_recovery_strategy ==
28612867
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
28622868
return;
28632869

28642870
rd_kafka_timer_start_oneshot(
2865-
&rk->rk_timers, &rk->rebootstrap_tmr, rd_true /*restart*/,
2871+
&rk->rk_timers, &rk->rebootstrap_tmr, rd_false /*don't restart*/,
28662872
rk->rk_conf.metadata_recovery_rebootstrap_trigger_ms * 1000LL,
28672873
rd_kafka_rebootstrap_tmr_cb, NULL);
28682874
}
28692875

2876+
/**
2877+
* Stops rebootstrap timer, for example after a successful metadata response.
2878+
*
2879+
* @return 1 if the timer was started (before being stopped), else 0.
2880+
*
2881+
* @locks none
2882+
* @locks_acquired rd_kafka_timers_lock()
2883+
* @locality any
2884+
*/
2885+
int rd_kafka_rebootstrap_tmr_stop(rd_kafka_t *rk) {
2886+
if (rk->rk_conf.metadata_recovery_strategy ==
2887+
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
2888+
return 0;
2889+
2890+
return rd_kafka_timer_stop(&rk->rk_timers, &rk->rebootstrap_tmr,
2891+
rd_true /* lock */);
2892+
}
2893+
28702894
/**
28712895
* Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
28722896
* friends) since it does not have an API for stopping the cgrp we will need to

src/rdkafka_int.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,9 @@ rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
12511251

12521252
void rd_kafka_rebootstrap(rd_kafka_t *rk);
12531253

1254-
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk);
1254+
void rd_kafka_rebootstrap_tmr_start_maybe(rd_kafka_t *rk);
1255+
1256+
int rd_kafka_rebootstrap_tmr_stop(rd_kafka_t *rk);
12551257

12561258
void rd_kafka_reset_any_broker_down_reported(rd_kafka_t *rk);
12571259

src/rdkafka_metadata.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
10331033
rd_kafka_wrlock(rkb->rkb_rk);
10341034

10351035
rkb->rkb_rk->rk_ts_metadata = rd_clock();
1036-
rd_kafka_rebootstrap_tmr_restart(rkb->rkb_rk);
10371036

10381037
/* Update cached cluster id. */
10391038
if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&

src/rdkafka_request.c

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2620,6 +2620,7 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
26202620
if (err)
26212621
goto err;
26222622

2623+
rd_kafka_rebootstrap_tmr_stop(rk);
26232624
if (rko && rko->rko_replyq.q) {
26242625
/* Reply to metadata requester, passing on the metadata.
26252626
* Reuse requesting rko for the reply. */
@@ -2744,6 +2745,18 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
27442745
*
27452746
* @sa rd_kafka_MetadataRequest().
27462747
* @sa rd_kafka_MetadataRequest_resp_cb().
2748+
*
2749+
* @locality any
2750+
* @locks none
2751+
* @locks_acquired
2752+
* - mtx_lock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock) in
2753+
* this function
2754+
* - rd_kafka_broker_lock() in
2755+
* rd_kafka_broker_ApiVersion_supported()
2756+
* - rd_kafka_timers_lock() in
2757+
* rd_kafka_rebootstrap_tmr_start_maybe()
2758+
* - mtx_lock(&rkq->rkq_lock) in
2759+
* rd_kafka_broker_buf_enq_replyq()
27472760
*/
27482761
static rd_kafka_resp_err_t
27492762
rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
@@ -2877,9 +2890,28 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
28772890
rkbuf->rkbuf_u.Metadata.decr = full_incr;
28782891
rkbuf->rkbuf_u.Metadata.decr_lock =
28792892
&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock;
2893+
} else if (!resp_cb) {
2894+
/*
2895+
* Conditions for starting the rebootstrap timer:
2896+
*
2897+
* !full_incr:
2898+
* In case it's a full request, forced or not, it won't be
2899+
* retried on the same broker to avoid blocking metadata
2900+
* requests, because of the lock, when that broker isn't
2901+
* available. We don't start the timer as we cannot ensure the
2902+
* request is retried for the duration of
2903+
* `metadata.recovery.rebootstrap.trigger.ms`.
2904+
*
2905+
* !resp_cb:
2906+
* Same reasoning applies when we use a custom callback, for the
2907+
* AdminClient requests for example.
2908+
*
2909+
* Start the rebootstrap timer only if it's the first
2910+
* metadata refresh request after last successful response,
2911+
* so the timer is not reset if already scheduled. */
2912+
rd_kafka_rebootstrap_tmr_start_maybe(rkb->rkb_rk);
28802913
}
28812914

2882-
28832915
if (topic_cnt > 0) {
28842916
char *topic;
28852917
int i;

0 commit comments

Comments
 (0)