Skip to content

Commit fe19965

Browse files
committed
reset consumer state to stream state
1 parent 4a7566f commit fe19965

File tree

5 files changed

+283
-1
lines changed

5 files changed

+283
-1
lines changed

server/consumer.go

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
963963
}
964964

965965
mset.mu.RLock()
966-
s, js, jsa, cfg, acc := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc
966+
s, js, jsa, cfg, acc, lseq := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc, mset.lseq
967967
mset.mu.RUnlock()
968968

969969
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
@@ -1228,6 +1228,40 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12281228
o.mu.Lock()
12291229
o.readStoredState()
12301230
o.mu.Unlock()
1231+
1232+
// only for streams with r1; if we are recovering and the consumer sequence is greater than the last sequence,
1233+
// we need to reconcile the consumer state with the stream
1234+
// run only for single replica streams
1235+
if mset.cfg.Replicas == 1 && o.sseq > 2 && lseq < o.sseq && isRecovering {
1236+
o.srv.Warnf("Consumer %q recovering with sseq %d greater than stream last seq %d, reconciling state", o.name, o.sseq, lseq)
1237+
1238+
o.mu.Lock()
1239+
o.reconcileStateWithStream(lseq)
1240+
1241+
// Save the reconciled state
1242+
state := &ConsumerState{
1243+
Delivered: SequencePair{
1244+
Stream: o.sseq - 1,
1245+
Consumer: o.dseq - 1,
1246+
},
1247+
AckFloor: SequencePair{
1248+
Stream: o.asflr,
1249+
Consumer: o.adflr,
1250+
},
1251+
Pending: o.pending,
1252+
Redelivered: o.rdc,
1253+
}
1254+
o.mu.Unlock()
1255+
1256+
err := o.store.ForceUpdate(state)
1257+
if err != nil {
1258+
s.Warnf("Consumer %q error updating state: %v", o.name, err)
1259+
return nil, NewJSConsumerStoreFailedError(err)
1260+
}
1261+
1262+
o.srv.Noticef("Consumer %q reconciled state: sseq=%d dseq=%d asflr=%d adflr=%d pending=%d",
1263+
o.name, o.sseq, o.dseq, o.asflr, o.adflr, len(o.pending))
1264+
}
12311265
} else {
12321266
// Select starting sequence number
12331267
o.selectStartingSeqNo()
@@ -5712,6 +5746,80 @@ func (o *consumer) hasSkipListPending() bool {
57125746
return o.lss != nil && len(o.lss.seqs) > 0
57135747
}
57145748

5749+
}
5750+
5751+
// reconcileStateWithStream reconciles consumer state when the stream has reverted
5752+
// due to data loss (e.g., VM crash). This handles the case where consumer state
5753+
// is ahead of the stream's last sequence.
5754+
// Lock should be held.
5755+
func (o *consumer) reconcileStateWithStream(streamLastSeq uint64) {
5756+
o.srv.Noticef("Consumer %q reconciling state with stream (lseq=%d): before sseq=%d dseq=%d asflr=%d adflr=%d pending=%d rdc=%d",
5757+
o.name, streamLastSeq, o.sseq, o.dseq, o.asflr, o.adflr, len(o.pending), len(o.rdc))
5758+
5759+
// If an ack floor is higher than stream last sequence, reset everything
5760+
if o.asflr > streamLastSeq {
5761+
o.srv.Warnf("Consumer %q ack floor (%d) > stream last seq (%d), resetting all state", o.name, o.asflr, streamLastSeq)
5762+
5763+
o.sseq = streamLastSeq + 1
5764+
o.dseq = 1
5765+
o.adflr = 0
5766+
o.asflr = streamLastSeq
5767+
o.pending = nil
5768+
o.rdc = nil
5769+
return
5770+
}
5771+
5772+
// Remove pending entries that are beyond the stream's last sequence
5773+
if len(o.pending) > 0 {
5774+
for seq := range o.pending {
5775+
if seq > streamLastSeq {
5776+
delete(o.pending, seq)
5777+
}
5778+
}
5779+
}
5780+
5781+
// Remove redelivered entries that are beyond the stream's last sequence
5782+
if len(o.rdc) > 0 {
5783+
for seq := range o.rdc {
5784+
if seq > streamLastSeq {
5785+
delete(o.rdc, seq)
5786+
}
5787+
}
5788+
}
5789+
5790+
// Update starting sequence and delivery sequence based on pending state
5791+
if len(o.pending) == 0 {
5792+
o.sseq = o.asflr + 1
5793+
o.dseq = o.adflr + 1
5794+
} else {
5795+
// Find highest stream sequence in pending
5796+
var maxStreamSeq uint64
5797+
var maxConsumerSeq uint64
5798+
5799+
for streamSeq, p := range o.pending {
5800+
if streamSeq > maxStreamSeq {
5801+
maxStreamSeq = streamSeq
5802+
}
5803+
if p.Sequence > maxConsumerSeq {
5804+
maxConsumerSeq = p.Sequence
5805+
}
5806+
}
5807+
5808+
// Set next sequences based on highest pending
5809+
o.sseq = maxStreamSeq + 1
5810+
o.dseq = maxConsumerSeq + 1
5811+
}
5812+
5813+
// Ensure we don't go beyond the stream
5814+
if o.sseq > streamLastSeq+1 {
5815+
o.sseq = streamLastSeq + 1
5816+
}
5817+
5818+
o.srv.Warnf("Consumer %q reconciled state: after sseq=%d dseq=%d asflr=%d adflr=%d pending=%d rdc=%d",
5819+
o.name, o.sseq, o.dseq, o.asflr, o.adflr, len(o.pending), len(o.rdc))
5820+
}
5821+
5822+
57155823
// Will select the starting sequence.
57165824
func (o *consumer) selectStartingSeqNo() {
57175825
if o.mset == nil || o.mset.store == nil {

server/filestore.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11593,6 +11593,51 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
1159311593
return nil
1159411594
}
1159511595

11596+
// ForceUpdate updates the consumer state without the backwards check.
11597+
// This is used during recovery when we need to reset the consumer to an earlier sequence.
11598+
func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
11599+
o.fs.warn("Consumer %q force updating state: %+v", o.name, state)
11600+
// Sanity checks.
11601+
if state.AckFloor.Consumer > state.Delivered.Consumer {
11602+
return fmt.Errorf("bad ack floor for consumer")
11603+
}
11604+
if state.AckFloor.Stream > state.Delivered.Stream {
11605+
return fmt.Errorf("bad ack floor for stream")
11606+
}
11607+
11608+
// Copy to our state.
11609+
var pending map[uint64]*Pending
11610+
var redelivered map[uint64]uint64
11611+
if len(state.Pending) > 0 {
11612+
pending = make(map[uint64]*Pending, len(state.Pending))
11613+
for seq, p := range state.Pending {
11614+
pending[seq] = &Pending{p.Sequence, p.Timestamp}
11615+
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
11616+
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
11617+
}
11618+
}
11619+
}
11620+
if len(state.Redelivered) > 0 {
11621+
redelivered = make(map[uint64]uint64, len(state.Redelivered))
11622+
for seq, dc := range state.Redelivered {
11623+
redelivered[seq] = dc
11624+
}
11625+
}
11626+
11627+
// Replace our state.
11628+
o.mu.Lock()
11629+
defer o.mu.Unlock()
11630+
11631+
o.state.Delivered = state.Delivered
11632+
o.state.AckFloor = state.AckFloor
11633+
o.state.Pending = pending
11634+
o.state.Redelivered = redelivered
11635+
11636+
o.kickFlusher()
11637+
11638+
return nil
11639+
}
11640+
1159611641
// Will encrypt the state with our asset key. Will be a no-op if encryption not enabled.
1159711642
// Lock should be held.
1159811643
func (o *consumerFileStore) encryptState(buf []byte) ([]byte, error) {

server/filestore_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8518,6 +8518,94 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) {
85188518
require_Error(t, err, errCorruptState)
85198519
}
85208520

8521+
func TestFileStoreResetConsumerToStreamState(t *testing.T) {
8522+
fs, err := newFileStore(
8523+
FileStoreConfig{StoreDir: t.TempDir()},
8524+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
8525+
require_NoError(t, err)
8526+
defer fs.Stop()
8527+
8528+
msg := []byte("abc")
8529+
for i := 1; i <= 30; i++ {
8530+
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg, 0)
8531+
require_NoError(t, err)
8532+
}
8533+
8534+
err = fs.writeFullState()
8535+
require_NoError(t, err)
8536+
8537+
obs, err := fs.ConsumerStore("c1", time.Now(), &ConsumerConfig{
8538+
Durable: "c1",
8539+
FilterSubject: "foo.*",
8540+
AckPolicy: AckNone,
8541+
DeliverPolicy: DeliverAll,
8542+
})
8543+
8544+
require_NoError(t, err)
8545+
defer obs.Stop()
8546+
8547+
state := &ConsumerState{}
8548+
state.Delivered = SequencePair{
8549+
Consumer: 5,
8550+
Stream: 5,
8551+
}
8552+
8553+
state.AckFloor = SequencePair{
8554+
Consumer: 5,
8555+
Stream: 5,
8556+
}
8557+
8558+
// set to 5
8559+
err = obs.Update(state)
8560+
require_NoError(t, err)
8561+
8562+
currState, err := obs.State()
8563+
require_NoError(t, err)
8564+
8565+
require_Equal(t, fs.state.LastSeq, uint64(30))
8566+
require_Equal(t, fs.state.FirstSeq, uint64(1))
8567+
require_Equal(t, currState.AckFloor, state.AckFloor)
8568+
require_Equal(t, currState.Delivered, state.Delivered)
8569+
require_Equal(t, len(currState.Redelivered), len(state.Redelivered))
8570+
require_Equal(t, len(currState.Pending), len(state.Pending))
8571+
8572+
// reset our state to 0.
8573+
fs.state.FirstSeq = 0
8574+
fs.state.LastSeq = 0
8575+
8576+
// set back to lower values
8577+
newState := &ConsumerState{}
8578+
newState.Delivered = SequencePair{
8579+
Consumer: 1,
8580+
Stream: 4,
8581+
}
8582+
8583+
newState.AckFloor = SequencePair{
8584+
Consumer: 1,
8585+
Stream: 3,
8586+
}
8587+
8588+
// update should fail but force update should pass
8589+
err = obs.Update(newState)
8590+
require_Error(t, err, fmt.Errorf("old update ignored"))
8591+
8592+
err = obs.ForceUpdate(newState)
8593+
require_NoError(t, err)
8594+
8595+
fullState, err := obs.State()
8596+
require_NoError(t, err)
8597+
8598+
borrowedState, err := obs.BorrowState()
8599+
require_NoError(t, err)
8600+
8601+
require_Equal(t, fullState.Delivered, borrowedState.Delivered)
8602+
require_Equal(t, fullState.AckFloor, borrowedState.AckFloor)
8603+
require_Equal(t, newState.Delivered, borrowedState.Delivered)
8604+
require_Equal(t, newState.AckFloor, borrowedState.AckFloor)
8605+
require_Equal(t, len(newState.Redelivered), len(borrowedState.Redelivered))
8606+
require_Equal(t, len(newState.Pending), len(borrowedState.Pending))
8607+
}
8608+
85218609
func TestFileStoreNumPendingMulti(t *testing.T) {
85228610
fs, err := newFileStore(
85238611
FileStoreConfig{StoreDir: t.TempDir()},

server/memstore.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,6 +2311,46 @@ func (o *consumerMemStore) Update(state *ConsumerState) error {
23112311
return nil
23122312
}
23132313

2314+
func (o *consumerMemStore) ForceUpdate(state *ConsumerState) error {
2315+
// Sanity checks.
2316+
if state.AckFloor.Consumer > state.Delivered.Consumer {
2317+
return fmt.Errorf("bad ack floor for consumer")
2318+
}
2319+
if state.AckFloor.Stream > state.Delivered.Stream {
2320+
return fmt.Errorf("bad ack floor for stream")
2321+
}
2322+
2323+
// Copy to our state.
2324+
var pending map[uint64]*Pending
2325+
var redelivered map[uint64]uint64
2326+
if len(state.Pending) > 0 {
2327+
pending = make(map[uint64]*Pending, len(state.Pending))
2328+
for seq, p := range state.Pending {
2329+
pending[seq] = &Pending{p.Sequence, p.Timestamp}
2330+
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
2331+
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
2332+
}
2333+
}
2334+
}
2335+
if len(state.Redelivered) > 0 {
2336+
redelivered = make(map[uint64]uint64, len(state.Redelivered))
2337+
for seq, dc := range state.Redelivered {
2338+
redelivered[seq] = dc
2339+
}
2340+
}
2341+
2342+
// Replace our state.
2343+
o.mu.Lock()
2344+
defer o.mu.Unlock()
2345+
2346+
o.state.Delivered = state.Delivered
2347+
o.state.AckFloor = state.AckFloor
2348+
o.state.Pending = pending
2349+
o.state.Redelivered = redelivered
2350+
2351+
return nil
2352+
}
2353+
23142354
// SetStarting sets our starting stream sequence.
23152355
func (o *consumerMemStore) SetStarting(sseq uint64) error {
23162356
o.mu.Lock()

server/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ type ConsumerStore interface {
363363
UpdateAcks(dseq, sseq uint64) error
364364
UpdateConfig(cfg *ConsumerConfig) error
365365
Update(*ConsumerState) error
366+
ForceUpdate(*ConsumerState) error
366367
State() (*ConsumerState, error)
367368
BorrowState() (*ConsumerState, error)
368369
EncodedState() ([]byte, error)

0 commit comments

Comments
 (0)