4242 */
4343
4444// Globals to track log sequence
45- static volatile int seen_heartbeat_req = 0 ;
45+ static volatile int seen_heartbeat_req = 0 ;
4646static volatile int seen_heartbeat_resp = 0 ;
47- static volatile int seen_metadata_log = 0 ;
47+ static volatile int seen_metadata_log = 0 ;
4848
4949static void reset_log_tracking (void ) {
50- seen_heartbeat_req = 0 ;
51- seen_heartbeat_resp = 0 ;
52- seen_metadata_log = 0 ;
50+ seen_heartbeat_req = 0 ;
51+ seen_heartbeat_resp = 0 ;
52+ seen_metadata_log = 0 ;
5353}
5454
5555static void wait_for_metadata_refresh_log (int timeout_ms ) {
56- int elapsed = 0 ;
57- while (elapsed < timeout_ms && !seen_metadata_log ) {
58- rd_usleep (500 * 1000 , NULL ); // 500 ms
59- elapsed += 500 ;
60- }
61- TEST_ASSERT (seen_heartbeat_req , "Expected HeartbeatRequest log not seen after partition creation" );
62- TEST_ASSERT (seen_heartbeat_resp , "Expected HeartbeatResponse log not seen after partition creation" );
63- TEST_ASSERT (seen_metadata_log , "Expected metadata refresh log not seen after partition creation and heartbeat" );
56+ int elapsed = 0 ;
57+ while (elapsed < timeout_ms && !seen_metadata_log ) {
58+ rd_usleep (500 * 1000 , NULL ); // 500 ms
59+ elapsed += 500 ;
60+ }
61+ TEST_ASSERT (
62+ seen_heartbeat_req ,
63+ "Expected HeartbeatRequest log not seen after partition creation" );
64+ TEST_ASSERT (
65+ seen_heartbeat_resp ,
66+ "Expected HeartbeatResponse log not seen after partition creation" );
67+ TEST_ASSERT (seen_metadata_log ,
68+ "Expected metadata refresh log not seen after partition "
69+ "creation and heartbeat" );
6470}
6571
6672// Custom log callback to capture and process librdkafka logs
67- static void test_metadata_log_cb (const rd_kafka_t * rk , int level , const char * fac , const char * buf ) {
68- if (strstr (buf , "Sent ConsumerGroupHeartbeatRequest" )) {
69- seen_heartbeat_req = 1 ;
70- }
71- if (seen_heartbeat_req && strstr (buf , "Received ConsumerGroupHeartbeatResponse" )) {
72- seen_heartbeat_resp = 1 ;
73- }
74- if (seen_heartbeat_resp && strstr (buf , "Partition assigned to this consumer is not present in cached metadata" )) {
75- seen_metadata_log = 1 ;
76- }
73+ static void test_metadata_log_cb (const rd_kafka_t * rk ,
74+ int level ,
75+ const char * fac ,
76+ const char * buf ) {
77+ if (strstr (buf , "Sent ConsumerGroupHeartbeatRequest" )) {
78+ seen_heartbeat_req = 1 ;
79+ }
80+ if (seen_heartbeat_req &&
81+ strstr (buf , "Received ConsumerGroupHeartbeatResponse" )) {
82+ seen_heartbeat_resp = 1 ;
83+ }
84+ if (seen_heartbeat_resp &&
85+ strstr (buf ,
86+ "Partition assigned to this consumer is not present in "
87+ "cached metadata" )) {
88+ seen_metadata_log = 1 ;
89+ }
7790}
7891
79- static rd_kafka_t * create_consumer (const char * topic , const char * group , void (* log_cb )(const rd_kafka_t * , int , const char * , const char * )) {
80- rd_kafka_conf_t * conf ;
81- test_conf_init (& conf , NULL , 60 );
82- test_conf_set (conf , "group.id" , group );
83- test_conf_set (conf , "auto.offset.reset" , "earliest" );
84- test_conf_set (conf , "debug" , "cgrp, protocol" );
85- rd_kafka_conf_set_log_cb (conf , test_metadata_log_cb );
86- rd_kafka_t * consumer = test_create_consumer (topic , NULL , conf , NULL );
87- return consumer ;
92+ static rd_kafka_t * create_consumer (
93+ const char * topic ,
94+ const char * group ,
95+ void (* log_cb )(const rd_kafka_t * , int , const char * , const char * )) {
96+ rd_kafka_conf_t * conf ;
97+ test_conf_init (& conf , NULL , 60 );
98+ test_conf_set (conf , "group.id" , group );
99+ test_conf_set (conf , "auto.offset.reset" , "earliest" );
100+ test_conf_set (conf , "debug" , "cgrp, protocol" );
101+ rd_kafka_conf_set_log_cb (conf , test_metadata_log_cb );
102+ rd_kafka_t * consumer = test_create_consumer (topic , NULL , conf , NULL );
103+ return consumer ;
88104}
89105
90106static void setup_and_run_metadata_refresh_test (void ) {
91- const char * topic = test_mk_topic_name ("cgrp_metadata" , 1 );
92- int initial_partitions = 2 ;
93- int new_partitions = 4 ;
94- rd_kafka_t * c1 , * c2 , * rk ;
95- const char * group = "grp_metadata" ;
107+ const char * topic = test_mk_topic_name ("cgrp_metadata" , 1 );
108+ int initial_partitions = 2 ;
109+ int new_partitions = 4 ;
110+ rd_kafka_t * c1 , * c2 , * rk ;
111+ const char * group = "grp_metadata" ;
96112
97- SUB_TEST_QUICK ();
113+ SUB_TEST_QUICK ();
98114
99- TEST_SAY ("Creating topic %s with %d partitions\n" , topic , initial_partitions );
100- test_create_topic (NULL , topic , initial_partitions , 1 );
115+ TEST_SAY ("Creating topic %s with %d partitions\n" , topic ,
116+ initial_partitions );
117+ test_create_topic (NULL , topic , initial_partitions , 1 );
101118
102- TEST_SAY ("Creating consumers\n" );
103- c1 = create_consumer (topic , group , test_metadata_log_cb );
104- c2 = create_consumer (topic , group , test_metadata_log_cb );
119+ TEST_SAY ("Creating consumers\n" );
120+ c1 = create_consumer (topic , group , test_metadata_log_cb );
121+ c2 = create_consumer (topic , group , test_metadata_log_cb );
105122
106- rk = test_create_handle (RD_KAFKA_PRODUCER , NULL );
123+ rk = test_create_handle (RD_KAFKA_PRODUCER , NULL );
107124
108- TEST_SAY ("Subscribing to topic %s\n" , topic );
109- test_consumer_subscribe (c1 , topic );
110- test_consumer_subscribe (c2 , topic );
125+ TEST_SAY ("Subscribing to topic %s\n" , topic );
126+ test_consumer_subscribe (c1 , topic );
127+ test_consumer_subscribe (c2 , topic );
111128
112- // Wait for initial assignment
113- test_consumer_wait_assignment (c1 , rd_false );
114- test_consumer_wait_assignment (c2 , rd_false );
129+ // Wait for initial assignment
130+ test_consumer_wait_assignment (c1 , rd_false );
131+ test_consumer_wait_assignment (c2 , rd_false );
115132
116- // Create new partitions
117- TEST_SAY ("Increasing partition count to %d\n" , new_partitions );
118- test_create_partitions (rk , topic , new_partitions );
133+ // Create new partitions
134+ TEST_SAY ("Increasing partition count to %d\n" , new_partitions );
135+ test_create_partitions (rk , topic , new_partitions );
119136
120- // Reset log tracking variables to only consider logs after partition creation
121- reset_log_tracking ();
137+ // Reset log tracking variables to only consider logs after partition
138+ // creation
139+ reset_log_tracking ();
122140
123- // Wait for expected logs for up to 10 seconds
124- wait_for_metadata_refresh_log (10000 );
141+ // Wait for expected logs for up to 10 seconds
142+ wait_for_metadata_refresh_log (10000 );
125143
126- TEST_SAY ("Closing consumers\n" );
127- test_consumer_close (c1 );
128- test_consumer_close (c2 );
129- rd_kafka_destroy (c1 );
130- rd_kafka_destroy (c2 );
144+ TEST_SAY ("Closing consumers\n" );
145+ test_consumer_close (c1 );
146+ test_consumer_close (c2 );
147+ rd_kafka_destroy (c1 );
148+ rd_kafka_destroy (c2 );
131149
132- SUB_TEST_PASS ();
150+ SUB_TEST_PASS ();
133151}
134152
135153int main_0154_metadata_refresh (int argc , char * * argv ) {
136- if (!test_consumer_group_protocol_classic ())
137- setup_and_run_metadata_refresh_test ();
138- return 0 ;
154+ if (!test_consumer_group_protocol_classic ())
155+ setup_and_run_metadata_refresh_test ();
156+ return 0 ;
139157}
0 commit comments