Skip to content

Commit 2036d92

Browse files
committed
reconcile consumer state to stream seq
1 parent 1723f25 commit 2036d92

File tree

5 files changed

+279
-1
lines changed

5 files changed

+279
-1
lines changed

server/consumer.go

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
963963
}
964964

965965
mset.mu.RLock()
966-
s, js, jsa, cfg, acc := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc
966+
s, js, jsa, cfg, acc, lseq := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc, mset.lseq
967967
mset.mu.RUnlock()
968968

969969
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
@@ -1227,7 +1227,49 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12271227
// Restore our saved state.
12281228
o.mu.Lock()
12291229
o.readStoredState()
1230+
1231+
replicas := o.cfg.replicas(&mset.cfg)
1232+
1233+
// Starting sequence represents the next sequence to be delivered, so decrement it
1234+
// since that's the minimum amount the stream should have as its last sequence.
1235+
sseq := o.sseq
1236+
if sseq > 0 {
1237+
sseq--
1238+
}
1239+
12301240
o.mu.Unlock()
1241+
1242+
// A stream observing data loss rolls back in its sequence. Check if we need to reconcile the consumer state
1243+
// to ensure new messages aren't skipped.
1244+
// Only performed for non-replicated consumers for now.
1245+
if replicas == 1 && lseq < sseq && isRecovering {
1246+
s.Warnf("JetStream consumer '%s > %s > %s' delivered sequence %d past last stream sequence of %d",
1247+
o.acc.Name, o.stream, o.name, sseq, lseq)
1248+
1249+
o.mu.Lock()
1250+
o.reconcileStateWithStream(lseq)
1251+
1252+
// Save the reconciled state
1253+
state := &ConsumerState{
1254+
Delivered: SequencePair{
1255+
Stream: o.sseq - 1,
1256+
Consumer: o.dseq - 1,
1257+
},
1258+
AckFloor: SequencePair{
1259+
Stream: o.asflr,
1260+
Consumer: o.adflr,
1261+
},
1262+
Pending: o.pending,
1263+
Redelivered: o.rdc,
1264+
}
1265+
err := o.store.ForceUpdate(state)
1266+
o.mu.Unlock()
1267+
if err != nil {
1268+
s.Errorf("JetStream consumer '%s > %s > %s' errored while updating state: %v", o.acc.Name, o.stream, o.name, err)
1269+
mset.mu.Unlock()
1270+
return nil, NewJSConsumerStoreFailedError(err)
1271+
}
1272+
}
12311273
} else {
12321274
// Select starting sequence number
12331275
o.selectStartingSeqNo()
@@ -5710,6 +5752,67 @@ func (o *consumer) hasSkipListPending() bool {
57105752
return o.lss != nil && len(o.lss.seqs) > 0
57115753
}
57125754

5755+
// reconcileStateWithStream reconciles consumer state when the stream has reverted
5756+
// due to data loss (e.g., VM crash). This handles the case where consumer state
5757+
// is ahead of the stream's last sequence.
5758+
// Lock should be held.
5759+
func (o *consumer) reconcileStateWithStream(streamLastSeq uint64) {
5760+
// If an ack floor is higher than stream last sequence,
5761+
// reset back down but keep the highest known sequences.
5762+
if o.asflr > streamLastSeq {
5763+
o.asflr = streamLastSeq
5764+
// Delivery floor is one below the delivered sequence,
5765+
// but if it is zero somehow, ensure we don't underflow.
5766+
o.adflr = o.dseq
5767+
if o.adflr > 0 {
5768+
o.adflr--
5769+
}
5770+
o.pending = nil
5771+
o.rdc = nil
5772+
}
5773+
5774+
// Remove pending entries that are beyond the stream's last sequence
5775+
if len(o.pending) > 0 {
5776+
for seq := range o.pending {
5777+
if seq > streamLastSeq {
5778+
delete(o.pending, seq)
5779+
}
5780+
}
5781+
}
5782+
5783+
// Remove redelivered entries that are beyond the stream's last sequence
5784+
if len(o.rdc) > 0 {
5785+
for seq := range o.rdc {
5786+
if seq > streamLastSeq {
5787+
delete(o.rdc, seq)
5788+
}
5789+
}
5790+
}
5791+
5792+
// Update starting sequence and delivery sequence based on pending state
5793+
if len(o.pending) == 0 {
5794+
o.sseq = o.asflr + 1
5795+
o.dseq = o.adflr + 1
5796+
} else {
5797+
// Find highest stream sequence in pending
5798+
var maxStreamSeq uint64
5799+
var maxConsumerSeq uint64
5800+
5801+
for streamSeq, p := range o.pending {
5802+
if streamSeq > maxStreamSeq {
5803+
maxStreamSeq = streamSeq
5804+
}
5805+
if p.Sequence > maxConsumerSeq {
5806+
maxConsumerSeq = p.Sequence
5807+
}
5808+
}
5809+
5810+
// Set next sequences based on highest pending
5811+
o.sseq = maxStreamSeq + 1
5812+
o.dseq = maxConsumerSeq + 1
5813+
}
5814+
}
5815+
57135816
// Will select the starting sequence.
57145817
func (o *consumer) selectStartingSeqNo() {
57155818
if o.mset == nil || o.mset.store == nil {

server/filestore.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11654,6 +11654,52 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
1165411654
return nil
1165511655
}
1165611656

11657+
// ForceUpdate updates the consumer state without the backwards check.
11658+
// This is used during recovery when we need to reset the consumer to an earlier sequence.
11659+
func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
11660+
o.fs.warn("Consumer %q force updating state: %+v", o.name, state)
11661+
// Sanity checks.
11662+
if state.AckFloor.Consumer > state.Delivered.Consumer {
11663+
return fmt.Errorf("bad ack floor for consumer")
11664+
}
11665+
if state.AckFloor.Stream > state.Delivered.Stream {
11666+
return fmt.Errorf("bad ack floor for stream")
11667+
}
11668+
11669+
// Copy to our state.
11670+
var pending map[uint64]*Pending
11671+
var redelivered map[uint64]uint64
11672+
if len(state.Pending) > 0 {
11673+
pending = make(map[uint64]*Pending, len(state.Pending))
11674+
for seq, p := range state.Pending {
11675+
pending[seq] = &Pending{p.Sequence, p.Timestamp}
11676+
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
11677+
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
11678+
}
11679+
}
11680+
}
11681+
if len(state.Redelivered) > 0 {
11682+
redelivered = make(map[uint64]uint64, len(state.Redelivered))
11683+
for seq, dc := range state.Redelivered {
11684+
redelivered[seq] = dc
11685+
}
11686+
}
11687+
11688+
// Replace our state.
11689+
o.mu.Lock()
11690+
11691+
o.state.Delivered = state.Delivered
11692+
o.state.AckFloor = state.AckFloor
11693+
o.state.Pending = pending
11694+
o.state.Redelivered = redelivered
11695+
buf, err := o.encodeState()
11696+
o.mu.Unlock()
11697+
if err != nil {
11698+
return err
11699+
}
11700+
return o.writeState(buf)
11701+
}
11702+
1165711703
// Will encrypt the state with our asset key. Will be a no-op if encryption not enabled.
1165811704
// Lock should be held.
1165911705
func (o *consumerFileStore) encryptState(buf []byte) ([]byte, error) {

server/filestore_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8518,6 +8518,94 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) {
85188518
require_Error(t, err, errCorruptState)
85198519
}
85208520

8521+
func TestFileStoreResetConsumerToStreamState(t *testing.T) {
8522+
fs, err := newFileStore(
8523+
FileStoreConfig{StoreDir: t.TempDir()},
8524+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
8525+
require_NoError(t, err)
8526+
defer fs.Stop()
8527+
8528+
msg := []byte("abc")
8529+
for i := 1; i <= 30; i++ {
8530+
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg, 0)
8531+
require_NoError(t, err)
8532+
}
8533+
8534+
err = fs.writeFullState()
8535+
require_NoError(t, err)
8536+
8537+
obs, err := fs.ConsumerStore("c1", time.Now(), &ConsumerConfig{
8538+
Durable: "c1",
8539+
FilterSubject: "foo.*",
8540+
AckPolicy: AckNone,
8541+
DeliverPolicy: DeliverAll,
8542+
})
8543+
8544+
require_NoError(t, err)
8545+
defer obs.Stop()
8546+
8547+
state := &ConsumerState{}
8548+
state.Delivered = SequencePair{
8549+
Consumer: 5,
8550+
Stream: 5,
8551+
}
8552+
8553+
state.AckFloor = SequencePair{
8554+
Consumer: 5,
8555+
Stream: 5,
8556+
}
8557+
8558+
// set to 5
8559+
err = obs.Update(state)
8560+
require_NoError(t, err)
8561+
8562+
currState, err := obs.State()
8563+
require_NoError(t, err)
8564+
8565+
require_Equal(t, fs.state.LastSeq, uint64(30))
8566+
require_Equal(t, fs.state.FirstSeq, uint64(1))
8567+
require_Equal(t, currState.AckFloor, state.AckFloor)
8568+
require_Equal(t, currState.Delivered, state.Delivered)
8569+
require_Equal(t, len(currState.Redelivered), len(state.Redelivered))
8570+
require_Equal(t, len(currState.Pending), len(state.Pending))
8571+
8572+
// reset our state to 0.
8573+
fs.state.FirstSeq = 0
8574+
fs.state.LastSeq = 0
8575+
8576+
// set back to lower values
8577+
newState := &ConsumerState{}
8578+
newState.Delivered = SequencePair{
8579+
Consumer: 1,
8580+
Stream: 4,
8581+
}
8582+
8583+
newState.AckFloor = SequencePair{
8584+
Consumer: 1,
8585+
Stream: 3,
8586+
}
8587+
8588+
// update should fail but force update should pass
8589+
err = obs.Update(newState)
8590+
require_Error(t, err, fmt.Errorf("old update ignored"))
8591+
8592+
err = obs.ForceUpdate(newState)
8593+
require_NoError(t, err)
8594+
8595+
fullState, err := obs.State()
8596+
require_NoError(t, err)
8597+
8598+
borrowedState, err := obs.BorrowState()
8599+
require_NoError(t, err)
8600+
8601+
require_Equal(t, fullState.Delivered, borrowedState.Delivered)
8602+
require_Equal(t, fullState.AckFloor, borrowedState.AckFloor)
8603+
require_Equal(t, newState.Delivered, borrowedState.Delivered)
8604+
require_Equal(t, newState.AckFloor, borrowedState.AckFloor)
8605+
require_Equal(t, len(newState.Redelivered), len(borrowedState.Redelivered))
8606+
require_Equal(t, len(newState.Pending), len(borrowedState.Pending))
8607+
}
8608+
85218609
func TestFileStoreNumPendingMulti(t *testing.T) {
85228610
fs, err := newFileStore(
85238611
FileStoreConfig{StoreDir: t.TempDir()},

server/memstore.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,6 +2311,46 @@ func (o *consumerMemStore) Update(state *ConsumerState) error {
23112311
return nil
23122312
}
23132313

2314+
func (o *consumerMemStore) ForceUpdate(state *ConsumerState) error {
2315+
// Sanity checks.
2316+
if state.AckFloor.Consumer > state.Delivered.Consumer {
2317+
return fmt.Errorf("bad ack floor for consumer")
2318+
}
2319+
if state.AckFloor.Stream > state.Delivered.Stream {
2320+
return fmt.Errorf("bad ack floor for stream")
2321+
}
2322+
2323+
// Copy to our state.
2324+
var pending map[uint64]*Pending
2325+
var redelivered map[uint64]uint64
2326+
if len(state.Pending) > 0 {
2327+
pending = make(map[uint64]*Pending, len(state.Pending))
2328+
for seq, p := range state.Pending {
2329+
pending[seq] = &Pending{p.Sequence, p.Timestamp}
2330+
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
2331+
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
2332+
}
2333+
}
2334+
}
2335+
if len(state.Redelivered) > 0 {
2336+
redelivered = make(map[uint64]uint64, len(state.Redelivered))
2337+
for seq, dc := range state.Redelivered {
2338+
redelivered[seq] = dc
2339+
}
2340+
}
2341+
2342+
// Replace our state.
2343+
o.mu.Lock()
2344+
defer o.mu.Unlock()
2345+
2346+
o.state.Delivered = state.Delivered
2347+
o.state.AckFloor = state.AckFloor
2348+
o.state.Pending = pending
2349+
o.state.Redelivered = redelivered
2350+
2351+
return nil
2352+
}
2353+
23142354
// SetStarting sets our starting stream sequence.
23152355
func (o *consumerMemStore) SetStarting(sseq uint64) error {
23162356
o.mu.Lock()

server/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ type ConsumerStore interface {
363363
UpdateAcks(dseq, sseq uint64) error
364364
UpdateConfig(cfg *ConsumerConfig) error
365365
Update(*ConsumerState) error
366+
ForceUpdate(*ConsumerState) error
366367
State() (*ConsumerState, error)
367368
BorrowState() (*ConsumerState, error)
368369
EncodedState() ([]byte, error)

0 commit comments

Comments
 (0)