Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

mset.mu.RLock()
s, js, jsa, cfg, acc := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc
s, js, jsa, cfg, acc, lseq := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc, mset.lseq
mset.mu.RUnlock()

// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
Expand Down Expand Up @@ -1227,7 +1227,49 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Restore our saved state.
o.mu.Lock()
o.readStoredState()

replicas := o.cfg.replicas(&mset.cfg)

// Starting sequence represents the next sequence to be delivered, so decrement it
// since that's the minimum amount the stream should have as its last sequence.
sseq := o.sseq
if sseq > 0 {
sseq--
}

o.mu.Unlock()

// A stream observing data loss rolls back in its sequence. Check if we need to reconcile the consumer state
// to ensure new messages aren't skipped.
// Only performed for non-replicated consumers for now.
if replicas == 1 && lseq < sseq && isRecovering {
s.Warnf("JetStream consumer '%s > %s > %s' delivered sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, lseq)

o.mu.Lock()
o.reconcileStateWithStream(lseq)

// Save the reconciled state
state := &ConsumerState{
Delivered: SequencePair{
Stream: o.sseq - 1,
Consumer: o.dseq - 1,
},
AckFloor: SequencePair{
Stream: o.asflr,
Consumer: o.adflr,
},
Pending: o.pending,
Redelivered: o.rdc,
}
err := o.store.ForceUpdate(state)
o.mu.Unlock()
if err != nil {
s.Errorf("JetStream consumer '%s > %s > %s' errored while updating state: %v", o.acc.Name, o.stream, o.name, err)
mset.mu.Unlock()
return nil, NewJSConsumerStoreFailedError(err)
}
}
} else {
// Select starting sequence number
o.selectStartingSeqNo()
Expand Down Expand Up @@ -5710,6 +5752,67 @@ func (o *consumer) hasSkipListPending() bool {
return o.lss != nil && len(o.lss.seqs) > 0
}

// reconcileStateWithStream reconciles consumer state when the stream has reverted
// due to data loss (e.g., VM crash). This handles the case where consumer state
// is ahead of the stream's last sequence.
// Lock should be held.
func (o *consumer) reconcileStateWithStream(streamLastSeq uint64) {
// If an ack floor is higher than stream last sequence,
// reset back down but keep the highest known sequences.
if o.asflr > streamLastSeq {
o.asflr = streamLastSeq
// Delivery floor is one below the delivered sequence,
// but if it is zero somehow, ensure we don't underflow.
o.adflr = o.dseq
if o.adflr > 0 {
o.adflr--
}
o.pending = nil
o.rdc = nil
}

// Remove pending entries that are beyond the stream's last sequence
if len(o.pending) > 0 {
for seq := range o.pending {
if seq > streamLastSeq {
delete(o.pending, seq)
}
}
}

// Remove redelivered entries that are beyond the stream's last sequence
if len(o.rdc) > 0 {
for seq := range o.rdc {
if seq > streamLastSeq {
delete(o.rdc, seq)
}
}
}

// Update starting sequence and delivery sequence based on pending state
if len(o.pending) == 0 {
o.sseq = o.asflr + 1
o.dseq = o.adflr + 1
} else {
// Find highest stream sequence in pending
var maxStreamSeq uint64
var maxConsumerSeq uint64

for streamSeq, p := range o.pending {
if streamSeq > maxStreamSeq {
maxStreamSeq = streamSeq
}
if p.Sequence > maxConsumerSeq {
maxConsumerSeq = p.Sequence
}
}

// Set next sequences based on highest pending
o.sseq = maxStreamSeq + 1
o.dseq = maxConsumerSeq + 1
}
}

// Will select the starting sequence.
func (o *consumer) selectStartingSeqNo() {
if o.mset == nil || o.mset.store == nil {
Expand Down
44 changes: 44 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11654,6 +11654,50 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
return nil
}

// ForceUpdate updates the consumer state without the backwards check.
// This is used during recovery when we need to reset the consumer to an earlier sequence.
func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
}
if state.AckFloor.Stream > state.Delivered.Stream {
return fmt.Errorf("bad ack floor for stream")
}

// Copy to our state.
var pending map[uint64]*Pending
var redelivered map[uint64]uint64
if len(state.Pending) > 0 {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
}
}
if len(state.Redelivered) > 0 {
redelivered = make(map[uint64]uint64, len(state.Redelivered))
for seq, dc := range state.Redelivered {
redelivered[seq] = dc
}
}

// Replace our state.
o.mu.Lock()
o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered
buf, err := o.encodeState()
o.mu.Unlock()
if err != nil {
return err
}
return o.writeState(buf)
}

// Will encrypt the state with our asset key. Will be a no-op if encryption not enabled.
// Lock should be held.
func (o *consumerFileStore) encryptState(buf []byte) ([]byte, error) {
Expand Down
72 changes: 72 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8518,6 +8518,78 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) {
require_Error(t, err, errCorruptState)
}

func TestFileStoreResetConsumerToStreamState(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("abc")
for i := 1; i <= 30; i++ {
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg, 0)
require_NoError(t, err)
}

err = fs.writeFullState()
require_NoError(t, err)

obs, err := fs.ConsumerStore("c1", time.Now(), &ConsumerConfig{
Durable: "c1",
FilterSubject: "foo.*",
AckPolicy: AckNone,
DeliverPolicy: DeliverAll,
})

require_NoError(t, err)
defer obs.Stop()

state := &ConsumerState{}
state.Delivered = SequencePair{Consumer: 5, Stream: 5}
state.AckFloor = SequencePair{Consumer: 5, Stream: 5}

// set to 5
err = obs.Update(state)
require_NoError(t, err)

currState, err := obs.State()
require_NoError(t, err)

fsState := fs.State()
require_Equal(t, fsState.LastSeq, uint64(30))
require_Equal(t, fsState.FirstSeq, uint64(1))
require_Equal(t, currState.AckFloor, state.AckFloor)
require_Equal(t, currState.Delivered, state.Delivered)
require_Equal(t, len(currState.Redelivered), len(state.Redelivered))
require_Equal(t, len(currState.Pending), len(state.Pending))

fs.mu.Lock()
fs.state.FirstSeq = 0
fs.state.LastSeq = 0
fs.mu.Unlock()

// set back to lower values
newState := &ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 4},
AckFloor: SequencePair{Consumer: 1, Stream: 3},
}

// update should fail but force update should pass
err = obs.Update(newState)
require_Error(t, err, fmt.Errorf("old update ignored"))

err = obs.ForceUpdate(newState)
require_NoError(t, err)

currState, err = obs.State()
require_NoError(t, err)

require_Equal(t, currState.AckFloor, newState.AckFloor)
require_Equal(t, currState.Delivered, newState.Delivered)
require_Equal(t, len(currState.Redelivered), len(newState.Redelivered))
require_Equal(t, len(currState.Pending), len(newState.Pending))
}

func TestFileStoreNumPendingMulti(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
Expand Down
117 changes: 116 additions & 1 deletion server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"fmt"
"math/rand"
"net/url"
"os"
os "os"
"path/filepath"
"reflect"
"runtime"
"slices"
Expand Down Expand Up @@ -10587,3 +10588,117 @@ func TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange(t *test
o = mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)
}

func TestJetStreamConsumerReconcileConsumerAfterStreamDataLoss(t *testing.T) {
test := func(t *testing.T, totalMsgs int) {
storeDir := t.TempDir()
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
jetstream: {store_dir: %q}
`, storeDir)))

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

// Publish a few messages.
for range totalMsgs {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DURABLE",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub.Drain()

// Consume all available messages.
msgs, err := sub.Fetch(totalMsgs, nats.MaxWait(200*time.Millisecond))
require_NoError(t, err)
require_Len(t, len(msgs), totalMsgs)
for _, msg := range msgs {
require_NoError(t, msg.AckSync())
}

// Confirm the consumer info reports all messages as delivered and acked.
lseq := uint64(totalMsgs)
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.Delivered.Stream, lseq)
require_Equal(t, ci.AckFloor.Stream, lseq)
require_Equal(t, ci.Delivered.Consumer, lseq)

// Shut down the server and manually remove or truncate the message blocks, simulating data loss.
mset, err := s.globalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
blk := filepath.Join(fs.fcfg.StoreDir, msgDir, "1.blk")
index := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
nc.Close()
s.Shutdown()
if totalMsgs > 1 {
stat, err := os.Stat(blk)
require_NoError(t, err)
require_NoError(t, os.Truncate(blk, stat.Size()/2+1))
} else {
require_NoError(t, os.Remove(blk))
}
require_NoError(t, os.Remove(index))

// Restart the server and reconnect.
s, _ = RunServerWithConfig(conf)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()

// Publish another message. Due to the simulated data loss, the stream sequence should continue
// counting after truncating the corrupted data.
pubAck, err := js.Publish("foo", nil)
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, lseq)

sub, err = js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub.Drain()

// The consumer should be able to consume above message.
// Previously the consumer state would not be reconciled and would not be able to consume the message.
msgs, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
require_NoError(t, err)
require_Len(t, len(msgs), 1)
msg := msgs[0]
meta, err := msg.Metadata()
require_NoError(t, err)
require_Equal(t, meta.Sequence.Stream, lseq)
require_NoError(t, msg.AckSync())

// Confirm the consumer info reports all messages as delivered and acked.
// But the delivered sequence shouldn't be reset and still move monotonically.
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.Delivered.Stream, lseq)
require_Equal(t, ci.AckFloor.Stream, lseq)
require_Equal(t, ci.Delivered.Consumer, lseq+1)
}

for _, totalMsgs := range []int{1, 2} {
t.Run(fmt.Sprint(totalMsgs), func(t *testing.T) { test(t, totalMsgs) })
}
}
Loading