Skip to content

Commit 33f3941

Browse files
committed
[KIP-932] Fix intermittent test and add debug logging
1. Fix intermittent do_test_mixed_commit_types failure: add rd_sleep(3) before consumer close to allow in-flight async ack requests to complete. The race occurred when commit_async grabbed acks from the inflight map but the broker response hadn't arrived before consumer close tore down the connection. Tighten commit_sync timing assertion from 5000ms to 500ms 2. Add debug logging to commit_async, ShareAcknowledge request construction, and ack batch building for better observability 3. Add TODO KIP-932 comments for future improvements: - Ack batch merging for adjacent/overlapping offset ranges - Using abs_timeout instead of socket.timeout.ms for ShareAcknowledge requests in commit_sync path
1 parent 8549895 commit 33f3941

File tree

4 files changed

+179
-124
lines changed

4 files changed

+179
-124
lines changed

src/rdkafka.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3443,6 +3443,20 @@ static void rd_kafka_share_segregate_acks_by_leader(rd_kafka_t *rk,
34433443
* is then fully destroyed (freeing its entries). */
34443444
rd_kafka_share_ack_batch_entry_t *entry;
34453445
int j;
3446+
/*
3447+
* TODO KIP-932: Merge with overlapping offsets into
3448+
* single entry with single type. For example, if
3449+
* existing has an ACCEPT for offsets 0-50 and new
3450+
* batch has ACCEPT for offsets 51-100, they can be
3451+
* merged into a single ACCEPT for offsets 0-100.
3452+
* Currently, we are blindly adding entries from the
3453+
* new batch into the existing batch, which means we
3454+
* may end up with multiple adjacent or overlapping
3455+
* entries of the same type that could have been
3456+
* merged. This is not incorrect, but it is less
3457+
* efficient in terms of the network bandwidth in the
3458+
* RPC call.
3459+
*/
34463460
RD_LIST_FOREACH(entry, &batch->entries, j) {
34473461
rd_list_add(
34483462
&existing->entries,
@@ -4212,6 +4226,8 @@ rd_kafka_error_t *rd_kafka_share_commit_async(rd_kafka_share_t *rkshare) {
42124226
rd_kafka_op_t *rko;
42134227
rd_list_t *ack_batches;
42144228

4229+
rd_kafka_dbg(rk, CGRP, "SHARE", "Committing asynchronously");
4230+
42154231
/* Drain rk_rep for all pending callbacks (non-blocking) */
42164232
rd_kafka_q_serve(rk->rk_rep, RD_POLL_NOWAIT, 0, RD_KAFKA_Q_CB_CALLBACK,
42174233
rd_kafka_poll_cb, NULL);
@@ -4274,6 +4290,9 @@ rd_kafka_share_commit_sync(rd_kafka_share_t *rkshare,
42744290

42754291
*partitions = NULL;
42764292

4293+
rd_kafka_dbg(rk, CGRP, "SHARE",
4294+
"Committing synchronously with timeout %d ms", timeout_ms);
4295+
42774296
/* Drain rk_rep for all pending callbacks (non-blocking) */
42784297
rd_kafka_q_serve(rk->rk_rep, RD_POLL_NOWAIT, 0, RD_KAFKA_Q_CB_CALLBACK,
42794298
rd_kafka_poll_cb, NULL);

src/rdkafka_fetcher.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,6 +2651,11 @@ void rd_kafka_ShareAcknowledgeRequest(rd_kafka_broker_t *rkb,
26512651
/* Topics array with acknowledgement batches */
26522652
of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
26532653

2654+
rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREACK",
2655+
"Building ShareAcknowledge request with %d ack toppars "
2656+
"and %d total ack entries",
2657+
ack_details_cnt, total_ack_entries);
2658+
26542659
if (ack_details) {
26552660
rd_kafka_Uuid_t *topic_id_last = NULL;
26562661
rd_kafka_share_ack_batches_t *batches;
@@ -2690,11 +2695,25 @@ void rd_kafka_ShareAcknowledgeRequest(rd_kafka_broker_t *rkb,
26902695
rd_kafka_buf_write_i32(rkbuf,
26912696
batches->rktpar->partition);
26922697

2698+
rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREACK",
2699+
"Adding ack for topic %s [%" PRId32
2700+
"] with %d entries",
2701+
batches->rktpar->topic,
2702+
batches->rktpar->partition,
2703+
rd_list_cnt(&batches->entries));
2704+
26932705
/* Write acknowledgement batches */
26942706
entries_cnt = rd_list_cnt(&batches->entries);
26952707
rd_kafka_buf_write_arraycnt(rkbuf, entries_cnt);
26962708

26972709
RD_LIST_FOREACH(entry, &batches->entries, m) {
2710+
rd_kafka_dbg(
2711+
rkb->rkb_rk, FETCH, "SHAREACK",
2712+
"Adding ack entry with start offset "
2713+
"%" PRId64 ", end offset %" PRId64
2714+
", type %d",
2715+
entry->start_offset, entry->end_offset,
2716+
entry->types[0]);
26982717
/* FirstOffset */
26992718
rd_kafka_buf_write_i64(rkbuf,
27002719
entry->start_offset);

src/rdkafka_share_acknowledgement.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,13 @@ rd_list_t *rd_kafka_share_build_ack_details(rd_kafka_share_t *rkshare) {
372372
->response_leader_epoch,
373373
0);
374374
}
375+
rd_kafka_dbg(
376+
rkshare->rkshare_rk, CGRP,
377+
"SHAREACK",
378+
" Adding ack detail for offsets "
379+
"[%" PRId64 " - %" PRId64
380+
"] with type %d",
381+
run_start, run_end, run_type);
375382
/* Collated: 1 type for entire
376383
* range */
377384
rd_list_add(

0 commit comments

Comments
 (0)