Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,9 +2603,20 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// fully recovered from disk.
isRecovering := true

doSnapshot := func() {
if mset == nil || isRecovering || isRestore {
return
snapState := struct {
// Only one snapshot is in progress at any given time
inProgress bool
// Keep track store state, if snapshot is successful update lastState
curState SimpleState
// This is where we want to receive async snapshot results
ch chan InstalledSnapshot
}{
ch: make(chan InstalledSnapshot, 1),
}

wantSnapshot := func() bool {
if mset == nil || isRecovering || isRestore || snapState.inProgress {
return false
}

// Before we actually calculate the detailed state and encode it, let's check the
Expand All @@ -2618,18 +2629,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// consumers on idle streams but better to be safe than sorry!
ne, nb := n.Size()
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
Copy link
Member

@neilalexander neilalexander Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this does the right thing, given that there's also the firstNeedsUpdate and lastNeedsUpdate bools. If either of those are true then the First and Last may be untrustworthy for this comparison. My feeling is that they shouldn't be set for a filtered state of _EMPTY_ but possible to double check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make sure that the optimization in this patch makes no changes in the logic of when to create a snapshot. And I also verified that under the same workload, this patch will roughly result in the same number of snapshot operations as the original code.
Having said that, I do think that checking the store's state is unnecessary. And I did play with changing these conditions a bit. However, I'd prefer to do this in a separate patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just elaborating my previous comment. The primary reason for taking snapshots of the stream is to trim raft's log. The logic that is implemented here involves the size and number of entries in the log, and this curState and lastState comparison. But this does not tell us anything about how much of raft's log we can get rid of. A better strategy would be the following: we compact if we can get rid of least one entire block. And we should not bother compacting partial blocks during normal operation.

return
return false
}

// Make sure all pending data is flushed before allowing snapshots.
mset.flushAllPending()
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState = curState
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
snapState.curState = curState
return true
}

handleSnapshotErr := func(err error) {
switch err {
case nil:
lastState = snapState.curState
case errNoSnapAvailable, errNodeClosed, errCatchupsRunning:
// ignore the error
default:
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
mset.acc.Name, mset.name(), n.Group(), err)
}

}

doSnapshot := func() {
if wantSnapshot() {
// Make sure all pending data is flushed before allowing snapshots.
mset.flushAllPending()
err := n.InstallSnapshot(mset.stateSnapshot())
handleSnapshotErr(err)
}
}

doSnapshotAsync := func() {
if wantSnapshot() {
// Make sure all pending data is flushed before allowing snapshots.
mset.flushAllPending()
n.InstallSnapshotAsync(mset.stateSnapshot(), snapState.ch)
snapState.inProgress = true
}
}

asyncSnapshotDone := func(snap InstalledSnapshot) {
handleSnapshotErr(snap.Err)
snapState.inProgress = false
}

defer func() {
if snapState.inProgress {
s := <-snapState.ch
asyncSnapshotDone(s)
}
}()

// We will establish a restoreDoneCh no matter what. Will never be triggered unless
// we replace with the restore chan.
restoreDoneCh := make(<-chan error)
Expand Down Expand Up @@ -2701,6 +2750,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

for {
select {
case s := <-snapState.ch:
// async snapshot is done, handle the result
asyncSnapshotDone(s)
case <-s.quitCh:
// Server shutting down, but we might receive this before qch, so try to snapshot.
doSnapshot()
Expand Down Expand Up @@ -2808,7 +2860,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check about snapshotting
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
doSnapshot()
doSnapshotAsync()
}

case isLeader = <-lch:
Expand Down Expand Up @@ -2903,7 +2955,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
stopDirectMonitoring()

case <-t.C:
doSnapshot()
doSnapshotAsync()

case <-uch:
// keep stream assignment current
Expand Down
110 changes: 87 additions & 23 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RaftNode interface {
ProposeMulti(entries []*Entry) error
ForwardProposal(entry []byte) error
InstallSnapshot(snap []byte) error
InstallSnapshotAsync(snap []byte, ch chan<- InstalledSnapshot)
SendSnapshot(snap []byte) error
NeedSnapshot() bool
Applied(index uint64) (entries uint64, bytes uint64)
Expand Down Expand Up @@ -233,6 +234,7 @@ type raft struct {
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
membChanging bool // There is a membership change proposal in progress
deleted bool // If the node was deleted.
snapshotting bool // Snapshot is in progress
}

type proposedEntry struct {
Expand Down Expand Up @@ -311,6 +313,7 @@ var (
errNodeRemoved = errors.New("raft: peer was removed")
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
errNoSnapAvailable = errors.New("raft: no snapshot available")
errSnapInProgress = errors.New("raft: snapshot is already in progress")
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
Expand Down Expand Up @@ -1261,49 +1264,110 @@ func (n *raft) SendSnapshot(data []byte) error {
return nil
}

// Used to install a snapshot for the given term and applied index. This will release
// all of the log entries up to and including index. This should not be called with
// entries that have been applied to the FSM but have not been applied to the raft state.
func (n *raft) InstallSnapshot(data []byte) error {
if n.State() == Closed {
return errNodeClosed
}
type InstalledSnapshot struct {
Term uint64
Index uint64
Path string
Err error
}

n.Lock()
defer n.Unlock()
func (n *raft) installSnapshotAsyncLocked(data []byte, ch chan<- InstalledSnapshot) {
if n.snapshotting {
ch <- InstalledSnapshot{Err: errSnapInProgress}
return
}

// If a write error has occurred already then stop here.
if werr := n.werr; werr != nil {
return werr
if n.werr != nil {
ch <- InstalledSnapshot{Err: n.werr}
return
}

// Check that a catchup isn't already taking place. If it is then we won't
// allow installing snapshots until it is done.
// Check that a catchup isn't already taking place. If it is then we
// won't allow installing snapshots until it is done.
if len(n.progress) > 0 || n.paused {
return errCatchupsRunning
ch <- InstalledSnapshot{Err: errCatchupsRunning}
return
}

if n.applied == 0 {
n.debug("Not snapshotting as there are no applied entries")
return errNoSnapAvailable
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
return
}

var term uint64
if ae, _ := n.loadEntry(n.applied); ae != nil {
term = ae.term
} else {
ae, _ := n.loadEntry(n.applied)
if ae == nil {
n.debug("Not snapshotting as entry %d is not available", n.applied)
return errNoSnapAvailable
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
return
}

n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), ae.term, n.applied)

return n.installSnapshot(&snapshot{
lastTerm: term,
encoded := n.encodeSnapshot(&snapshot{
lastTerm: ae.term,
lastIndex: n.applied,
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}),
data: data,
})

snapDir := filepath.Join(n.sd, snapshotsDir)
snapFile := filepath.Join(snapDir, fmt.Sprintf(snapFileT, ae.term, n.applied))
snap := InstalledSnapshot{Term: ae.term, Index: n.applied, Path: snapFile}

n.snapshotting = true

go func() {
snap.Err = writeFileWithSync(snap.Path, encoded, defaultFilePerms)
n.Lock()
if n.State() == Closed {
snap.Err = errNodeClosed
}
if snap.Err == nil {
// Delete our previous snapshot file if it exists.
if n.snapfile != _EMPTY_ && n.snapfile != snap.Path {
os.Remove(n.snapfile)
}
// Remember our latest snapshot file.
n.snapfile = snap.Path
_, snap.Err = n.wal.Compact(snap.Index + 1)
if snap.Err != nil {
n.setWriteErrLocked(snap.Err)
} else {
var state StreamState
n.wal.FastState(&state)
n.papplied = snap.Index
n.bytes = state.Bytes
}
}
n.snapshotting = false
n.Unlock()
ch <- snap
}()
}

// InstallSnapshotAsync installs a snapshot asynchronously. It writes the
// snapshot to disk and compacts the WAL in a separate goroutine. The caller
// is notified of the result on the provided channel.
func (n *raft) InstallSnapshotAsync(data []byte, ch chan<- InstalledSnapshot) {
if n.State() == Closed {
ch <- InstalledSnapshot{Err: errNodeClosed}
return
}
n.Lock()
defer n.Unlock()
n.installSnapshotAsyncLocked(data, ch)
}

// InstallSnapshot installs a snapshot for the current applied index. This is a
// synchronous call that will block until the snapshot is installed, and will
// release all of the log entries up to the applied index.
func (n *raft) InstallSnapshot(data []byte) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the performance change due to this now creating both a new ch for every call and spinning up a goroutine? Specifically for the paths that don't use async snapshots like meta and consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I did it this way because the goroutine that is spawned acquires a lock on raft only after it wrote the snapshot file. My thinking was that we can we can get some of the benefits also when calling InstallSnapshot. And now that I think about it, it might work very nicely if combined with some of the optimizations in #7355.
I believe there is not too much overhead in spawning a goroutine and making the channel, compared to what the rest of the method is doing (create the snapshot file + a few syncs + compact). I could measure the overhead by changing doSnapshotAsync call with doSnapshot calls, and run the same benchmark and compare it to baseline.
We could also consider to use InstallSnapshotAsync with meta and consumers. I haven't done so because I don't have a way to benchmark those yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to evaluate the overhead as explained above. Basically, in monitorStream I replaced doSnapshotAsync with doSnapshot. I got the following results:
overhead.pdf

As expected, overhead is small and sometimes we do get some benefits in terms of latency. Alternatively, we could keep a goroutine ready to go, linked to a IPQueue.
Or if we want to err on the safe side, I could change the patch so that nothing changes in the case of regular InstallSnapshot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it's fine for the scope of this PR. But maybe indeed we'll need to do some more profiling (also including the other perf improvements for 2.14) and see whether we should keep a goroutine ready to go and link to a IPQueue like you mention.

ch := make(chan InstalledSnapshot, 1)
n.InstallSnapshotAsync(data, ch)
snap := <-ch
return snap.Err
}

// Install the snapshot.
Expand Down
38 changes: 38 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4652,3 +4652,41 @@ func TestNRGMustNotResetVoteOnStepDownOrLeaderTransfer(t *testing.T) {
require_Equal(t, n.term, 1)
require_Equal(t, n.vote, nats0)
}

func TestNRGAsyncSnapshotInProgress(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
nats0 := "S1Nunr6R" // "nats-0"

aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

n.processAppendEntry(aeMsg1, n.aesub)
n.processAppendEntry(aeHeartbeat, n.aesub)
n.Applied(1)

ch1 := make(chan InstalledSnapshot, 1)
ch2 := make(chan InstalledSnapshot, 1)

n.Lock()
n.installSnapshotAsyncLocked(nil, ch1)
n.installSnapshotAsyncLocked(nil, ch2)

select {
case s := <-ch2:
require_Error(t, s.Err, errSnapInProgress)
case <-time.After(5 * time.Second):
t.Fatalf("Unexpected time out while waiting for snapshot result")
}
n.Unlock()

select {
case s := <-ch1:
require_NoError(t, s.Err)
case <-time.After(5 * time.Second):
t.Fatalf("Unexpected time out while waiting for snapshot result")
}
}