Added MessageCommitter interface and implementation#610
Added MessageCommitter interface and implementation#610alex268 merged 5 commits intoydb-platform:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #610 +/- ##
============================================
- Coverage 68.92% 68.87% -0.06%
- Complexity 3131 3133 +2
============================================
Files 355 359 +4
Lines 15304 15328 +24
Branches 1621 1621
============================================
+ Hits 10549 10557 +8
- Misses 4094 4109 +15
- Partials 661 662 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
topic/src/main/java/tech/ydb/topic/read/impl/MessageDecoder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR introduces a new commit abstraction (MessageCommitter) for topic reader message/event commits, decoupling commit handling from ReadPartitionSession internals and updating reader components/tests to use commit ranges.
Changes:
- Added
MessageCommitterAPI and aMessageCommitterImplimplementation to manage commit futures and commit requests. - Refactored
MessageandDataReceivedEventto exposegetRangeToCommit()and a session-boundgetCommitter(), withcommit()becoming a default method. - Updated reader/session internals (
ReadPartitionSession,ReadSession, sync reader, deferred committer) and adapted tests accordingly.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| topic/src/main/java/tech/ydb/topic/read/MessageCommitter.java | New public interface for committing ranges/messages. |
| topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java | New implementation backing commit futures and confirmations. |
| topic/src/main/java/tech/ydb/topic/read/Message.java | Adds commit range + committer, makes commit() a default method; deprecates offsets accessor. |
| topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java | Adds commit range + committer, makes commit() a default method; deprecates offsets accessor. |
| topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java | Wires messages to MessageCommitter + commit range. |
| topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java | Provides event commit range + committer instead of direct session commits. |
| topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java | Moves commit handling to MessageCommitterImpl and introduces per-message commit ranges. |
| topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java | Adapts partition session lifecycle/commit confirmation to the new committer flow. |
| topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java | Refactors deferred commit batching to group by MessageCommitter. |
| topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java | Simplifies API from adding collections to adding single ranges. |
| topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java | Updates transaction offset updates to use commit ranges. |
| topic/src/main/java/tech/ydb/topic/read/impl/MessageDecoder.java | Renames “id” parameter to “traceID” in decoding tasks. |
| topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java | Marks the interface as deprecated. |
| topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java | Adds OffsetsRange.of(...) factories. |
| topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java | Updates tests to match new DisjointOffsetRangeSet.add(...) signature. |
| topic/src/test/java/tech/ydb/topic/impl/TopicReadersIntegrationTest.java | Adds a test timeout rule and an ignored test scenario for retention-like behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.