@@ -67,7 +67,8 @@ static rd_kafka_resp_err_t wait_fatal_error(rd_kafka_share_t *share_c,
6767 errstr [0 ] = '\0' ;
6868 while (test_clock () < deadline ) {
6969 rd_kafka_resp_err_t err = rd_kafka_fatal_error (
70- rd_kafka_share_consumer_get_rk (share_c ), errstr , errstr_size );
70+ rd_kafka_share_consumer_get_rk (share_c ), errstr ,
71+ errstr_size );
7172 if (err != RD_KAFKA_RESP_ERR_NO_ERROR )
7273 return err ;
7374 rd_usleep (100 * 1000 , 0 );
@@ -227,12 +228,12 @@ static void do_test_share_group_assignment_rebalance(void) {
227228 deadline = test_clock () + 15000 * 1000 ;
228229 while (test_clock () < deadline ) {
229230
230- TEST_CALL_ERR__ (
231- rd_kafka_assignment ( rd_kafka_share_consumer_get_rk (share_c1 ),
232- & share_c1_assignment ));
233- TEST_CALL_ERR__ (
234- rd_kafka_assignment ( rd_kafka_share_consumer_get_rk (share_c2 ),
235- & share_c2_assignment ));
231+ TEST_CALL_ERR__ (rd_kafka_assignment (
232+ rd_kafka_share_consumer_get_rk (share_c1 ),
233+ & share_c1_assignment ));
234+ TEST_CALL_ERR__ (rd_kafka_assignment (
235+ rd_kafka_share_consumer_get_rk (share_c2 ),
236+ & share_c2_assignment ));
236237
237238 if (share_c1_assignment -> cnt + share_c2_assignment -> cnt == 3 &&
238239 share_c1_assignment -> cnt > 0 &&
@@ -338,9 +339,11 @@ static void do_test_share_group_multi_topic_assignment(void) {
338339 deadline = test_clock () + 15000 * 1000 ;
339340 while (test_clock () < deadline ) {
340341 TEST_CALL_ERR__ (rd_kafka_assignment (
341- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
342+ rd_kafka_share_consumer_get_rk (share_c1 ),
343+ & share_c1_assign ));
342344 TEST_CALL_ERR__ (rd_kafka_assignment (
343- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
345+ rd_kafka_share_consumer_get_rk (share_c2 ),
346+ & share_c2_assign ));
344347
345348 total_orders =
346349 count_topic_partitions (share_c1_assign , topic_orders ) +
@@ -385,11 +388,14 @@ static void do_test_share_group_multi_topic_assignment(void) {
385388 while (test_clock () < deadline ) {
386389
387390 TEST_CALL_ERR__ (rd_kafka_assignment (
388- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
391+ rd_kafka_share_consumer_get_rk (share_c1 ),
392+ & share_c1_assign ));
389393 TEST_CALL_ERR__ (rd_kafka_assignment (
390- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
394+ rd_kafka_share_consumer_get_rk (share_c2 ),
395+ & share_c2_assign ));
391396 TEST_CALL_ERR__ (rd_kafka_assignment (
392- rd_kafka_share_consumer_get_rk (share_c3 ), & share_c3_assign ));
397+ rd_kafka_share_consumer_get_rk (share_c3 ),
398+ & share_c3_assign ));
393399
394400 total_orders =
395401 count_topic_partitions (share_c1_assign , topic_orders ) +
@@ -442,9 +448,11 @@ static void do_test_share_group_multi_topic_assignment(void) {
442448 deadline = test_clock () + 15000 * 1000 ;
443449 while (test_clock () < deadline ) {
444450 TEST_CALL_ERR__ (rd_kafka_assignment (
445- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
451+ rd_kafka_share_consumer_get_rk (share_c2 ),
452+ & share_c2_assign ));
446453 TEST_CALL_ERR__ (rd_kafka_assignment (
447- rd_kafka_share_consumer_get_rk (share_c3 ), & share_c3_assign ));
454+ rd_kafka_share_consumer_get_rk (share_c3 ),
455+ & share_c3_assign ));
448456
449457 if (count_topic_partitions (share_c2_assign , topic_orders ) ==
450458 4 &&
@@ -610,8 +618,8 @@ static void do_test_share_group_rtt_injection(void) {
610618 rd_usleep (500 * 1000 , 0 );
611619
612620 /* Verify initial assignment */
613- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
614- & assignment ));
621+ TEST_CALL_ERR__ (rd_kafka_assignment (
622+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
615623 TEST_ASSERT (assignment -> cnt == 3 ,
616624 "Expected 3 partitions initially, got %d" , assignment -> cnt );
617625 rd_kafka_topic_partition_list_destroy (assignment );
@@ -635,8 +643,8 @@ static void do_test_share_group_rtt_injection(void) {
635643 rd_usleep (500 * 1000 , 0 );
636644
637645 /* Verify consumer recovered and still has assignment */
638- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
639- & assignment ));
646+ TEST_CALL_ERR__ (rd_kafka_assignment (
647+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
640648 TEST_ASSERT (assignment -> cnt == 3 ,
641649 "Expected 3 partitions after timeout recovery, got %d" ,
642650 assignment -> cnt );
@@ -702,9 +710,11 @@ static void do_test_share_group_session_timeout(void) {
702710 dl = test_clock () + 15000 * 1000 ;
703711 while (test_clock () < dl ) {
704712 TEST_CALL_ERR__ (rd_kafka_assignment (
705- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
713+ rd_kafka_share_consumer_get_rk (share_c1 ),
714+ & share_c1_assign ));
706715 TEST_CALL_ERR__ (rd_kafka_assignment (
707- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
716+ rd_kafka_share_consumer_get_rk (share_c2 ),
717+ & share_c2_assign ));
708718 share_c1_initial = share_c1_assign -> cnt ;
709719 share_c2_initial = share_c2_assign -> cnt ;
710720 rd_kafka_topic_partition_list_destroy (share_c1_assign );
@@ -784,9 +794,11 @@ static void do_test_share_group_target_assignment(void) {
784794 dl = test_clock () + 15000 * 1000 ;
785795 while (test_clock () < dl ) {
786796 TEST_CALL_ERR__ (rd_kafka_assignment (
787- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
797+ rd_kafka_share_consumer_get_rk (share_c1 ),
798+ & share_c1_assign ));
788799 TEST_CALL_ERR__ (rd_kafka_assignment (
789- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
800+ rd_kafka_share_consumer_get_rk (share_c2 ),
801+ & share_c2_assign ));
790802 if (share_c1_assign -> cnt + share_c2_assign -> cnt == 4 &&
791803 share_c1_assign -> cnt > 0 && share_c2_assign -> cnt > 0 ) {
792804 rd_kafka_topic_partition_list_destroy (share_c1_assign );
@@ -840,9 +852,11 @@ static void do_test_share_group_target_assignment(void) {
840852 while (test_clock () < dl ) {
841853
842854 TEST_CALL_ERR__ (rd_kafka_assignment (
843- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
855+ rd_kafka_share_consumer_get_rk (share_c1 ),
856+ & share_c1_assign ));
844857 TEST_CALL_ERR__ (rd_kafka_assignment (
845- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
858+ rd_kafka_share_consumer_get_rk (share_c2 ),
859+ & share_c2_assign ));
846860
847861 if ((share_c1_assign -> cnt == 4 && share_c2_assign -> cnt == 0 ) ||
848862 (share_c1_assign -> cnt == 0 && share_c2_assign -> cnt == 4 )) {
@@ -1396,8 +1410,8 @@ static void do_test_member_rejoin_with_epoch_zero(void) {
13961410 rd_usleep (500 * 1000 , 0 );
13971411
13981412 /* Verify initial assignment (member is now in stable state) */
1399- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
1400- & assignment ));
1413+ TEST_CALL_ERR__ (rd_kafka_assignment (
1414+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
14011415 TEST_ASSERT (assignment -> cnt == 3 ,
14021416 "Expected 3 partitions initially, got %d" , assignment -> cnt );
14031417 rd_kafka_topic_partition_list_destroy (assignment );
@@ -1474,9 +1488,11 @@ static void do_test_leaving_member_bumps_group_epoch(void) {
14741488 dl = test_clock () + 15000 * 1000 ;
14751489 while (test_clock () < dl ) {
14761490 TEST_CALL_ERR__ (rd_kafka_assignment (
1477- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
1491+ rd_kafka_share_consumer_get_rk (share_c1 ),
1492+ & share_c1_assign ));
14781493 TEST_CALL_ERR__ (rd_kafka_assignment (
1479- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
1494+ rd_kafka_share_consumer_get_rk (share_c2 ),
1495+ & share_c2_assign ));
14801496 if (share_c1_assign -> cnt + share_c2_assign -> cnt == 4 &&
14811497 share_c1_assign -> cnt > 0 && share_c2_assign -> cnt > 0 ) {
14821498 rd_kafka_topic_partition_list_destroy (share_c1_assign );
@@ -1547,8 +1563,8 @@ static void do_test_partition_assignment_with_multiple_topics(void) {
15471563 "Expected 5 partitions (3+2)" );
15481564
15491565 /* Verify assignment includes partitions from both topics */
1550- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
1551- & assignment ));
1566+ TEST_CALL_ERR__ (rd_kafka_assignment (
1567+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
15521568
15531569 /* Count partitions per topic */
15541570 for (i = 0 ; i < assignment -> cnt ; i ++ ) {
@@ -1620,11 +1636,14 @@ static void do_test_multiple_members_partition_distribution(void) {
16201636 dl = test_clock () + 15000 * 1000 ;
16211637 while (test_clock () < dl ) {
16221638 TEST_CALL_ERR__ (rd_kafka_assignment (
1623- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
1639+ rd_kafka_share_consumer_get_rk (share_c1 ),
1640+ & share_c1_assign ));
16241641 TEST_CALL_ERR__ (rd_kafka_assignment (
1625- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
1642+ rd_kafka_share_consumer_get_rk (share_c2 ),
1643+ & share_c2_assign ));
16261644 TEST_CALL_ERR__ (rd_kafka_assignment (
1627- rd_kafka_share_consumer_get_rk (share_c3 ), & share_c3_assign ));
1645+ rd_kafka_share_consumer_get_rk (share_c3 ),
1646+ & share_c3_assign ));
16281647 total_partitions = share_c1_assign -> cnt + share_c2_assign -> cnt +
16291648 share_c3_assign -> cnt ;
16301649 if (share_c1_assign -> cnt >= 1 && share_c2_assign -> cnt >= 1 &&
@@ -1837,8 +1856,8 @@ static void do_test_subscription_change(void) {
18371856 "Expected 2 partitions from topicA" );
18381857
18391858 /* Verify assignment has topic A only */
1840- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
1841- & assignment ));
1859+ TEST_CALL_ERR__ (rd_kafka_assignment (
1860+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
18421861 for (i = 0 ; i < assignment -> cnt ; i ++ ) {
18431862 TEST_ASSERT (strcmp (assignment -> elems [i ].topic , topicA ) == 0 ,
18441863 "Expected topicA, got %s" ,
@@ -1943,8 +1962,8 @@ static void do_test_group_id_not_found_while_unsubscribed(void) {
19431962 rd_usleep (500 * 1000 , 0 );
19441963
19451964 /* Verify consumer is NOT in fatal state - error should be benign */
1946- fatal_err = rd_kafka_fatal_error (rd_kafka_share_consumer_get_rk ( share_c ),
1947- errstr , sizeof (errstr ));
1965+ fatal_err = rd_kafka_fatal_error (
1966+ rd_kafka_share_consumer_get_rk ( share_c ), errstr , sizeof (errstr ));
19481967 TEST_ASSERT (fatal_err == RD_KAFKA_RESP_ERR_NO_ERROR ,
19491968 "Expected no fatal error when GROUP_ID_NOT_FOUND arrives "
19501969 "while unsubscribed, but got: %s (%s)" ,
@@ -2017,7 +2036,8 @@ static void do_test_group_id_not_found_while_unsubscribed(void) {
20172036// rd_usleep(500 * 1000, 0);
20182037
20192038// /* Verify consumer entered fatal state */
2020- // fatal_err = rd_kafka_fatal_error(rd_kafka_share_consumer_get_rk(share_c),
2039+ // fatal_err =
2040+ // rd_kafka_fatal_error(rd_kafka_share_consumer_get_rk(share_c),
20212041// errstr, sizeof(errstr));
20222042// TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR,
20232043// "Expected consumer to be in fatal state after "
@@ -2255,8 +2275,8 @@ static void do_test_graceful_shutdown_stable_state(void) {
22552275 rd_usleep (500 * 1000 , 0 );
22562276
22572277 /* Verify initial assignment - member is in stable state */
2258- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
2259- & assignment ));
2278+ TEST_CALL_ERR__ (rd_kafka_assignment (
2279+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
22602280 TEST_ASSERT (assignment -> cnt == 3 ,
22612281 "Expected 3 partitions initially, got %d" , assignment -> cnt );
22622282 rd_kafka_topic_partition_list_destroy (assignment );
@@ -2316,8 +2336,8 @@ static void do_test_resubscribe_after_unsubscribe(void) {
23162336 wait_share_heartbeats (mcluster , 1 , 1000 );
23172337 rd_usleep (500 * 1000 , 0 );
23182338
2319- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
2320- & assignment ));
2339+ TEST_CALL_ERR__ (rd_kafka_assignment (
2340+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
23212341 TEST_ASSERT (assignment -> cnt == 3 ,
23222342 "Expected 3 partitions on first subscribe, got %d" ,
23232343 assignment -> cnt );
@@ -2329,8 +2349,8 @@ static void do_test_resubscribe_after_unsubscribe(void) {
23292349 rd_usleep (500 * 1000 , 0 );
23302350
23312351 /* Verify no assignment after unsubscribe */
2332- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
2333- & assignment ));
2352+ TEST_CALL_ERR__ (rd_kafka_assignment (
2353+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
23342354 TEST_ASSERT (assignment -> cnt == 0 ,
23352355 "Expected 0 partitions after unsubscribe, got %d" ,
23362356 assignment -> cnt );
@@ -2419,9 +2439,11 @@ static void do_test_consumer_leave_rebalance(void) {
24192439 while (test_clock () < dl ) {
24202440
24212441 TEST_CALL_ERR__ (rd_kafka_assignment (
2422- rd_kafka_share_consumer_get_rk (share_c1 ), & share_c1_assign ));
2442+ rd_kafka_share_consumer_get_rk (share_c1 ),
2443+ & share_c1_assign ));
24232444 TEST_CALL_ERR__ (rd_kafka_assignment (
2424- rd_kafka_share_consumer_get_rk (share_c2 ), & share_c2_assign ));
2445+ rd_kafka_share_consumer_get_rk (share_c2 ),
2446+ & share_c2_assign ));
24252447 final_total = share_c1_assign -> cnt + share_c2_assign -> cnt ;
24262448 rd_kafka_topic_partition_list_destroy (share_c1_assign );
24272449 rd_kafka_topic_partition_list_destroy (share_c2_assign );
@@ -2533,8 +2555,8 @@ static void do_test_empty_topic_subscription(void) {
25332555 /* Wait for assignment on empty topic */
25342556 rd_usleep (500 * 1000 , 0 );
25352557
2536- TEST_CALL_ERR__ (rd_kafka_assignment (rd_kafka_share_consumer_get_rk ( share_c ),
2537- & assignment ));
2558+ TEST_CALL_ERR__ (rd_kafka_assignment (
2559+ rd_kafka_share_consumer_get_rk ( share_c ), & assignment ));
25382560 TEST_SAY ("Empty topic: %d partitions\n" , assignment -> cnt );
25392561 TEST_ASSERT (assignment -> cnt == 3 , "Expected 3 partitions, got %d" ,
25402562 assignment -> cnt );
0 commit comments