Skip to content

Commit 01ec279

Browse files
authored
Fix flakyness of tests 0014 and 0085 (#5189)
* Fix flakyness test 0085 * Errors that cause a refresh coordinator like NOT_COORDINATOR during an offset fetch should not be propagated to the application.
1 parent ccb95d9 commit 01ec279

File tree

4 files changed

+20
-9
lines changed

4 files changed

+20
-9
lines changed

src/rdkafka_partition.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2162,6 +2162,7 @@ static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk,
21622162
rd_kafka_toppar_lock(rktp);
21632163

21642164
if (rko->rko_err) {
2165+
int actions;
21652166
rd_kafka_dbg(
21662167
rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
21672168
"Failed to fetch offset for "
@@ -2176,10 +2177,15 @@ static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk,
21762177
rd_kafka_toppar_unlock(rktp);
21772178

21782179

2179-
/* Propagate error to application */
2180+
actions = rd_kafka_handle_OffsetFetch_err_action(
2181+
NULL, rko->rko_err, NULL);
2182+
/* Propagate error to application. Exclude
2183+
* permanent errors that caused a coordinator
2184+
* refresh like `NOT_COORDINATOR` */
21802185
if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
21812186
rko->rko_err !=
2182-
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
2187+
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT &&
2188+
!(actions & RD_KAFKA_ERR_ACTION_REFRESH))
21832189
rd_kafka_consumer_err(
21842190
rktp->rktp_fetchq, RD_KAFKA_NODEID_UA,
21852191
rko->rko_err, 0, NULL, rktp,

src/rdkafka_request.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,11 @@ void rd_kafka_OffsetForLeaderEpochRequest(
11631163
}
11641164

11651165

1166+
int rd_kafka_handle_OffsetFetch_err_action(rd_kafka_broker_t *rkb,
1167+
rd_kafka_resp_err_t err,
1168+
rd_kafka_buf_t *request) {
1169+
return rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END);
1170+
}
11661171

11671172
/**
11681173
* Generic handler for OffsetFetch responses.
@@ -1362,8 +1367,7 @@ rd_kafka_handle_OffsetFetch(rd_kafka_t *rk,
13621367
seen_cnt, (*offsets)->cnt, retry_unstable,
13631368
rd_kafka_err2str(err));
13641369

1365-
actions =
1366-
rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END);
1370+
actions = rd_kafka_handle_OffsetFetch_err_action(rkb, err, request);
13671371

13681372
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
13691373
/* Re-query for coordinator */

src/rdkafka_request.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ void rd_kafka_OffsetForLeaderEpochRequest(
245245
rd_kafka_resp_cb_t *resp_cb,
246246
void *opaque);
247247

248+
int rd_kafka_handle_OffsetFetch_err_action(rd_kafka_broker_t *rkb,
249+
rd_kafka_resp_err_t err,
250+
rd_kafka_buf_t *request);
248251

249252
rd_kafka_resp_err_t
250253
rd_kafka_handle_OffsetFetch(rd_kafka_t *rk,

tests/0085-headers.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
static RdKafka::Producer *producer;
3535
static RdKafka::KafkaConsumer *consumer;
3636
static std::string topic;
37-
static bool first_produce = true;
3837

3938
static void assert_all_headers_match(RdKafka::Headers *actual,
4039
const RdKafka::Headers *expected) {
@@ -88,10 +87,6 @@ static void test_headers(RdKafka::Headers *produce_headers,
8887
Test::Fail("produce() failed: " + RdKafka::err2str(err));
8988

9089
producer->flush(tmout_multip(10 * 1000));
91-
if (first_produce) {
92-
test_wait_topic_exists(producer->c_ptr(), topic.c_str(), 5000);
93-
first_produce = false;
94-
}
9590

9691
if (producer->outq_len() > 0)
9792
Test::Fail(tostr() << "Expected producer to be flushed, "
@@ -363,6 +358,8 @@ int main_0085_headers(int argc, char **argv) {
363358

364359
delete conf;
365360

361+
Test::create_topic_wait_exists(p, topic.c_str(), 1, -1, 5000);
362+
366363
std::vector<RdKafka::TopicPartition *> parts;
367364
parts.push_back(RdKafka::TopicPartition::create(
368365
topic, 0, RdKafka::Topic::OFFSET_BEGINNING));
@@ -385,6 +382,7 @@ int main_0085_headers(int argc, char **argv) {
385382
test_failed_produce();
386383
test_assignment_op();
387384

385+
Test::delete_topic(p, topic.c_str());
388386
c->close();
389387
delete c;
390388
delete p;

0 commit comments

Comments
 (0)