[KIP-932]: Implement acknowledgement callback#5380
Open
Pratyush Ranjan (PratRanj07) wants to merge 8 commits intodev_kip-932_commit-syncfrom
Open
[KIP-932]: Implement acknowledgement callback#5380Pratyush Ranjan (PratRanj07) wants to merge 8 commits intodev_kip-932_commit-syncfrom
Pratyush Ranjan (PratRanj07) wants to merge 8 commits intodev_kip-932_commit-syncfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
0988202 to
2d00c1f
Compare
1. Core implementation: - SHARE_COMMIT_SYNC_FANOUT op type and handler in main thread - Segregate acks by partition leader into per-broker pending_commit_sync - Dispatch sync ack ops with priority over async_ack_details - Timeout timer fills remaining partitions with _TIMED_OUT - App thread blocks on temp queue until all broker replies or timeout 2. Data structures: - commit_sync_request in rkcg (abs_timeout, results, wait count, replyq, timer) - pending_commit_sync in broker (sync_ack_details, abs_timeout, request_id) - SHARE_COMMIT_SYNC_FANOUT and FANOUT_REPLY op types with union members 3. Broker thread changes: - ShareAcknowledge reply handler populates ack_results in reply op - Main thread reply handler copies per-partition errors to sync results - Pending sync acks dispatched before async when broker becomes free 4. Tests (0176-share_consumer_commit_sync.c): - Basic implicit/explicit ack mode commit_sync - No pending acks returns NULL/NULL - Commit prevents redelivery (Consumer B gets 0 records) - Mixed ack types (ACCEPT/RELEASE/REJECT) with delivery count verification - Multiple commit_sync calls with back-to-back no-op commits - Multi-topic multi-partition (10 topics x 6 partitions x 10 msgs) - Mock: verifies ShareAcknowledge RPC count matches commit_sync calls - Mock: timeout handling with 5s RTT / 2s timeout and recovery - Mixed commit types (10 async + 1 sync pattern) - Mock: broker dispatch priority (sync before async) 5. Example program and build integration
1. Fix intermittent do_test_mixed_commit_types failure: add
rd_sleep(3) before consumer close to allow in-flight async
ack requests to complete. The race occurred when commit_async
grabbed acks from the inflight map but the broker response
hadn't arrived before consumer close tore down the connection.
Tighten commit_sync timing assertion from 5000ms to 500ms
2. Add debug logging to commit_async, ShareAcknowledge request
construction, and ack batch building for better observability
3. Add TODO KIP-932 comments for future improvements:
- Ack batch merging for adjacent/overlapping offset ranges
- Using abs_timeout instead of socket.timeout.ms for
ShareAcknowledge requests in commit_sync path
- Thread safety check for share fetch broker fields
43b8005 to
1b69281
Compare
debdba8 to
e2f7d39
Compare
551ca4f to
bfa47a3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR intends to implement acknowledgement callback and call it for: