Skip to content

Commit 3137946

Browse files
authored
[KIP-1102] Enable clients to rebootstrap based on timeout or error code (#4981)
1 parent 20ed352 commit 3137946

File tree

9 files changed

+100
-23
lines changed

9 files changed

+100
-23
lines changed

CHANGELOG.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
librdkafka v2.11.0 is a feature release:
44

5+
* [KIP-1102](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code) Enable clients to rebootstrap based on timeout or error code (#4981).
56
* Fix for poll ratio calculation in case the queues are forwarded (#5017).
67

78

@@ -124,8 +125,22 @@ librdkafka v2.10.0 is a feature release:
124125
> The [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) consumer is currently in **Preview** and should not be used in production environments. Implementation is feature complete but contract could have minor changes before General Availability.
125126
126127

128+
## Upgrade considerations
129+
130+
131+
Starting from this version, brokers not reported in Metadata RPC call are
132+
removed along with their threads. Brokers and their threads are added back
133+
when they appear in a Metadata RPC response again. When no brokers are left
134+
or they're not reachable, the client will start a re-bootstrap sequence
135+
by default. `metadata.recovery.strategy` controls this,
136+
which defaults to `rebootstrap`.
137+
Setting `metadata.recovery.strategy` to `none` avoids any re-bootstrapping and
138+
leaves only the broker received in last successful metadata response.
139+
140+
127141
## Enhancements and Fixes
128142

143+
* [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+producer+and+consumer+clients+to+rebootstrap) Allow producer and consumer clients to rebootstrap
129144
* Identify brokers only by broker id (#4557, @mfleming)
130145
* Remove unavailable brokers and their thread (#4557, @mfleming)
131146
* Commits during a cooperative incremental rebalance aren't causing
@@ -175,7 +190,7 @@ librdkafka v2.10.0 is a feature release:
175190
and connection.
176191
Happens since 1.x (#4557, @mfleming).
177192
* Issues: #4557
178-
Remove brokers not reported in a metadata call, along with their thread.
193+
Remove brokers not reported in a metadata call, along with their threads.
179194
Avoids that unavailable brokers are selected for a new connection when
180195
there's no one available. We cannot tell if a broker was removed
181196
temporarily or permanently so we always remove it and it'll be added back when

CONFIGURATION.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535
1212
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer*
1313
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
1414
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
15-
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client fails with a fatal error. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. <br>*Type: enum value*
15+
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client doesn't re-bootstrap. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously or the client cannot refresh metadata within `metadata.recovery.rebootstrap.trigger.ms` or it's requested in a metadata response. <br>*Type: enum value*
16+
metadata.recovery.rebootstrap.trigger.ms | * | 0 .. 2147483647 | 300000 | low | If a client configured to rebootstrap using `metadata.recovery.strategy=rebootstrap` is unable to obtain metadata from any of the brokers for this interval, client repeats the bootstrap process using `bootstrap.servers` configuration and brokers added through `rd_kafka_brokers_add()`. <br>*Type: integer*
1617
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
1718
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
1819
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*

INTRODUCTION.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2132,10 +2132,11 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
21322132
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
21332133
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
21342134
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 | Supported |
2135-
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 4.0.0 | Preview |
2135+
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 4.0.0 | Preview |
21362136
| KIP-899 - Allow producer and consumer clients to rebootstrap | 3.8.0 | Supported |
21372137
| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported |
21382138
| KIP-1082 - Require Client-Generated IDs over the ConsumerGroupHeartbeat | 4.0.0 | Supported |
2139+
| KIP-1102 - Enable clients to rebootstrap based on timeout or error code | 4.0.0 | Supported |
21392140

21402141

21412142

@@ -2153,7 +2154,7 @@ release of librdkafka.
21532154
| 0 | Produce | 12 | 10 |
21542155
| 1 | Fetch | 17 | 16 |
21552156
| 2 | ListOffsets | 10 | 7 |
2156-
| 3 | Metadata | 13 | 12 |
2157+
| 3 | Metadata | 13 | 13 |
21572158
| 8 | OffsetCommit | 9 | 9 |
21582159
| 9 | OffsetFetch | 9 | 9 |
21592160
| 10 | FindCoordinator | 6 | 2 |

src/rdkafka.c

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
720720
"maximum size the broker will accept"),
721721
_ERR_DESC(RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,
722722
"Broker: Client metadata is stale, "
723-
"client should rebootstrap to obtain new metadata."),
723+
"client should rebootstrap to obtain new metadata"),
724724
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
725725

726726

@@ -2077,15 +2077,13 @@ static void rd_kafka_rebootstrap_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
20772077
/* Avoid re-bootstrapping while terminating */
20782078
return;
20792079

2080+
rd_dassert(rk->rk_conf.metadata_recovery_strategy !=
2081+
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE);
20802082
if (rk->rk_conf.metadata_recovery_strategy ==
2081-
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE) {
2082-
rd_kafka_set_fatal_error(
2083-
rk, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED, "%s",
2084-
"Lost connection to broker(s) "
2085-
"and metadata recovery with re-bootstrap "
2086-
"is disabled");
2083+
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
2084+
/* This function should not be called in this case.
2085+
* this is just a fail-safe. */
20872086
return;
2088-
}
20892087

20902088
rd_kafka_dbg(rk, ALL, "REBOOTSTRAP", "Starting re-bootstrap sequence");
20912089

@@ -2801,11 +2799,31 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
28012799
* Schedules a rebootstrap of the cluster immediately.
28022800
*/
28032801
void rd_kafka_rebootstrap(rd_kafka_t *rk) {
2802+
if (rk->rk_conf.metadata_recovery_strategy ==
2803+
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
2804+
return;
2805+
28042806
rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rebootstrap_tmr,
28052807
rd_true /*restart*/, 0,
28062808
rd_kafka_rebootstrap_tmr_cb, NULL);
28072809
}
28082810

2811+
/**
2812+
* Restarts rebootstrap timer with the configured interval.
2813+
*
2814+
* @locks none
2815+
* @locality any
2816+
*/
2817+
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk) {
2818+
if (rk->rk_conf.metadata_recovery_strategy ==
2819+
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
2820+
return;
2821+
2822+
rd_kafka_timer_start_oneshot(
2823+
&rk->rk_timers, &rk->rebootstrap_tmr, rd_true /*restart*/,
2824+
rk->rk_conf.metadata_recovery_rebootstrap_trigger_ms * 1000LL,
2825+
rd_kafka_rebootstrap_tmr_cb, NULL);
2826+
}
28092827

28102828
/**
28112829
* Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with

src/rdkafka_conf.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,18 +441,31 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
441441
{_RK_GLOBAL, "metadata.recovery.strategy", _RK_C_S2I,
442442
_RK(metadata_recovery_strategy),
443443
"Controls how the client recovers when none of the brokers known to it "
444-
"is available. If set to `none`, the client fails with a fatal error. "
444+
"is available. If set to `none`, the client doesn't re-bootstrap. "
445445
"If set to `rebootstrap`, the client repeats the bootstrap process "
446446
"using `bootstrap.servers` and brokers added through "
447447
"`rd_kafka_brokers_add()`. Rebootstrapping is useful when a client "
448448
"communicates with brokers so infrequently that the set of brokers "
449449
"may change entirely before the client refreshes metadata. "
450450
"Metadata recovery is triggered when all last-known brokers appear "
451-
"unavailable simultaneously.",
451+
"unavailable simultaneously or the client cannot refresh metadata within "
452+
"`metadata.recovery.rebootstrap.trigger.ms` or it's requested in a "
453+
"metadata response.",
452454
.vdef = RD_KAFKA_METADATA_RECOVERY_STRATEGY_REBOOTSTRAP,
453455
.s2i = {{RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE, "none"},
454456
{RD_KAFKA_METADATA_RECOVERY_STRATEGY_REBOOTSTRAP, "rebootstrap"},
455457
{0, NULL}}},
458+
{_RK_GLOBAL, "metadata.recovery.rebootstrap.trigger.ms", _RK_C_INT,
459+
_RK(metadata_recovery_rebootstrap_trigger_ms),
460+
"If a client configured to rebootstrap using "
461+
"`metadata.recovery.strategy=rebootstrap` "
462+
"is unable to obtain metadata from any "
463+
"of the brokers for this interval, "
464+
"client repeats the bootstrap process using "
465+
"`bootstrap.servers` configuration "
466+
"and brokers added through "
467+
"`rd_kafka_brokers_add()`.",
468+
0, INT_MAX, 300000},
456469
{_RK_GLOBAL | _RK_DEPRECATED | _RK_HIDDEN, "metadata.request.timeout.ms",
457470
_RK_C_INT, _RK(metadata_request_timeout_ms), "Not used.", 10, 900 * 1000,
458471
10},

src/rdkafka_conf.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ struct rd_kafka_conf_s {
207207
int msg_copy_max_size;
208208
int recv_max_msg_size;
209209
int max_inflight;
210+
int metadata_recovery_rebootstrap_trigger_ms;
210211
int metadata_request_timeout_ms;
211212
int metadata_refresh_interval_ms;
212213
int metadata_refresh_fast_cnt;

src/rdkafka_int.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,14 @@ int rd_kafka_set_fatal_error0(rd_kafka_t *rk,
10881088

10891089
rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk);
10901090

1091+
#define rd_kafka_producer_can_have_fatal_errors(rk) \
1092+
(rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence)
1093+
1094+
#define rd_kafka_consumer_can_have_fatal_errors(rk) \
1095+
(rk->rk_type == RD_KAFKA_CONSUMER && \
1096+
(rk->rk_conf.group_instance_id || \
1097+
rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER))
1098+
10911099
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
10921100
rd_kafka_fatal_error_code(rd_kafka_t *rk) {
10931101
/* This is an optimization to avoid an atomic read which are costly
@@ -1097,10 +1105,8 @@ rd_kafka_fatal_error_code(rd_kafka_t *rk) {
10971105
* 2) static consumers (group.instance.id)
10981106
* 3) Group using consumer protocol (Introduced in KIP-848). See exact
10991107
* errors in rd_kafka_cgrp_handle_ConsumerGroupHeartbeat() */
1100-
if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
1101-
(rk->rk_type == RD_KAFKA_CONSUMER &&
1102-
(rk->rk_conf.group_instance_id ||
1103-
rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER)))
1108+
if (rd_kafka_producer_can_have_fatal_errors(rk) ||
1109+
rd_kafka_consumer_can_have_fatal_errors(rk))
11041110
return rd_atomic32_get(&rk->rk_fatal.err);
11051111

11061112
return RD_KAFKA_RESP_ERR_NO_ERROR;
@@ -1241,5 +1247,6 @@ rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
12411247

12421248
void rd_kafka_rebootstrap(rd_kafka_t *rk);
12431249

1250+
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk);
12441251

12451252
#endif /* _RDKAFKA_INT_H_ */

src/rdkafka_metadata.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
572572
int broker_changes = 0;
573573
int cache_changes = 0;
574574
int cgrp_subscription_version = -1;
575+
int16_t ErrorCode = 0;
575576

576577
/* If client rack is present, the metadata cache (topic or full) needs
577578
* to contain the partition to rack map. */
@@ -872,8 +873,21 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
872873
ClusterAuthorizedOperations;
873874
}
874875

876+
if (ApiVersion >= 13) {
877+
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
878+
}
879+
875880
rd_kafka_buf_skip_tags(rkbuf);
876881

882+
if (ErrorCode) {
883+
rd_rkb_dbg(rkb, METADATA, "METADATA",
884+
"Metadata response: received top level "
885+
"error code %" PRId16 ": %s",
886+
ErrorCode, rd_kafka_err2str(ErrorCode));
887+
err = ErrorCode;
888+
goto err;
889+
}
890+
877891
/* Entire Metadata response now parsed without errors:
878892
* update our internal state according to the response. */
879893

@@ -1025,6 +1039,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
10251039
rd_kafka_wrlock(rkb->rkb_rk);
10261040

10271041
rkb->rkb_rk->rk_ts_metadata = rd_clock();
1042+
rd_kafka_rebootstrap_tmr_restart(rkb->rkb_rk);
10281043

10291044
/* Update cached cluster id. */
10301045
if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&

src/rdkafka_request.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2638,12 +2638,18 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
26382638
goto done;
26392639

26402640
err:
2641-
actions = rd_kafka_err_action(rkb, err, request,
2641+
actions = rd_kafka_err_action(
2642+
rkb, err, request,
26422643

2643-
RD_KAFKA_ERR_ACTION_RETRY,
2644-
RD_KAFKA_RESP_ERR__PARTIAL,
2644+
RD_KAFKA_ERR_ACTION_SPECIAL, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,
26452645

2646-
RD_KAFKA_ERR_ACTION_END);
2646+
RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__PARTIAL,
2647+
2648+
RD_KAFKA_ERR_ACTION_END);
2649+
2650+
if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) {
2651+
rd_kafka_rebootstrap(rk);
2652+
}
26472653

26482654
if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
26492655
/* In case it's a brokers full refresh call,
@@ -2765,7 +2771,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
27652771
int *full_incr = NULL;
27662772
void *handler_arg = NULL;
27672773
rd_kafka_resp_cb_t *handler_cb = rd_kafka_handle_Metadata;
2768-
int16_t metadata_max_version = 12;
2774+
int16_t metadata_max_version = 13;
27692775
rd_kafka_replyq_t use_replyq = replyq;
27702776

27712777
/* In case we want cluster authorized operations in the Metadata

0 commit comments

Comments
 (0)