Skip to content

Commit 3ef2658

Browse files
authored
Merge branch 'confluentinc:master' into bugfix/4949-fpe-in-rd_hdr_histogram_new
2 parents 9adbcfb + 13a2bba commit 3ef2658

15 files changed

+410
-69
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ librdkafka v2.12.0 is a feature release:
77
* Fix for KIP-1102 time based re-bootstrap condition (#5177).
88
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
99
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
10+
* Additional KIP-320 related validation: when assigning the offset is validated
11+
if leader epoch is specified (#4931).
1012

1113

1214
## Fixes
@@ -32,6 +34,18 @@ librdkafka v2.12.0 is a feature release:
3234
Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on
3335
big-endian architectures (#5183, @paravoid).
3436

37+
### Consumer fixes
38+
39+
* Issues: #5158.
40+
Additional KIP-320 related validation: when assigning the offset is validated
41+
if a leader epoch is specified. A committed offset could have been truncated
42+
in case of unclean leader election and, when a different member starts
43+
fetching from it on the new leader, it could get an offset out of range
44+
and a subsequent offset reset. The assigned offset is now validated before
45+
we start fetching from it and, in case it was truncated, fetching starts
46+
from last available offset of given leader epoch.
47+
Happens since 2.1.0 (#4931).
48+
3549

3650

3751
# librdkafka v2.11.1

src-cpp/rdkafkacpp.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ namespace RdKafka {
112112
* @remark This value should only be used during compile time,
113113
* for runtime checks of version use RdKafka::version()
114114
*/
115-
#define RD_KAFKA_VERSION 0x020b01ff
115+
#define RD_KAFKA_VERSION 0x020c00ff
116116

117117
/**
118118
* @brief Returns the librdkafka version as integer.

src/rdkafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ typedef SSIZE_T ssize_t;
167167
* @remark This value should only be used during compile time,
168168
* for runtime checks of version use rd_kafka_version()
169169
*/
170-
#define RD_KAFKA_VERSION 0x020b01ff
170+
#define RD_KAFKA_VERSION 0x020c00ff
171171

172172
/**
173173
* @brief Returns the librdkafka version as integer.

src/rdkafka_assignment.c

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -489,29 +489,54 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
489489
*
490490
* Start fetcher for partition and forward partition's
491491
* fetchq to consumer group's queue. */
492+
rd_kafka_fetch_pos_t pos =
493+
rd_kafka_topic_partition_get_fetch_pos(rktpar);
492494

493-
rd_kafka_dbg(rk, CGRP, "SRVPEND",
494-
"Starting pending assigned partition "
495-
"%s [%" PRId32 "] at %s",
496-
rktpar->topic, rktpar->partition,
497-
rd_kafka_fetch_pos2str(
498-
rd_kafka_topic_partition_get_fetch_pos(
499-
rktpar)));
500-
501-
/* Reset the (lib) pause flag which may have been set by
502-
* the cgrp when scheduling the rebalance callback. */
495+
/* Reset the (lib) pause flag which may have
496+
* been set by the cgrp when scheduling the
497+
* rebalance callback. */
503498
rd_kafka_toppar_op_pause_resume(
504499
rktp, rd_false /*resume*/,
505500
RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
506501

507-
/* Start the fetcher */
508-
rktp->rktp_started = rd_true;
509-
rk->rk_consumer.assignment.started_cnt++;
510-
511-
rd_kafka_toppar_op_fetch_start(
512-
rktp,
513-
rd_kafka_topic_partition_get_fetch_pos(rktpar),
514-
rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);
502+
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
503+
pos.leader_epoch != -1) {
504+
rd_kafka_dbg(
505+
rk, CGRP, "SRVPEND",
506+
"Validating assigned partition offset "
507+
"%s [%" PRId32 "] at %s",
508+
rktpar->topic, rktpar->partition,
509+
rd_kafka_fetch_pos2str(pos));
510+
511+
rd_kafka_toppar_forward_internal(
512+
rktp, rk->rk_consumer.q);
513+
rd_kafka_toppar_lock(rktp);
514+
rd_kafka_toppar_set_fetch_state(
515+
rktp,
516+
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
517+
rd_kafka_toppar_set_next_fetch_position(rktp,
518+
pos);
519+
rd_kafka_toppar_set_offset_validation_position(
520+
rktp, pos);
521+
rd_kafka_offset_validate(rktp, "offset fetch");
522+
rd_kafka_toppar_unlock(rktp);
523+
524+
} else {
525+
rd_kafka_dbg(
526+
rk, CGRP, "SRVPEND",
527+
"Starting pending assigned partition "
528+
"%s [%" PRId32 "] at %s",
529+
rktpar->topic, rktpar->partition,
530+
rd_kafka_fetch_pos2str(pos));
531+
532+
/* Start the fetcher */
533+
rktp->rktp_started = rd_true;
534+
rk->rk_consumer.assignment.started_cnt++;
535+
536+
rd_kafka_toppar_op_fetch_start(
537+
rktp, pos, rk->rk_consumer.q,
538+
RD_KAFKA_NO_REPLYQ);
539+
}
515540

516541

517542
} else if (can_query_offsets) {

src/rdkafka_cgrp.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4430,6 +4430,9 @@ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) {
44304430
"unassign",
44314431
rkcg->rkcg_group_id->str);
44324432
rd_kafka_cgrp_unassign(rkcg);
4433+
4434+
/* Leave group, if desired. */
4435+
rd_kafka_cgrp_leave_maybe(rkcg);
44334436
return;
44344437
}
44354438

src/rdkafka_mock.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
548548
rd_kafka_mock_committed_offset_t *
549549
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
550550
const rd_kafkap_str_t *group,
551-
int64_t offset,
551+
rd_kafka_fetch_pos_t pos,
552552
const rd_kafkap_str_t *metadata) {
553553
rd_kafka_mock_committed_offset_t *coff;
554554

@@ -571,12 +571,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
571571

572572
coff->metadata = rd_kafkap_str_copy(metadata);
573573

574-
coff->offset = offset;
574+
coff->pos = pos;
575575

576576
rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK",
577-
"Topic %s [%" PRId32 "] committing offset %" PRId64
577+
"Topic %s [%" PRId32
578+
"] committing offset %s"
578579
" for group %.*s",
579-
mpart->topic->name, mpart->id, offset,
580+
mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos),
580581
RD_KAFKAP_STR_PR(group));
581582

582583
return coff;

src/rdkafka_mock_cgrp.c

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,21 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_sync_set(
252252
return RD_KAFKA_RESP_ERR_NO_ERROR;
253253
}
254254

255+
/**
256+
* @brief Member left the group
257+
* either explicitly or due to a timeout.
258+
* Trigger rebalance if there are still members left.
259+
* If this was the last member, mark the group as empty.
260+
*/
261+
void rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
262+
rd_kafka_mock_cgrp_classic_t *mcgrp,
263+
const char *reason) {
264+
if (mcgrp->member_cnt > 0)
265+
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, reason);
266+
else
267+
rd_kafka_mock_cgrp_classic_set_state(
268+
mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY, reason);
269+
}
255270

256271
/**
257272
* @brief Member is explicitly leaving the group (through LeaveGroupRequest)
@@ -265,7 +280,8 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_leave(
265280

266281
rd_kafka_mock_cgrp_classic_member_destroy(mcgrp, member);
267282

268-
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "explicit member leave");
283+
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
284+
mcgrp, "explicit member leave");
269285

270286
return RD_KAFKA_RESP_ERR_NO_ERROR;
271287
}
@@ -633,7 +649,8 @@ static void rd_kafka_mock_cgrp_classic_session_tmr_cb(rd_kafka_timers_t *rkts,
633649
}
634650

635651
if (timeout_cnt)
636-
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "member timeout");
652+
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
653+
mcgrp, "member timeout");
637654
}
638655

639656

src/rdkafka_mock_handlers.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
742742
int32_t MaxNumOffsets;
743743
rd_kafka_mock_partition_t *mpart = NULL;
744744
rd_kafka_resp_err_t err = all_err;
745+
int32_t LeaderEpoch = -1;
745746

746747
rd_kafka_buf_read_i32(rkbuf, &Partition);
747748

@@ -807,7 +808,6 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
807808
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) {
808809
/* Response: LeaderEpoch */
809810
const rd_kafka_mock_msgset_t *mset = NULL;
810-
int32_t leader_epoch = -1;
811811
rd_bool_t on_follower = rd_false;
812812

813813
if (mpart) {
@@ -818,12 +818,12 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
818818
if (Offset >= 0 &&
819819
(mset = rd_kafka_mock_msgset_find(
820820
mpart, Offset, on_follower))) {
821-
leader_epoch =
821+
LeaderEpoch =
822822
mset->leader_epoch;
823823
}
824824
}
825825

826-
rd_kafka_buf_write_i32(resp, leader_epoch);
826+
rd_kafka_buf_write_i32(resp, LeaderEpoch);
827827
}
828828

829829
/* Response: Partition tags */
@@ -835,7 +835,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
835835
"offset %" PRId64 " (leader epoch %" PRId32
836836
") for %s: %s",
837837
RD_KAFKAP_STR_PR(&Topic), Partition,
838-
Offset, mpart ? mpart->leader_epoch : -1,
838+
Offset, LeaderEpoch,
839839
rd_kafka_offset2str(Timestamp),
840840
rd_kafka_err2str(err));
841841
}
@@ -930,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
930930
mpart, &GroupId);
931931

932932
/* Response: CommittedOffset */
933-
rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1);
933+
rd_kafka_buf_write_i64(resp,
934+
coff ? coff->pos.offset : -1);
934935

935936
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) {
936937
/* Response: CommittedLeaderEpoch */
937938
rd_kafka_buf_write_i32(
938-
resp, mpart ? mpart->leader_epoch : -1);
939+
resp, coff ? coff->pos.leader_epoch : -1);
939940
}
940941

941942
/* Response: Metadata */
@@ -952,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
952953
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
953954
"Topic %s [%" PRId32
954955
"] returning "
955-
"committed offset %" PRId64
956+
"committed offset %s"
956957
" for group %s",
957958
mtopic->name, mpart->id,
958-
coff->offset, coff->group);
959+
rd_kafka_fetch_pos2str(coff->pos),
960+
coff->group);
959961
else
960962
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
961963
"Topic %.*s [%" PRId32
@@ -1109,6 +1111,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11091111
rd_kafka_mock_partition_t *mpart = NULL;
11101112
rd_kafka_resp_err_t err = all_err;
11111113
int64_t CommittedOffset;
1114+
int32_t CommittedLeaderEpoch = -1;
11121115
rd_kafkap_str_t Metadata;
11131116

11141117
rd_kafka_buf_read_i32(rkbuf, &Partition);
@@ -1126,7 +1129,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11261129
rd_kafka_buf_read_i64(rkbuf, &CommittedOffset);
11271130

11281131
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) {
1129-
int32_t CommittedLeaderEpoch;
11301132
rd_kafka_buf_read_i32(rkbuf,
11311133
&CommittedLeaderEpoch);
11321134

@@ -1145,9 +1147,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11451147
rd_kafka_buf_skip_tags(rkbuf);
11461148

11471149
if (!err)
1148-
rd_kafka_mock_commit_offset(mpart, &GroupId,
1149-
CommittedOffset,
1150-
&Metadata);
1150+
rd_kafka_mock_commit_offset(
1151+
mpart, &GroupId,
1152+
RD_KAFKA_FETCH_POS(CommittedOffset,
1153+
CommittedLeaderEpoch),
1154+
&Metadata);
11511155

11521156
/* Response: ErrorCode */
11531157
rd_kafka_buf_write_i16(resp, err);

src/rdkafka_mock_int.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ typedef struct rd_kafka_mock_committed_offset_s {
296296
/**< mpart.committed_offsets */
297297
TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link;
298298
char *group; /**< Allocated along with the struct */
299-
int64_t offset; /**< Committed offset */
299+
rd_kafka_fetch_pos_t pos; /**< Committed position */
300300
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
301301
} rd_kafka_mock_committed_offset_t;
302302

@@ -552,7 +552,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
552552
rd_kafka_mock_committed_offset_t *
553553
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
554554
const rd_kafkap_str_t *group,
555-
int64_t offset,
555+
rd_kafka_fetch_pos_t pos,
556556
const rd_kafkap_str_t *metadata);
557557

558558
const rd_kafka_mock_msgset_t *

src/rdkafka_offset.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
10561056
end_offset, end_offset_leader_epoch);
10571057

10581058
} else {
1059+
rd_kafka_toppar_set_fetch_state(
1060+
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK);
10591061
rd_kafka_toppar_unlock(rktp);
10601062

10611063
/* Seek to the updated end offset */

0 commit comments

Comments
 (0)