Skip to content

Commit be4de2b

Browse files
committed
add test case
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
1 parent 2036d92 commit be4de2b

File tree

3 files changed

+131
-34
lines changed

3 files changed

+131
-34
lines changed

server/filestore.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11657,7 +11657,6 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
1165711657
// ForceUpdate updates the consumer state without the backwards check.
1165811658
// This is used during recovery when we need to reset the consumer to an earlier sequence.
1165911659
func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
11660-
o.fs.warn("Consumer %q force updating state: %+v", o.name, state)
1166111660
// Sanity checks.
1166211661
if state.AckFloor.Consumer > state.Delivered.Consumer {
1166311662
return fmt.Errorf("bad ack floor for consumer")
@@ -11687,7 +11686,6 @@ func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
1168711686

1168811687
// Replace our state.
1168911688
o.mu.Lock()
11690-
1169111689
o.state.Delivered = state.Delivered
1169211690
o.state.AckFloor = state.AckFloor
1169311691
o.state.Pending = pending

server/filestore_test.go

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8545,15 +8545,8 @@ func TestFileStoreResetConsumerToStreamState(t *testing.T) {
85458545
defer obs.Stop()
85468546

85478547
state := &ConsumerState{}
8548-
state.Delivered = SequencePair{
8549-
Consumer: 5,
8550-
Stream: 5,
8551-
}
8552-
8553-
state.AckFloor = SequencePair{
8554-
Consumer: 5,
8555-
Stream: 5,
8556-
}
8548+
state.Delivered = SequencePair{Consumer: 5, Stream: 5}
8549+
state.AckFloor = SequencePair{Consumer: 5, Stream: 5}
85578550

85588551
// set to 5
85598552
err = obs.Update(state)
@@ -8562,27 +8555,23 @@ func TestFileStoreResetConsumerToStreamState(t *testing.T) {
85628555
currState, err := obs.State()
85638556
require_NoError(t, err)
85648557

8565-
require_Equal(t, fs.state.LastSeq, uint64(30))
8566-
require_Equal(t, fs.state.FirstSeq, uint64(1))
8558+
fsState := fs.State()
8559+
require_Equal(t, fsState.LastSeq, uint64(30))
8560+
require_Equal(t, fsState.FirstSeq, uint64(1))
85678561
require_Equal(t, currState.AckFloor, state.AckFloor)
85688562
require_Equal(t, currState.Delivered, state.Delivered)
85698563
require_Equal(t, len(currState.Redelivered), len(state.Redelivered))
85708564
require_Equal(t, len(currState.Pending), len(state.Pending))
85718565

8572-
// reset our state to 0.
8566+
fs.mu.Lock()
85738567
fs.state.FirstSeq = 0
85748568
fs.state.LastSeq = 0
8569+
fs.mu.Unlock()
85758570

85768571
// set back to lower values
8577-
newState := &ConsumerState{}
8578-
newState.Delivered = SequencePair{
8579-
Consumer: 1,
8580-
Stream: 4,
8581-
}
8582-
8583-
newState.AckFloor = SequencePair{
8584-
Consumer: 1,
8585-
Stream: 3,
8572+
newState := &ConsumerState{
8573+
Delivered: SequencePair{Consumer: 1, Stream: 4},
8574+
AckFloor: SequencePair{Consumer: 1, Stream: 3},
85868575
}
85878576

85888577
// update should fail but force update should pass
@@ -8592,18 +8581,13 @@ func TestFileStoreResetConsumerToStreamState(t *testing.T) {
85928581
err = obs.ForceUpdate(newState)
85938582
require_NoError(t, err)
85948583

8595-
fullState, err := obs.State()
8596-
require_NoError(t, err)
8597-
8598-
borrowedState, err := obs.BorrowState()
8584+
currState, err = obs.State()
85998585
require_NoError(t, err)
86008586

8601-
require_Equal(t, fullState.Delivered, borrowedState.Delivered)
8602-
require_Equal(t, fullState.AckFloor, borrowedState.AckFloor)
8603-
require_Equal(t, newState.Delivered, borrowedState.Delivered)
8604-
require_Equal(t, newState.AckFloor, borrowedState.AckFloor)
8605-
require_Equal(t, len(newState.Redelivered), len(borrowedState.Redelivered))
8606-
require_Equal(t, len(newState.Pending), len(borrowedState.Pending))
8587+
require_Equal(t, currState.AckFloor, newState.AckFloor)
8588+
require_Equal(t, currState.Delivered, newState.Delivered)
8589+
require_Equal(t, len(currState.Redelivered), len(newState.Redelivered))
8590+
require_Equal(t, len(currState.Pending), len(newState.Pending))
86078591
}
86088592

86098593
func TestFileStoreNumPendingMulti(t *testing.T) {

server/jetstream_consumer_test.go

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import (
2525
"fmt"
2626
"math/rand"
2727
"net/url"
28-
"os"
28+
os "os"
29+
"path/filepath"
2930
"reflect"
3031
"runtime"
3132
"slices"
@@ -10587,3 +10588,117 @@ func TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange(t *test
1058710588
o = mset.lookupConsumer("CONSUMER")
1058810589
require_NotNil(t, o)
1058910590
}
10591+
10592+
func TestJetStreamConsumerReconcileConsumerAfterStreamDataLoss(t *testing.T) {
10593+
test := func(t *testing.T, totalMsgs int) {
10594+
storeDir := t.TempDir()
10595+
conf := createConfFile(t, []byte(fmt.Sprintf(`
10596+
listen: 127.0.0.1:-1
10597+
jetstream: {store_dir: %q}
10598+
`, storeDir)))
10599+
10600+
s, _ := RunServerWithConfig(conf)
10601+
defer s.Shutdown()
10602+
10603+
nc, js := jsClientConnect(t, s)
10604+
defer nc.Close()
10605+
10606+
_, err := js.AddStream(&nats.StreamConfig{
10607+
Name: "TEST",
10608+
Subjects: []string{"foo"},
10609+
})
10610+
require_NoError(t, err)
10611+
10612+
// Publish a few messages.
10613+
for range totalMsgs {
10614+
_, err = js.Publish("foo", nil)
10615+
require_NoError(t, err)
10616+
}
10617+
10618+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
10619+
Durable: "DURABLE",
10620+
AckPolicy: nats.AckExplicitPolicy,
10621+
})
10622+
require_NoError(t, err)
10623+
10624+
sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST"))
10625+
require_NoError(t, err)
10626+
defer sub.Drain()
10627+
10628+
// Consume all available messages.
10629+
msgs, err := sub.Fetch(totalMsgs, nats.MaxWait(200*time.Millisecond))
10630+
require_NoError(t, err)
10631+
require_Len(t, len(msgs), totalMsgs)
10632+
for _, msg := range msgs {
10633+
require_NoError(t, msg.AckSync())
10634+
}
10635+
10636+
// Confirm the consumer info reports all messages as delivered and acked.
10637+
lseq := uint64(totalMsgs)
10638+
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
10639+
require_NoError(t, err)
10640+
require_Equal(t, ci.NumPending, 0)
10641+
require_Equal(t, ci.NumAckPending, 0)
10642+
require_Equal(t, ci.Delivered.Stream, lseq)
10643+
require_Equal(t, ci.AckFloor.Stream, lseq)
10644+
require_Equal(t, ci.Delivered.Consumer, lseq)
10645+
10646+
// Shut down the server and manually remove or truncate the message blocks, simulating data loss.
10647+
mset, err := s.globalAccount().lookupStream("TEST")
10648+
require_NoError(t, err)
10649+
fs := mset.store.(*fileStore)
10650+
blk := filepath.Join(fs.fcfg.StoreDir, msgDir, "1.blk")
10651+
index := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
10652+
nc.Close()
10653+
s.Shutdown()
10654+
if totalMsgs > 1 {
10655+
stat, err := os.Stat(blk)
10656+
require_NoError(t, err)
10657+
require_NoError(t, os.Truncate(blk, stat.Size()/2+1))
10658+
} else {
10659+
require_NoError(t, os.Remove(blk))
10660+
}
10661+
require_NoError(t, os.Remove(index))
10662+
10663+
// Restart the server and reconnect.
10664+
s, _ = RunServerWithConfig(conf)
10665+
defer s.Shutdown()
10666+
nc, js = jsClientConnect(t, s)
10667+
defer nc.Close()
10668+
10669+
// Publish another message. Due to the simulated data loss, the stream sequence should continue
10670+
// counting after truncating the corrupted data.
10671+
pubAck, err := js.Publish("foo", nil)
10672+
require_NoError(t, err)
10673+
require_Equal(t, pubAck.Sequence, lseq)
10674+
10675+
sub, err = js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST"))
10676+
require_NoError(t, err)
10677+
defer sub.Drain()
10678+
10679+
// The consumer should be able to consume above message.
10680+
// Previously the consumer state would not be reconciled and would not be able to consume the message.
10681+
msgs, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
10682+
require_NoError(t, err)
10683+
require_Len(t, len(msgs), 1)
10684+
msg := msgs[0]
10685+
meta, err := msg.Metadata()
10686+
require_NoError(t, err)
10687+
require_Equal(t, meta.Sequence.Stream, lseq)
10688+
require_NoError(t, msg.AckSync())
10689+
10690+
// Confirm the consumer info reports all messages as delivered and acked.
10691+
// But the delivered sequence shouldn't be reset and still move monotonically.
10692+
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
10693+
require_NoError(t, err)
10694+
require_Equal(t, ci.NumPending, 0)
10695+
require_Equal(t, ci.NumAckPending, 0)
10696+
require_Equal(t, ci.Delivered.Stream, lseq)
10697+
require_Equal(t, ci.AckFloor.Stream, lseq)
10698+
require_Equal(t, ci.Delivered.Consumer, lseq+1)
10699+
}
10700+
10701+
for _, totalMsgs := range []int{1, 2} {
10702+
t.Run(fmt.Sprint(totalMsgs), func(t *testing.T) { test(t, totalMsgs) })
10703+
}
10704+
}

0 commit comments

Comments
 (0)