Skip to content

Commit 99b03e8

Browse files
authored
[KIP-848] Test 0147: mock tests specific for the 848 consumer group protocol (confluentinc#4920)
* [KIP-8484] Mock tests specific for the new consumer group protocol * Expedite HB only after coordinator actually changes Expedite coordinator query when there's a DESTROY_BROKER error. * Flaky test 0067 fix
1 parent 1510d2b commit 99b03e8

File tree

6 files changed

+844
-2
lines changed

6 files changed

+844
-2
lines changed

src/rdkafka_cgrp.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,9 @@ static int rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
654654
/* Clear previous broker handle, if any */
655655
if (rkcg->rkcg_curr_coord)
656656
rd_kafka_cgrp_coord_clear_broker(rkcg);
657+
658+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
659+
rkcg, "coordinator changed");
657660
}
658661

659662

@@ -788,6 +791,8 @@ static void rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t *rk,
788791

789792
RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
790793

794+
RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__DESTROY_BROKER,
795+
791796
RD_KAFKA_ERR_ACTION_END);
792797

793798

@@ -807,6 +812,12 @@ static void rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t *rk,
807812
rd_kafka_cgrp_set_last_err(rkcg, ErrorCode);
808813
}
809814

815+
if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY_BROKER) {
816+
/* This error is one-time and should cause
817+
* an immediate retry. */
818+
rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
819+
}
820+
810821
/* Retries are performed by the timer-intervalled
811822
* coord queries, continue querying */
812823
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
@@ -3320,8 +3331,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
33203331
rkcg->rkcg_consumer_flags |=
33213332
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
33223333
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
3323-
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
3324-
rkcg, "coordinator query");
3334+
/* If coordinator changes, HB will be expedited. */
33253335
}
33263336

33273337
if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) {

tests/0067-empty_topic.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ static void do_test_empty_topic_consumer() {
7070
RdKafka::err2str(err));
7171
delete md;
7272

73+
/* Await propagation to the whole cluster */
74+
test_wait_topic_exists(consumer->c_ptr(), topic.c_str(), 1 * 1000);
75+
7376
/* Start consumer */
7477
err = consumer->start(rkt, partition, RdKafka::Topic::OFFSET_BEGINNING);
7578
if (err)

0 commit comments

Comments
 (0)