Skip to content

Commit a0df03c

Browse files
craig[bot]pav-kv
andcommitted
Merge #143596
143596: raft: raftLog invariant checks and tweaks r=tbg a=pav-kv Epic: none Release note: none Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 5ced8ed + bf39d78 commit a0df03c

File tree

3 files changed

+86
-20
lines changed

3 files changed

+86
-20
lines changed

pkg/raft/log.go

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,26 @@ type raftLog struct {
6666

6767
// committed is the highest log position that is known to be in
6868
// stable storage on a quorum of nodes.
69+
//
70+
// Invariant: committed does not regress.
6971
committed uint64
7072
// applying is the highest log position that the application has
7173
// been instructed to apply to its state machine. Some of these
7274
// entries may be in the process of applying and have not yet
7375
// reached applied.
7476
// Use: The field is incremented when accepting a Ready struct.
75-
// Invariant: applied <= applying && applying <= committed
77+
//
78+
// Invariant: applied <= applying <= committed.
79+
// Invariant: applying does not regress.
7680
applying uint64
7781
// applied is the highest log position that the application has
7882
// successfully applied to its state machine.
7983
// Use: The field is incremented when advancing after the committed
8084
// entries in a Ready struct have been applied (either synchronously
8185
// or asynchronously).
82-
// Invariant: applied <= committed
86+
//
87+
// Invariant: applied <= committed.
88+
// Invariant: applied does not regress.
8389
applied uint64
8490

8591
logger raftlogger.Logger
@@ -286,10 +292,6 @@ func (l *raftLog) hasNextUnstableEnts() bool {
286292
// entries from the unstable log may be returned; otherwise, only entries known
287293
// to reside locally on stable storage will be returned.
288294
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
289-
if l.hasNextOrInProgressSnapshot() {
290-
// See comment in hasNextCommittedEnts.
291-
return nil
292-
}
293295
lo, hi := l.applying, l.maxAppliableIndex(allowUnstable) // (lo, hi]
294296
if lo >= hi {
295297
// Nothing to apply.
@@ -305,26 +307,37 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
305307
// hasNextCommittedEnts returns if there is any available entries for execution.
306308
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
307309
func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
308-
if l.hasNextOrInProgressSnapshot() {
309-
// If we have a snapshot to apply, don't also return any committed
310-
// entries. Doing so raises questions about what should be applied
311-
// first.
312-
return false
313-
}
314-
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
315-
return lo < hi
310+
return l.applying < l.maxAppliableIndex(allowUnstable)
316311
}
317312

318313
// maxAppliableIndex returns the maximum committed index that can be applied.
319314
// If allowUnstable is true, committed entries from the unstable log can be
320315
// applied; otherwise, only entries known to reside locally on stable storage
321316
// can be applied.
317+
//
318+
// The maxAppliableIndex never regresses, and is always >= l.applying, assuming
319+
// allowUnstable does not change from true to false. As of today, this flag is
320+
// configured statically.
321+
//
322+
// If there is a pending snapshot, maxAppliableIndex returns l.applying, i.e.
323+
// the application of committed entries is paused until the snapshot is applied.
322324
func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
323-
hi := l.committed
324-
if !allowUnstable {
325-
hi = min(hi, l.unstable.prev.index)
326-
}
327-
return hi
325+
if l.hasNextOrInProgressSnapshot() {
326+
// If we have a snapshot to apply, don't return any committed entries. Doing
327+
// so raises questions about what should be applied first.
328+
//
329+
// TODO(pav-kv): the answer to the questions is - the snapshot should be
330+
// applied first, and then the entries. The code must make sure that the
331+
// overall sequence of "apply" batches is in the increasing order of the
332+
// commit index.
333+
return l.applying
334+
}
335+
if allowUnstable {
336+
return l.committed
337+
}
338+
// NB: this returns >= l.applying because l.applying <= prev.index, assuming
339+
// that allowUnstable hasn't flipped from true to false.
340+
return min(l.committed, l.unstable.prev.index)
328341
}
329342

330343
// nextUnstableSnapshot returns the snapshot, if present, that is available to

pkg/raft/log_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func TestMatch(t *testing.T) {
6464
t.Run("", func(t *testing.T) {
6565
log := newLog(NewMemoryStorage(), raftlogger.DiscardLogger)
6666
require.True(t, log.append(init))
67+
log.checkInvariants(t)
6768
match, ok := log.match(tt.sl)
6869
require.Equal(t, !tt.notOk, ok)
6970
require.Equal(t, tt.want, match)
@@ -112,6 +113,7 @@ func TestFindConflictByTerm(t *testing.T) {
112113
}})
113114
l := newLog(st, raftlogger.DiscardLogger)
114115
require.True(t, l.append(tt.sl))
116+
l.checkInvariants(t)
115117

116118
index, term := l.findConflictByTerm(tt.index, tt.term)
117119
require.Equal(t, tt.want, index)
@@ -126,6 +128,7 @@ func TestIsUpToDate(t *testing.T) {
126128
init := entryID{}.append(1, 1, 2, 2, 3)
127129
raftLog := newLog(NewMemoryStorage(), raftlogger.DiscardLogger)
128130
require.True(t, raftLog.append(init))
131+
raftLog.checkInvariants(t)
129132
last := raftLog.lastEntryID()
130133
require.Equal(t, entryID{term: 3, index: 5}, last)
131134
for _, tt := range []struct {
@@ -177,8 +180,10 @@ func TestAppend(t *testing.T) {
177180
require.NoError(t, storage.SetHardState(pb.HardState{Term: init.term}))
178181
require.NoError(t, storage.Append(init.entries))
179182
raftLog := newLog(storage, raftlogger.DiscardLogger)
183+
raftLog.checkInvariants(t)
180184

181185
require.Equal(t, !tt.notOk, raftLog.append(tt.app))
186+
raftLog.checkInvariants(t)
182187
if tt.notOk {
183188
require.Equal(t, init.entries, raftLog.allEntries())
184189
return
@@ -242,6 +247,7 @@ func TestLogMaybeAppend(t *testing.T) {
242247
raftLog := newLog(NewMemoryStorage(), raftlogger.DiscardLogger)
243248
require.True(t, raftLog.append(init))
244249
raftLog.committed = commit
250+
raftLog.checkInvariants(t)
245251

246252
t.Run("", func(t *testing.T) {
247253
defer func() {
@@ -250,6 +256,7 @@ func TestLogMaybeAppend(t *testing.T) {
250256
}
251257
}()
252258
ok := raftLog.maybeAppend(app)
259+
raftLog.checkInvariants(t)
253260
require.Equal(t, !tt.notOk, ok)
254261
require.False(t, tt.panic)
255262
require.Equal(t, commit, raftLog.committed) // commit index did not change
@@ -270,9 +277,11 @@ func TestCompactionSideEffects(t *testing.T) {
270277
require.NoError(t, storage.Append(stable.entries))
271278
raftLog := newLog(storage, raftlogger.DiscardLogger)
272279
require.True(t, raftLog.append(unstable))
280+
raftLog.checkInvariants(t)
273281

274282
raftLog.commitTo(unstable.mark())
275283
raftLog.appliedTo(raftLog.committed)
284+
raftLog.checkInvariants(t)
276285

277286
offset := uint64(500)
278287
require.NoError(t, storage.Compact(offset))
@@ -291,6 +300,7 @@ func TestCompactionSideEffects(t *testing.T) {
291300

292301
last := raftLog.lastEntryID()
293302
require.True(t, raftLog.append(last.append(last.term+1)))
303+
raftLog.checkInvariants(t)
294304
require.Equal(t, last.index+1, raftLog.lastIndex())
295305

296306
want := append(stable.entries[offset:], unstable.entries...)
@@ -335,11 +345,13 @@ func TestNextCommittedEnts(t *testing.T) {
335345
raftLog := newLog(storage, raftlogger.DiscardLogger)
336346
require.True(t, raftLog.append(init))
337347
require.NoError(t, storage.Append(init.sub(3, 4)))
348+
raftLog.checkInvariants(t)
338349

339350
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
340351
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
341352
raftLog.appliedTo(tt.applied)
342353
raftLog.acceptApplying(tt.applying)
354+
raftLog.checkInvariants(t)
343355
if tt.snap {
344356
newSnap := snap
345357
newSnap.snap.Metadata.Index = init.lastIndex() + 1
@@ -377,12 +389,14 @@ func TestAcceptApplying(t *testing.T) {
377389
raftLog := newLogWithSize(storage, raftlogger.DiscardLogger, maxSize)
378390
require.True(t, raftLog.append(init))
379391
require.NoError(t, storage.Append(init.sub(3, 4)))
392+
raftLog.checkInvariants(t)
380393

381394
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
382395
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
383396
raftLog.appliedTo(3)
384397

385398
raftLog.acceptApplying(tt.index)
399+
raftLog.checkInvariants(t)
386400
require.Equal(t, tt.wantNext, raftLog.hasNextCommittedEnts(tt.allowUnstable))
387401
})
388402
}
@@ -407,13 +421,16 @@ func TestAppliedTo(t *testing.T) {
407421
raftLog := newLog(storage, raftlogger.DiscardLogger)
408422
require.True(t, raftLog.append(init))
409423
require.NoError(t, storage.Append(init.sub(3, 4)))
424+
raftLog.checkInvariants(t)
410425

411426
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
412427
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
413428
raftLog.appliedTo(3)
414429
raftLog.acceptApplying(applying)
430+
raftLog.checkInvariants(t)
415431

416432
raftLog.appliedTo(tt.applied)
433+
raftLog.checkInvariants(t)
417434
require.Equal(t, tt.applied, raftLog.applied)
418435
require.Equal(t, tt.wantApplying, raftLog.applying)
419436
})
@@ -439,11 +456,13 @@ func TestNextUnstableEnts(t *testing.T) {
439456
raftLog := newLog(storage, raftlogger.DiscardLogger)
440457
require.True(t, raftLog.append(tt))
441458
require.Equal(t, tt.prev, raftLog.unstable.prev)
459+
raftLog.checkInvariants(t)
442460

443461
require.Equal(t, len(tt.entries) != 0, raftLog.hasNextUnstableEnts())
444462
require.Equal(t, tt.entries, raftLog.nextUnstableEnts())
445463
if len(tt.entries) != 0 {
446464
raftLog.stableTo(tt.mark())
465+
raftLog.checkInvariants(t)
447466
}
448467
require.Equal(t, tt.lastEntryID(), raftLog.unstable.prev)
449468
})
@@ -472,7 +491,9 @@ func TestCommitTo(t *testing.T) {
472491
raftLog := newLog(NewMemoryStorage(), raftlogger.DiscardLogger)
473492
require.True(t, raftLog.append(init))
474493
raftLog.committed = commit
494+
raftLog.checkInvariants(t)
475495
raftLog.commitTo(tt.commit)
496+
raftLog.checkInvariants(t)
476497
require.Equal(t, tt.want, raftLog.committed)
477498
})
478499
}
@@ -497,7 +518,9 @@ func TestStableTo(t *testing.T) {
497518
t.Run("", func(t *testing.T) {
498519
raftLog := newLog(NewMemoryStorage(), raftlogger.DiscardLogger)
499520
require.True(t, raftLog.append(init))
521+
raftLog.checkInvariants(t)
500522
raftLog.stableTo(tt.mark)
523+
raftLog.checkInvariants(t)
501524
require.Equal(t, tt.want, raftLog.unstable.prev.index)
502525
})
503526
}
@@ -561,6 +584,7 @@ func TestTermCacheLookUpAfterStableTo(t *testing.T) {
561584
// Imitate a transfer of some unstable entries into storage.
562585
require.NoError(t, storage.Append(tt.init.sub(initID.index, tt.stableTo.Index)))
563586
raftLog.stableTo(tt.stableTo)
587+
raftLog.checkInvariants(t)
564588

565589
// Do term lookup for the parts of raftLog not covered by unstable.
566590
start, end := tt.init.LogSlice.prev.index, raftLog.unstable.prev.index
@@ -603,7 +627,9 @@ func TestStableToWithSnap(t *testing.T) {
603627
require.NoError(t, s.ApplySnapshot(snap))
604628
raftLog := newLog(s, raftlogger.DiscardLogger)
605629
require.True(t, raftLog.append(tt.sl))
630+
raftLog.checkInvariants(t)
606631
raftLog.stableTo(tt.to)
632+
raftLog.checkInvariants(t)
607633
require.Equal(t, tt.want, raftLog.unstable.prev.index)
608634
})
609635
}
@@ -634,8 +660,10 @@ func TestCompaction(t *testing.T) {
634660
require.NoError(t, storage.Append(init.entries))
635661
raftLog := newLog(storage, raftlogger.DiscardLogger)
636662
raftLog.commitTo(init.mark())
663+
raftLog.checkInvariants(t)
637664

638665
raftLog.appliedTo(raftLog.committed)
666+
raftLog.checkInvariants(t)
639667
for j := 0; j < len(tt.compact); j++ {
640668
err := storage.Compact(tt.compact[j])
641669
if err != nil {
@@ -655,6 +683,7 @@ func TestLogRestore(t *testing.T) {
655683
storage := NewMemoryStorage()
656684
storage.ApplySnapshot(pb.Snapshot{Metadata: snap})
657685
raftLog := newLog(storage, raftlogger.DiscardLogger)
686+
raftLog.checkInvariants(t)
658687

659688
require.Zero(t, len(raftLog.allEntries()))
660689
require.Equal(t, index, raftLog.compacted())
@@ -674,6 +703,7 @@ func TestIsOutOfBounds(t *testing.T) {
674703
l := newLog(storage, raftlogger.DiscardLogger)
675704
require.True(t, l.append(entryID{term: 1, index: offset}.
676705
append(intRange(offset+1, offset+num+1)...)))
706+
l.checkInvariants(t)
677707

678708
first := offset + 1
679709
for _, tt := range []struct {
@@ -713,6 +743,7 @@ func TestTerm(t *testing.T) {
713743
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
714744
l := newLog(storage, raftlogger.DiscardLogger)
715745
require.True(t, l.append(entryID{term: 1, index: offset}.append(intRange(1, num)...)))
746+
l.checkInvariants(t)
716747

717748
for _, tt := range []struct {
718749
idx uint64
@@ -744,6 +775,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
744775
term: 1,
745776
snap: pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}},
746777
}))
778+
l.checkInvariants(t)
747779

748780
for _, tt := range []struct {
749781
idx uint64
@@ -785,6 +817,7 @@ func TestSlice(t *testing.T) {
785817
require.NoError(t, storage.Append(entries(offset, half)))
786818
l := newLog(storage, raftlogger.DiscardLogger)
787819
require.True(t, l.append(pbEntryID(&halfe).append(intRange(half+1, last+1)...)))
820+
l.checkInvariants(t)
788821

789822
for _, tt := range []struct {
790823
lo uint64
@@ -872,6 +905,7 @@ func TestScan(t *testing.T) {
872905
l := newLog(storage, raftlogger.DiscardLogger)
873906
require.True(t, l.append(entryID{term: half - 1, index: half - 1}.
874907
append(intRange(half, last+1)...)))
908+
l.checkInvariants(t)
875909

876910
// Test that scan() returns the same entries as slice(), on all inputs.
877911
for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} {
@@ -917,6 +951,24 @@ func mustTerm(term uint64, err error) uint64 {
917951
return term
918952
}
919953

954+
func (l *raftLog) checkInvariants(t testing.TB) {
955+
t.Helper()
956+
l.unstable.checkInvariants(t)
957+
l.termCache.checkInvariants(t)
958+
require.LessOrEqual(t, l.committed, l.lastIndex())
959+
require.LessOrEqual(t, l.applying, l.committed)
960+
require.LessOrEqual(t, l.applied, l.applying)
961+
if l.unstable.snapshotInProgress {
962+
// If a snapshot is being applied, we don't send follow-up entries for
963+
// application until the snapshot is done.
964+
require.Equal(t, l.applying, l.unstable.prev.index)
965+
} else if l.hasNextOrInProgressSnapshot() {
966+
// If there is a pending snapshot, we could only have sent earlier entries
967+
// for application.
968+
require.Less(t, l.applying, l.unstable.prev.index)
969+
}
970+
}
971+
920972
// index is a helper type for generating slices of pb.Entry. The value of index
921973
// is the first entry index in the generated slices.
922974
type index uint64

pkg/raft/term_cache_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ func TestTermCacheRandomTruncateAndAppends(t *testing.T) {
238238
}
239239
}
240240

241-
func (tc *termCache) checkInvariants(t *testing.T) {
241+
func (tc *termCache) checkInvariants(t testing.TB) {
242+
t.Helper()
242243
// check termCache is non-empty
243244
require.NotEmptyf(t, tc.cache, "termCache should never be empty")
244245
// check we haven't grown past 2x maxLength

0 commit comments

Comments
 (0)