Skip to content

Commit acec9a5

Browse files
authored
[KIP-848] Fixes related to Offset Commit previous error and ConsumerGroupHeartbeat not updating member epoch in a case (#4672)
[KIP-848] Fixed a condition where error was being raised in commit due to old error in the topic partition [KIP-848] Fix discarding heartbeat response without epoch update when leaving during inflight HB
1 parent 06b2fcb commit acec9a5

File tree

3 files changed

+24
-13
lines changed

3 files changed

+24
-13
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ librdkafka v2.12.0 is a feature release:
88

99
## Fixes
1010

11+
### General fixes
12+
* Fix for discarding the member epoch in a consumer group heartbeat response when leaving with an inflight HB (#4672).
13+
* Fix for an error being raised after a commit due to an existing error in the topic partition (#4672).
14+
1115
### Telemetry fixes
1216

1317
* Issues: #5179 .

src/rdkafka_cgrp.c

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3085,8 +3085,6 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
30853085

30863086
rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
30873087

3088-
if (rd_kafka_cgrp_will_leave(rkcg))
3089-
err = RD_KAFKA_RESP_ERR__OUTDATED;
30903088
if (err)
30913089
goto err;
30923090

@@ -3106,15 +3104,25 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
31063104
}
31073105

31083106
rd_kafka_buf_read_i32(rkbuf, &member_epoch);
3109-
rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms);
3110-
3111-
int8_t are_assignments_present;
3112-
rd_kafka_buf_read_i8(rkbuf, &are_assignments_present);
31133107
rkcg->rkcg_generation_id = member_epoch;
3108+
3109+
rd_kafka_dbg(rk, CGRP, "HEARTBEAT",
3110+
"ConsumerGroupHeartbeat response received for "
3111+
"member id \"%.*s\" with epoch %d",
3112+
RD_KAFKAP_STR_PR(&member_id), member_epoch);
3113+
3114+
if (rd_kafka_cgrp_will_leave(rkcg)) {
3115+
err = RD_KAFKA_RESP_ERR__OUTDATED;
3116+
goto err;
3117+
}
3118+
3119+
rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms);
31143120
if (heartbeat_interval_ms > 0) {
31153121
rkcg->rkcg_heartbeat_intvl_ms = heartbeat_interval_ms;
31163122
}
31173123

3124+
int8_t are_assignments_present;
3125+
rd_kafka_buf_read_i8(rkbuf, &are_assignments_present);
31183126
if (are_assignments_present == 1) {
31193127
rd_kafka_topic_partition_list_t *assigned_topic_partitions;
31203128
const rd_kafka_topic_partition_field_t assignments_fields[] = {
@@ -3134,11 +3142,10 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
31343142
sizeof(assigned_topic_partitions_str), 0);
31353143
}
31363144

3137-
rd_kafka_dbg(
3138-
rk, CGRP, "HEARTBEAT",
3139-
"ConsumerGroupHeartbeat response received target "
3140-
"assignment \"%s\"",
3141-
assigned_topic_partitions_str);
3145+
rd_kafka_dbg(rk, CGRP, "HEARTBEAT",
3146+
"ConsumerGroupHeartbeat received target "
3147+
"assignment \"%s\"",
3148+
assigned_topic_partitions_str);
31423149
}
31433150

31443151
if (assigned_topic_partitions) {

src/rdkafka_request.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,9 +1726,9 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk,
17261726
continue;
17271727
}
17281728

1729+
rktpar->err = partition->err;
17291730
if (partition->err) {
1730-
rktpar->err = partition->err;
1731-
err = partition->err;
1731+
err = partition->err;
17321732
errcnt++;
17331733
/* Accumulate actions for per-partition
17341734
* errors. */

0 commit comments

Comments
 (0)