Skip to content

[KIP-932] Add rd_kafka_share_commit_sync() API#5374

Open
Pranav Rathi (pranavrth) wants to merge 5 commits intodev_kip-932_queues-for-kafkafrom
dev_kip-932_commit-sync
Open

[KIP-932] Add rd_kafka_share_commit_sync() API#5374
Pranav Rathi (pranavrth) wants to merge 5 commits intodev_kip-932_queues-for-kafkafrom
dev_kip-932_commit-sync

Conversation

@pranavrth
Copy link
Copy Markdown
Member

  1. Core implementation:

    • SHARE_COMMIT_SYNC_FANOUT op type and handler in main thread
    • Segregate acks by partition leader into per-broker pending_commit_sync
    • Dispatch sync ack ops with priority over async_ack_details
    • Timeout timer fills remaining partitions with _TIMED_OUT
    • App thread blocks on temp queue until all broker replies or timeout
  2. Data structures:

    • commit_sync_request in rkcg (abs_timeout, results, wait count, replyq, timer)
    • pending_commit_sync in broker (sync_ack_details, abs_timeout, request_id)
    • SHARE_COMMIT_SYNC_FANOUT and FANOUT_REPLY op types with union members
  3. Broker thread changes:

    • ShareAcknowledge reply handler populates ack_results in reply op
    • Main thread reply handler copies per-partition errors to sync results
    • Pending sync acks dispatched before async when broker becomes free
  4. Tests (0176-share_consumer_commit_sync.c):

    • Basic implicit/explicit ack mode commit_sync
    • No pending acks returns NULL/NULL
    • Commit prevents redelivery (Consumer B gets 0 records)
    • Mixed ack types (ACCEPT/RELEASE/REJECT) with delivery count verification
    • Multiple commit_sync calls with back-to-back no-op commits
    • Multi-topic multi-partition (10 topics x 6 partitions x 10 msgs)
    • Mock: verifies ShareAcknowledge RPC count matches commit_sync calls
    • Mock: timeout handling with 5s RTT / 2s timeout and recovery
    • Mixed commit types (10 async + 1 sync pattern)
    • Mock: broker dispatch priority (sync before async)
  5. Example program and build integration

@pranavrth Pranav Rathi (pranavrth) requested a review from a team as a code owner March 30, 2026 13:01
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_ShareAcknowledge-rpc branch from c9e39d3 to 541611f Compare April 1, 2026 14:16
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_commit-sync branch from 33f3941 to 0988202 Compare April 1, 2026 14:26
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_ShareAcknowledge-rpc branch from 541611f to 469f22b Compare April 2, 2026 01:29
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_commit-sync branch from 0988202 to 2d00c1f Compare April 2, 2026 01:44
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_ShareAcknowledge-rpc branch from 469f22b to 5e7a5de Compare April 2, 2026 08:26
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_commit-sync branch from 43b8005 to 1b69281 Compare April 2, 2026 08:27
Base automatically changed from dev_kip-932_ShareAcknowledge-rpc to dev_kip-932_queues-for-kafka April 6, 2026 07:48
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip-932_commit-sync branch from 1b69281 to 551ca4f Compare April 6, 2026 11:53
1. Core implementation:
   - SHARE_COMMIT_SYNC_FANOUT op type and handler in main thread
   - Segregate acks by partition leader into per-broker pending_commit_sync
   - Dispatch sync ack ops with priority over async_ack_details
   - Timeout timer fills remaining partitions with _TIMED_OUT
   - App thread blocks on temp queue until all broker replies or timeout

2. Data structures:
   - commit_sync_request in rkcg (abs_timeout, results, wait count, replyq, timer)
   - pending_commit_sync in broker (sync_ack_details, abs_timeout, request_id)
   - SHARE_COMMIT_SYNC_FANOUT and FANOUT_REPLY op types with union members

3. Broker thread changes:
   - ShareAcknowledge reply handler populates ack_results in reply op
   - Main thread reply handler copies per-partition errors to sync results
   - Pending sync acks dispatched before async when broker becomes free

4. Tests (0176-share_consumer_commit_sync.c):
   - Basic implicit/explicit ack mode commit_sync
   - No pending acks returns NULL/NULL
   - Commit prevents redelivery (Consumer B gets 0 records)
   - Mixed ack types (ACCEPT/RELEASE/REJECT) with delivery count verification
   - Multiple commit_sync calls with back-to-back no-op commits
   - Multi-topic multi-partition (10 topics x 6 partitions x 10 msgs)
   - Mock: verifies ShareAcknowledge RPC count matches commit_sync calls
   - Mock: timeout handling with 5s RTT / 2s timeout and recovery
   - Mixed commit types (10 async + 1 sync pattern)
   - Mock: broker dispatch priority (sync before async)

5. Example program and build integration
1. Fix intermittent do_test_mixed_commit_types failure: add
   rd_sleep(3) before consumer close to allow in-flight async
   ack requests to complete. The race occurred when commit_async
   grabbed acks from the inflight map but the broker response
   hadn't arrived before consumer close tore down the connection.
   Tighten commit_sync timing assertion from 5000ms to 500ms

2. Add debug logging to commit_async, ShareAcknowledge request
   construction, and ack batch building for better observability

3. Add TODO KIP-932 comments for future improvements:
   - Ack batch merging for adjacent/overlapping offset ranges
   - Using abs_timeout instead of socket.timeout.ms for
     ShareAcknowledge requests in commit_sync path
   - Thread safety check for share fetch broker fields
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments


if (!ack_batches) {
rd_kafka_dbg(rk, CGRP, "SHARE",
"No pending acknowledgements to commit sync");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"No pending acknowledgements to commit sync");
"No pending acknowledgements to commit");

rkcg->rkcg_share.commit_sync_id_counter;
rkcg->rkcg_commit_sync_request.abs_timeout = abs_timeout;
rkcg->rkcg_commit_sync_request.replyq = rko->rko_replyq.q;
rko->rko_replyq.q = NULL; /* Take ownership of replyq */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep all the rkcg assignment operations together

Comment on lines +4031 to +4033
if (result_rktpar)
result_rktpar->err =
RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be problematic if result_rktpar is not found? Maybe add a debug log or rd_assert in that case to make sure it should always be there?

* reply.
* @locality main
* thread */
int64_t commit_sync_id_counter; /**< Global counter for
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t commit_sync_id_counter; /**< Global counter for
int64_t commit_sync_request_id_counter; /**< Global counter for

*results; /**< Partition-to-error mapping.
* Populated as broker replies
* arrive. */
int wait_broker_result_count; /**< Number of broker
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int wait_broker_result_count; /**< Number of broker
int brokers_awaiting_result_cnt; /**< Number of broker

or something similar

rd_kafka_dbg(
rkshare->rkshare_rk, CGRP,
"SHAREACK",
" Adding ack detail for offsets "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this extra space?

* and returns per-partition results.
*/

#define MAX_MSGS 500
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed

Comment on lines +4300 to +4302
rd_kafka_share_acknowledge_all_if_implicit(rkshare);

ack_batches = rd_kafka_share_build_ack_details(rk->rk_rkshare);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiple app threads call commitSync(), there can be race conditions while building the acks.

rd_kafka_topic_partition_t *result_rktpar;

rktp = rd_kafka_topic_partition_toppar(rk, batch->rktpar);
if (!rktp || !rktp->rktp_leader) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wrap this condition in unlikely()?

Comment on lines +489 to +491
rd_ts_t abs_timeout; /**< Absolute timeout from
* the commit_sync request. */
int64_t commit_sync_request_id; /**< Request ID this
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of these fields? They are only getting used while creating the rko to be sent from MT to BT. In that situation, we can use values from cgrp struct instead.

* Step 3: Dispatch pending commit_sync to the replying broker
* (highest priority). Acks must always be sent.
*/
if (!reply_rkb->rkb_share_fetch_enqueued &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the check on rkb_share_fetch_enqueued? We are setting it to false at the beginning of this function.

rko->rko_u.share_commit_sync_fanout.results = NULL;

/* Initialize commit_sync request state */
rkcg->rkcg_share.commit_sync_id_counter++;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having this global counter, can we generate the id using a random no. generator or use abs_timeout value? If yes, we can get rid of commit_sync_id_counter field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants