Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ librdkafka v2.12.0 is a feature release:
* Fix for KIP-1102 time based re-bootstrap condition (#5177).
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
* Additional KIP-320 related validation: when assigning the offset is validated
if leader epoch is specified (#4931).
* Fix double free of headers in `rd_kafka_produceva` method (@blindspotbounty, #4628).
* Fix to ensure `rd_kafka_query_watermark_offsets` enforces the specified timeout and does not continue beyond timeout expiry (#5201).

Expand Down Expand Up @@ -38,6 +40,18 @@ librdkafka v2.12.0 is a feature release:
Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on
big-endian architectures (#5183, @paravoid).

### Consumer fixes

* Issues: #5158.
Additional KIP-320 related validation: when assigning the offset is validated
if a leader epoch is specified. A committed offset could have been truncated
in case of unclean leader election and, when a different member starts
fetching from it on the new leader, it could get an offset out of range
and a subsequent offset reset. The assigned offset is now validated before
we start fetching from it and, in case it was truncated, fetching starts
from last available offset of given leader epoch.
Happens since 2.1.0 (#4931).

### Producer fixes

* Issues: #4627.
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2984,6 +2984,10 @@ static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt,
return -1;
}

/*
* We cannot validate this offset as the legacy API doesn't
* provide the leader epoch.
*/
rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1),
rkq, RD_KAFKA_NO_REPLYQ);

Expand Down
3 changes: 3 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4430,6 +4430,9 @@ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) {
"unassign",
rkcg->rkcg_group_id->str);
rd_kafka_cgrp_unassign(rkcg);

/* Leave group, if desired. */
rd_kafka_cgrp_leave_maybe(rkcg);
return;
}

Expand Down
9 changes: 5 additions & 4 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_committed_offset_t *
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
const rd_kafkap_str_t *group,
int64_t offset,
rd_kafka_fetch_pos_t pos,
const rd_kafkap_str_t *metadata) {
rd_kafka_mock_committed_offset_t *coff;

Expand All @@ -571,12 +571,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,

coff->metadata = rd_kafkap_str_copy(metadata);

coff->offset = offset;
coff->pos = pos;

rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK",
"Topic %s [%" PRId32 "] committing offset %" PRId64
"Topic %s [%" PRId32
"] committing offset %s"
" for group %.*s",
mpart->topic->name, mpart->id, offset,
mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos),
RD_KAFKAP_STR_PR(group));

return coff;
Expand Down
21 changes: 19 additions & 2 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_sync_set(
return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
* @brief Member left the group
* either explicitly or due to a timeout.
* Trigger rebalance if there are still members left.
* If this was the last member, mark the group as empty.
*/
void rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
rd_kafka_mock_cgrp_classic_t *mcgrp,
const char *reason) {
if (mcgrp->member_cnt > 0)
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, reason);
else
rd_kafka_mock_cgrp_classic_set_state(
mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY, reason);
}

/**
* @brief Member is explicitly leaving the group (through LeaveGroupRequest)
Expand All @@ -265,7 +280,8 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_leave(

rd_kafka_mock_cgrp_classic_member_destroy(mcgrp, member);

rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "explicit member leave");
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
mcgrp, "explicit member leave");

return RD_KAFKA_RESP_ERR_NO_ERROR;
}
Expand Down Expand Up @@ -633,7 +649,8 @@ static void rd_kafka_mock_cgrp_classic_session_tmr_cb(rd_kafka_timers_t *rkts,
}

if (timeout_cnt)
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "member timeout");
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
mcgrp, "member timeout");
}


Expand Down
28 changes: 16 additions & 12 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
int32_t MaxNumOffsets;
rd_kafka_mock_partition_t *mpart = NULL;
rd_kafka_resp_err_t err = all_err;
int32_t LeaderEpoch = -1;

rd_kafka_buf_read_i32(rkbuf, &Partition);

Expand Down Expand Up @@ -807,7 +808,6 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) {
/* Response: LeaderEpoch */
const rd_kafka_mock_msgset_t *mset = NULL;
int32_t leader_epoch = -1;
rd_bool_t on_follower = rd_false;

if (mpart) {
Expand All @@ -818,12 +818,12 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
if (Offset >= 0 &&
(mset = rd_kafka_mock_msgset_find(
mpart, Offset, on_follower))) {
leader_epoch =
LeaderEpoch =
mset->leader_epoch;
}
}

rd_kafka_buf_write_i32(resp, leader_epoch);
rd_kafka_buf_write_i32(resp, LeaderEpoch);
}

/* Response: Partition tags */
Expand All @@ -835,7 +835,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
"offset %" PRId64 " (leader epoch %" PRId32
") for %s: %s",
RD_KAFKAP_STR_PR(&Topic), Partition,
Offset, mpart ? mpart->leader_epoch : -1,
Offset, LeaderEpoch,
rd_kafka_offset2str(Timestamp),
rd_kafka_err2str(err));
}
Expand Down Expand Up @@ -930,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
mpart, &GroupId);

/* Response: CommittedOffset */
rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1);
rd_kafka_buf_write_i64(resp,
coff ? coff->pos.offset : -1);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) {
/* Response: CommittedLeaderEpoch */
rd_kafka_buf_write_i32(
resp, mpart ? mpart->leader_epoch : -1);
resp, coff ? coff->pos.leader_epoch : -1);
}

/* Response: Metadata */
Expand All @@ -952,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Topic %s [%" PRId32
"] returning "
"committed offset %" PRId64
"committed offset %s"
" for group %s",
mtopic->name, mpart->id,
coff->offset, coff->group);
rd_kafka_fetch_pos2str(coff->pos),
coff->group);
else
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Topic %.*s [%" PRId32
Expand Down Expand Up @@ -1109,6 +1111,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_partition_t *mpart = NULL;
rd_kafka_resp_err_t err = all_err;
int64_t CommittedOffset;
int32_t CommittedLeaderEpoch = -1;
rd_kafkap_str_t Metadata;

rd_kafka_buf_read_i32(rkbuf, &Partition);
Expand All @@ -1126,7 +1129,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_i64(rkbuf, &CommittedOffset);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) {
int32_t CommittedLeaderEpoch;
rd_kafka_buf_read_i32(rkbuf,
&CommittedLeaderEpoch);

Expand All @@ -1145,9 +1147,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_skip_tags(rkbuf);

if (!err)
rd_kafka_mock_commit_offset(mpart, &GroupId,
CommittedOffset,
&Metadata);
rd_kafka_mock_commit_offset(
mpart, &GroupId,
RD_KAFKA_FETCH_POS(CommittedOffset,
CommittedLeaderEpoch),
&Metadata);

/* Response: ErrorCode */
rd_kafka_buf_write_i16(resp, err);
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ typedef struct rd_kafka_mock_committed_offset_s {
/**< mpart.committed_offsets */
TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link;
char *group; /**< Allocated along with the struct */
int64_t offset; /**< Committed offset */
rd_kafka_fetch_pos_t pos; /**< Committed position */
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
} rd_kafka_mock_committed_offset_t;

Expand Down Expand Up @@ -552,7 +552,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_committed_offset_t *
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
const rd_kafkap_str_t *group,
int64_t offset,
rd_kafka_fetch_pos_t pos,
const rd_kafkap_str_t *metadata);

const rd_kafka_mock_msgset_t *
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,10 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
end_offset, end_offset_leader_epoch);

} else {
/* It ensures a validation isn't started again
* until the seek completes. */
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK);
rd_kafka_toppar_unlock(rktp);

/* Seek to the updated end offset */
Expand Down
18 changes: 14 additions & 4 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@

#include "rdunittest.h"

const char *rd_kafka_fetch_states[] = {"none", "stopping",
"stopped", "offset-query",
"offset-wait", "validate-epoch-wait",
"active"};
const char *rd_kafka_fetch_states[] = {"none", "stopping",
"stopped", "offset-query",
"offset-wait", "validate-epoch-wait",
"validate-seek", "active"};


static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk,
Expand Down Expand Up @@ -1748,6 +1748,16 @@ static void rd_kafka_toppar_fetch_start(rd_kafka_toppar_t *rktp,
"no previously committed offset "
"available");

} else if (pos.leader_epoch >= -1) {
rd_kafka_toppar_set_next_fetch_position(rktp, pos);
rd_kafka_toppar_set_offset_validation_position(rktp, pos);
/* This is necessary as it only starts
* validating from states ACTIVE or
* VALIDATE_EPOCH_WAIT. */
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
rd_kafka_offset_validate(rktp, "validate fetch start position");

} else {
rd_kafka_toppar_set_next_fetch_position(rktp, pos);

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK,
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
} rktp_fetch_state; /* Broker thread's state */

Expand Down
Loading