diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b86a889e8af..51415abebdb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2603,9 +2603,20 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // fully recovered from disk. isRecovering := true - doSnapshot := func() { - if mset == nil || isRecovering || isRestore { - return + snapState := struct { + // Only one snapshot is in progress at any given time + inProgress bool + // Keep track store state, if snapshot is successful update lastState + curState SimpleState + // This is where we want to receive async snapshot results + ch chan InstalledSnapshot + }{ + ch: make(chan InstalledSnapshot, 1), + } + + wantSnapshot := func() bool { + if mset == nil || isRecovering || isRestore || snapState.inProgress { + return false } // Before we actually calculate the detailed state and encode it, let's check the @@ -2618,18 +2629,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // consumers on idle streams but better to be safe than sorry! ne, nb := n.Size() if curState == lastState && ne < compactNumMin && nb < compactSizeMin { - return + return false } - // Make sure all pending data is flushed before allowing snapshots. - mset.flushAllPending() - if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { - lastState = curState - } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning { - s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) + snapState.curState = curState + return true + } + + handleSnapshotErr := func(err error) { + switch err { + case nil: + lastState = snapState.curState + case errNoSnapAvailable, errNodeClosed, errCatchupsRunning: + // ignore the error + default: + s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", + mset.acc.Name, mset.name(), n.Group(), err) } + } + doSnapshot := func() { + if wantSnapshot() { + // Make sure all pending data is flushed before allowing snapshots. + mset.flushAllPending() + err := n.InstallSnapshot(mset.stateSnapshot()) + handleSnapshotErr(err) + } + } + + doSnapshotAsync := func() { + if wantSnapshot() { + // Make sure all pending data is flushed before allowing snapshots. + mset.flushAllPending() + n.InstallSnapshotAsync(mset.stateSnapshot(), snapState.ch) + snapState.inProgress = true + } + } + + asyncSnapshotDone := func(snap InstalledSnapshot) { + handleSnapshotErr(snap.Err) + snapState.inProgress = false + } + + defer func() { + if snapState.inProgress { + s := <-snapState.ch + asyncSnapshotDone(s) + } + }() + // We will establish a restoreDoneCh no matter what. Will never be triggered unless // we replace with the restore chan. restoreDoneCh := make(<-chan error) @@ -2701,6 +2750,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps for { select { + case s := <-snapState.ch: + // async snapshot is done, handle the result + asyncSnapshotDone(s) case <-s.quitCh: // Server shutting down, but we might receive this before qch, so try to snapshot. doSnapshot() @@ -2808,7 +2860,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check about snapshotting // If we have at least min entries to compact, go ahead and try to snapshot/compact. if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs { - doSnapshot() + doSnapshotAsync() } case isLeader = <-lch: @@ -2903,7 +2955,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps stopDirectMonitoring() case <-t.C: - doSnapshot() + doSnapshotAsync() case <-uch: // keep stream assignment current diff --git a/server/raft.go b/server/raft.go index 591552b58e6..93933fe3bca 100644 --- a/server/raft.go +++ b/server/raft.go @@ -41,6 +41,7 @@ type RaftNode interface { ProposeMulti(entries []*Entry) error ForwardProposal(entry []byte) error InstallSnapshot(snap []byte) error + InstallSnapshotAsync(snap []byte, ch chan<- InstalledSnapshot) SendSnapshot(snap []byte) error NeedSnapshot() bool Applied(index uint64) (entries uint64, bytes uint64) @@ -233,6 +234,7 @@ type raft struct { scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data. membChanging bool // There is a membership change proposal in progress deleted bool // If the node was deleted. + snapshotting bool // Snapshot is in progress } type proposedEntry struct { @@ -311,6 +313,7 @@ var ( errNodeRemoved = errors.New("raft: peer was removed") errBadSnapName = errors.New("raft: snapshot name could not be parsed") errNoSnapAvailable = errors.New("raft: no snapshot available") + errSnapInProgress = errors.New("raft: snapshot is already in progress") errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running") errSnapshotCorrupt = errors.New("raft: snapshot corrupt") errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader") @@ -1261,49 +1264,110 @@ func (n *raft) SendSnapshot(data []byte) error { return nil } -// Used to install a snapshot for the given term and applied index. This will release -// all of the log entries up to and including index. This should not be called with -// entries that have been applied to the FSM but have not been applied to the raft state. -func (n *raft) InstallSnapshot(data []byte) error { - if n.State() == Closed { - return errNodeClosed - } +type InstalledSnapshot struct { + Term uint64 + Index uint64 + Path string + Err error +} - n.Lock() - defer n.Unlock() +func (n *raft) installSnapshotAsyncLocked(data []byte, ch chan<- InstalledSnapshot) { + if n.snapshotting { + ch <- InstalledSnapshot{Err: errSnapInProgress} + return + } // If a write error has occurred already then stop here. - if werr := n.werr; werr != nil { - return werr + if n.werr != nil { + ch <- InstalledSnapshot{Err: n.werr} + return } - // Check that a catchup isn't already taking place. If it is then we won't - // allow installing snapshots until it is done. + // Check that a catchup isn't already taking place. If it is then we + // won't allow installing snapshots until it is done. if len(n.progress) > 0 || n.paused { - return errCatchupsRunning + ch <- InstalledSnapshot{Err: errCatchupsRunning} + return } if n.applied == 0 { n.debug("Not snapshotting as there are no applied entries") - return errNoSnapAvailable + ch <- InstalledSnapshot{Err: errNoSnapAvailable} + return } - var term uint64 - if ae, _ := n.loadEntry(n.applied); ae != nil { - term = ae.term - } else { + ae, _ := n.loadEntry(n.applied) + if ae == nil { n.debug("Not snapshotting as entry %d is not available", n.applied) - return errNoSnapAvailable + ch <- InstalledSnapshot{Err: errNoSnapAvailable} + return } - n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied) + n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), ae.term, n.applied) - return n.installSnapshot(&snapshot{ - lastTerm: term, + encoded := n.encodeSnapshot(&snapshot{ + lastTerm: ae.term, lastIndex: n.applied, peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), data: data, }) + + snapDir := filepath.Join(n.sd, snapshotsDir) + snapFile := filepath.Join(snapDir, fmt.Sprintf(snapFileT, ae.term, n.applied)) + snap := InstalledSnapshot{Term: ae.term, Index: n.applied, Path: snapFile} + + n.snapshotting = true + + go func() { + snap.Err = writeFileWithSync(snap.Path, encoded, defaultFilePerms) + n.Lock() + if n.State() == Closed { + snap.Err = errNodeClosed + } + if snap.Err == nil { + // Delete our previous snapshot file if it exists. + if n.snapfile != _EMPTY_ && n.snapfile != snap.Path { + os.Remove(n.snapfile) + } + // Remember our latest snapshot file. + n.snapfile = snap.Path + _, snap.Err = n.wal.Compact(snap.Index + 1) + if snap.Err != nil { + n.setWriteErrLocked(snap.Err) + } else { + var state StreamState + n.wal.FastState(&state) + n.papplied = snap.Index + n.bytes = state.Bytes + } + } + n.snapshotting = false + n.Unlock() + ch <- snap + }() +} + +// InstallSnapshotAsync installs a snapshot asynchronously. It writes the +// snapshot to disk and compacts the WAL in a separate goroutine. The caller +// is notified of the result on the provided channel. +func (n *raft) InstallSnapshotAsync(data []byte, ch chan<- InstalledSnapshot) { + if n.State() == Closed { + ch <- InstalledSnapshot{Err: errNodeClosed} + return + } + n.Lock() + defer n.Unlock() + n.installSnapshotAsyncLocked(data, ch) +} + +// InstallSnapshot installs a snapshot for the current applied index. This is a +// synchronous call that will block until the snapshot is installed, and will +// release all of the log entries up to the applied index. +func (n *raft) InstallSnapshot(data []byte) error { + ch := make(chan InstalledSnapshot, 1) + n.InstallSnapshotAsync(data, ch) + snap := <-ch + return snap.Err } // Install the snapshot. diff --git a/server/raft_test.go b/server/raft_test.go index 39dd8b60e95..d0fb03c6f28 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -4652,3 +4652,41 @@ func TestNRGMustNotResetVoteOnStepDownOrLeaderTransfer(t *testing.T) { require_Equal(t, n.term, 1) require_Equal(t, n.vote, nats0) } + +func TestNRGAsyncSnapshotInProgress(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + nats0 := "S1Nunr6R" // "nats-0" + + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + n.processAppendEntry(aeMsg1, n.aesub) + n.processAppendEntry(aeHeartbeat, n.aesub) + n.Applied(1) + + ch1 := make(chan InstalledSnapshot, 1) + ch2 := make(chan InstalledSnapshot, 1) + + n.Lock() + n.installSnapshotAsyncLocked(nil, ch1) + n.installSnapshotAsyncLocked(nil, ch2) + + select { + case s := <-ch2: + require_Error(t, s.Err, errSnapInProgress) + case <-time.After(5 * time.Second): + t.Fatalf("Unexpected time out while waiting for snapshot result") + } + n.Unlock() + + select { + case s := <-ch1: + require_NoError(t, s.Err) + case <-time.After(5 * time.Second): + t.Fatalf("Unexpected time out while waiting for snapshot result") + } +}