@@ -310,6 +310,24 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
310310            rkb , ApiKey , minver , maxver , featuresp , rd_true  /* do_lock */ );
311311}
312312
313+ /** 
314+  * @brief Check that at least a ApiVersion greater or equal to 
315+  *        \p minver exists for \p ApiKey. 
316+  * 
317+  * @returns `rd_true` if the broker supports \p ApiKey with 
318+  *          a version greater than or equal to \p minver, else `rd_false`. 
319+  * @locks none 
320+  * @locks_acquired rd_kafka_broker_lock() 
321+  * @locality any 
322+  */ 
323+ rd_bool_t  rd_kafka_broker_ApiVersion_at_least (rd_kafka_broker_t  * rkb ,
324+                                               int16_t  ApiKey ,
325+                                               int16_t  minver ) {
326+         return  rd_kafka_broker_ApiVersion_supported0 (
327+                    rkb , ApiKey , minver , RD_KAFKAP_RPC_VERSION_MAX , NULL ,
328+                    rd_true  /* do_lock */ ) !=  -1 ;
329+ }
330+ 
313331/** 
314332 * @brief Set broker state. 
315333 * 
@@ -2214,53 +2232,6 @@ rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
22142232        return  (int )(remains  / 1000 );
22152233}
22162234
2217- 
2218- /** 
2219-  * @brief Unittest for reconnect.backoff.ms 
2220-  */ 
2221- static  int  rd_ut_reconnect_backoff (void ) {
2222-         rd_kafka_broker_t  rkb  =  RD_ZERO_INIT ;
2223-         rd_kafka_conf_t  conf   =  {.reconnect_backoff_ms      =  10 ,
2224-                                  .reconnect_backoff_max_ms  =  90 };
2225-         rd_ts_t  now            =  1000000 ;
2226-         int  backoff ;
2227- 
2228-         rkb .rkb_reconnect_backoff_ms  =  conf .reconnect_backoff_ms ;
2229- 
2230-         /* broker's backoff is the initial reconnect.backoff.ms=10 */ 
2231-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2232-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2233-         RD_UT_ASSERT_RANGE (backoff , 7 , 15 , "%d" );
2234- 
2235-         /* .. 20 */ 
2236-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2237-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2238-         RD_UT_ASSERT_RANGE (backoff , 15 , 30 , "%d" );
2239- 
2240-         /* .. 40 */ 
2241-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2242-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2243-         RD_UT_ASSERT_RANGE (backoff , 30 , 60 , "%d" );
2244- 
2245-         /* .. 80, the jitter is capped at reconnect.backoff.max.ms=90  */ 
2246-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2247-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2248-         RD_UT_ASSERT_RANGE (backoff , 60 , conf .reconnect_backoff_max_ms , "%d" );
2249- 
2250-         /* .. 90, capped by reconnect.backoff.max.ms */ 
2251-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2252-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2253-         RD_UT_ASSERT_RANGE (backoff , 67 , conf .reconnect_backoff_max_ms , "%d" );
2254- 
2255-         /* .. 90, should remain at capped value. */ 
2256-         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
2257-         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
2258-         RD_UT_ASSERT_RANGE (backoff , 67 , conf .reconnect_backoff_max_ms , "%d" );
2259- 
2260-         RD_UT_PASS ();
2261- }
2262- 
2263- 
22642235/** 
22652236 * @brief Initiate asynchronous connection attempt to the next address 
22662237 *        in the broker's address list. 
@@ -6266,6 +6237,98 @@ void rd_kafka_broker_decommission(rd_kafka_t *rk,
62666237        rd_kafka_wrlock (rk );
62676238}
62686239
6240+ /** 
6241+  * @brief Unittest for reconnect.backoff.ms 
6242+  */ 
6243+ static  int  rd_ut_reconnect_backoff (void ) {
6244+         rd_kafka_broker_t  rkb  =  RD_ZERO_INIT ;
6245+         rd_kafka_conf_t  conf   =  {.reconnect_backoff_ms      =  10 ,
6246+                                  .reconnect_backoff_max_ms  =  90 };
6247+         rd_ts_t  now            =  1000000 ;
6248+         int  backoff ;
6249+ 
6250+         rkb .rkb_reconnect_backoff_ms  =  conf .reconnect_backoff_ms ;
6251+ 
6252+         /* broker's backoff is the initial reconnect.backoff.ms=10 */ 
6253+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6254+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6255+         RD_UT_ASSERT_RANGE (backoff , 7 , 15 , "%d" );
6256+ 
6257+         /* .. 20 */ 
6258+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6259+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6260+         RD_UT_ASSERT_RANGE (backoff , 15 , 30 , "%d" );
6261+ 
6262+         /* .. 40 */ 
6263+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6264+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6265+         RD_UT_ASSERT_RANGE (backoff , 30 , 60 , "%d" );
6266+ 
6267+         /* .. 80, the jitter is capped at reconnect.backoff.max.ms=90  */ 
6268+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6269+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6270+         RD_UT_ASSERT_RANGE (backoff , 60 , conf .reconnect_backoff_max_ms , "%d" );
6271+ 
6272+         /* .. 90, capped by reconnect.backoff.max.ms */ 
6273+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6274+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6275+         RD_UT_ASSERT_RANGE (backoff , 67 , conf .reconnect_backoff_max_ms , "%d" );
6276+ 
6277+         /* .. 90, should remain at capped value. */ 
6278+         rd_kafka_broker_update_reconnect_backoff (& rkb , & conf , now );
6279+         backoff  =  rd_kafka_broker_reconnect_backoff (& rkb , now );
6280+         RD_UT_ASSERT_RANGE (backoff , 67 , conf .reconnect_backoff_max_ms , "%d" );
6281+ 
6282+         RD_UT_PASS ();
6283+ }
6284+ 
6285+ /** 
6286+  * @brief Unittest for reconnect.backoff.ms 
6287+  */ 
6288+ static  int  rd_ut_ApiVersion_at_least (void ) {
6289+         rd_kafka_broker_t  rkb  =  RD_ZERO_INIT ;
6290+         mtx_init (& rkb .rkb_lock , mtx_plain );
6291+ 
6292+         struct  rd_kafka_ApiVersion  av  =  {
6293+             .ApiKey  =  RD_KAFKAP_Metadata ,
6294+             .MinVer  =  5 ,
6295+             .MaxVer  =  10 ,
6296+         };
6297+ 
6298+         rkb .rkb_ApiVersions_cnt  =  1 ;
6299+         rkb .rkb_ApiVersions      =  & av ;
6300+ 
6301+         RD_UT_ASSERT (
6302+             rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 0 ),
6303+             "Metadata API version should be at least 0" );
6304+         RD_UT_ASSERT (
6305+             rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 3 ),
6306+             "Metadata API version should be at least 3" );
6307+         RD_UT_ASSERT (
6308+             rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 5 ),
6309+             "Metadata API version should be at least 5" );
6310+         RD_UT_ASSERT (
6311+             rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 7 ),
6312+             "Metadata API version should be at least 7" );
6313+         RD_UT_ASSERT (
6314+             rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 10 ),
6315+             "Metadata API version should be at least 10" );
6316+         RD_UT_ASSERT (
6317+             !rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 11 ),
6318+             "Metadata API version shouldn't be at least 11" );
6319+ 
6320+         rkb .rkb_ApiVersions_cnt  =  0 ;
6321+         RD_UT_ASSERT (
6322+             !rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 0 ),
6323+             "Metadata API version shouldn't be at least 0" );
6324+         RD_UT_ASSERT (
6325+             !rd_kafka_broker_ApiVersion_at_least (& rkb , RD_KAFKAP_Metadata , 3 ),
6326+             "Metadata API version shouldn't be at least 3" );
6327+ 
6328+         mtx_destroy (& rkb .rkb_lock );
6329+         RD_UT_PASS ();
6330+ }
6331+ 
62696332/** 
62706333 * @name Unit tests 
62716334 * @{ 
@@ -6275,6 +6338,7 @@ int unittest_broker(void) {
62756338        int  fails  =  0 ;
62766339
62776340        fails  +=  rd_ut_reconnect_backoff ();
6341+         fails  +=  rd_ut_ApiVersion_at_least ();
62786342
62796343        return  fails ;
62806344}
0 commit comments