Skip to content

Commit a5f5321

Browse files
authored
Fix feature version ranges, currently matching a single version (#5130)
Made the conditions for enabling the features future proof, allowing to remove RPC versions in a subsequent major Apache Kafka version without disabling features. The existing checks were matching a single version instead of a range and were failing if the older version was removed. Happening since 1.x
1 parent c56a3e6 commit a5f5321

11 files changed

+598
-95
lines changed

CHANGELOG.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
# librdkafka v2.11.1
2+
3+
librdkafka v2.11.1 is a maintenance release:
4+
5+
* Made the conditions for enabling the features future proof (#5130).
6+
7+
8+
## Fixes
9+
10+
### General fixes
11+
12+
* Issues: #4948, #4956.
13+
Made the conditions for enabling the features future proof, allowing to
14+
remove RPC versions in a subsequent Apache Kafka version without disabling
15+
features. The existing checks were matching a single version instead of
16+
a range and were failing if the older version was removed.
17+
Happening since 1.x (#5130).
18+
19+
20+
121
# librdkafka v2.11.0
222

323
librdkafka v2.11.0 is a feature release:
@@ -11,7 +31,7 @@ librdkafka v2.11.0 is a feature release:
1131
JoinGroup v0 anymore, missing in AK 4.0 and CP 8.0 (#5131).
1232
* Improve HTTPS CA certificates configuration by probing several paths
1333
when OpenSSL is statically linked and providing a way to customize their location
14-
or value (#).
34+
or value (#5133).
1535

1636

1737
## Fixes

src/rdkafka_broker.c

Lines changed: 111 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
310310
rkb, ApiKey, minver, maxver, featuresp, rd_true /* do_lock */);
311311
}
312312

313+
/**
314+
* @brief Check that at least a ApiVersion greater or equal to
315+
* \p minver exists for \p ApiKey.
316+
*
317+
* @returns `rd_true` if the broker supports \p ApiKey with
318+
* a version greater than or equal to \p minver, else `rd_false`.
319+
* @locks none
320+
* @locks_acquired rd_kafka_broker_lock()
321+
* @locality any
322+
*/
323+
rd_bool_t rd_kafka_broker_ApiVersion_at_least(rd_kafka_broker_t *rkb,
324+
int16_t ApiKey,
325+
int16_t minver) {
326+
return rd_kafka_broker_ApiVersion_supported0(
327+
rkb, ApiKey, minver, RD_KAFKAP_RPC_VERSION_MAX, NULL,
328+
rd_true /* do_lock */) != -1;
329+
}
330+
313331
/**
314332
* @brief Set broker state.
315333
*
@@ -2214,53 +2232,6 @@ rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
22142232
return (int)(remains / 1000);
22152233
}
22162234

2217-
2218-
/**
2219-
* @brief Unittest for reconnect.backoff.ms
2220-
*/
2221-
static int rd_ut_reconnect_backoff(void) {
2222-
rd_kafka_broker_t rkb = RD_ZERO_INIT;
2223-
rd_kafka_conf_t conf = {.reconnect_backoff_ms = 10,
2224-
.reconnect_backoff_max_ms = 90};
2225-
rd_ts_t now = 1000000;
2226-
int backoff;
2227-
2228-
rkb.rkb_reconnect_backoff_ms = conf.reconnect_backoff_ms;
2229-
2230-
/* broker's backoff is the initial reconnect.backoff.ms=10 */
2231-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2232-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2233-
RD_UT_ASSERT_RANGE(backoff, 7, 15, "%d");
2234-
2235-
/* .. 20 */
2236-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2237-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2238-
RD_UT_ASSERT_RANGE(backoff, 15, 30, "%d");
2239-
2240-
/* .. 40 */
2241-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2242-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2243-
RD_UT_ASSERT_RANGE(backoff, 30, 60, "%d");
2244-
2245-
/* .. 80, the jitter is capped at reconnect.backoff.max.ms=90 */
2246-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2247-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2248-
RD_UT_ASSERT_RANGE(backoff, 60, conf.reconnect_backoff_max_ms, "%d");
2249-
2250-
/* .. 90, capped by reconnect.backoff.max.ms */
2251-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2252-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2253-
RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
2254-
2255-
/* .. 90, should remain at capped value. */
2256-
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2257-
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2258-
RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
2259-
2260-
RD_UT_PASS();
2261-
}
2262-
2263-
22642235
/**
22652236
* @brief Initiate asynchronous connection attempt to the next address
22662237
* in the broker's address list.
@@ -6266,6 +6237,98 @@ void rd_kafka_broker_decommission(rd_kafka_t *rk,
62666237
rd_kafka_wrlock(rk);
62676238
}
62686239

6240+
/**
6241+
* @brief Unittest for reconnect.backoff.ms
6242+
*/
6243+
static int rd_ut_reconnect_backoff(void) {
6244+
rd_kafka_broker_t rkb = RD_ZERO_INIT;
6245+
rd_kafka_conf_t conf = {.reconnect_backoff_ms = 10,
6246+
.reconnect_backoff_max_ms = 90};
6247+
rd_ts_t now = 1000000;
6248+
int backoff;
6249+
6250+
rkb.rkb_reconnect_backoff_ms = conf.reconnect_backoff_ms;
6251+
6252+
/* broker's backoff is the initial reconnect.backoff.ms=10 */
6253+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6254+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6255+
RD_UT_ASSERT_RANGE(backoff, 7, 15, "%d");
6256+
6257+
/* .. 20 */
6258+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6259+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6260+
RD_UT_ASSERT_RANGE(backoff, 15, 30, "%d");
6261+
6262+
/* .. 40 */
6263+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6264+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6265+
RD_UT_ASSERT_RANGE(backoff, 30, 60, "%d");
6266+
6267+
/* .. 80, the jitter is capped at reconnect.backoff.max.ms=90 */
6268+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6269+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6270+
RD_UT_ASSERT_RANGE(backoff, 60, conf.reconnect_backoff_max_ms, "%d");
6271+
6272+
/* .. 90, capped by reconnect.backoff.max.ms */
6273+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6274+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6275+
RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
6276+
6277+
/* .. 90, should remain at capped value. */
6278+
rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
6279+
backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
6280+
RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
6281+
6282+
RD_UT_PASS();
6283+
}
6284+
6285+
/**
6286+
* @brief Unittest for reconnect.backoff.ms
6287+
*/
6288+
static int rd_ut_ApiVersion_at_least(void) {
6289+
rd_kafka_broker_t rkb = RD_ZERO_INIT;
6290+
mtx_init(&rkb.rkb_lock, mtx_plain);
6291+
6292+
struct rd_kafka_ApiVersion av = {
6293+
.ApiKey = RD_KAFKAP_Metadata,
6294+
.MinVer = 5,
6295+
.MaxVer = 10,
6296+
};
6297+
6298+
rkb.rkb_ApiVersions_cnt = 1;
6299+
rkb.rkb_ApiVersions = &av;
6300+
6301+
RD_UT_ASSERT(
6302+
rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 0),
6303+
"Metadata API version should be at least 0");
6304+
RD_UT_ASSERT(
6305+
rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 3),
6306+
"Metadata API version should be at least 3");
6307+
RD_UT_ASSERT(
6308+
rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 5),
6309+
"Metadata API version should be at least 5");
6310+
RD_UT_ASSERT(
6311+
rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 7),
6312+
"Metadata API version should be at least 7");
6313+
RD_UT_ASSERT(
6314+
rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 10),
6315+
"Metadata API version should be at least 10");
6316+
RD_UT_ASSERT(
6317+
!rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 11),
6318+
"Metadata API version shouldn't be at least 11");
6319+
6320+
rkb.rkb_ApiVersions_cnt = 0;
6321+
RD_UT_ASSERT(
6322+
!rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 0),
6323+
"Metadata API version shouldn't be at least 0");
6324+
RD_UT_ASSERT(
6325+
!rd_kafka_broker_ApiVersion_at_least(&rkb, RD_KAFKAP_Metadata, 3),
6326+
"Metadata API version shouldn't be at least 3");
6327+
6328+
mtx_destroy(&rkb.rkb_lock);
6329+
RD_UT_PASS();
6330+
}
6331+
62696332
/**
62706333
* @name Unit tests
62716334
* @{
@@ -6275,6 +6338,7 @@ int unittest_broker(void) {
62756338
int fails = 0;
62766339

62776340
fails += rd_ut_reconnect_backoff();
6341+
fails += rd_ut_ApiVersion_at_least();
62786342

62796343
return fails;
62806344
}

src/rdkafka_broker.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb,
477477
int *featuresp,
478478
rd_bool_t do_lock);
479479

480+
rd_bool_t rd_kafka_broker_ApiVersion_at_least(rd_kafka_broker_t *rkb,
481+
int16_t ApiKey,
482+
int16_t minver);
483+
480484
rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func,
481485
int line,
482486
rd_kafka_t *rk,

0 commit comments

Comments
 (0)