Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,9 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset + 1;
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset)))
/* Only update position for messages that are not EOF */
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset) &&
(rkmessages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)))
rd_kafka_update_app_pos(
rk, rktp,
RD_KAFKA_FETCH_POS(
Expand Down
129 changes: 129 additions & 0 deletions tests/0137-barrier_batch_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,141 @@ static void do_test_consume_batch_control_msgs(void) {
}


/**
* @brief Test that rd_kafka_consume_batch_queue correctly updates consumer
* position when EOF messages are received with enable.partition.eof=true.
*
* This is a regression test for the bug where EOF messages incorrectly
* advanced the consumer position by 2 instead of 1 (last_offset + 2 instead
* of last_offset + 1).
*/
static void do_test_consume_batch_eof_position(void) {
const char *topic;
rd_kafka_t *consumer;
rd_kafka_conf_t *conf;
rd_kafka_queue_t *rkq;
uint64_t testid;
const int partition_cnt = 1;
const int partition = 0;
const int produce_msg_cnt = 5;
const int consume_msg_cnt = 10;
const int timeout_ms = 5000;
const int session_timeout_s = 60;
const int replication_factor = -1;
const int topic_creation_timeout_ms = 5000;
rd_kafka_message_t **rkmessages;
int msg_cnt, i;
int64_t last_real_offset = -1;
int64_t eof_offset = -1;
int64_t position_after_eof;
rd_kafka_topic_partition_list_t *positions;
rd_kafka_resp_err_t err;
int eof_received = 0;

SUB_TEST("Testing EOF position with consume_batch_queue");

/* Create consumer configuration with enable.partition.eof=true */
test_conf_init(&conf, NULL, session_timeout_s);
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "enable.partition.eof", "true");

testid = test_id_generate();

topic = test_mk_topic_name("0137-barrier_batch_consume", 1);

/* Create topic */
test_create_topic_wait_exists(NULL, topic, partition_cnt,
replication_factor,
topic_creation_timeout_ms);
test_produce_msgs_easy(topic, testid, partition, produce_msg_cnt);

consumer = test_create_consumer(topic, NULL, conf, NULL);
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);

/* Consume messages in batches until we get EOF */
rkmessages = malloc(consume_msg_cnt * sizeof(*rkmessages));

while (!eof_received) {
msg_cnt = (int)rd_kafka_consume_batch_queue(
rkq, timeout_ms, rkmessages, consume_msg_cnt);

TEST_ASSERT(msg_cnt >= 0, "consume_batch_queue failed");

if (msg_cnt == 0) {
TEST_WARN("No messages received, retrying...");
continue;
}

/* Check if EOF messages are received */
for (i = 0; i < msg_cnt; i++) {
rd_kafka_message_t *rkm = rkmessages[i];

if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
eof_received = 1;
eof_offset = rkm->offset;
TEST_SAY("Received EOF at offset %" PRId64 "\n",
eof_offset);
} else if (!rkm->err) {
last_real_offset = rkm->offset;
}
}

/* Destroy messages */
for (i = 0; i < msg_cnt; i++)
rd_kafka_message_destroy(rkmessages[i]);
}

rd_free(rkmessages);

/* Test that the last real message offset is the expected value */
TEST_ASSERT(last_real_offset == produce_msg_cnt - 1,
"Expected last message offset %" PRId64 ", got %" PRId64,
(int64_t)(produce_msg_cnt - 1), last_real_offset);

/* Get consumer position after EOF */
positions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(positions, topic, partition);

err = rd_kafka_position(consumer, positions);
TEST_ASSERT(!err, "rd_kafka_position failed: %s", rd_kafka_err2str(err));

/* Extract the position value from the partition list */
position_after_eof = positions->elems[0].offset;

TEST_SAY(
"Last real message offset: %" PRId64 "\n"
"EOF offset: %" PRId64 "\n"
"Position after EOF: %" PRId64 "\n",
last_real_offset, eof_offset, position_after_eof);

TEST_ASSERT(
position_after_eof == last_real_offset + 1,
"Position after EOF should be %" PRId64 " (last_offset + 1), "
"but got %" PRId64 ". This indicates the EOF offset bug where "
"position incorrectly advances by +2 instead of +1",
last_real_offset + 1, position_after_eof);

rd_kafka_topic_partition_list_destroy(positions);
rd_kafka_queue_destroy(rkq);
test_consumer_close(consumer);
rd_kafka_destroy(consumer);

SUB_TEST_PASS();
}


int main_0137_barrier_batch_consume(int argc, char **argv) {
do_test_consume_batch_with_seek();
do_test_consume_batch_store_offset();
do_test_consume_batch_with_pause_and_resume_different_batch();
do_test_consume_batch_with_pause_and_resume_same_batch();
do_test_consume_batch_control_msgs();
do_test_consume_batch_eof_position();

return 0;
}