Log Publisher Fragmented Message Tracker#1931
Conversation
…er. If there is an election in between a fragmented message, the leader will now replay the beginning fragments of this fragmented message up to the commit position to bring the LogAdapter up to date.
| private final LongArrayQueue fragmentedMessageBounds = new LongArrayQueue(Long.MAX_VALUE); | ||
| private long logAdapterRebuildStartPosition = Aeron.NULL_VALUE; | ||
|
|
||
| LongArrayQueue fragmentedMessageBounds() |
Check notice
Code scanning / CodeQL
Exposing internal representation Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
In general, to avoid exposing internal representation, never return a direct reference to a mutable field. Either (a) change the method to return an immutable/read‑only view of the data or (b) return a defensive copy. Callers that need to modify internal state should instead be given dedicated methods that encapsulate the allowed mutations.
For this specific case, the simplest, least invasive fix is to make fragmentedMessageBounds() return a snapshot of the current queue contents rather than the actual LongArrayQueue instance. Since LongArrayQueue does not have an obvious “unmodifiable view” hook and we should avoid adding non‑standard dependencies, the practical approach is to create a new LongArrayQueue, copy the elements from this.fragmentedMessageBounds into it, and return that. This preserves external behaviour for callers that only read or iterate over the queue, while preventing them from mutating the tracker’s internal queue. If the method is only used for observation (for example, in tests), this change does not break existing functionality.
Concretely, in aeron-cluster/src/main/java/io/aeron/cluster/LogPublisherFragmentedMessageTracker.java, change the fragmentedMessageBounds() method (lines 31–34) so that instead of return fragmentedMessageBounds; it constructs and returns a new LongArrayQueue initialized from the current queue. This requires iterating over fragmentedMessageBounds and offering each long value into the new queue. No additional imports or new classes are needed.
| @@ -30,7 +30,12 @@ | ||
|
|
||
| LongArrayQueue fragmentedMessageBounds() | ||
| { | ||
| return fragmentedMessageBounds; | ||
| final LongArrayQueue copy = new LongArrayQueue(fragmentedMessageBounds.capacity()); | ||
| for (int i = 0, size = fragmentedMessageBounds.size(); i < size; i++) | ||
| { | ||
| copy.offerLong(fragmentedMessageBounds.get(i)); | ||
| } | ||
| return copy; | ||
| } | ||
|
|
||
| long logAdapterRebuildStartPosition() |
Opening draft PR to see what others' thoughts are.
Problem is recovery on the leader if there is an election and the leader's commit position is in between a fragmented message. When the leader moves into election states LEADER_REPLAY or FOLLOWER_REPLAY, the leader will replay from the log position to append position. This is a problem since the beginning fragments of the fragmented message will not be in the LogAdapter's buffer builder.
This manifests itself in a few ways
This implementation revives some of the work from but using a different approach than: #1864. This approach avoids the leader needing to read the log in a non extension case, Will also look to revive other PR in coming days for comparison.
This draft PR is to keep track of the uncommitted fragmented messages offered by the leader. If there is an election in between a fragmented message, the leader will now replay the beginning fragments of this fragmented message up to the commit position to bring the LogAdapter up to date. Implementation will likely need some cleanup as maybe this isn't a very clean approach.