Skip to content

Commit 5c11024

Browse files
(2.14) Reset consumer to new starting sequence
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 588a2c3 commit 5c11024

File tree

9 files changed

+580
-1
lines changed

9 files changed

+580
-1
lines changed

server/consumer.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,11 +436,13 @@ type consumer struct {
436436
lss *lastSeqSkipList
437437
rlimit *rate.Limiter
438438
reqSub *subscription
439+
resetSub *subscription
439440
ackSub *subscription
440441
ackReplyT string
441442
ackSubj string
442443
nextMsgSubj string
443444
nextMsgReqs *ipQueue[*nextMsgReq]
445+
resetSubj string
444446
maxp int
445447
pblimit int
446448
maxpb int
@@ -1276,6 +1278,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12761278
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
12771279
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
12781280
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
1281+
o.resetSubj = fmt.Sprintf(JSApiConsumerResetT, mn, o.name)
12791282

12801283
// Check/update the inactive threshold
12811284
o.updateInactiveThreshold(&o.cfg)
@@ -1558,6 +1561,10 @@ func (o *consumer) setLeader(isLeader bool) {
15581561
o.mu.Unlock()
15591562
return
15601563
}
1564+
if o.resetSub, err = o.subscribeInternal(o.resetSubj, o.processResetReq); err != nil {
1565+
o.mu.Unlock()
1566+
return
1567+
}
15611568

15621569
// Check on flow control settings.
15631570
if o.cfg.FlowControl {
@@ -1677,8 +1684,9 @@ func (o *consumer) setLeader(isLeader bool) {
16771684
// ok if they are nil, we protect inside unsubscribe()
16781685
o.unsubscribe(o.ackSub)
16791686
o.unsubscribe(o.reqSub)
1687+
o.unsubscribe(o.resetSub)
16801688
o.unsubscribe(o.fcSub)
1681-
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
1689+
o.ackSub, o.reqSub, o.resetSub, o.fcSub = nil, nil, nil, nil
16821690
if o.infoSub != nil {
16831691
o.srv.sysUnsubscribe(o.infoSub)
16841692
o.infoSub = nil
@@ -2613,6 +2621,78 @@ func (o *consumer) updateSkipped(seq uint64) {
26132621
o.propose(b[:])
26142622
}
26152623

2624+
func (o *consumer) resetStartingSeq(seq uint64, reply string) (uint64, bool, error) {
2625+
o.mu.Lock()
2626+
defer o.mu.Unlock()
2627+
2628+
// Reset to a specific sequence, or back to the ack floor.
2629+
if seq == 0 {
2630+
seq = o.asflr + 1
2631+
} else if o.cfg.DeliverPolicy == DeliverAll {
2632+
// Always allowed.
2633+
goto VALID
2634+
} else if o.cfg.DeliverPolicy == DeliverByStartSequence {
2635+
// Only allowed if not going below what's configured.
2636+
if seq < o.cfg.OptStartSeq {
2637+
return 0, false, errors.New("below start seq")
2638+
}
2639+
goto VALID
2640+
} else if o.cfg.DeliverPolicy == DeliverByStartTime && o.mset != nil {
2641+
// Only allowed if not going below what's configured.
2642+
nseq := o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
2643+
if seq < nseq {
2644+
return 0, false, errors.New("below start time")
2645+
}
2646+
goto VALID
2647+
} else {
2648+
return 0, false, errors.New("not allowed")
2649+
}
2650+
2651+
VALID:
2652+
// Must be a minimum of 1.
2653+
if seq <= 0 {
2654+
seq = 1
2655+
}
2656+
o.resetLocalStartingSeq(seq)
2657+
// Clustered mode and R>1.
2658+
if o.node != nil {
2659+
b := make([]byte, 1+8+len(reply))
2660+
b[0] = byte(resetSeqOp)
2661+
var le = binary.LittleEndian
2662+
le.PutUint64(b[1:], seq)
2663+
copy(b[1+8:], reply)
2664+
o.propose(b[:])
2665+
return seq, false, nil
2666+
} else if o.store != nil {
2667+
o.store.Reset(seq - 1)
2668+
// Cleanup messages that lost interest.
2669+
if o.retention == InterestPolicy {
2670+
if mset := o.mset; mset != nil {
2671+
o.mu.Unlock()
2672+
ss := mset.state()
2673+
o.checkStateForInterestStream(&ss)
2674+
o.mu.Lock()
2675+
}
2676+
}
2677+
2678+
// Recalculate pending, and re-trigger message delivery.
2679+
o.streamNumPending()
2680+
o.signalNewMessages()
2681+
return seq, true, nil
2682+
}
2683+
return seq, false, nil
2684+
}
2685+
2686+
// Lock should be held.
2687+
func (o *consumer) resetLocalStartingSeq(seq uint64) {
2688+
o.pending, o.rdc = nil, nil
2689+
o.rdq = nil
2690+
o.rdqi.Empty()
2691+
o.sseq, o.dseq = seq, 1
2692+
o.adflr, o.asflr = o.dseq-1, o.sseq-1
2693+
o.ldt, o.lat = time.Time{}, time.Time{}
2694+
}
2695+
26162696
func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
26172697
// On exit make sure we nil out pch.
26182698
defer func() {
@@ -4140,6 +4220,48 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
41404220
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
41414221
}
41424222

4223+
// processResetReq will reset a consumer to a new starting sequence.
4224+
func (o *consumer) processResetReq(_ *subscription, c *client, a *Account, _, reply string, rmsg []byte) {
4225+
if reply == _EMPTY_ {
4226+
return
4227+
}
4228+
4229+
s := o.srv
4230+
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}
4231+
4232+
hdr, msg := c.msgParts(rmsg)
4233+
if errorOnRequiredApiLevel(hdr) {
4234+
resp.Error = NewJSRequiredApiLevelError()
4235+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4236+
return
4237+
}
4238+
4239+
// An empty message resets back to the ack floor, otherwise a custom sequence is used.
4240+
var req JSApiConsumerResetRequest
4241+
if len(msg) > 0 {
4242+
if err := json.Unmarshal(msg, &req); err != nil {
4243+
resp.Error = NewJSInvalidJSONError(err)
4244+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4245+
return
4246+
}
4247+
// Resetting to 0 is invalid.
4248+
if req.Seq == 0 {
4249+
resp.Error = NewJSInvalidJSONError(errors.New("reset to zero seq"))
4250+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4251+
return
4252+
}
4253+
}
4254+
resetSeq, canRespond, err := o.resetStartingSeq(req.Seq, reply)
4255+
if err != nil {
4256+
resp.Error = NewJSConsumerInvalidResetError(err)
4257+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4258+
} else if canRespond {
4259+
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
4260+
resp.ResetSeq = resetSeq
4261+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4262+
}
4263+
}
4264+
41434265
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
41444266
o.mu.Lock()
41454267
defer o.mu.Unlock()
@@ -6096,9 +6218,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
60966218
o.active = false
60976219
o.unsubscribe(o.ackSub)
60986220
o.unsubscribe(o.reqSub)
6221+
o.unsubscribe(o.resetSub)
60996222
o.unsubscribe(o.fcSub)
61006223
o.ackSub = nil
61016224
o.reqSub = nil
6225+
o.resetSub = nil
61026226
o.fcSub = nil
61036227
if o.infoSub != nil {
61046228
o.srv.sysUnsubscribe(o.infoSub)

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,5 +2018,15 @@
20182018
"help": "",
20192019
"url": "",
20202020
"deprecates": ""
2021+
},
2022+
{
2023+
"constant": "JSConsumerInvalidResetErr",
2024+
"code": 400,
2025+
"error_code": 10204,
2026+
"description": "invalid reset: {err}",
2027+
"comment": "",
2028+
"help": "",
2029+
"url": "",
2030+
"deprecates": ""
20212031
}
20222032
]

server/filestore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11658,6 +11658,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
1165811658
func (o *consumerFileStore) SetStarting(sseq uint64) error {
1165911659
o.mu.Lock()
1166011660
o.state.Delivered.Stream = sseq
11661+
o.state.AckFloor.Stream = sseq
1166111662
buf, err := o.encodeState()
1166211663
o.mu.Unlock()
1166311664
if err != nil {
@@ -11682,6 +11683,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) {
1168211683
o.kickFlusher()
1168311684
}
1168411685

11686+
// Reset all values in the store, and reset the starting sequence.
11687+
func (o *consumerFileStore) Reset(sseq uint64) error {
11688+
o.mu.Lock()
11689+
o.state = ConsumerState{}
11690+
o.mu.Unlock()
11691+
return o.SetStarting(sseq)
11692+
}
11693+
1168511694
// HasState returns if this store has a recorded state.
1168611695
func (o *consumerFileStore) HasState() bool {
1168711696
o.mu.Lock()

server/jetstream_api.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ const (
155155
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156156
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157157

158+
// JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
159+
JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
160+
158161
// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
159162
JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"
160163
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
@@ -757,6 +760,19 @@ type JSApiConsumerGetNextRequest struct {
757760
PriorityGroup
758761
}
759762

763+
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
764+
type JSApiConsumerResetRequest struct {
765+
Seq uint64 `json:"seq"`
766+
}
767+
768+
type JSApiConsumerResetResponse struct {
769+
ApiResponse
770+
*ConsumerInfo
771+
ResetSeq uint64 `json:"reset_seq"`
772+
}
773+
774+
const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"
775+
760776
// Structure that holds state for a JetStream API request that is processed
761777
// in a separate long-lived go routine. This is to avoid blocking connections.
762778
type jsAPIRoutedReq struct {

server/jetstream_cluster.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ const (
144144
// Batch stream ops.
145145
batchMsgOp
146146
batchCommitMsgOp
147+
// Consumer rest to specific starting sequence.
148+
resetSeqOp
147149
)
148150

149151
// raftGroups are controlled by the metagroup controller.
@@ -6127,6 +6129,39 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
61276129
o.store.UpdateStarting(sseq - 1)
61286130
}
61296131
o.mu.Unlock()
6132+
case resetSeqOp:
6133+
o.mu.Lock()
6134+
var le = binary.LittleEndian
6135+
sseq := le.Uint64(buf[1:9])
6136+
reply := string(buf[9:])
6137+
o.resetLocalStartingSeq(sseq)
6138+
if o.store != nil {
6139+
o.store.Reset(sseq - 1)
6140+
}
6141+
// Cleanup messages that lost interest.
6142+
if o.retention == InterestPolicy {
6143+
if mset := o.mset; mset != nil {
6144+
o.mu.Unlock()
6145+
ss := mset.state()
6146+
o.checkStateForInterestStream(&ss)
6147+
o.mu.Lock()
6148+
}
6149+
}
6150+
// Recalculate pending, and re-trigger message delivery.
6151+
if !o.isLeader() {
6152+
o.mu.Unlock()
6153+
} else {
6154+
o.streamNumPending()
6155+
o.signalNewMessages()
6156+
s, a := o.srv, o.acc
6157+
o.mu.Unlock()
6158+
if reply != _EMPTY_ {
6159+
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}
6160+
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
6161+
resp.ResetSeq = sseq
6162+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
6163+
}
6164+
}
61306165
case addPendingRequest:
61316166
o.mu.Lock()
61326167
if !o.isLeader() {

0 commit comments

Comments
 (0)