Skip to content

Commit 20ed352

Browse files
authored
[KIP-848] Fixed not adhering to HB interval when auto commit interval was less than HB interval (confluentinc#5114)
1 parent a4a050b commit 20ed352

File tree

4 files changed

+89
-22
lines changed

4 files changed

+89
-22
lines changed

src/rdkafka_cgrp.c

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6399,43 +6399,49 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
63996399
rd_kafka_cgrp_consumer_incr_unassign_done(rkcg);
64006400
break;
64016401

6402-
case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
6403-
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
6404-
rkcg, "back to steady state");
6405-
6406-
if (rkcg->rkcg_rebalance_rejoin) {
6407-
rkcg->rkcg_rebalance_rejoin = rd_false;
6408-
rd_kafka_cgrp_rejoin(
6409-
rkcg,
6410-
"rejoining group to redistribute "
6411-
"previously owned partitions to other "
6412-
"group members");
6413-
break;
6414-
}
6415-
6416-
/* FALLTHRU */
6402+
case RD_KAFKA_CGRP_JOIN_STATE_INIT:
6403+
case RD_KAFKA_CGRP_JOIN_STATE_STEADY: {
64176404

6418-
case RD_KAFKA_CGRP_JOIN_STATE_INIT: {
6419-
rd_bool_t still_in_group = rd_true;
6405+
rd_bool_t not_in_group = rd_false;
64206406
/*
64216407
* There maybe a case when there are no assignments are
64226408
* assigned to this consumer. In this case, while terminating
64236409
* the consumer can be in STEADY or INIT state and won't go
64246410
* to intermediate state. In this scenario, last leave call is
64256411
* done from here.
64266412
*/
6427-
still_in_group &= !rd_kafka_cgrp_leave_maybe(rkcg);
6413+
not_in_group |= rd_kafka_cgrp_leave_maybe(rkcg);
64286414

64296415
/* Check if cgrp is trying to terminate, which is safe to do
64306416
* in these two states. Otherwise we'll need to wait for
64316417
* the current state to decommission. */
6432-
still_in_group &= !rd_kafka_cgrp_try_terminate(rkcg);
6418+
not_in_group |= rd_kafka_cgrp_try_terminate(rkcg);
6419+
6420+
if (not_in_group)
6421+
break;
64336422

6434-
if (still_in_group)
6423+
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT) {
64356424
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
6436-
rkcg, "back to init state");
6425+
rkcg, "Assignment Done: in init state");
6426+
} else if (rkcg->rkcg_rebalance_rejoin) {
6427+
/* No need to expedite the HB here as it's being
6428+
* expedited in the rejoin call.*/
6429+
rkcg->rkcg_rebalance_rejoin = rd_false;
6430+
rd_kafka_cgrp_rejoin(
6431+
rkcg,
6432+
"Assignment Done: rejoining group to redistribute "
6433+
"previously owned partitions to other "
6434+
"group members");
6435+
} else if (rkcg->rkcg_consumer_flags &
6436+
RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) {
6437+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
6438+
rkcg,
6439+
"Assignment Done: in steady state, waiting for "
6440+
"ack");
6441+
}
64376442
break;
64386443
}
6444+
64396445
default:
64406446
break;
64416447
}

tests/0147-consumer_group_consumer_mock.c

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,59 @@ static void do_test_metadata_unknown_topic_id_tests(void) {
805805
}
806806
}
807807

808+
static void do_test_adherence_to_hb_interval(void) {
809+
rd_kafka_mock_cluster_t *mcluster;
810+
const char *bootstraps;
811+
rd_kafka_topic_partition_list_t *subscription;
812+
rd_kafka_t *c;
813+
const char *topic =
814+
test_mk_topic_name("do_test_adherence_to_hb_interval", 1);
815+
rd_kafka_conf_t *conf;
816+
size_t heartbeat_request_count = 0;
817+
818+
SUB_TEST_QUICK("do_test_adherence_to_hb_interval");
819+
820+
mcluster = test_mock_cluster_new(1, &bootstraps);
821+
rd_kafka_mock_set_group_consumer_heartbeat_interval_ms(mcluster, 1000);
822+
rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
823+
824+
test_conf_init(&conf, NULL, 0);
825+
test_conf_set(conf, "bootstrap.servers", bootstraps);
826+
test_conf_set(conf, "auto.offset.reset", "earliest");
827+
test_conf_set(conf, "auto.commit.interval.ms", "100");
828+
c = test_create_consumer(topic, NULL, conf, NULL);
829+
830+
subscription = rd_kafka_topic_partition_list_new(1);
831+
rd_kafka_topic_partition_list_add(subscription, topic,
832+
RD_KAFKA_PARTITION_UA);
833+
834+
rd_kafka_mock_start_request_tracking(mcluster);
835+
TEST_SAY("Subscribing to topic\n");
836+
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
837+
rd_kafka_topic_partition_list_destroy(subscription);
838+
839+
TEST_SAY("Subscription done, waiting for heartbeats\n");
840+
841+
rd_sleep(2); /* Sleep to ensure that some HB are sent */
842+
843+
heartbeat_request_count = test_mock_get_matching_request_cnt(
844+
mcluster, is_heartbeat_request, NULL);
845+
TEST_SAY("Heartbeat request count: %zu\n", heartbeat_request_count);
846+
847+
/* Assert that we received the expected number of heartbeats */
848+
TEST_ASSERT(heartbeat_request_count >= 3 &&
849+
heartbeat_request_count <= 5,
850+
"Expected between 3 and 5 heartbeats, got %zu",
851+
heartbeat_request_count);
852+
853+
rd_kafka_mock_stop_request_tracking(mcluster);
854+
855+
rd_kafka_destroy(c);
856+
test_mock_cluster_destroy(mcluster);
857+
858+
SUB_TEST_PASS();
859+
}
860+
808861
int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
809862
TEST_SKIP_MOCK_CLUSTER(0);
810863

@@ -821,5 +874,7 @@ int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
821874

822875
do_test_metadata_unknown_topic_id_tests();
823876

877+
do_test_adherence_to_hb_interval();
878+
824879
return 0;
825880
}

tests/test.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7423,7 +7423,7 @@ rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
74237423
* received by mock cluster \p mcluster, matching
74247424
* function \p match , called with opaque \p opaque .
74257425
*/
7426-
static size_t test_mock_get_matching_request_cnt(
7426+
size_t test_mock_get_matching_request_cnt(
74277427
rd_kafka_mock_cluster_t *mcluster,
74287428
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
74297429
void *opaque) {

tests/test.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,12 @@ rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms);
877877
void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster);
878878
rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
879879
const char **bootstraps);
880+
881+
size_t test_mock_get_matching_request_cnt(
882+
rd_kafka_mock_cluster_t *mcluster,
883+
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
884+
void *opaque);
885+
880886
size_t test_mock_wait_matching_requests(
881887
rd_kafka_mock_cluster_t *mcluster,
882888
size_t num,

0 commit comments

Comments
 (0)