4242import java .util .List ;
4343import java .util .Map ;
4444import java .util .Objects ;
45- import java .util .Optional ;
4645import java .util .concurrent .CompletableFuture ;
47- import java .util .concurrent .TimeUnit ;
4846import java .util .concurrent .atomic .AtomicBoolean ;
4947import java .util .function .LongUnaryOperator ;
5048
5149/**
5250 * An abstract implementation of {@link LogAppender}.
5351 */
5452public abstract class LogAppenderBase implements LogAppender {
53+ /** For storing log entries to create an {@link AppendEntriesRequestProto}. */
54+ private class EntryBuffer {
55+ /** A queue for limiting the byte size and element size. */
56+ private final DataQueue <EntryWithData > queue ;
57+ /** A map for releasing {@link ReferenceCountedObject}s. */
58+ private final Map <Long , ReferenceCountedObject <EntryWithData >> map = new HashMap <>();
59+
60+ EntryBuffer () {
61+ final RaftProperties properties = server .getRaftServer ().getProperties ();
62+ final SizeInBytes bufferByteLimit = RaftServerConfigKeys .Log .Appender .bufferByteLimit (properties );
63+ final int bufferElementLimit = RaftServerConfigKeys .Log .Appender .bufferElementLimit (properties );
64+ this .queue = new DataQueue <>(name , bufferByteLimit , bufferElementLimit , EntryWithData ::getSerializedSize );
65+ }
66+
67+ void retain () {
68+ for (ReferenceCountedObject <EntryWithData > ref : map .values ()) {
69+ ref .retain ();
70+ }
71+ }
72+
73+ void release () {
74+ for (ReferenceCountedObject <EntryWithData > ref : map .values ()) {
75+ ref .release ();
76+ }
77+ }
78+
79+ int size () {
80+ return map .size ();
81+ }
82+
83+ boolean putNew (long index , ReferenceCountedObject <EntryWithData > ref ) {
84+ if (!queue .offer (ref .get ())) {
85+ ref .release ();
86+ return false ;
87+ }
88+ final ReferenceCountedObject <EntryWithData > previous = map .put (index , ref );
89+ Preconditions .assertNull (previous , () -> "previous with index " + index );
90+ return true ;
91+ }
92+
93+ void releaseAndClear () {
94+ release ();
95+ map .clear ();
96+ }
97+
98+ List <LogEntryProto > pollList (long heartbeatWaitTimeMs ) throws RaftLogIOException {
99+ try {
100+ return queue .pollList (heartbeatWaitTimeMs , EntryWithData ::getEntry , null );
101+ } catch (RaftLogIOException e ) {
102+ releaseAndClear ();
103+ throw e ;
104+ } finally {
105+ for (EntryWithData entry : queue ) {
106+ // Release remaining entries.
107+ final ReferenceCountedObject <EntryWithData > removed = map .remove (entry .getIndex ());
108+ Objects .requireNonNull (removed , "removed == null" );
109+ removed .release ();
110+ }
111+ queue .clear ();
112+ }
113+ }
114+ }
115+
55116 private final String name ;
56117 private final RaftServer .Division server ;
57118 private final LeaderState leaderState ;
58119 private final FollowerInfo follower ;
59120
60- private final DataQueue <EntryWithData > buffer ;
61121 private final int snapshotChunkMaxSize ;
62122
63123 private final LogAppenderDaemon daemon ;
@@ -75,9 +135,6 @@ protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, F
75135 final RaftProperties properties = server .getRaftServer ().getProperties ();
76136 this .snapshotChunkMaxSize = RaftServerConfigKeys .Log .Appender .snapshotChunkSizeMax (properties ).getSizeInt ();
77137
78- final SizeInBytes bufferByteLimit = RaftServerConfigKeys .Log .Appender .bufferByteLimit (properties );
79- final int bufferElementLimit = RaftServerConfigKeys .Log .Appender .bufferElementLimit (properties );
80- this .buffer = new DataQueue <>(this , bufferByteLimit , bufferElementLimit , EntryWithData ::getSerializedSize );
81138 this .daemon = new LogAppenderDaemon (this );
82139 this .eventAwaitForSignal = new AwaitForSignal (name );
83140
@@ -210,13 +267,13 @@ protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
210267 final long n = oldNextIndex <= 0L ? oldNextIndex : Math .min (oldNextIndex - 1 , newNextIndex );
211268 if (m > n ) {
212269 if (m > newNextIndex ) {
213- LOG .info ("Set nextIndex to matchIndex + 1 (= " + m + ")" );
270+ LOG .info ("{}: Set nextIndex to matchIndex + 1 (= {})" , name , m );
214271 }
215272 return m ;
216273 } else if (oldNextIndex <= 0L ) {
217274 return oldNextIndex ; // no change.
218275 } else {
219- LOG .info ("Decrease nextIndex to " + n );
276+ LOG .info ("{}: Decrease nextIndex to {}" , name , n );
220277 return n ;
221278 }
222279 };
@@ -227,18 +284,18 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he
227284 throw new UnsupportedOperationException ("Use nextAppendEntriesRequest(" + callId + ", " + heartbeat +") instead." );
228285 }
229286
230- /**
231- * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
232- * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
233- * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
234- *
235- * @param callId The call id of the returned request.
236- * @param heartbeat the returned request must be a heartbeat.
237- *
238- * @return a retained reference of {@link AppendEntriesRequestProto} object.
239- * Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
240- * after use.
241- */
287+ /**
288+ * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
289+ * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
290+ * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
291+ *
292+ * @param callId The call id of the returned request.
293+ * @param heartbeat the returned request must be a heartbeat.
294+ *
295+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
296+ * Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
297+ * after use.
298+ */
242299 protected ReferenceCountedObject <AppendEntriesRequestProto > nextAppendEntriesRequest (long callId , boolean heartbeat )
243300 throws RaftLogIOException {
244301 final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs ();
@@ -253,56 +310,24 @@ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesReq
253310 return ref ;
254311 }
255312
256- Preconditions .assertTrue (buffer .isEmpty (), () -> "buffer has " + buffer .getNumElements () + " elements." );
257-
258313 final long snapshotIndex = follower .getSnapshotIndex ();
259- final long leaderNext = getRaftLog ().getNextIndex ();
260314 final long followerNext = follower .getNextIndex ();
261- final long halfMs = heartbeatWaitTimeMs /2 ;
262- final Map <Long , ReferenceCountedObject <EntryWithData >> offered = new HashMap <>();
263- for (long next = followerNext ; leaderNext > next && getHeartbeatWaitTimeMs () - halfMs > 0 ; next ++) {
264- final ReferenceCountedObject <EntryWithData > entryWithData ;
265- try {
266- entryWithData = getRaftLog ().retainEntryWithData (next );
267- if (!buffer .offer (entryWithData .get ())) {
268- entryWithData .release ();
269- break ;
270- }
271- offered .put (next , entryWithData );
272- } catch (Exception e ){
273- for (ReferenceCountedObject <EntryWithData > ref : offered .values ()) {
274- ref .release ();
275- }
276- offered .clear ();
277- throw e ;
278- }
279- }
280- if (buffer .isEmpty ()) {
315+ final EntryBuffer entryBuffer = readLogEntries (followerNext , heartbeatWaitTimeMs );
316+ if (entryBuffer == null ) {
281317 return null ;
282318 }
283319
284- final List <LogEntryProto > protos ;
285- try {
286- protos = buffer .pollList (getHeartbeatWaitTimeMs (), EntryWithData ::getEntry ,
287- (entry , time , exception ) -> LOG .warn ("Failed to get {} in {}" ,
288- entry , time .toString (TimeUnit .MILLISECONDS , 3 ), exception ));
289- } catch (RaftLogIOException e ) {
290- for (ReferenceCountedObject <EntryWithData > ref : offered .values ()) {
291- ref .release ();
292- }
293- offered .clear ();
294- throw e ;
295- } finally {
296- for (EntryWithData entry : buffer ) {
297- // Release remaining entries.
298- Optional .ofNullable (offered .remove (entry .getIndex ())).ifPresent (ReferenceCountedObject ::release );
299- }
300- buffer .clear ();
301- }
320+ final List <LogEntryProto > protos = entryBuffer .pollList (heartbeatWaitTimeMs );
321+ Preconditions .assertSame (entryBuffer .size (), protos .size (), "#protos" );
302322 assertProtos (protos , followerNext , previous , snapshotIndex );
303323 AppendEntriesRequestProto appendEntriesProto =
304324 leaderState .newAppendEntriesRequestProto (follower , protos , previous , callId );
305- return ReferenceCountedObject .delegateFrom (offered .values (), appendEntriesProto );
325+
326+ final ReferenceCountedObject <AppendEntriesRequestProto > ref = ReferenceCountedObject .wrap (
327+ appendEntriesProto , entryBuffer ::retain , entryBuffer ::release );
328+ ref .retain ();
329+ entryBuffer .release ();
330+ return ref ;
306331 }
307332
308333 private void assertProtos (List <LogEntryProto > protos , long nextIndex , TermIndex previous , long snapshotIndex ) {
@@ -324,6 +349,31 @@ private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex
324349 }
325350 }
326351
352+ private EntryBuffer readLogEntries (long followerNext , long heartbeatWaitTimeMs ) throws RaftLogIOException {
353+ final RaftLog raftLog = getRaftLog ();
354+ final long leaderNext = raftLog .getNextIndex ();
355+ final long halfMs = heartbeatWaitTimeMs /2 ;
356+ EntryBuffer entryBuffer = null ;
357+ for (long next = followerNext ; leaderNext > next && getHeartbeatWaitTimeMs () - halfMs > 0 ; next ++) {
358+ final ReferenceCountedObject <EntryWithData > ref ;
359+ try {
360+ ref = raftLog .retainEntryWithData (next );
361+ if (entryBuffer == null ) {
362+ entryBuffer = new EntryBuffer ();
363+ }
364+ if (!entryBuffer .putNew (next , ref )) {
365+ break ;
366+ }
367+ } catch (Exception e ){
368+ if (entryBuffer != null ) {
369+ entryBuffer .releaseAndClear ();
370+ }
371+ throw e ;
372+ }
373+ }
374+ return entryBuffer ;
375+ }
376+
327377 @ Override
328378 public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest (TermIndex firstAvailableLogTermIndex ) {
329379 Preconditions .assertTrue (firstAvailableLogTermIndex .getIndex () >= 0 );
0 commit comments