Skip to content

Commit 634b7e3

Browse files
committed
add variable to mark reconciled consumers
1 parent fe19965 commit 634b7e3

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

server/consumer.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5746,8 +5746,6 @@ func (o *consumer) hasSkipListPending() bool {
57465746
return o.lss != nil && len(o.lss.seqs) > 0
57475747
}
57485748

5749-
}
5750-
57515749
// reconcileStateWithStream reconciles consumer state when the stream has reverted
57525750
// due to data loss (e.g., VM crash). This handles the case where consumer state
57535751
// is ahead of the stream's last sequence.

server/filestore.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11035,22 +11035,23 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
1103511035
////////////////////////////////////////////////////////////////////////////////
1103611036

1103711037
type consumerFileStore struct {
11038-
mu sync.Mutex
11039-
fs *fileStore
11040-
cfg *FileConsumerInfo
11041-
prf keyGen
11042-
aek cipher.AEAD
11043-
name string
11044-
odir string
11045-
ifn string
11046-
hh *highwayhash.Digest64
11047-
state ConsumerState
11048-
fch chan struct{}
11049-
qch chan struct{}
11050-
flusher bool
11051-
writing bool
11052-
dirty bool
11053-
closed bool
11038+
mu sync.Mutex
11039+
fs *fileStore
11040+
cfg *FileConsumerInfo
11041+
prf keyGen
11042+
aek cipher.AEAD
11043+
name string
11044+
odir string
11045+
ifn string
11046+
hh *highwayhash.Digest64
11047+
state ConsumerState
11048+
fch chan struct{}
11049+
qch chan struct{}
11050+
flusher bool
11051+
writing bool
11052+
dirty bool
11053+
closed bool
11054+
stateLoaded bool // Set when state is explicitly loaded/updated (reconciled).
1105411055
}
1105511056

1105611057
func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *ConsumerConfig) (ConsumerStore, error) {
@@ -11632,6 +11633,7 @@ func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
1163211633
o.state.AckFloor = state.AckFloor
1163311634
o.state.Pending = pending
1163411635
o.state.Redelivered = redelivered
11636+
o.stateLoaded = true
1163511637

1163611638
o.kickFlusher()
1163711639

@@ -11835,7 +11837,7 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er
1183511837
state := &ConsumerState{}
1183611838

1183711839
// See if we have a running state or if we need to read in from disk.
11838-
if o.state.Delivered.Consumer != 0 || o.state.Delivered.Stream != 0 {
11840+
if o.stateLoaded || o.state.Delivered.Consumer != 0 || o.state.Delivered.Stream != 0 {
1183911841
state.Delivered = o.state.Delivered
1184011842
state.AckFloor = o.state.AckFloor
1184111843
if len(o.state.Pending) > 0 {

0 commit comments

Comments
 (0)