3737import org .apache .ratis .util .SizeInBytes ;
3838import org .apache .ratis .util .TimeDuration ;
3939
40+ import java .util .Collection ;
4041import java .util .Collections ;
4142import java .util .HashMap ;
4243import java .util .List ;
4344import java .util .Map ;
4445import java .util .Objects ;
45- import java .util .Optional ;
4646import java .util .concurrent .CompletableFuture ;
47- import java .util .concurrent .TimeUnit ;
4847import java .util .concurrent .atomic .AtomicBoolean ;
4948import java .util .function .LongUnaryOperator ;
5049
5150/**
5251 * An abstract implementation of {@link LogAppender}.
5352 */
5453public abstract class LogAppenderBase implements LogAppender {
54+ /** For buffering log entries to create an {@link EntryList}. */
55+ private static class EntryBuffer {
56+ /** A queue for limiting the byte size, number of elements and poll time. */
57+ private final DataQueue <EntryWithData > queue ;
58+ /** A map for releasing {@link ReferenceCountedObject}s. */
59+ private final Map <Long , ReferenceCountedObject <EntryWithData >> references = new HashMap <>();
60+
61+ EntryBuffer (Object name , RaftProperties properties ) {
62+ final SizeInBytes bufferByteLimit = RaftServerConfigKeys .Log .Appender .bufferByteLimit (properties );
63+ final int bufferElementLimit = RaftServerConfigKeys .Log .Appender .bufferElementLimit (properties );
64+ this .queue = new DataQueue <>(name , bufferByteLimit , bufferElementLimit , EntryWithData ::getSerializedSize );
65+ }
66+
67+ boolean putNew (long index , ReferenceCountedObject <EntryWithData > retained ) {
68+ if (!queue .offer (retained .get ())) {
69+ retained .release ();
70+ return false ;
71+ }
72+ final ReferenceCountedObject <EntryWithData > previous = references .put (index , retained );
73+ Preconditions .assertNull (previous , () -> "previous with index " + index );
74+ return true ;
75+ }
76+
77+ void releaseAllAndClear () {
78+ for (ReferenceCountedObject <EntryWithData > ref : references .values ()) {
79+ ref .release ();
80+ }
81+ references .clear ();
82+ queue .clear ();
83+ }
84+
85+ EntryList pollList (long heartbeatWaitTimeMs ) throws RaftLogIOException {
86+ final List <LogEntryProto > protos ;
87+ try {
88+ protos = queue .pollList (heartbeatWaitTimeMs , EntryWithData ::getEntry , null );
89+ } catch (Exception e ) {
90+ releaseAllAndClear ();
91+ throw e ;
92+ } finally {
93+ for (EntryWithData entry : queue ) {
94+ // Remove and release remaining entries.
95+ final ReferenceCountedObject <EntryWithData > removed = references .remove (entry .getIndex ());
96+ Objects .requireNonNull (removed , "removed == null" );
97+ removed .release ();
98+ }
99+ queue .clear ();
100+ }
101+ return new EntryList (protos , references );
102+ }
103+ }
104+
105+ /** Storing log entries and their references. */
106+ private static class EntryList {
107+ private final List <LogEntryProto > protos ;
108+ private final Collection <ReferenceCountedObject <EntryWithData >> references ;
109+
110+ EntryList (List <LogEntryProto > protos , Map <Long , ReferenceCountedObject <EntryWithData >> references ) {
111+ Preconditions .assertSame (references .size (), protos .size (), "#entries" );
112+ this .protos = Collections .unmodifiableList (protos );
113+ this .references = Collections .unmodifiableCollection (references .values ());
114+ }
115+
116+ List <LogEntryProto > getProtos () {
117+ return protos ;
118+ }
119+
120+ void retain () {
121+ for (ReferenceCountedObject <EntryWithData > ref : references ) {
122+ ref .retain ();
123+ }
124+ }
125+
126+ void release () {
127+ for (ReferenceCountedObject <EntryWithData > ref : references ) {
128+ ref .release ();
129+ }
130+ }
131+ }
132+
55133 private final String name ;
56134 private final RaftServer .Division server ;
57135 private final LeaderState leaderState ;
58136 private final FollowerInfo follower ;
59137
60- private final DataQueue <EntryWithData > buffer ;
61138 private final int snapshotChunkMaxSize ;
62139
63140 private final LogAppenderDaemon daemon ;
@@ -75,9 +152,6 @@ protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, F
75152 final RaftProperties properties = server .getRaftServer ().getProperties ();
76153 this .snapshotChunkMaxSize = RaftServerConfigKeys .Log .Appender .snapshotChunkSizeMax (properties ).getSizeInt ();
77154
78- final SizeInBytes bufferByteLimit = RaftServerConfigKeys .Log .Appender .bufferByteLimit (properties );
79- final int bufferElementLimit = RaftServerConfigKeys .Log .Appender .bufferElementLimit (properties );
80- this .buffer = new DataQueue <>(this , bufferByteLimit , bufferElementLimit , EntryWithData ::getSerializedSize );
81155 this .daemon = new LogAppenderDaemon (this );
82156 this .eventAwaitForSignal = new AwaitForSignal (name );
83157
@@ -210,13 +284,13 @@ protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
210284 final long n = oldNextIndex <= 0L ? oldNextIndex : Math .min (oldNextIndex - 1 , newNextIndex );
211285 if (m > n ) {
212286 if (m > newNextIndex ) {
213- LOG .info ("Set nextIndex to matchIndex + 1 (= " + m + ")" );
287+ LOG .info ("{}: Set nextIndex to matchIndex + 1 (= {})" , name , m );
214288 }
215289 return m ;
216290 } else if (oldNextIndex <= 0L ) {
217291 return oldNextIndex ; // no change.
218292 } else {
219- LOG .info ("Decrease nextIndex to " + n );
293+ LOG .info ("{}: Decrease nextIndex to {}" , name , n );
220294 return n ;
221295 }
222296 };
@@ -227,18 +301,18 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he
227301 throw new UnsupportedOperationException ("Use nextAppendEntriesRequest(" + callId + ", " + heartbeat +") instead." );
228302 }
229303
230- /**
231- * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
232- * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
233- * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
234- *
235- * @param callId The call id of the returned request.
236- * @param heartbeat the returned request must be a heartbeat.
237- *
238- * @return a retained reference of {@link AppendEntriesRequestProto} object.
239- * Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
240- * after use.
241- */
304+ /**
305+ * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
306+ * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
307+ * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
308+ *
309+ * @param callId The call id of the returned request.
310+ * @param heartbeat the returned request must be a heartbeat.
311+ *
312+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
313+ * Since the returned reference is retained,
314+ * the caller must call {@link ReferenceCountedObject#release()}} after use.
315+ */
242316 protected ReferenceCountedObject <AppendEntriesRequestProto > nextAppendEntriesRequest (long callId , boolean heartbeat )
243317 throws RaftLogIOException {
244318 final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs ();
@@ -253,56 +327,23 @@ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesReq
253327 return ref ;
254328 }
255329
256- Preconditions .assertTrue (buffer .isEmpty (), () -> "buffer has " + buffer .getNumElements () + " elements." );
257-
258330 final long snapshotIndex = follower .getSnapshotIndex ();
259- final long leaderNext = getRaftLog ().getNextIndex ();
260331 final long followerNext = follower .getNextIndex ();
261- final long halfMs = heartbeatWaitTimeMs /2 ;
262- final Map <Long , ReferenceCountedObject <EntryWithData >> offered = new HashMap <>();
263- for (long next = followerNext ; leaderNext > next && getHeartbeatWaitTimeMs () - halfMs > 0 ; next ++) {
264- final ReferenceCountedObject <EntryWithData > entryWithData ;
265- try {
266- entryWithData = getRaftLog ().retainEntryWithData (next );
267- if (!buffer .offer (entryWithData .get ())) {
268- entryWithData .release ();
269- break ;
270- }
271- offered .put (next , entryWithData );
272- } catch (Exception e ){
273- for (ReferenceCountedObject <EntryWithData > ref : offered .values ()) {
274- ref .release ();
275- }
276- offered .clear ();
277- throw e ;
278- }
279- }
280- if (buffer .isEmpty ()) {
332+ final EntryBuffer entryBuffer = readLogEntries (followerNext , heartbeatWaitTimeMs );
333+ if (entryBuffer == null ) {
281334 return null ;
282335 }
283336
284- final List <LogEntryProto > protos ;
285- try {
286- protos = buffer .pollList (getHeartbeatWaitTimeMs (), EntryWithData ::getEntry ,
287- (entry , time , exception ) -> LOG .warn ("Failed to get {} in {}" ,
288- entry , time .toString (TimeUnit .MILLISECONDS , 3 ), exception ));
289- } catch (RaftLogIOException e ) {
290- for (ReferenceCountedObject <EntryWithData > ref : offered .values ()) {
291- ref .release ();
292- }
293- offered .clear ();
294- throw e ;
295- } finally {
296- for (EntryWithData entry : buffer ) {
297- // Release remaining entries.
298- Optional .ofNullable (offered .remove (entry .getIndex ())).ifPresent (ReferenceCountedObject ::release );
299- }
300- buffer .clear ();
301- }
337+ final EntryList entryList = entryBuffer .pollList (heartbeatWaitTimeMs );
338+ final List <LogEntryProto > protos = entryList .getProtos ();
302339 assertProtos (protos , followerNext , previous , snapshotIndex );
303340 AppendEntriesRequestProto appendEntriesProto =
304341 leaderState .newAppendEntriesRequestProto (follower , protos , previous , callId );
305- return ReferenceCountedObject .delegateFrom (offered .values (), appendEntriesProto );
342+ final ReferenceCountedObject <AppendEntriesRequestProto > ref = ReferenceCountedObject .wrap (
343+ appendEntriesProto , entryList ::retain , entryList ::release );
344+ ref .retain ();
345+ entryList .release ();
346+ return ref ;
306347 }
307348
308349 private void assertProtos (List <LogEntryProto > protos , long nextIndex , TermIndex previous , long snapshotIndex ) {
@@ -324,6 +365,31 @@ private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex
324365 }
325366 }
326367
368+ private EntryBuffer readLogEntries (long followerNext , long heartbeatWaitTimeMs ) throws RaftLogIOException {
369+ final RaftLog raftLog = getRaftLog ();
370+ final long leaderNext = raftLog .getNextIndex ();
371+ final long halfMs = heartbeatWaitTimeMs /2 ;
372+ EntryBuffer entryBuffer = null ;
373+ for (long next = followerNext ; leaderNext > next && getHeartbeatWaitTimeMs () - halfMs > 0 ; next ++) {
374+ final ReferenceCountedObject <EntryWithData > retained ;
375+ try {
376+ retained = raftLog .retainEntryWithData (next );
377+ if (entryBuffer == null ) {
378+ entryBuffer = new EntryBuffer (name , server .getRaftServer ().getProperties ());
379+ }
380+ if (!entryBuffer .putNew (next , retained )) {
381+ break ;
382+ }
383+ } catch (Exception e ) {
384+ if (entryBuffer != null ) {
385+ entryBuffer .releaseAllAndClear ();
386+ }
387+ throw e ;
388+ }
389+ }
390+ return entryBuffer ;
391+ }
392+
327393 @ Override
328394 public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest (TermIndex firstAvailableLogTermIndex ) {
329395 Preconditions .assertTrue (firstAvailableLogTermIndex .getIndex () >= 0 );
0 commit comments