Skip to content

Commit 0073790

Browse files
committed
Second version starting offset validation when partition already joined the consumer group
1 parent 0284602 commit 0073790

File tree

5 files changed

+40
-54
lines changed

5 files changed

+40
-54
lines changed

src/rdkafka.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2984,6 +2984,10 @@ static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt,
29842984
return -1;
29852985
}
29862986

2987+
/*
2988+
* We cannot validate this offset as the legacy API doesn't
2989+
* provide the leader epoch.
2990+
*/
29872991
rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1),
29882992
rkq, RD_KAFKA_NO_REPLYQ);
29892993

src/rdkafka_assignment.c

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -489,54 +489,29 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
489489
*
490490
* Start fetcher for partition and forward partition's
491491
* fetchq to consumer group's queue. */
492-
rd_kafka_fetch_pos_t pos =
493-
rd_kafka_topic_partition_get_fetch_pos(rktpar);
494492

495-
/* Reset the (lib) pause flag which may have
496-
* been set by the cgrp when scheduling the
497-
* rebalance callback. */
493+
rd_kafka_dbg(rk, CGRP, "SRVPEND",
494+
"Starting pending assigned partition "
495+
"%s [%" PRId32 "] at %s",
496+
rktpar->topic, rktpar->partition,
497+
rd_kafka_fetch_pos2str(
498+
rd_kafka_topic_partition_get_fetch_pos(
499+
rktpar)));
500+
501+
/* Reset the (lib) pause flag which may have been set by
502+
* the cgrp when scheduling the rebalance callback. */
498503
rd_kafka_toppar_op_pause_resume(
499504
rktp, rd_false /*resume*/,
500505
RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
501506

502-
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
503-
pos.leader_epoch != -1) {
504-
rd_kafka_dbg(
505-
rk, CGRP, "SRVPEND",
506-
"Validating assigned partition offset "
507-
"%s [%" PRId32 "] at %s",
508-
rktpar->topic, rktpar->partition,
509-
rd_kafka_fetch_pos2str(pos));
510-
511-
rd_kafka_toppar_forward_internal(
512-
rktp, rk->rk_consumer.q);
513-
rd_kafka_toppar_lock(rktp);
514-
rd_kafka_toppar_set_fetch_state(
515-
rktp,
516-
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
517-
rd_kafka_toppar_set_next_fetch_position(rktp,
518-
pos);
519-
rd_kafka_toppar_set_offset_validation_position(
520-
rktp, pos);
521-
rd_kafka_offset_validate(rktp, "offset fetch");
522-
rd_kafka_toppar_unlock(rktp);
523-
524-
} else {
525-
rd_kafka_dbg(
526-
rk, CGRP, "SRVPEND",
527-
"Starting pending assigned partition "
528-
"%s [%" PRId32 "] at %s",
529-
rktpar->topic, rktpar->partition,
530-
rd_kafka_fetch_pos2str(pos));
531-
532-
/* Start the fetcher */
533-
rktp->rktp_started = rd_true;
534-
rk->rk_consumer.assignment.started_cnt++;
535-
536-
rd_kafka_toppar_op_fetch_start(
537-
rktp, pos, rk->rk_consumer.q,
538-
RD_KAFKA_NO_REPLYQ);
539-
}
507+
/* Start the fetcher */
508+
rktp->rktp_started = rd_true;
509+
rk->rk_consumer.assignment.started_cnt++;
510+
511+
rd_kafka_toppar_op_fetch_start(
512+
rktp,
513+
rd_kafka_topic_partition_get_fetch_pos(rktpar),
514+
rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);
540515

541516

542517
} else if (can_query_offsets) {

src/rdkafka_offset.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
10561056
end_offset, end_offset_leader_epoch);
10571057

10581058
} else {
1059+
/* It ensures a validation isn't started again
1060+
* until the seek completes. */
10591061
rd_kafka_toppar_set_fetch_state(
10601062
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_SEEK);
10611063
rd_kafka_toppar_unlock(rktp);

src/rdkafka_partition.c

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,16 @@ static void rd_kafka_toppar_fetch_start(rd_kafka_toppar_t *rktp,
17481748
"no previously committed offset "
17491749
"available");
17501750

1751+
} else if (pos.leader_epoch >= -1) {
1752+
rd_kafka_toppar_set_next_fetch_position(rktp, pos);
1753+
rd_kafka_toppar_set_offset_validation_position(rktp, pos);
1754+
/* This is necessary as it only starts
1755+
* validating from states ACTIVE or
1756+
* VALIDATE_EPOCH_WAIT. */
1757+
rd_kafka_toppar_set_fetch_state(
1758+
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
1759+
rd_kafka_offset_validate(rktp, "validate fetch start position");
1760+
17511761
} else {
17521762
rd_kafka_toppar_set_next_fetch_position(rktp, pos);
17531763

@@ -2272,14 +2282,7 @@ static void rd_kafka_toppar_op(rd_kafka_toppar_t *rktp,
22722282
rd_kafka_toppar_op0(rktp, rko, replyq);
22732283
}
22742284

2275-
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
2276-
rd_kafka_q_t *fwdq) {
2277-
rd_kafka_q_lock(rktp->rktp_fetchq);
2278-
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2279-
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2280-
0 /* no fwd_app */);
2281-
rd_kafka_q_unlock(rktp->rktp_fetchq);
2282-
}
2285+
22832286

22842287
/**
22852288
* Start consuming partition (async operation).
@@ -2296,7 +2299,11 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
22962299
rd_kafka_replyq_t replyq) {
22972300
int32_t version;
22982301

2299-
rd_kafka_toppar_forward_internal(rktp, fwdq);
2302+
rd_kafka_q_lock(rktp->rktp_fetchq);
2303+
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2304+
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, 0, /* no do_lock */
2305+
0 /* no fwd_app */);
2306+
rd_kafka_q_unlock(rktp->rktp_fetchq);
23002307

23012308
/* Bump version barrier. */
23022309
version = rd_kafka_toppar_version_new_barrier(rktp);

src/rdkafka_partition.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,6 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp,
620620
void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp,
621621
rd_kafka_broker_t *rkb);
622622

623-
void rd_kafka_toppar_forward_internal(rd_kafka_toppar_t *rktp,
624-
rd_kafka_q_t *fwdq);
625623

626624
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
627625
rd_kafka_fetch_pos_t pos,

0 commit comments

Comments
 (0)