Skip to content

Commit dc61e5b

Browse files
authored
Revert "[KIP-320] Validate assigned partitions before starting to consume from them (#4931)" (#5207)
This reverts commit 13a2bba.
1 parent ad90e9b commit dc61e5b

12 files changed

+66
-407
lines changed

CHANGELOG.md

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ librdkafka v2.12.0 is a feature release:
99
* Fix for KIP-1102 time based re-bootstrap condition (#5177).
1010
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
1111
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
12-
* Additional KIP-320 related validation: when assigning the offset is validated
13-
if leader epoch is specified (#4931).
1412
* Fix double free of headers in `rd_kafka_produceva` method (@blindspotbounty, #4628).
1513
* Fix to ensure `rd_kafka_query_watermark_offsets` enforces the specified timeout and does not continue beyond timeout expiry (#5201).
1614

@@ -40,18 +38,6 @@ librdkafka v2.12.0 is a feature release:
4038
Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on
4139
big-endian architectures (#5183, @paravoid).
4240

43-
### Consumer fixes
44-
45-
* Issues: #5158.
46-
Additional KIP-320 related validation: when assigning the offset is validated
47-
if a leader epoch is specified. A committed offset could have been truncated
48-
in case of unclean leader election and, when a different member starts
49-
fetching from it on the new leader, it could get an offset out of range
50-
and a subsequent offset reset. The assigned offset is now validated before
51-
we start fetching from it and, in case it was truncated, fetching starts
52-
from last available offset of given leader epoch.
53-
Happens since 2.1.0 (#4931).
54-
5541
### Producer fixes
5642

5743
* Issues: #4627.

src/rdkafka_assignment.c

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -489,54 +489,29 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
489489
*
490490
* Start fetcher for partition and forward partition's
491491
* fetchq to consumer group's queue. */
492-
rd_kafka_fetch_pos_t pos =
493-
rd_kafka_topic_partition_get_fetch_pos(rktpar);
494492

495-
/* Reset the (lib) pause flag which may have
496-
* been set by the cgrp when scheduling the
497-
* rebalance callback. */
493+
rd_kafka_dbg(rk, CGRP, "SRVPEND",
494+
"Starting pending assigned partition "
495+
"%s [%" PRId32 "] at %s",
496+
rktpar->topic, rktpar->partition,
497+
rd_kafka_fetch_pos2str(
498+
rd_kafka_topic_partition_get_fetch_pos(
499+
rktpar)));
500+
501+
/* Reset the (lib) pause flag which may have been set by
502+
* the cgrp when scheduling the rebalance callback. */
498503
rd_kafka_toppar_op_pause_resume(
499504
rktp, rd_false /*resume*/,
500505
RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
501506

502-
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
503-
pos.leader_epoch != -1) {
504-
rd_kafka_dbg(
505-
rk, CGRP, "SRVPEND",
506-
"Validating assigned partition offset "
507-
"%s [%" PRId32 "] at %s",
508-
rktpar->topic, rktpar->partition,
509-
rd_kafka_fetch_pos2str(pos));
510-
511-
rd_kafka_toppar_forward_internal(
512-
rktp, rk->rk_consumer.q);
513-
rd_kafka_toppar_lock(rktp);
514-
rd_kafka_toppar_set_fetch_state(
515-
rktp,
516-
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
517-
rd_kafka_toppar_set_next_fetch_position(rktp,
518-
pos);
519-
rd_kafka_toppar_set_offset_validation_position(
520-
rktp, pos);
521-
rd_kafka_offset_validate(rktp, "offset fetch");
522-
rd_kafka_toppar_unlock(rktp);
523-
524-
} else {
525-
rd_kafka_dbg(
526-
rk, CGRP, "SRVPEND",
527-
"Starting pending assigned partition "
528-
"%s [%" PRId32 "] at %s",
529-
rktpar->topic, rktpar->partition,
530-
rd_kafka_fetch_pos2str(pos));
531-
532-
/* Start the fetcher */
533-
rktp->rktp_started = rd_true;
534-
rk->rk_consumer.assignment.started_cnt++;
535-
536-
rd_kafka_toppar_op_fetch_start(
537-
rktp, pos, rk->rk_consumer.q,
538-
RD_KAFKA_NO_REPLYQ);
539-
}
507+
/* Start the fetcher */
508+
rktp->rktp_started = rd_true;
509+
rk->rk_consumer.assignment.started_cnt++;
510+
511+
rd_kafka_toppar_op_fetch_start(
512+
rktp,
513+
rd_kafka_topic_partition_get_fetch_pos(rktpar),
514+
rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);
540515

541516

542517
} else if (can_query_offsets) {

src/rdkafka_cgrp.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4430,9 +4430,6 @@ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) {
44304430
"unassign",
44314431
rkcg->rkcg_group_id->str);
44324432
rd_kafka_cgrp_unassign(rkcg);
4433-
4434-
/* Leave group, if desired. */
4435-
rd_kafka_cgrp_leave_maybe(rkcg);
44364433
return;
44374434
}
44384435

src/rdkafka_mock.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
548548
rd_kafka_mock_committed_offset_t *
549549
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
550550
const rd_kafkap_str_t *group,
551-
rd_kafka_fetch_pos_t pos,
551+
int64_t offset,
552552
const rd_kafkap_str_t *metadata) {
553553
rd_kafka_mock_committed_offset_t *coff;
554554

@@ -571,13 +571,12 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
571571

572572
coff->metadata = rd_kafkap_str_copy(metadata);
573573

574-
coff->pos = pos;
574+
coff->offset = offset;
575575

576576
rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK",
577-
"Topic %s [%" PRId32
578-
"] committing offset %s"
577+
"Topic %s [%" PRId32 "] committing offset %" PRId64
579578
" for group %.*s",
580-
mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos),
579+
mpart->topic->name, mpart->id, offset,
581580
RD_KAFKAP_STR_PR(group));
582581

583582
return coff;

src/rdkafka_mock_cgrp.c

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -252,21 +252,6 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_sync_set(
252252
return RD_KAFKA_RESP_ERR_NO_ERROR;
253253
}
254254

255-
/**
256-
* @brief Member left the group
257-
* either explicitly or due to a timeout.
258-
* Trigger rebalance if there are still members left.
259-
* If this was the last member, mark the group as empty.
260-
*/
261-
void rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
262-
rd_kafka_mock_cgrp_classic_t *mcgrp,
263-
const char *reason) {
264-
if (mcgrp->member_cnt > 0)
265-
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, reason);
266-
else
267-
rd_kafka_mock_cgrp_classic_set_state(
268-
mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY, reason);
269-
}
270255

271256
/**
272257
* @brief Member is explicitly leaving the group (through LeaveGroupRequest)
@@ -280,8 +265,7 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_classic_member_leave(
280265

281266
rd_kafka_mock_cgrp_classic_member_destroy(mcgrp, member);
282267

283-
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
284-
mcgrp, "explicit member leave");
268+
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "explicit member leave");
285269

286270
return RD_KAFKA_RESP_ERR_NO_ERROR;
287271
}
@@ -649,8 +633,7 @@ static void rd_kafka_mock_cgrp_classic_session_tmr_cb(rd_kafka_timers_t *rkts,
649633
}
650634

651635
if (timeout_cnt)
652-
rd_kafka_mock_cgrp_classic_member_leave_rebalance_maybe(
653-
mcgrp, "member timeout");
636+
rd_kafka_mock_cgrp_classic_rebalance(mcgrp, "member timeout");
654637
}
655638

656639

src/rdkafka_mock_handlers.c

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,6 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
742742
int32_t MaxNumOffsets;
743743
rd_kafka_mock_partition_t *mpart = NULL;
744744
rd_kafka_resp_err_t err = all_err;
745-
int32_t LeaderEpoch = -1;
746745

747746
rd_kafka_buf_read_i32(rkbuf, &Partition);
748747

@@ -808,6 +807,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
808807
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) {
809808
/* Response: LeaderEpoch */
810809
const rd_kafka_mock_msgset_t *mset = NULL;
810+
int32_t leader_epoch = -1;
811811
rd_bool_t on_follower = rd_false;
812812

813813
if (mpart) {
@@ -818,12 +818,12 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
818818
if (Offset >= 0 &&
819819
(mset = rd_kafka_mock_msgset_find(
820820
mpart, Offset, on_follower))) {
821-
LeaderEpoch =
821+
leader_epoch =
822822
mset->leader_epoch;
823823
}
824824
}
825825

826-
rd_kafka_buf_write_i32(resp, LeaderEpoch);
826+
rd_kafka_buf_write_i32(resp, leader_epoch);
827827
}
828828

829829
/* Response: Partition tags */
@@ -835,7 +835,7 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn,
835835
"offset %" PRId64 " (leader epoch %" PRId32
836836
") for %s: %s",
837837
RD_KAFKAP_STR_PR(&Topic), Partition,
838-
Offset, LeaderEpoch,
838+
Offset, mpart ? mpart->leader_epoch : -1,
839839
rd_kafka_offset2str(Timestamp),
840840
rd_kafka_err2str(err));
841841
}
@@ -930,13 +930,12 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
930930
mpart, &GroupId);
931931

932932
/* Response: CommittedOffset */
933-
rd_kafka_buf_write_i64(resp,
934-
coff ? coff->pos.offset : -1);
933+
rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1);
935934

936935
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) {
937936
/* Response: CommittedLeaderEpoch */
938937
rd_kafka_buf_write_i32(
939-
resp, coff ? coff->pos.leader_epoch : -1);
938+
resp, mpart ? mpart->leader_epoch : -1);
940939
}
941940

942941
/* Response: Metadata */
@@ -953,11 +952,10 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
953952
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
954953
"Topic %s [%" PRId32
955954
"] returning "
956-
"committed offset %s"
955+
"committed offset %" PRId64
957956
" for group %s",
958957
mtopic->name, mpart->id,
959-
rd_kafka_fetch_pos2str(coff->pos),
960-
coff->group);
958+
coff->offset, coff->group);
961959
else
962960
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
963961
"Topic %.*s [%" PRId32
@@ -1111,7 +1109,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11111109
rd_kafka_mock_partition_t *mpart = NULL;
11121110
rd_kafka_resp_err_t err = all_err;
11131111
int64_t CommittedOffset;
1114-
int32_t CommittedLeaderEpoch = -1;
11151112
rd_kafkap_str_t Metadata;
11161113

11171114
rd_kafka_buf_read_i32(rkbuf, &Partition);
@@ -1129,6 +1126,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11291126
rd_kafka_buf_read_i64(rkbuf, &CommittedOffset);
11301127

11311128
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) {
1129+
int32_t CommittedLeaderEpoch;
11321130
rd_kafka_buf_read_i32(rkbuf,
11331131
&CommittedLeaderEpoch);
11341132

@@ -1147,11 +1145,9 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
11471145
rd_kafka_buf_skip_tags(rkbuf);
11481146

11491147
if (!err)
1150-
rd_kafka_mock_commit_offset(
1151-
mpart, &GroupId,
1152-
RD_KAFKA_FETCH_POS(CommittedOffset,
1153-
CommittedLeaderEpoch),
1154-
&Metadata);
1148+
rd_kafka_mock_commit_offset(mpart, &GroupId,
1149+
CommittedOffset,
1150+
&Metadata);
11551151

11561152
/* Response: ErrorCode */
11571153
rd_kafka_buf_write_i16(resp, err);

src/rdkafka_mock_int.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ typedef struct rd_kafka_mock_committed_offset_s {
296296
/**< mpart.committed_offsets */
297297
TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link;
298298
char *group; /**< Allocated along with the struct */
299-
rd_kafka_fetch_pos_t pos; /**< Committed position */
299+
int64_t offset; /**< Committed offset */
300300
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
301301
} rd_kafka_mock_committed_offset_t;
302302

@@ -552,7 +552,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
552552
rd_kafka_mock_committed_offset_t *
553553
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
554554
const rd_kafkap_str_t *group,
555-
rd_kafka_fetch_pos_t pos,
555+
int64_t offset,
556556
const rd_kafkap_str_t *metadata);
557557

558558
const rd_kafka_mock_msgset_t *

src/rdkafka_offset.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,8 +1056,6 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
10561056
end_offset, end_offset_leader_epoch);
10571057

10581058
} else {
1059-
rd_kafka_toppar_set_fetch_state(
1060-
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK);
10611059
rd_kafka_toppar_unlock(rktp);
10621060

10631061
/* Seek to the updated end offset */

src/rdkafka_partition.c

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838

3939
#include "rdunittest.h"
4040

41-
const char *rd_kafka_fetch_states[] = {"none", "stopping",
42-
"stopped", "offset-query",
43-
"offset-wait", "validate-epoch-wait",
44-
"validate-seek", "active"};
41+
const char *rd_kafka_fetch_states[] = {"none", "stopping",
42+
"stopped", "offset-query",
43+
"offset-wait", "validate-epoch-wait",
44+
"active"};
4545

4646

4747
static rd_kafka_op_res_t rd_kafka_toppar_op_serve(rd_kafka_t *rk,
@@ -2272,14 +2272,7 @@ static void rd_kafka_toppar_op(rd_kafka_toppar_t *rktp,
22722272
rd_kafka_toppar_op0(rktp, rko, replyq);
22732273
}
22742274

2275-
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
2276-
rd_kafka_q_t *fwdq) {
2277-
rd_kafka_q_lock(rktp->rktp_fetchq);
2278-
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2279-
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2280-
0 /* no fwd_app */);
2281-
rd_kafka_q_unlock(rktp->rktp_fetchq);
2282-
}
2275+
22832276

22842277
/**
22852278
* Start consuming partition (async operation).
@@ -2296,7 +2289,11 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
22962289
rd_kafka_replyq_t replyq) {
22972290
int32_t version;
22982291

2299-
rd_kafka_toppar_forward_internal(rktp, fwdq);
2292+
rd_kafka_q_lock(rktp->rktp_fetchq);
2293+
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2294+
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2295+
0 /* no fwd_app */);
2296+
rd_kafka_q_unlock(rktp->rktp_fetchq);
23002297

23012298
/* Bump version barrier. */
23022299
version = rd_kafka_toppar_version_new_barrier(rktp);

src/rdkafka_partition.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
299299
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
300300
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
301301
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
302-
RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK,
303302
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
304303
} rktp_fetch_state; /* Broker thread's state */
305304

@@ -620,8 +619,6 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp,
620619
void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp,
621620
rd_kafka_broker_t *rkb);
622621

623-
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
624-
rd_kafka_q_t *fwdq);
625622

626623
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
627624
rd_kafka_fetch_pos_t pos,

0 commit comments

Comments
 (0)