Skip to content

Commit a027376

Browse files
committed
Second version adding back the partition to the pending list while setting fetch_pos_validated to `rd_true'
1 parent 0284602 commit a027376

File tree

5 files changed

+86
-26
lines changed

5 files changed

+86
-26
lines changed

src/rdkafka.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2984,6 +2984,10 @@ static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt,
29842984
return -1;
29852985
}
29862986

2987+
/*
2988+
* We cannot validate this offset as the legacy API doesn't
2989+
* provide the leader epoch.
2990+
*/
29872991
rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1),
29882992
rkq, RD_KAFKA_NO_REPLYQ);
29892993

src/rdkafka_assignment.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,10 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
492492
rd_kafka_fetch_pos_t pos =
493493
rd_kafka_topic_partition_get_fetch_pos(rktpar);
494494

495-
/* Reset the (lib) pause flag which may have
496-
* been set by the cgrp when scheduling the
497-
* rebalance callback. */
498-
rd_kafka_toppar_op_pause_resume(
499-
rktp, rd_false /*resume*/,
500-
RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
501-
502495
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
503-
pos.leader_epoch != -1) {
496+
pos.leader_epoch != -1 &&
497+
!rd_kafka_topic_partition_get_fetch_pos_validated(
498+
rktpar)) {
504499
rd_kafka_dbg(
505500
rk, CGRP, "SRVPEND",
506501
"Validating assigned partition offset "
@@ -529,6 +524,14 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
529524
rktpar->topic, rktpar->partition,
530525
rd_kafka_fetch_pos2str(pos));
531526

527+
/* Reset the (lib) pause flag which may have
528+
* been set by the cgrp when scheduling the
529+
* rebalance callback. */
530+
rd_kafka_toppar_op_pause_resume(
531+
rktp, rd_false /*resume*/,
532+
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
533+
RD_KAFKA_NO_REPLYQ);
534+
532535
/* Start the fetcher */
533536
rktp->rktp_started = rd_true;
534537
rk->rk_consumer.assignment.started_cnt++;

src/rdkafka_offset.c

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
940940
rd_kafka_toppar_lock(rktp);
941941
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_VALIDATING;
942942
rd_kafka_toppar_unlock(rktp);
943+
rd_bool_t assignment_serve = rd_false;
943944

944945
if (err == RD_KAFKA_RESP_ERR__DESTROY) {
945946
rd_kafka_toppar_destroy(rktp); /* Drop refcnt */
@@ -1084,12 +1085,28 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
10841085
rktp->rktp_partition, end_offset,
10851086
end_offset_leader_epoch);
10861087

1087-
rd_kafka_toppar_set_fetch_state(rktp,
1088-
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1088+
if (rd_kafka_is_simple_consumer(rk) || rktp->rktp_started) {
1089+
/* Already started, just set state to active */
1090+
rd_kafka_toppar_set_fetch_state(
1091+
rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1092+
} else {
1093+
rd_kafka_topic_partition_t *rktpar_pending =
1094+
rd_kafka_topic_partition_list_add_copy(
1095+
rk->rk_consumer.assignment.pending, rktpar);
1096+
rd_kafka_topic_partition_set_from_fetch_pos(
1097+
rktpar_pending, rktp->rktp_offset_validation_pos);
1098+
rd_kafka_topic_partition_set_fetch_pos_validated(
1099+
rktpar_pending, rd_true);
1100+
/* Serve the assignment after releasing the lock
1101+
* to respect lock order. */
1102+
assignment_serve = rd_true;
1103+
}
10891104
}
10901105

10911106
done:
10921107
rd_kafka_toppar_unlock(rktp);
1108+
if (assignment_serve)
1109+
rd_kafka_assignment_serve(rk);
10931110

10941111
if (parts)
10951112
rd_kafka_topic_partition_list_destroy(parts);

src/rdkafka_partition.c

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2864,6 +2864,44 @@ void rd_kafka_topic_partition_set_current_leader_epoch(
28642864
parpriv->current_leader_epoch = current_leader_epoch;
28652865
}
28662866

2867+
/**
2868+
* @brief Marks the fetch position (offset + leader epoch) as validated or not.
2869+
* @param rktpar Partition object.
2870+
* @param fetch_pos_validated Whether the fetch position has been validated.
2871+
*
2872+
* @remark See KIP-320 for more information.
2873+
*/
2874+
void rd_kafka_topic_partition_set_fetch_pos_validated(
2875+
rd_kafka_topic_partition_t *rktpar,
2876+
rd_bool_t fetch_pos_validated) {
2877+
rd_kafka_topic_partition_private_t *parpriv;
2878+
2879+
/* Avoid allocating private_t if clearing the epoch */
2880+
if (!fetch_pos_validated || !rktpar->_private)
2881+
return;
2882+
2883+
parpriv = rd_kafka_topic_partition_get_private(rktpar);
2884+
parpriv->fetch_pos_validated = fetch_pos_validated;
2885+
}
2886+
2887+
/**
2888+
* @brief Gets the validation status of the fetch position (offset + leader
2889+
* epoch).
2890+
* @param rktpar Partition object.
2891+
* @return Whether the fetch position has been validated.
2892+
*
2893+
* @remark See KIP-320 for more information.
2894+
*/
2895+
rd_bool_t rd_kafka_topic_partition_get_fetch_pos_validated(
2896+
rd_kafka_topic_partition_t *rktpar) {
2897+
const rd_kafka_topic_partition_private_t *parpriv;
2898+
2899+
if (!(parpriv = rktpar->_private))
2900+
return rd_false;
2901+
2902+
return parpriv->fetch_pos_validated;
2903+
}
2904+
28672905
/**
28682906
* @brief Set offset and leader epoch from a fetchpos.
28692907
*/

src/rdkafka_partition.h

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,13 @@ typedef struct rd_kafka_topic_partition_private_s {
499499
int32_t current_leader_epoch;
500500
/** Leader epoch if known, else -1. */
501501
int32_t leader_epoch;
502+
/**
503+
* Is fetch position (offset + leader_epoch) validated?
504+
* This field isn't copied automatically
505+
* when copying or updating the struct to avoid unintended
506+
* skipped validations.
507+
*/
508+
rd_bool_t fetch_pos_validated;
502509
/** Topic id. */
503510
rd_kafka_Uuid_t topic_id;
504511
} rd_kafka_topic_partition_private_t;
@@ -848,30 +855,21 @@ rd_kafka_topic_partition_get_private(rd_kafka_topic_partition_t *rktpar) {
848855
}
849856

850857

851-
/**
852-
* @returns the partition leader current epoch, if relevant and known,
853-
* else -1.
854-
*
855-
* @param rktpar Partition object.
856-
*
857-
* @remark See KIP-320 for more information.
858-
*/
859858
int32_t rd_kafka_topic_partition_get_current_leader_epoch(
860859
const rd_kafka_topic_partition_t *rktpar);
861860

862861

863-
/**
864-
* @brief Sets the partition leader current epoch (use -1 to clear).
865-
*
866-
* @param rktpar Partition object.
867-
* @param leader_epoch Partition leader current epoch, use -1 to reset.
868-
*
869-
* @remark See KIP-320 for more information.
870-
*/
871862
void rd_kafka_topic_partition_set_current_leader_epoch(
872863
rd_kafka_topic_partition_t *rktpar,
873864
int32_t leader_epoch);
874865

866+
void rd_kafka_topic_partition_set_fetch_pos_validated(
867+
rd_kafka_topic_partition_t *rktpar,
868+
rd_bool_t fetch_pos_validated);
869+
870+
rd_bool_t rd_kafka_topic_partition_get_fetch_pos_validated(
871+
rd_kafka_topic_partition_t *rktpar);
872+
875873
/**
876874
* @returns the partition's rktp if set (no refcnt increase), else NULL.
877875
*/

0 commit comments

Comments
 (0)