Skip to content

Commit af98a34

Browse files
committed
raft: lift max size from raftLog
There is no more built-in flow control in the raftLog type. This commit moves the final piece of it out. There is only one place where this limit is still used: hasUnappliedConfChanges check scans the log when campaigning. Epic: none Release note: none
1 parent 2e43ba7 commit af98a34

File tree

4 files changed

+13
-34
lines changed

4 files changed

+13
-34
lines changed

pkg/raft/log.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,36 +83,20 @@ type raftLog struct {
8383
applied uint64
8484

8585
logger raftlogger.Logger
86-
87-
// maxApplyingEntsSize limits the outstanding byte size of the messages
88-
// returned from calls to nextCommittedEnts that have not been acknowledged
89-
// by a call to appliedTo.
90-
maxApplyingEntsSize entryEncodingSize
9186
}
9287

93-
// newLog returns log using the given storage and default options. It
94-
// recovers the log to the state that it just commits and applies the
95-
// latest snapshot.
88+
// newLog returns a raft log initialized to the state in the given storage.
9689
func newLog(storage Storage, logger raftlogger.Logger) *raftLog {
97-
return newLogWithSize(storage, logger, noLimit)
98-
}
99-
100-
// newLogWithSize returns a log using the given storage and max
101-
// message size.
102-
func newLogWithSize(
103-
storage Storage, logger raftlogger.Logger, maxApplyingEntsSize entryEncodingSize,
104-
) *raftLog {
10590
compacted, lastIndex := storage.Compacted(), storage.LastIndex()
10691
lastTerm, err := storage.Term(lastIndex)
10792
if err != nil {
10893
panic(err) // TODO(pav-kv): the storage should always cache the last term.
10994
}
11095
last := entryID{term: lastTerm, index: lastIndex}
11196
return &raftLog{
112-
storage: storage,
113-
unstable: newUnstable(last, logger),
114-
termCache: newTermCache(termCacheSize, last),
115-
maxApplyingEntsSize: maxApplyingEntsSize,
97+
storage: storage,
98+
unstable: newUnstable(last, logger),
99+
termCache: newTermCache(termCacheSize, last),
116100

117101
// Initialize our committed and applied pointers to the time of the last
118102
// compaction.

pkg/raft/log_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@ func TestNextCommittedEnts(t *testing.T) {
373373
}
374374

375375
func TestAcceptApplying(t *testing.T) {
376-
maxSize := entryEncodingSize(100)
377376
snap := pb.Snapshot{
378377
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
379378
}
@@ -398,7 +397,7 @@ func TestAcceptApplying(t *testing.T) {
398397
t.Run("", func(t *testing.T) {
399398
storage := NewMemoryStorage()
400399
require.NoError(t, storage.ApplySnapshot(snap))
401-
raftLog := newLogWithSize(storage, raftlogger.DiscardLogger, maxSize)
400+
raftLog := newLog(storage, raftlogger.DiscardLogger)
402401
require.True(t, raftLog.append(init))
403402
require.NoError(t, storage.Append(init.sub(3, 4)))
404403
raftLog.checkInvariants(t)

pkg/raft/node_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -645,11 +645,8 @@ func TestCommitPagination(t *testing.T) {
645645

646646
func TestCommitPaginationWithAsyncStorageWrites(t *testing.T) {
647647
s := newTestMemoryStorage(withPeers(1))
648-
cfg := newTestConfig(1, 10, 1, s)
649-
cfg.MaxCommittedSizePerReady = 2048
648+
rn := newTestRawNode(1, 10, 1, s)
650649

651-
rn, err := NewRawNode(cfg)
652-
require.NoError(t, err)
653650
require.NoError(t, rn.Campaign())
654651

655652
// Persist vote.

pkg/raft/raft.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,10 @@ type raft struct {
323323

324324
maxMsgSize entryEncodingSize
325325
maxUncommittedSize entryPayloadSize
326+
// maxCommittedPageSize limits the size of committed entries that can be
327+
// loaded into memory in hasUnappliedConfChanges.
328+
// TODO(#131559): avoid this loading in the first place, and remove this.
329+
maxCommittedPageSize entryEncodingSize
326330

327331
config quorum.Config
328332
trk tracker.ProgressTracker
@@ -445,7 +449,7 @@ func newRaft(c *Config) *raft {
445449
if err := c.validate(); err != nil {
446450
panic(err.Error())
447451
}
448-
raftlog := newLogWithSize(c.Storage, c.Logger, entryEncodingSize(c.MaxCommittedSizePerReady))
452+
raftlog := newLog(c.Storage, c.Logger)
449453
hs, cs, err := c.Storage.InitialState()
450454
if err != nil {
451455
panic(err) // TODO(bdarnell)
@@ -457,6 +461,7 @@ func newRaft(c *Config) *raft {
457461
raftLog: raftlog,
458462
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
459463
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
464+
maxCommittedPageSize: entryEncodingSize(c.MaxCommittedSizePerReady),
460465
lazyReplication: c.LazyReplication,
461466
electionTimeout: c.ElectionTick,
462467
electionTimeoutJitter: c.ElectionJitterTick,
@@ -1474,13 +1479,7 @@ func (r *raft) hasUnappliedConfChanges() bool {
14741479
// Scan all unapplied committed entries to find a config change. Paginate the
14751480
// scan, to avoid a potentially unlimited memory spike.
14761481
lo, hi := r.raftLog.applied, r.raftLog.committed
1477-
// Reuse the maxApplyingEntsSize limit because it is used for similar purposes
1478-
// (limiting the read of unapplied committed entries) when raft sends entries
1479-
// via the Ready struct for application.
1480-
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
1481-
// outside the raft package.
1482-
pageSize := r.raftLog.maxApplyingEntsSize
1483-
if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error {
1482+
if err := r.raftLog.scan(lo, hi, r.maxCommittedPageSize, func(ents []pb.Entry) error {
14841483
for i := range ents {
14851484
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
14861485
found = true

0 commit comments

Comments
 (0)