diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 92eddccae..808a86dfd 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -829,7 +829,9 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rko = (rd_kafka_op_t *)rkmessages[i]->_private; rd_kafka_toppar_t *rktp = rko->rko_rktp; int64_t offset = rkmessages[i]->offset + 1; - if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset))) + /* Only update position for messages that are not EOF */ + if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset) && + (rkmessages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF))) rd_kafka_update_app_pos( rk, rktp, RD_KAFKA_FETCH_POS( diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c index 19bec387d..20c51d200 100644 --- a/tests/0137-barrier_batch_consume.c +++ b/tests/0137-barrier_batch_consume.c @@ -608,12 +608,141 @@ static void do_test_consume_batch_control_msgs(void) { } +/** + * @brief Test that rd_kafka_consume_batch_queue correctly updates consumer + * position when EOF messages are received with enable.partition.eof=true. + * + * This is a regression test for the bug where EOF messages incorrectly + * advanced the consumer position by 2 instead of 1 (last_offset + 2 instead + * of last_offset + 1). + */ +static void do_test_consume_batch_eof_position(void) { + const char *topic; + rd_kafka_t *consumer; + rd_kafka_conf_t *conf; + rd_kafka_queue_t *rkq; + uint64_t testid; + const int partition_cnt = 1; + const int partition = 0; + const int produce_msg_cnt = 5; + const int consume_msg_cnt = 10; + const int timeout_ms = 5000; + const int session_timeout_s = 60; + const int replication_factor = -1; + const int topic_creation_timeout_ms = 5000; + rd_kafka_message_t **rkmessages; + int msg_cnt, i; + int64_t last_real_offset = -1; + int64_t eof_offset = -1; + int64_t position_after_eof; + rd_kafka_topic_partition_list_t *positions; + rd_kafka_resp_err_t err; + int eof_received = 0; + + SUB_TEST("Testing EOF position with consume_batch_queue"); + + /* Create consumer configuration with enable.partition.eof=true */ + test_conf_init(&conf, NULL, session_timeout_s); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.partition.eof", "true"); + + testid = test_id_generate(); + + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + /* Create topic */ + test_create_topic_wait_exists(NULL, topic, partition_cnt, + replication_factor, + topic_creation_timeout_ms); + test_produce_msgs_easy(topic, testid, partition, produce_msg_cnt); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + /* Consume messages in batches until we get EOF */ + rkmessages = malloc(consume_msg_cnt * sizeof(*rkmessages)); + + while (!eof_received) { + msg_cnt = (int)rd_kafka_consume_batch_queue( + rkq, timeout_ms, rkmessages, consume_msg_cnt); + + TEST_ASSERT(msg_cnt >= 0, "consume_batch_queue failed"); + + if (msg_cnt == 0) { + TEST_WARN("No messages received, retrying..."); + continue; + } + + /* Check if EOF messages are received */ + for (i = 0; i < msg_cnt; i++) { + rd_kafka_message_t *rkm = rkmessages[i]; + + if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + eof_received = 1; + eof_offset = rkm->offset; + TEST_SAY("Received EOF at offset %" PRId64 "\n", + eof_offset); + } else if (!rkm->err) { + last_real_offset = rkm->offset; + } + } + + /* Destroy messages */ + for (i = 0; i < msg_cnt; i++) + rd_kafka_message_destroy(rkmessages[i]); + } + + rd_free(rkmessages); + + /* Test that the last real message offset is the expected value */ + TEST_ASSERT(last_real_offset == produce_msg_cnt - 1, + "Expected last message offset %" PRId64 ", got %" PRId64, + (int64_t)(produce_msg_cnt - 1), last_real_offset); + + /* Get consumer position after EOF */ + positions = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(positions, topic, partition); + + err = rd_kafka_position(consumer, positions); + TEST_ASSERT(!err, "rd_kafka_position failed: %s", rd_kafka_err2str(err)); + + /* Extract the position value from the partition list */ + position_after_eof = positions->elems[0].offset; + + TEST_SAY( + "Last real message offset: %" PRId64 "\n" + "EOF offset: %" PRId64 "\n" + "Position after EOF: %" PRId64 "\n", + last_real_offset, eof_offset, position_after_eof); + + TEST_ASSERT( + position_after_eof == last_real_offset + 1, + "Position after EOF should be %" PRId64 " (last_offset + 1), " + "but got %" PRId64 ". This indicates the EOF offset bug where " + "position incorrectly advances by +2 instead of +1", + last_real_offset + 1, position_after_eof); + + rd_kafka_topic_partition_list_destroy(positions); + rd_kafka_queue_destroy(rkq); + test_consumer_close(consumer); + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + int main_0137_barrier_batch_consume(int argc, char **argv) { do_test_consume_batch_with_seek(); do_test_consume_batch_store_offset(); do_test_consume_batch_with_pause_and_resume_different_batch(); do_test_consume_batch_with_pause_and_resume_same_batch(); do_test_consume_batch_control_msgs(); + do_test_consume_batch_eof_position(); return 0; }