Skip to content

Commit c06db5e

Browse files
(2.14) Support durable and flow-controled stream sourcing/mirroring
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 22f7dad commit c06db5e

15 files changed

+726
-62
lines changed

server/consumer.go

Lines changed: 85 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ const (
336336
AckAll
337337
// AckExplicit requires ack or nack for all messages.
338338
AckExplicit
339+
// AckFlowControl functions like AckAll, but acks based on responses to flow control.
340+
AckFlowControl
339341
)
340342

341343
func (a AckPolicy) String() string {
@@ -344,6 +346,8 @@ func (a AckPolicy) String() string {
344346
return "none"
345347
case AckAll:
346348
return "all"
349+
case AckFlowControl:
350+
return "flow_control"
347351
default:
348352
return "explicit"
349353
}
@@ -651,7 +655,7 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
651655
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
652656
}
653657
// Set proper default for max ack pending if we are ack explicit and none has been set.
654-
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
658+
if config.MaxAckPending == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll || config.AckPolicy == AckFlowControl) {
655659
ackPending := JsDefaultMaxAckPending
656660
if lim.MaxAckPending > 0 && lim.MaxAckPending < ackPending {
657661
ackPending = lim.MaxAckPending
@@ -673,6 +677,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
673677
if config.PriorityPolicy == PriorityPinnedClient && config.PinnedTTL == 0 {
674678
config.PinnedTTL = JsDefaultPinnedTTL
675679
}
680+
681+
// Set default values for flow control policy.
682+
if config.AckPolicy == AckFlowControl && !pedantic {
683+
config.FlowControl = true
684+
config.Heartbeat = sourceHealthHB
685+
}
676686
return nil
677687
}
678688

@@ -722,6 +732,31 @@ func checkConsumerCfg(
722732
return NewJSConsumerAckWaitNegativeError()
723733
}
724734

735+
// Ack Flow Control policy requires push-based flow-controlled consumer.
736+
if config.AckPolicy == AckFlowControl {
737+
if config.DeliverSubject == _EMPTY_ {
738+
return NewJSConsumerAckFCRequiresPushError()
739+
}
740+
if !config.FlowControl {
741+
return NewJSConsumerAckFCRequiresFCError()
742+
}
743+
// We currently limit using heartbeat of 1s, since those are used for ephemeral sourcing consumers as well.
744+
// We could decide to relax this in the future, but need to be careful to not allow a heartbeat larger
745+
// than the stalled source timeout.
746+
if config.Heartbeat != sourceHealthHB {
747+
return NewJSStreamInvalidConfigError(fmt.Errorf("flow control ack policy heartbeat needs to be 1s"))
748+
}
749+
if config.MaxAckPending <= 0 {
750+
return NewJSConsumerAckFCRequiresMaxAckPendingError()
751+
}
752+
if config.AckWait != 0 || len(config.BackOff) > 0 {
753+
return NewJSConsumerAckFCRequiresNoAckWaitError()
754+
}
755+
if config.MaxDeliver > 0 {
756+
return NewJSConsumerAckFCRequiresNoMaxDeliverError()
757+
}
758+
}
759+
725760
// Check if we have a BackOff defined that MaxDeliver is within range etc.
726761
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver {
727762
return NewJSConsumerMaxDeliverBackoffError()
@@ -1081,7 +1116,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
10811116
// Check on stream type conflicts with WorkQueues.
10821117
if cfg.Retention == WorkQueuePolicy && !config.Direct {
10831118
// Force explicit acks here.
1084-
if config.AckPolicy != AckExplicit {
1119+
if config.AckPolicy != AckExplicit && config.AckPolicy != AckFlowControl {
10851120
mset.mu.Unlock()
10861121
return nil, NewJSConsumerWQRequiresExplicitAckError()
10871122
}
@@ -1595,7 +1630,7 @@ func (o *consumer) setLeader(isLeader bool) {
15951630
}
15961631

15971632
var err error
1598-
if o.cfg.AckPolicy != AckNone {
1633+
if o.cfg.AckPolicy != AckNone && o.cfg.AckPolicy != AckFlowControl {
15991634
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
16001635
o.mu.Unlock()
16011636
return
@@ -3442,7 +3477,9 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
34423477
}
34433478

34443479
// Check if this ack is above the current pointer to our next to deliver.
3445-
if sseq >= o.sseq {
3480+
// Ignore if it's a flow-controlled consumer, its state could end up further ahead
3481+
// since its state is not replicated before delivery.
3482+
if sseq >= o.sseq && !o.cfg.FlowControl {
34463483
// Let's make sure this is valid.
34473484
// This is only received on the consumer leader, so should never be higher
34483485
// than the last stream sequence. But could happen if we've just become
@@ -3500,7 +3537,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
35003537
}
35013538
delete(o.rdc, sseq)
35023539
o.removeFromRedeliverQueue(sseq)
3503-
case AckAll:
3540+
case AckAll, AckFlowControl:
35043541
// no-op
35053542
if dseq <= o.adflr || sseq <= o.asflr {
35063543
o.mu.Unlock()
@@ -3660,7 +3697,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
36603697
}
36613698

36623699
switch o.cfg.AckPolicy {
3663-
case AckNone, AckAll:
3700+
case AckNone, AckAll, AckFlowControl:
36643701
needAck = sseq > asflr
36653702
case AckExplicit:
36663703
if sseq > asflr {
@@ -5259,7 +5296,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
52595296
if o.isActive() {
52605297
o.mu.RLock()
52615298
o.sendIdleHeartbeat(odsubj)
5299+
flowControl := o.cfg.AckPolicy == AckFlowControl && len(o.pending) > 0
52625300
o.mu.RUnlock()
5301+
5302+
// Send flow control on EOS if it's used for acknowledgements.
5303+
if flowControl {
5304+
o.mu.Lock()
5305+
o.sendFlowControl()
5306+
o.mu.Unlock()
5307+
}
52635308
}
52645309
// Reset our idle heartbeat timer.
52655310
hb.Reset(hbd)
@@ -5435,7 +5480,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
54355480
// Update delivered first.
54365481
o.updateDelivered(dseq, seq, dc, ts)
54375482

5438-
if ap == AckExplicit || ap == AckAll {
5483+
if ap == AckExplicit || ap == AckAll || ap == AckFlowControl {
54395484
o.trackPending(seq, dseq)
54405485
} else if ap == AckNone {
54415486
o.adflr = dseq
@@ -5487,19 +5532,23 @@ func (o *consumer) needFlowControl(sz int) bool {
54875532
if o.fcid == _EMPTY_ && o.pbytes > o.maxpb/2 {
54885533
return true
54895534
}
5535+
// Or, when acking based on flow control, we need to send it if we've hit the max pending limit earlier.
5536+
if o.fcid == _EMPTY_ && o.cfg.AckPolicy == AckFlowControl && o.maxp > 0 && len(o.pending) >= o.maxp {
5537+
return true
5538+
}
54905539
// If we have an existing outstanding FC, check to see if we need to expand the o.fcsz
54915540
if o.fcid != _EMPTY_ && (o.pbytes-o.fcsz) >= o.maxpb {
54925541
o.fcsz += sz
54935542
}
54945543
return false
54955544
}
54965545

5497-
func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, _ []byte) {
5546+
func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, rmsg []byte) {
54985547
o.mu.Lock()
5499-
defer o.mu.Unlock()
55005548

55015549
// Ignore if not the latest we have sent out.
55025550
if subj != o.fcid {
5551+
o.mu.Unlock()
55035552
return
55045553
}
55055554

@@ -5519,6 +5568,21 @@ func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, su
55195568
o.fcid, o.fcsz = _EMPTY_, 0
55205569

55215570
o.signalNewMessages()
5571+
o.mu.Unlock()
5572+
5573+
hdr, _ := c.msgParts(rmsg)
5574+
if len(hdr) > 0 {
5575+
ldseq := parseInt64(sliceHeader(JSLastConsumerSeq, hdr))
5576+
lsseq := parseInt64(sliceHeader(JSLastStreamSeq, hdr))
5577+
if lsseq > 0 {
5578+
// Delivered sequence is allowed to be zero as a response
5579+
// to flow control without any deliveries.
5580+
if ldseq <= 0 {
5581+
ldseq = 0
5582+
}
5583+
o.processAckMsg(uint64(lsseq), uint64(ldseq), 1, _EMPTY_, false)
5584+
}
5585+
}
55225586
}
55235587

55245588
// Lock should be held.
@@ -5702,8 +5766,9 @@ func (o *consumer) checkPending() {
57025766
defer o.mu.Unlock()
57035767

57045768
mset := o.mset
5769+
ttl := int64(o.cfg.AckWait)
57055770
// On stop, mset and timer will be nil.
5706-
if o.closed || mset == nil || o.ptmr == nil {
5771+
if o.closed || mset == nil || o.ptmr == nil || ttl == 0 {
57075772
o.stopAndClearPtmr()
57085773
return
57095774
}
@@ -5714,7 +5779,6 @@ func (o *consumer) checkPending() {
57145779
fseq := state.FirstSeq
57155780

57165781
now := time.Now().UnixNano()
5717-
ttl := int64(o.cfg.AckWait)
57185782
next := int64(o.ackWait(0))
57195783
// However, if there is backoff, initializes with the largest backoff.
57205784
// It will be adjusted as needed.
@@ -6809,6 +6873,12 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
68096873
}
68106874

68116875
func (o *consumer) resetPtmr(delay time.Duration) {
6876+
// A delay of zero means it should be stopped.
6877+
if delay == 0 {
6878+
o.stopAndClearPtmr()
6879+
return
6880+
}
6881+
68126882
if o.ptmr == nil {
68136883
o.ptmr = time.AfterFunc(delay, o.checkPending)
68146884
} else {
@@ -6818,6 +6888,10 @@ func (o *consumer) resetPtmr(delay time.Duration) {
68186888
}
68196889

68206890
func (o *consumer) stopAndClearPtmr() {
6891+
// If the end time is unset, short-circuit since the timer will already be stopped.
6892+
if o.ptmrEnd.IsZero() {
6893+
return
6894+
}
68216895
stopAndClearTimer(&o.ptmr)
68226896
o.ptmrEnd = time.Time{}
68236897
}

server/errors.json

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2078,5 +2078,75 @@
20782078
"help": "",
20792079
"url": "",
20802080
"deprecates": ""
2081+
},
2082+
{
2083+
"constant": "JSMirrorConsumerRequiresAckFCErr",
2084+
"code": 400,
2085+
"error_code": 10210,
2086+
"description": "stream mirror consumer requires flow control ack policy",
2087+
"comment": "",
2088+
"help": "",
2089+
"url": "",
2090+
"deprecates": ""
2091+
},
2092+
{
2093+
"constant": "JSSourceConsumerRequiresAckFCErr",
2094+
"code": 400,
2095+
"error_code": 10211,
2096+
"description": "stream source consumer requires flow control ack policy",
2097+
"comment": "",
2098+
"help": "",
2099+
"url": "",
2100+
"deprecates": ""
2101+
},
2102+
{
2103+
"constant": "JSConsumerAckFCRequiresPushErr",
2104+
"code": 400,
2105+
"error_code": 10212,
2106+
"description": "flow control ack policy requires a push based consumer",
2107+
"comment": "",
2108+
"help": "",
2109+
"url": "",
2110+
"deprecates": ""
2111+
},
2112+
{
2113+
"constant": "JSConsumerAckFCRequiresFCErr",
2114+
"code": 400,
2115+
"error_code": 10213,
2116+
"description": "flow control ack policy requires flow control",
2117+
"comment": "",
2118+
"help": "",
2119+
"url": "",
2120+
"deprecates": ""
2121+
},
2122+
{
2123+
"constant": "JSConsumerAckFCRequiresMaxAckPendingErr",
2124+
"code": 400,
2125+
"error_code": 10214,
2126+
"description": "flow control ack policy requires max ack pending",
2127+
"comment": "",
2128+
"help": "",
2129+
"url": "",
2130+
"deprecates": ""
2131+
},
2132+
{
2133+
"constant": "JSConsumerAckFCRequiresNoAckWaitErr",
2134+
"code": 400,
2135+
"error_code": 10215,
2136+
"description": "flow control ack policy requires unset ack wait",
2137+
"comment": "",
2138+
"help": "",
2139+
"url": "",
2140+
"deprecates": ""
2141+
},
2142+
{
2143+
"constant": "JSConsumerAckFCRequiresNoMaxDeliverErr",
2144+
"code": 400,
2145+
"error_code": 10216,
2146+
"description": "flow control ack policy requires unset max deliver",
2147+
"comment": "",
2148+
"help": "",
2149+
"url": "",
2150+
"deprecates": ""
20812151
}
20822152
]

server/filestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12563,7 +12563,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
1256312563
}
1256412564

1256512565
// Check for AckAll here.
12566-
if o.cfg.AckPolicy == AckAll {
12566+
if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl {
1256712567
sgap := sseq - o.state.AckFloor.Stream
1256812568
o.state.AckFloor.Consumer = dseq
1256912569
o.state.AckFloor.Stream = sseq

server/jetstream_api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,7 @@ type JSApiConsumerResetRequest struct {
772772
Seq uint64 `json:"seq,omitempty"`
773773
}
774774

775+
// JSApiConsumerResetResponse is a superset of JSApiConsumerCreateResponse, but including an explicit ResetSeq.
775776
type JSApiConsumerResetResponse struct {
776777
ApiResponse
777778
*ConsumerInfo

server/jetstream_cluster.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4122,7 +4122,7 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
41224122
if needLock {
41234123
mset.mu.RLock()
41244124
}
4125-
mset.sendFlowControlReply(reply)
4125+
mset.sendFlowControlReply(reply, hdr)
41264126
if needLock {
41274127
mset.mu.RUnlock()
41284128
}
@@ -6604,7 +6604,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
66046604
o.lat = time.Now()
66056605

66066606
var sagap uint64
6607-
if o.cfg.AckPolicy == AckAll {
6607+
if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl {
66086608
// Always use the store state, as o.asflr is skipped ahead already.
66096609
// Capture before updating store.
66106610
state, err := o.store.BorrowState()
@@ -9107,7 +9107,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
91079107
// Check if we are work queue policy.
91089108
// We will do pre-checks here to avoid thrashing meta layer.
91099109
if sa.Config.Retention == WorkQueuePolicy && !cfg.Direct {
9110-
if cfg.AckPolicy != AckExplicit {
9110+
if cfg.AckPolicy != AckExplicit && cfg.AckPolicy != AckFlowControl {
91119111
resp.Error = NewJSConsumerWQRequiresExplicitAckError()
91129112
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
91139113
return

0 commit comments

Comments
 (0)