Skip to content

Commit 13a2bba

Browse files
authored
[KIP-320] Validate assigned partitions before starting to consume from them (#4931)
* Fetched committed offsets should be validated before starting to consume from it. Failing test and mock handler implementation for returning the committed offset leader epoch instead of current leader epoch. * Validate the offsets before starting to fetch assigned partitions * Add more test cases for partition assignment offset validation * Fix for test 0139 subtest `do_test_store_offset_without_leader_epoch` . When fetching an offset it returns the leader epoch used when committing, not the current leader epoch. Given the mock cluster fix the test needs to be changed. * Fix test `0139` subtest `do_test_list_offsets_leader_change`: use cloned partition list for listing offsets, to avoid the fake leader epoch is then used for validation when assigning. Fix ListOffsets mock handler for logging the correct returned leader epoch. * Changelog entry * Reduce number of tests in quick mode * Add a new fetch state when finishing validating and starting to seek after a truncation, to avoid a second repeated validation and possibly duplicated messages. * Increase single test timeout * Fix to leave the group in `rd_kafka_cgrp_incr_unassign_done` if terminate was requested, as done in `rd_kafka_cgrp_unassign_done` and `rd_kafka_cgrp_consumer_incr_unassign_done` * Mock cluster, set the group as empty when last member leaves instead of triggering a rebalance * Test 0139 with mock cluster marked as local. Doesn't delete topic if tests are local only as it's possible there's no cluster to connect to and it speeds up completing the test * Resume the partition before fetch start or before validation
1 parent c85c9ea commit 13a2bba

12 files changed

+407
-66
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ librdkafka v2.12.0 is a feature release:
77
* Fix for KIP-1102 time based re-bootstrap condition (#5177).
88
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
99
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
10+
* Additional KIP-320 related validation: when assigning the offset is validated
11+
if leader epoch is specified (#4931).
1012

1113

1214
## Fixes
@@ -31,6 +33,18 @@ librdkafka v2.12.0 is a feature release:
3133
Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on
3234
big-endian architectures (#5183, @paravoid).
3335

36+
### Consumer fixes
37+
38+
* Issues: #5158.
39+
Additional KIP-320 related validation: when assigning the offset is validated
40+
if a leader epoch is specified. A committed offset could have been truncated
41+
in case of unclean leader election and, when a different member starts
42+
fetching from it on the new leader, it could get an offset out of range
43+
and a subsequent offset reset. The assigned offset is now validated before
44+
we start fetching from it and, in case it was truncated, fetching starts
45+
from last available offset of given leader epoch.
46+
Happens since 2.1.0 (#4931).
47+
3448

3549

3650
# librdkafka v2.11.1

src/rdkafka_assignment.c

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -489,29 +489,54 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
489489
*
490490
* Start fetcher for partition and forward partition's
491491
* fetchq to consumer group's queue. */
492+
rd_kafka_fetch_pos_t pos =
493+
rd_kafka_topic_partition_get_fetch_pos(rktpar);
492494

493-
rd_kafka_dbg(rk, CGRP, "SRVPEND",
494-
"Starting pending assigned partition "
495-
"%s [%" PRId32 "] at %s",
496-
rktpar->topic, rktpar->partition,
497-
rd_kafka_fetch_pos2str(
498-
rd_kafka_topic_partition_get_fetch_pos(
499-
rktpar)));
500-
501-
/* Reset the (lib) pause flag which may have been set by
502-
* the cgrp when scheduling the rebalance callback. */
495+
/* Reset the (lib) pause flag which may have
496+
* been set by the cgrp when scheduling the
497+
* rebalance callback. */
503498
rd_kafka_toppar_op_pause_resume(
504499
rktp, rd_false /*resume*/,
505500
RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
506501

507-
/* Start the fetcher */
508-
rktp->rktp_started = rd_true;
509-
rk->rk_consumer.assignment.started_cnt++;
510-
511-
rd_kafka_toppar_op_fetch_start(
512-
rktp,
513-
rd_kafka_topic_partition_get_fetch_pos(rktpar),
514-
rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);
502+
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
503+
pos.leader_epoch != -1) {
504+
rd_kafka_dbg(
505+
rk, CGRP, "SRVPEND",
506+
"Validating assigned partition offset "
507+
"%s [%" PRId32 "] at %s",
508+
rktpar->topic, rktpar->partition,
509+
rd_kafka_fetch_pos2str(pos));
510+
511+
rd_kafka_toppar_forward_internal(
512+
rktp, rk->rk_consumer.q);
513+
rd_kafka_toppar_lock(rktp);
514+
rd_kafka_toppar_set_fetch_state(
515+
rktp,
516+
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
517+
rd_kafka_toppar_set_next_fetch_position(rktp,
518+
pos);
519+
rd_kafka_toppar_set_offset_validation_position(
520+
rktp, pos);
521+
rd_kafka_offset_validate(rktp, "offset fetch");
522+
rd_kafka_toppar_unlock(rktp);
523+
524+
} else {
525+
rd_kafka_dbg(
526+
rk, CGRP, "SRVPEND",
527+
"Starting pending assigned partition "
528+
"%s [%" PRId32 "] at %s",
529+
rktpar->topic, rktpar->partition,
530+
rd_kafka_fetch_pos2str(pos));
531+
532+
/* Start the fetcher */
533+
rktp->rktp_started = rd_true;
534+
rk->rk_consumer.assignment.started_cnt++;
535+
536+
rd_kafka_toppar_op_fetch_start(
537+
rktp, pos, rk->rk_consumer.q,
538+
RD_KAFKA_NO_REPLYQ);
539+
}
515540

516541

517542
} else if (can_query_offsets) {

src/rdkafka_cgrp.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4430,6 +4430,9 @@ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) {
44304430
"unassign",
44314431
rkcg->rkcg_group_id->str);
44324432
rd_kafka_cgrp_unassign(rkcg);
4433+
4434+
/* Leave group, if desired. */
4435+
rd_kafka_cgrp_leave_maybe(rkcg);
44334436
return;
44344437
}
44354438

src/rdkafka_mock.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
548548
rd_kafka_mock_committed_offset_t *
549549
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
550550
const rd_kafkap_str_t *group,
551-
int64_t offset,
551+
rd_kafka_fetch_pos_t pos,
552552
const rd_kafkap_str_t *metadata) {
553553
rd_kafka_mock_committed_offset_t *coff;
554554

@@ -571,12 +571,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
571571

572572
coff->metadata = rd_kafkap_str_copy(metadata);
573573

574-
coff->offset = offset;
574+
coff->pos = pos;
575575

576576
rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK",
577-
"Topic %s [%" PRId32 "] committing offset %" PRId64
577+
"Topic %s [%" PRId32
578+
"] committing offset %s"
578579
" for group %.*s",
579-
mpart->topic->name, mpart->id, offset,
580+
mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos),
580581
RD_KAFKAP_STR_PR(group));
581582

582583
return coff;

src/rdkafka_mock_cgrp.c

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,21 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_sync_set(
252252
return RD_KAFKA_RESP_ERR_NO_ERROR;
253253
}
254254

255+
/**
256+
* @brief Member left the group
257+
* either explicitly or due to a timeout.
258+
* Trigger rebalance if there are still members left.
259+
* If this was the last member, mark the group as empty.
260+
*/
261+
void rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
262+
rd_kafka_mock_cgrp_classic_t *mcgrp,
263+
const char *reason) {
264+
if (mcgrp->member_cnt > 0)
265+
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, reason);
266+
else
267+
rd_kafka_mock_cgrp_classic_set_state(
268+
mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY, reason);
269+
}
255270

256271
/**
257272
* @brief Member is explicitly leaving the group (through LeaveGroupRequest)
@@ -265,7 +280,8 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_leave(
265280

266281
rd_kafka_mock_cgrp_classic_member_destroy(mcgrp, member);
267282

268-
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "explicit member leave");
283+
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
284+
mcgrp, "explicit member leave");
269285

270286
return RD_KAFKA_RESP_ERR_NO_ERROR;
271287
}
@@ -633,7 +649,8 @@ static void rd_kafka_mock_cgrp_classic_session_tmr_cb(rd_kafka_timers_t *rkts,
633649
}
634650

635651
if (timeout_cnt)
636-
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "member timeout");
652+
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
653+
mcgrp, "member timeout");
637654
}
638655

639656

src/rdkafka_mock_handlers.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
742742
int32_t MaxNumOffsets;
743743
rd_kafka_mock_partition_t *mpart = NULL;
744744
rd_kafka_resp_err_t err = all_err;
745+
int32_t LeaderEpoch = -1;
745746

746747
rd_kafka_buf_read_i32(rkbuf, &Partition);
747748

@@ -807,7 +808,6 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
807808
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) {
808809
/* Response: LeaderEpoch */
809810
const rd_kafka_mock_msgset_t *mset = NULL;
810-
int32_t leader_epoch = -1;
811811
rd_bool_t on_follower = rd_false;
812812

813813
if (mpart) {
@@ -818,12 +818,12 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
818818
if (Offset >= 0 &&
819819
(mset = rd_kafka_mock_msgset_find(
820820
mpart, Offset, on_follower))) {
821-
leader_epoch =
821+
LeaderEpoch =
822822
mset->leader_epoch;
823823
}
824824
}
825825

826-
rd_kafka_buf_write_i32(resp, leader_epoch);
826+
rd_kafka_buf_write_i32(resp, LeaderEpoch);
827827
}
828828

829829
/* Response: Partition tags */
@@ -835,7 +835,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
835835
"offset %" PRId64 " (leader epoch %" PRId32
836836
") for %s: %s",
837837
RD_KAFKAP_STR_PR(&Topic), Partition,
838-
Offset, mpart ? mpart->leader_epoch : -1,
838+
Offset, LeaderEpoch,
839839
rd_kafka_offset2str(Timestamp),
840840
rd_kafka_err2str(err));
841841
}
@@ -930,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
930930
mpart, &GroupId);
931931

932932
/* Response: CommittedOffset */
933-
rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1);
933+
rd_kafka_buf_write_i64(resp,
934+
coff ? coff->pos.offset : -1);
934935

935936
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) {
936937
/* Response: CommittedLeaderEpoch */
937938
rd_kafka_buf_write_i32(
938-
resp, mpart ? mpart->leader_epoch : -1);
939+
resp, coff ? coff->pos.leader_epoch : -1);
939940
}
940941

941942
/* Response: Metadata */
@@ -952,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
952953
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
953954
"Topic %s [%" PRId32
954955
"] returning "
955-
"committed offset %" PRId64
956+
"committed offset %s"
956957
" for group %s",
957958
mtopic->name, mpart->id,
958-
coff->offset, coff->group);
959+
rd_kafka_fetch_pos2str(coff->pos),
960+
coff->group);
959961
else
960962
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
961963
"Topic %.*s [%" PRId32
@@ -1109,6 +1111,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11091111
rd_kafka_mock_partition_t *mpart = NULL;
11101112
rd_kafka_resp_err_t err = all_err;
11111113
int64_t CommittedOffset;
1114+
int32_t CommittedLeaderEpoch = -1;
11121115
rd_kafkap_str_t Metadata;
11131116

11141117
rd_kafka_buf_read_i32(rkbuf, &Partition);
@@ -1126,7 +1129,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11261129
rd_kafka_buf_read_i64(rkbuf, &CommittedOffset);
11271130

11281131
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) {
1129-
int32_t CommittedLeaderEpoch;
11301132
rd_kafka_buf_read_i32(rkbuf,
11311133
&CommittedLeaderEpoch);
11321134

@@ -1145,9 +1147,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11451147
rd_kafka_buf_skip_tags(rkbuf);
11461148

11471149
if (!err)
1148-
rd_kafka_mock_commit_offset(mpart, &GroupId,
1149-
CommittedOffset,
1150-
&Metadata);
1150+
rd_kafka_mock_commit_offset(
1151+
mpart, &GroupId,
1152+
RD_KAFKA_FETCH_POS(CommittedOffset,
1153+
CommittedLeaderEpoch),
1154+
&Metadata);
11511155

11521156
/* Response: ErrorCode */
11531157
rd_kafka_buf_write_i16(resp, err);

src/rdkafka_mock_int.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ typedef struct rd_kafka_mock_committed_offset_s {
296296
/**< mpart.committed_offsets */
297297
TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link;
298298
char *group; /**< Allocated along with the struct */
299-
int64_t offset; /**< Committed offset */
299+
rd_kafka_fetch_pos_t pos; /**< Committed position */
300300
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
301301
} rd_kafka_mock_committed_offset_t;
302302

@@ -552,7 +552,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
552552
rd_kafka_mock_committed_offset_t *
553553
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
554554
const rd_kafkap_str_t *group,
555-
int64_t offset,
555+
rd_kafka_fetch_pos_t pos,
556556
const rd_kafkap_str_t *metadata);
557557

558558
const rd_kafka_mock_msgset_t *

src/rdkafka_offset.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
10561056
end_offset, end_offset_leader_epoch);
10571057

10581058
} else {
1059+
rd_kafka_toppar_set_fetch_state(
1060+
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK);
10591061
rd_kafka_toppar_unlock(rktp);
10601062

10611063
/* Seek to the updated end offset */

src/rdkafka_partition.c

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838

3939
#include "rdunittest.h"
4040

41-
const char *rd_kafka_fetch_states[] = {"none", "stopping",
42-
"stopped", "offset-query",
43-
"offset-wait", "validate-epoch-wait",
44-
"active"};
41+
const char *rd_kafka_fetch_states[] = {"none", "stopping",
42+
"stopped", "offset-query",
43+
"offset-wait", "validate-epoch-wait",
44+
"validate-seek", "active"};
4545

4646

4747
static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk,
@@ -2266,7 +2266,14 @@ static void rd_kafka_toppar_op(rd_kafka_toppar_t *rktp,
22662266
rd_kafka_toppar_op0(rktp, rko, replyq);
22672267
}
22682268

2269-
2269+
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
2270+
rd_kafka_q_t *fwdq) {
2271+
rd_kafka_q_lock(rktp->rktp_fetchq);
2272+
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2273+
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2274+
0 /* no fwd_app */);
2275+
rd_kafka_q_unlock(rktp->rktp_fetchq);
2276+
}
22702277

22712278
/**
22722279
* Start consuming partition (async operation).
@@ -2283,11 +2290,7 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
22832290
rd_kafka_replyq_t replyq) {
22842291
int32_t version;
22852292

2286-
rd_kafka_q_lock(rktp->rktp_fetchq);
2287-
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2288-
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2289-
0 /* no fwd_app */);
2290-
rd_kafka_q_unlock(rktp->rktp_fetchq);
2293+
rd_kafka_toppar_forward_internal(rktp, fwdq);
22912294

22922295
/* Bump version barrier. */
22932296
version = rd_kafka_toppar_version_new_barrier(rktp);

src/rdkafka_partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
299299
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
300300
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
301301
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
302+
RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK,
302303
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
303304
} rktp_fetch_state; /* Broker thread's state */
304305

@@ -619,6 +620,8 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp,
619620
void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp,
620621
rd_kafka_broker_t *rkb);
621622

623+
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
624+
rd_kafka_q_t *fwdq);
622625

623626
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
624627
rd_kafka_fetch_pos_t pos,

0 commit comments

Comments
 (0)