From 028460271ab5be717bbd89a2f096ec8e4a43f99d Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 29 Sep 2025 16:30:02 +0200 Subject: [PATCH 1/2] Reapply "[KIP-320] Validate assigned partitions before starting to consume from them (#4931)" (#5207) This reverts commit dc61e5bf628ec8dbc23a86af92778d9f4761768b. --- CHANGELOG.md | 14 ++ src/rdkafka_assignment.c | 61 ++++-- src/rdkafka_cgrp.c | 3 + src/rdkafka_mock.c | 9 +- src/rdkafka_mock_cgrp.c | 21 +- src/rdkafka_mock_handlers.c | 28 +-- src/rdkafka_mock_int.h | 4 +- src/rdkafka_offset.c | 2 + src/rdkafka_partition.c | 23 ++- src/rdkafka_partition.h | 3 + tests/0139-offset_validation_mock.c | 300 ++++++++++++++++++++++++++-- tests/test.c | 5 +- 12 files changed, 407 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75834f4dfa..0c9ae9eecf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ librdkafka v2.12.0 is a feature release: * Fix for KIP-1102 time based re-bootstrap condition (#5177). * Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672). * Fix for an error being raised after a commit due to an existing error in the topic partition (#4672). +* Additional KIP-320 related validation: when assigning the offset is validated + if leader epoch is specified (#4931). * Fix double free of headers in `rd_kafka_produceva` method (@blindspotbounty, #4628). * Fix to ensure `rd_kafka_query_watermark_offsets` enforces the specified timeout and does not continue beyond timeout expiry (#5201). @@ -38,6 +40,18 @@ librdkafka v2.12.0 is a feature release: Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on big-endian architectures (#5183, @paravoid). +### Consumer fixes + +* Issues: #5158. + Additional KIP-320 related validation: when assigning the offset is validated + if a leader epoch is specified. A committed offset could have been truncated + in case of unclean leader election and, when a different member starts + fetching from it on the new leader, it could get an offset out of range + and a subsequent offset reset. The assigned offset is now validated before + we start fetching from it and, in case it was truncated, fetching starts + from last available offset of given leader epoch. + Happens since 2.1.0 (#4931). + ### Producer fixes * Issues: #4627. diff --git a/src/rdkafka_assignment.c b/src/rdkafka_assignment.c index 6d1f01913f..ac909c3ab9 100644 --- a/src/rdkafka_assignment.c +++ b/src/rdkafka_assignment.c @@ -489,29 +489,54 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) { * * Start fetcher for partition and forward partition's * fetchq to consumer group's queue. */ + rd_kafka_fetch_pos_t pos = + rd_kafka_topic_partition_get_fetch_pos(rktpar); - rd_kafka_dbg(rk, CGRP, "SRVPEND", - "Starting pending assigned partition " - "%s [%" PRId32 "] at %s", - rktpar->topic, rktpar->partition, - rd_kafka_fetch_pos2str( - rd_kafka_topic_partition_get_fetch_pos( - rktpar))); - - /* Reset the (lib) pause flag which may have been set by - * the cgrp when scheduling the rebalance callback. */ + /* Reset the (lib) pause flag which may have + * been set by the cgrp when scheduling the + * rebalance callback. */ rd_kafka_toppar_op_pause_resume( rktp, rd_false /*resume*/, RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ); - /* Start the fetcher */ - rktp->rktp_started = rd_true; - rk->rk_consumer.assignment.started_cnt++; - - rd_kafka_toppar_op_fetch_start( - rktp, - rd_kafka_topic_partition_get_fetch_pos(rktpar), - rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ); + if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) && + pos.leader_epoch != -1) { + rd_kafka_dbg( + rk, CGRP, "SRVPEND", + "Validating assigned partition offset " + "%s [%" PRId32 "] at %s", + rktpar->topic, rktpar->partition, + rd_kafka_fetch_pos2str(pos)); + + rd_kafka_toppar_forward_internal( + rktp, rk->rk_consumer.q); + rd_kafka_toppar_lock(rktp); + rd_kafka_toppar_set_fetch_state( + rktp, + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); + rd_kafka_toppar_set_next_fetch_position(rktp, + pos); + rd_kafka_toppar_set_offset_validation_position( + rktp, pos); + rd_kafka_offset_validate(rktp, "offset fetch"); + rd_kafka_toppar_unlock(rktp); + + } else { + rd_kafka_dbg( + rk, CGRP, "SRVPEND", + "Starting pending assigned partition " + "%s [%" PRId32 "] at %s", + rktpar->topic, rktpar->partition, + rd_kafka_fetch_pos2str(pos)); + + /* Start the fetcher */ + rktp->rktp_started = rd_true; + rk->rk_consumer.assignment.started_cnt++; + + rd_kafka_toppar_op_fetch_start( + rktp, pos, rk->rk_consumer.q, + RD_KAFKA_NO_REPLYQ); + } } else if (can_query_offsets) { diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d6348ba52d..e0aa01bd1a 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -4430,6 +4430,9 @@ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) { "unassign", rkcg->rkcg_group_id->str); rd_kafka_cgrp_unassign(rkcg); + + /* Leave group, if desired. */ + rd_kafka_cgrp_leave_maybe(rkcg); return; } diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index c8ca39e839..732ae03aaa 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -548,7 +548,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, rd_kafka_mock_committed_offset_t * rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group, - int64_t offset, + rd_kafka_fetch_pos_t pos, const rd_kafkap_str_t *metadata) { rd_kafka_mock_committed_offset_t *coff; @@ -571,12 +571,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, coff->metadata = rd_kafkap_str_copy(metadata); - coff->offset = offset; + coff->pos = pos; rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK", - "Topic %s [%" PRId32 "] committing offset %" PRId64 + "Topic %s [%" PRId32 + "] committing offset %s" " for group %.*s", - mpart->topic->name, mpart->id, offset, + mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos), RD_KAFKAP_STR_PR(group)); return coff; diff --git a/src/rdkafka_mock_cgrp.c b/src/rdkafka_mock_cgrp.c index 0c75e003e5..9956b04020 100644 --- a/src/rdkafka_mock_cgrp.c +++ b/src/rdkafka_mock_cgrp.c @@ -252,6 +252,21 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_sync_set( return RD_KAFKA_RESP_ERR_NO_ERROR; } +/** + * @brief Member left the group + * either explicitly or due to a timeout. + * Trigger rebalance if there are still members left. + * If this was the last member, mark the group as empty. + */ +void rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe( + rd_kafka_mock_cgrp_classic_t *mcgrp, + const char *reason) { + if (mcgrp->member_cnt > 0) + rd_kafka_mock_cgrp_classic_rebalance(mcgrp, reason); + else + rd_kafka_mock_cgrp_classic_set_state( + mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY, reason); +} /** * @brief Member is explicitly leaving the group (through LeaveGroupRequest) @@ -265,7 +280,8 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_leave( rd_kafka_mock_cgrp_classic_member_destroy(mcgrp, member); - rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "explicit member leave"); + rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe( + mcgrp, "explicit member leave"); return RD_KAFKA_RESP_ERR_NO_ERROR; } @@ -633,7 +649,8 @@ static void rd_kafka_mock_cgrp_classic_session_tmr_cb(rd_kafka_timers_t *rkts, } if (timeout_cnt) - rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "member timeout"); + rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe( + mcgrp, "member timeout"); } diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecceb..a6a35f34f7 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -742,6 +742,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, int32_t MaxNumOffsets; rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; + int32_t LeaderEpoch = -1; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -807,7 +808,6 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) { /* Response: LeaderEpoch */ const rd_kafka_mock_msgset_t *mset = NULL; - int32_t leader_epoch = -1; rd_bool_t on_follower = rd_false; if (mpart) { @@ -818,12 +818,12 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, if (Offset >= 0 && (mset = rd_kafka_mock_msgset_find( mpart, Offset, on_follower))) { - leader_epoch = + LeaderEpoch = mset->leader_epoch; } } - rd_kafka_buf_write_i32(resp, leader_epoch); + rd_kafka_buf_write_i32(resp, LeaderEpoch); } /* Response: Partition tags */ @@ -835,7 +835,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, "offset %" PRId64 " (leader epoch %" PRId32 ") for %s: %s", RD_KAFKAP_STR_PR(&Topic), Partition, - Offset, mpart ? mpart->leader_epoch : -1, + Offset, LeaderEpoch, rd_kafka_offset2str(Timestamp), rd_kafka_err2str(err)); } @@ -930,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn, mpart, &GroupId); /* Response: CommittedOffset */ - rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1); + rd_kafka_buf_write_i64(resp, + coff ? coff->pos.offset : -1); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) { /* Response: CommittedLeaderEpoch */ rd_kafka_buf_write_i32( - resp, mpart ? mpart->leader_epoch : -1); + resp, coff ? coff->pos.leader_epoch : -1); } /* Response: Metadata */ @@ -952,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn, rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Topic %s [%" PRId32 "] returning " - "committed offset %" PRId64 + "committed offset %s" " for group %s", mtopic->name, mpart->id, - coff->offset, coff->group); + rd_kafka_fetch_pos2str(coff->pos), + coff->group); else rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Topic %.*s [%" PRId32 @@ -1109,6 +1111,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; int64_t CommittedOffset; + int32_t CommittedLeaderEpoch = -1; rd_kafkap_str_t Metadata; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -1126,7 +1129,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_i64(rkbuf, &CommittedOffset); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) { - int32_t CommittedLeaderEpoch; rd_kafka_buf_read_i32(rkbuf, &CommittedLeaderEpoch); @@ -1145,9 +1147,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_skip_tags(rkbuf); if (!err) - rd_kafka_mock_commit_offset(mpart, &GroupId, - CommittedOffset, - &Metadata); + rd_kafka_mock_commit_offset( + mpart, &GroupId, + RD_KAFKA_FETCH_POS(CommittedOffset, + CommittedLeaderEpoch), + &Metadata); /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 2ef7a2a339..929d3b545b 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -296,7 +296,7 @@ typedef struct rd_kafka_mock_committed_offset_s { /**< mpart.committed_offsets */ TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link; char *group; /**< Allocated along with the struct */ - int64_t offset; /**< Committed offset */ + rd_kafka_fetch_pos_t pos; /**< Committed position */ rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ } rd_kafka_mock_committed_offset_t; @@ -552,7 +552,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, rd_kafka_mock_committed_offset_t * rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group, - int64_t offset, + rd_kafka_fetch_pos_t pos, const rd_kafkap_str_t *metadata); const rd_kafka_mock_msgset_t * diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index cf21d60c55..3b3fd306ab 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -1056,6 +1056,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, end_offset, end_offset_leader_epoch); } else { + rd_kafka_toppar_set_fetch_state( + rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK); rd_kafka_toppar_unlock(rktp); /* Seek to the updated end offset */ diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index ce4f01b467..6e51fe70f9 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -38,10 +38,10 @@ #include "rdunittest.h" -const char *rd_kafka_fetch_states[] = {"none", "stopping", - "stopped", "offset-query", - "offset-wait", "validate-epoch-wait", - "active"}; +const char *rd_kafka_fetch_states[] = {"none", "stopping", + "stopped", "offset-query", + "offset-wait", "validate-epoch-wait", + "validate-seek", "active"}; static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk, @@ -2272,7 +2272,14 @@ static void rd_kafka_toppar_op(rd_kafka_toppar_t *rktp, rd_kafka_toppar_op0(rktp, rko, replyq); } - +void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp, + rd_kafka_q_t *fwdq) { + rd_kafka_q_lock(rktp->rktp_fetchq); + if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP)) + rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */ + 0 /* no fwd_app */); + rd_kafka_q_unlock(rktp->rktp_fetchq); +} /** * Start consuming partition (async operation). @@ -2289,11 +2296,7 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp, rd_kafka_replyq_t replyq) { int32_t version; - rd_kafka_q_lock(rktp->rktp_fetchq); - if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP)) - rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */ - 0 /* no fwd_app */); - rd_kafka_q_unlock(rktp->rktp_fetchq); + rd_kafka_toppar_forward_internal(rktp, fwdq); /* Bump version barrier. */ version = rd_kafka_toppar_version_new_barrier(rktp); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 97a704f03b..d8020ded48 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -299,6 +299,7 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */ RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY, RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT, + RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK, RD_KAFKA_TOPPAR_FETCH_ACTIVE, } rktp_fetch_state; /* Broker thread's state */ @@ -619,6 +620,8 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp, void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp, rd_kafka_broker_t *rkb); +void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp, + rd_kafka_q_t *fwdq); rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t pos, diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index 1253d1d51e..942dd1d6d7 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -353,8 +353,8 @@ static void do_test_store_offset_without_leader_epoch(void) { leader_epoch = rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]); - /* OffsetFetch returns the leader epoch even if not set. */ - TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1, + /* OffsetFetch returns the leader epoch used when committing. */ + TEST_ASSERT(leader_epoch == -1, "expected %d, got %" PRId32, -1, leader_epoch); rd_kafka_topic_partition_list_destroy(rktpars); @@ -375,8 +375,8 @@ static void do_test_store_offset_without_leader_epoch(void) { 5, rktpars->elems[0].offset); leader_epoch = rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]); - /* OffsetFetch returns the leader epoch even if not set. */ - TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1, + /* OffsetFetch returns the leader epoch used when committing. */ + TEST_ASSERT(leader_epoch == -1, "expected %d, got %" PRId32, -1, leader_epoch); rd_kafka_topic_partition_list_destroy(rktpars); @@ -785,6 +785,7 @@ static void do_test_list_offsets_leader_change_rebalance_cb( int64_t low, high; rd_kafka_resp_err_t list_offsets_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_topic_partition_list_t *rktpars_list_offsets; TEST_ASSERT(partitions->cnt == 1, "Expected 1 assigned partition, got %d", @@ -794,23 +795,29 @@ static void do_test_list_offsets_leader_change_rebalance_cb( rd_kafka_mock_partition_set_leader(test->mcluster, test->topic, 0, 2); + /* Set a wrong leader epoch that should not be used + * for listing offsets. */ + rktpars_list_offsets = + rd_kafka_topic_partition_list_copy(partitions); + rktpars_list_offsets->elems[0].offset = 1; + rd_kafka_topic_partition_set_leader_epoch( + &rktpars_list_offsets->elems[0], 1234); do { - /* Set a wrong leader epoch that should not be used - * for listing offsets. */ - rd_kafka_topic_partition_set_leader_epoch( - &partitions->elems[0], 1234); + if (test->variation == 0) { - partitions->elems[0].offset = 1; list_offsets_err = rd_kafka_offsets_for_times( - rk, partitions, 1000); + rk, rktpars_list_offsets, 1000); } else { list_offsets_err = rd_kafka_query_watermark_offsets( - rk, partitions->elems[0].topic, - partitions->elems[0].partition, &low, - &high, 1000); + rk, + rktpars_list_offsets->elems[0].topic, + rktpars_list_offsets->elems[0] + .partition, + &low, &high, 1000); } + retries++; if (retries == 1) { TEST_ASSERT( @@ -821,7 +828,7 @@ static void do_test_list_offsets_leader_change_rebalance_cb( } if (retries > 2) TEST_FAIL( - "Offsets for times failed %d times " + "Failed %d times " " during the rebalance callback", retries); } while (list_offsets_err != RD_KAFKA_RESP_ERR_NO_ERROR); @@ -836,11 +843,13 @@ static void do_test_list_offsets_leader_change_rebalance_cb( /* Mock handler currently returns * RD_KAFKA_OFFSET_SPEC_LATEST * in the offsets for times case */ - TEST_ASSERT(partitions->elems[0].offset == + TEST_ASSERT(rktpars_list_offsets->elems[0].offset == RD_KAFKA_OFFSET_SPEC_LATEST, "Expected offset for times LATEST," " got %" PRId64, - partitions->elems[0].offset); + rktpars_list_offsets->elems[0].offset); + partitions->elems[0].offset = + RD_KAFKA_OFFSET_SPEC_LATEST; } else { TEST_ASSERT(0 == low, "Expected low offset 0" @@ -852,6 +861,7 @@ static void do_test_list_offsets_leader_change_rebalance_cb( high); partitions->elems[0].offset = high; } + rd_kafka_topic_partition_list_destroy(rktpars_list_offsets); test_consumer_assign_by_rebalance_protocol("rebalance", rk, partitions); @@ -924,8 +934,262 @@ static void do_test_list_offsets_leader_change(int variation) { SUB_TEST_PASS(); } -int main_0139_offset_validation_mock(int argc, char **argv) { +typedef enum offset_validation_on_partition_assignment_assign_variation_t { + /** Use subscribe. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_SUBSCRIBE, + /** Use subscribe with a rebalance callback. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_SUBSCRIBE_REBALANCE_CALLBACK, + /** Use assign. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_ASSIGN, + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION__CNT +} offset_validation_on_partition_assignment_assign_variation_t; + +typedef enum offset_validation_on_partition_assignment_commit_variation_t { + /** Commit with leader epoch. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_LEADER_EPOCH, + /** Commit without a leader epoch, no validation expected. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_NO_LEADER_EPOCH, + /** Offset store and commit with leader epoch. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_LEADER_EPOCH, + /** Offset store and commit without a leader epoch, no validation + expected. */ + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_NO_LEADER_EPOCH, + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION__CNT +} offset_validation_on_partition_assignment_commit_variation_t; + +static const char + *offset_validation_on_partition_assignment_assign_variation_names[] = { + "subscribe", + "subscribe with rebalance callback", + "assign", +}; + +static const char + *offset_validation_on_partition_assignment_commit_variation_names[] = { + "leader epoch", + "no leader epoch", + "offset store with leader epoch", + "offset store with no leader epoch", +}; + +/** + * @brief Test that a committed offset is validated before starting to + * fetch messages from an assigned partition, if leader epoch is set + * in the assignment. + * If it's not set, no validation is performed and + * there's an offset out of range error and a reset to earliest. + * Assignment and commit are done in different ways. + * + * @param assign_variation The assign variation to test. + * @param incremental If true, use cooperative incremental assignment. + * @param commit_variation The commit variation to test. + * + * @sa `offset_validation_on_partition_assignment_assign_variation_t` + * @sa `offset_validation_on_partition_assignment_commit_variation_t` + */ +static void do_test_offset_validation_on_partition_assignment( + offset_validation_on_partition_assignment_assign_variation_t + assign_variation, + rd_bool_t incremental, + offset_validation_on_partition_assignment_commit_variation_t + commit_variation) { + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + const char *bootstraps; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + int msg_count = 5, leader = 2; + uint64_t testid = test_id_generate(); + size_t matching_requests; + rd_kafka_topic_partition_list_t *to_commit, *to_assign = NULL; + rd_bool_t use_leader_epoch = + commit_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_LEADER_EPOCH || + commit_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_LEADER_EPOCH; + + rd_bool_t use_store_offsets = + commit_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_LEADER_EPOCH || + commit_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_NO_LEADER_EPOCH; + + + /* No validations when leader epoch is -1 (default) */ + size_t expected_validation_requests = use_leader_epoch ? 1 : 0; + /* Without validation there's an offset out of range and a reset + * to earliest. */ + int expected_msg_cnt = use_leader_epoch ? 0 : msg_count; + + SUB_TEST_QUICK( + "assign variation: %s %s, commit variation: %s", + offset_validation_on_partition_assignment_assign_variation_names + [assign_variation], + incremental ? "incremental" : "eager", + offset_validation_on_partition_assignment_commit_variation_names + [commit_variation]); + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + rd_kafka_mock_group_initial_rebalance_delay_ms(mcluster, 1); + + TEST_SAY("Producing messages\n"); + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_count, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + TEST_SAY("Consuming messages\n"); + test_conf_init(&conf, NULL, 30); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "enable.auto.offset.store", "false"); + test_conf_set(conf, "enable.partition.eof", "true"); + if (incremental) + test_conf_set(conf, "partition.assignment.strategy", + "cooperative-sticky"); + + c1 = test_create_consumer(c1_groupid, NULL, rd_kafka_conf_dup(conf), + NULL); + test_consumer_subscribe(c1, topic); + + /* `msg_count` messages and an EOF because of reset to earliest */ + test_consumer_poll("MSG_ALL", c1, testid, 1, 0, msg_count, NULL); + + TEST_SAY("Committing %s leader epochs\n", + use_leader_epoch ? "with" : "without"); + /* Simulate committing a truncated offset with leader + * epoch 0 or -1. */ + to_commit = test_topic_partitions(1, topic, 0); + to_commit->elems[0].offset = 5 + 2; + if (use_leader_epoch) + rd_kafka_topic_partition_set_leader_epoch(&to_commit->elems[0], + 0); + if (assign_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_ASSIGN) + to_assign = rd_kafka_topic_partition_list_copy(to_commit); + + if (use_store_offsets) { + TEST_CALL_ERR__(rd_kafka_offsets_store(c1, to_commit)); + TEST_CALL_ERR__(rd_kafka_commit(c1, NULL, rd_false)); + } else { + TEST_CALL_ERR__(rd_kafka_commit(c1, to_commit, rd_false)); + } + rd_kafka_topic_partition_list_destroy(to_commit); + rd_kafka_destroy(c1); + + TEST_SAY("Partition leader change\n"); + /* Leader changes to 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + rd_kafka_mock_start_request_tracking(mcluster); + + if (assign_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_SUBSCRIBE_REBALANCE_CALLBACK) + rd_kafka_conf_set_rebalance_cb(conf, test_rebalance_cb); + + TEST_SAY("New consumer\n"); + /* Destroy conf this time */ + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + + if (assign_variation != + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_ASSIGN) + test_consumer_subscribe(c1, topic); + else { + /* Let's imagine the truncated offset was committed to + * an external DB, here it's assigned again and must + * be validated when a leader epoch is present. */ + test_consumer_assign("assign", c1, to_assign); + rd_kafka_topic_partition_list_destroy(to_assign); + } + + + + /* One EOF only if it starts from the last available offset after + * validation, otherwise there's a reprocessing. */ + test_consumer_poll("MSG_EOF", c1, testid, 1, 0, expected_msg_cnt, NULL); + + TEST_SAY("Pause and resume\n"); + /* Pause the consumer */ + test_consumer_pause_resume_partition(c1, topic, 0, rd_true); + rd_usleep(1 * 1000, NULL); + /* Resume the consumer, it should not validate the offset + * once more. */ + test_consumer_pause_resume_partition(c1, topic, 0, rd_false); + + TEST_SAY("Await %" PRIusz " OffsetForLeaderEpoch requests\n", + expected_validation_requests); + /* Ensure offset was validated or that it wasn't. */ + matching_requests = test_mock_wait_matching_requests( + mcluster, expected_validation_requests, 1000, + is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == expected_validation_requests, + "Expected %" PRIusz + " OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + expected_validation_requests, matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + +static void do_test_offset_validation_on_partition_assignment_variations(void) { + offset_validation_on_partition_assignment_assign_variation_t + assign_variation, + max_assign_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION__CNT; + offset_validation_on_partition_assignment_commit_variation_t + commit_variation, + max_commit_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION__CNT; + + if (test_quick) { + max_assign_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_SUBSCRIBE_REBALANCE_CALLBACK; + max_commit_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_STORE_LEADER_EPOCH; + } + + for ( + assign_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_SUBSCRIBE; + assign_variation < max_assign_variation; assign_variation++) { + rd_bool_t incremental; + for (incremental = rd_false; incremental <= rd_true; + incremental++) { + + for ( + commit_variation = + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_COMMIT_VARIATION_LEADER_EPOCH; + commit_variation < max_commit_variation; + commit_variation++) { + if (assign_variation == + OFFSET_VALIDATION_ON_PARTITION_ASSIGNMENT_ASSIGN_VARIATION_ASSIGN && + incremental) { + /* To call `incremental_assign` you need + * to subscribe first */ + continue; + } + + do_test_offset_validation_on_partition_assignment( + assign_variation, incremental, + commit_variation); + } + } + } +} + +int main_0139_offset_validation_mock(int argc, char **argv) { TEST_SKIP_MOCK_CLUSTER(0); do_test_no_duplicates_during_offset_validation(); @@ -946,5 +1210,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_list_offsets_leader_change(0); do_test_list_offsets_leader_change(1); + do_test_offset_validation_on_partition_assignment_variations(); + return 0; } diff --git a/tests/test.c b/tests/test.c index 86205dd5de..1b3ac07d99 100644 --- a/tests/test.c +++ b/tests/test.c @@ -523,7 +523,7 @@ struct test tests[] = { _TEST(0136_resolve_cb, TEST_F_LOCAL), _TEST(0137_barrier_batch_consume, 0), _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0139_offset_validation_mock, 0), + _TEST(0139_offset_validation_mock, TEST_F_LOCAL), _TEST(0140_commit_metadata, 0), _TEST(0142_reauthentication, 0, TEST_BRKVER(2, 2, 0, 0)), _TEST(0143_exponential_backoff_mock, TEST_F_LOCAL), @@ -7195,6 +7195,9 @@ rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms) { char errstr[256]; int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000); + if (test_flags & TEST_F_LOCAL) + return RD_KAFKA_RESP_ERR_NO_ERROR; /* No topics to delete */ + rk = test_create_producer(); err = test_get_all_test_topics(rk, &topics, &topic_cnt); From 007379062433ac8c6e860d0afc3d65f6f4aea275 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 29 Sep 2025 16:32:59 +0200 Subject: [PATCH 2/2] Second version starting offset validation when partition already joined the consumer group --- src/rdkafka.c | 4 +++ src/rdkafka_assignment.c | 61 ++++++++++++---------------------------- src/rdkafka_offset.c | 2 ++ src/rdkafka_partition.c | 25 ++++++++++------ src/rdkafka_partition.h | 2 -- 5 files changed, 40 insertions(+), 54 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index c6f89ad469..7f65960cae 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2984,6 +2984,10 @@ static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt, return -1; } + /* + * We cannot validate this offset as the legacy API doesn't + * provide the leader epoch. + */ rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1), rkq, RD_KAFKA_NO_REPLYQ); diff --git a/src/rdkafka_assignment.c b/src/rdkafka_assignment.c index ac909c3ab9..6d1f01913f 100644 --- a/src/rdkafka_assignment.c +++ b/src/rdkafka_assignment.c @@ -489,54 +489,29 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) { * * Start fetcher for partition and forward partition's * fetchq to consumer group's queue. */ - rd_kafka_fetch_pos_t pos = - rd_kafka_topic_partition_get_fetch_pos(rktpar); - /* Reset the (lib) pause flag which may have - * been set by the cgrp when scheduling the - * rebalance callback. */ + rd_kafka_dbg(rk, CGRP, "SRVPEND", + "Starting pending assigned partition " + "%s [%" PRId32 "] at %s", + rktpar->topic, rktpar->partition, + rd_kafka_fetch_pos2str( + rd_kafka_topic_partition_get_fetch_pos( + rktpar))); + + /* Reset the (lib) pause flag which may have been set by + * the cgrp when scheduling the rebalance callback. */ rd_kafka_toppar_op_pause_resume( rktp, rd_false /*resume*/, RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ); - if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) && - pos.leader_epoch != -1) { - rd_kafka_dbg( - rk, CGRP, "SRVPEND", - "Validating assigned partition offset " - "%s [%" PRId32 "] at %s", - rktpar->topic, rktpar->partition, - rd_kafka_fetch_pos2str(pos)); - - rd_kafka_toppar_forward_internal( - rktp, rk->rk_consumer.q); - rd_kafka_toppar_lock(rktp); - rd_kafka_toppar_set_fetch_state( - rktp, - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); - rd_kafka_toppar_set_next_fetch_position(rktp, - pos); - rd_kafka_toppar_set_offset_validation_position( - rktp, pos); - rd_kafka_offset_validate(rktp, "offset fetch"); - rd_kafka_toppar_unlock(rktp); - - } else { - rd_kafka_dbg( - rk, CGRP, "SRVPEND", - "Starting pending assigned partition " - "%s [%" PRId32 "] at %s", - rktpar->topic, rktpar->partition, - rd_kafka_fetch_pos2str(pos)); - - /* Start the fetcher */ - rktp->rktp_started = rd_true; - rk->rk_consumer.assignment.started_cnt++; - - rd_kafka_toppar_op_fetch_start( - rktp, pos, rk->rk_consumer.q, - RD_KAFKA_NO_REPLYQ); - } + /* Start the fetcher */ + rktp->rktp_started = rd_true; + rk->rk_consumer.assignment.started_cnt++; + + rd_kafka_toppar_op_fetch_start( + rktp, + rd_kafka_topic_partition_get_fetch_pos(rktpar), + rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ); } else if (can_query_offsets) { diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 3b3fd306ab..395d47d3e4 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -1056,6 +1056,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, end_offset, end_offset_leader_epoch); } else { + /* It ensures a validation isn't started again + * until the seek completes. */ rd_kafka_toppar_set_fetch_state( rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK); rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 6e51fe70f9..7693d2ebf7 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1748,6 +1748,16 @@ static void rd_kafka_toppar_fetch_start(rd_kafka_toppar_t *rktp, "no previously committed offset " "available"); + } else if (pos.leader_epoch >= -1) { + rd_kafka_toppar_set_next_fetch_position(rktp, pos); + rd_kafka_toppar_set_offset_validation_position(rktp, pos); + /* This is necessary as it only starts + * validating from states ACTIVE or + * VALIDATE_EPOCH_WAIT. */ + rd_kafka_toppar_set_fetch_state( + rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); + rd_kafka_offset_validate(rktp, "validate fetch start position"); + } else { rd_kafka_toppar_set_next_fetch_position(rktp, pos); @@ -2272,14 +2282,7 @@ static void rd_kafka_toppar_op(rd_kafka_toppar_t *rktp, rd_kafka_toppar_op0(rktp, rko, replyq); } -void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp, - rd_kafka_q_t *fwdq) { - rd_kafka_q_lock(rktp->rktp_fetchq); - if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP)) - rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */ - 0 /* no fwd_app */); - rd_kafka_q_unlock(rktp->rktp_fetchq); -} + /** * Start consuming partition (async operation). @@ -2296,7 +2299,11 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp, rd_kafka_replyq_t replyq) { int32_t version; - rd_kafka_toppar_forward_internal(rktp, fwdq); + rd_kafka_q_lock(rktp->rktp_fetchq); + if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP)) + rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */ + 0 /* no fwd_app */); + rd_kafka_q_unlock(rktp->rktp_fetchq); /* Bump version barrier. */ version = rd_kafka_toppar_version_new_barrier(rktp); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index d8020ded48..7340de2e83 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -620,8 +620,6 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp, void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp, rd_kafka_broker_t *rkb); -void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp, - rd_kafka_q_t *fwdq); rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t pos,