Skip to content

Commit 67a0316

Browse files
committed
Separate info requests into their own JS API queue
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent c3e0b35 commit 67a0316

File tree

8 files changed

+130
-56
lines changed

8 files changed

+130
-56
lines changed

server/events.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,8 +1053,14 @@ func (s *Server) sendStatsz(subj string) {
10531053
Size: mg.ClusterSize(),
10541054
}
10551055
}
1056-
if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil {
1057-
jStat.Meta.Pending = ipq.len()
1056+
if jStat.Meta != nil {
1057+
if ipq := s.jsAPIRoutedReqs; ipq != nil {
1058+
jStat.Meta.PendingRequests = ipq.len()
1059+
}
1060+
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
1061+
jStat.Meta.PendingInfos = ipq.len()
1062+
}
1063+
jStat.Meta.Pending = jStat.Meta.PendingRequests + jStat.Meta.PendingInfos
10581064
}
10591065
}
10601066
jStat.Limits = &s.getOpts().JetStreamLimits

server/jetstream.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"time"
3232

3333
"github.com/minio/highwayhash"
34+
"github.com/nats-io/nats-server/v2/server/gsl"
3435
"github.com/nats-io/nats-server/v2/server/sysmem"
3536
"github.com/nats-io/nats-server/v2/server/tpm"
3637
"github.com/nats-io/nkeys"
@@ -102,22 +103,24 @@ type JetStreamAPIStats struct {
102103
// This is for internal accounting for JetStream for this server.
103104
type jetStream struct {
104105
// These are here first because of atomics on 32bit systems.
105-
apiInflight int64
106-
apiTotal int64
107-
apiErrors int64
108-
memReserved int64
109-
storeReserved int64
110-
memUsed int64
111-
storeUsed int64
112-
queueLimit int64
113-
clustered int32
114-
mu sync.RWMutex
115-
srv *Server
116-
config JetStreamConfig
117-
cluster *jetStreamCluster
118-
accounts map[string]*jsAccount
119-
apiSubs *Sublist
120-
started time.Time
106+
apiInflight int64
107+
apiTotal int64
108+
apiErrors int64
109+
memReserved int64
110+
storeReserved int64
111+
memUsed int64
112+
storeUsed int64
113+
queueLimit int64
114+
infoQueueLimit int64
115+
clustered int32
116+
mu sync.RWMutex
117+
srv *Server
118+
config JetStreamConfig
119+
cluster *jetStreamCluster
120+
accounts map[string]*jsAccount
121+
apiSubs *Sublist
122+
infoSubs *gsl.SimpleSublist // Subjects for info-specific queue.
123+
started time.Time
121124

122125
// System level request to purge a stream move
123126
accountPurge *subscription
@@ -412,7 +415,7 @@ func (s *Server) initJetStreamEncryption() (err error) {
412415

413416
// enableJetStream will start up the JetStream subsystem.
414417
func (s *Server) enableJetStream(cfg JetStreamConfig) error {
415-
js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()}
418+
js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache(), infoSubs: gsl.NewSimpleSublist()}
416419
s.gcbMu.Lock()
417420
if s.gcbOutMax = s.getOpts().JetStreamMaxCatchup; s.gcbOutMax == 0 {
418421
s.gcbOutMax = defaultMaxTotalCatchupOutBytes
@@ -421,6 +424,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
421424

422425
// TODO: Not currently reloadable.
423426
atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit)
427+
atomic.StoreInt64(&js.infoQueueLimit, s.getOpts().JetStreamInfoQueueLimit)
424428

425429
s.js.Store(js)
426430

server/jetstream_api.go

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -859,11 +859,19 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
859859
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
860860
// header from the msg body. No other references are needed.
861861
// Check pending and warn if getting backed up.
862-
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
863-
limit := atomic.LoadInt64(&js.queueLimit)
862+
var queue *ipQueue[*jsAPIRoutedReq]
863+
var limit int64
864+
if js.infoSubs.HasInterest(subject) {
865+
queue = s.jsAPIRoutedInfoReqs
866+
limit = atomic.LoadInt64(&js.infoQueueLimit)
867+
} else {
868+
queue = s.jsAPIRoutedReqs
869+
limit = atomic.LoadInt64(&js.queueLimit)
870+
}
871+
pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
864872
if pending >= int(limit) {
865-
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
866-
drained := int64(s.jsAPIRoutedReqs.drain())
873+
s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending)
874+
drained := int64(queue.drain())
867875
atomic.AddInt64(&js.apiInflight, -drained)
868876

869877
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
@@ -883,29 +891,45 @@ func (s *Server) processJSAPIRoutedRequests() {
883891
defer s.grWG.Done()
884892

885893
s.mu.RLock()
886-
queue := s.jsAPIRoutedReqs
894+
queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs
887895
client := &client{srv: s, kind: JETSTREAM}
888896
s.mu.RUnlock()
889897

890898
js := s.getJetStream()
891899

900+
processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) {
901+
// Only pop one item at a time here, otherwise if the system is recovering
902+
// from queue buildup, then one worker will pull off all the tasks and the
903+
// others will be starved of work.
904+
if r, ok := ipq.popOne(); ok && r != nil {
905+
client.pa = r.pa
906+
start := time.Now()
907+
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
908+
if dur := time.Since(start); dur >= readLoopReportThreshold {
909+
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
910+
}
911+
atomic.AddInt64(&js.apiInflight, -1)
912+
}
913+
}
914+
892915
for {
916+
// First select case is prioritizing queue, we will only fall through
917+
// to the second select case that considers infoqueue if queue is empty.
918+
// This effectively means infos are deprioritized.
893919
select {
894920
case <-queue.ch:
895-
// Only pop one item at a time here, otherwise if the system is recovering
896-
// from queue buildup, then one worker will pull off all the tasks and the
897-
// others will be starved of work.
898-
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
899-
client.pa = r.pa
900-
start := time.Now()
901-
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
902-
if dur := time.Since(start); dur >= readLoopReportThreshold {
903-
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
904-
}
905-
atomic.AddInt64(&js.apiInflight, -1)
906-
}
921+
processFromQueue(queue)
907922
case <-s.quitCh:
908923
return
924+
default:
925+
select {
926+
case <-infoqueue.ch:
927+
processFromQueue(infoqueue)
928+
case <-queue.ch:
929+
processFromQueue(queue)
930+
case <-s.quitCh:
931+
return
932+
}
909933
}
910934
}
911935
}
@@ -924,7 +948,8 @@ func (s *Server) setJetStreamExportSubs() error {
924948
if mp > maxProcs {
925949
mp = maxProcs
926950
}
927-
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
951+
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue")
952+
s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue")
928953
for i := 0; i < mp; i++ {
929954
s.startGoRoutine(s.processJSAPIRoutedRequests)
930955
}
@@ -940,16 +965,13 @@ func (s *Server) setJetStreamExportSubs() error {
940965
}
941966

942967
// API handles themselves.
968+
// infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests.
943969
pairs := []struct {
944970
subject string
945971
handler msgHandler
946972
}{
947-
{JSApiAccountInfo, s.jsAccountInfoRequest},
948973
{JSApiStreamCreate, s.jsStreamCreateRequest},
949974
{JSApiStreamUpdate, s.jsStreamUpdateRequest},
950-
{JSApiStreams, s.jsStreamNamesRequest},
951-
{JSApiStreamList, s.jsStreamListRequest},
952-
{JSApiStreamInfo, s.jsStreamInfoRequest},
953975
{JSApiStreamDelete, s.jsStreamDeleteRequest},
954976
{JSApiStreamPurge, s.jsStreamPurgeRequest},
955977
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
@@ -962,23 +984,40 @@ func (s *Server) setJetStreamExportSubs() error {
962984
{JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
963985
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
964986
{JSApiDurableCreate, s.jsConsumerCreateRequest},
965-
{JSApiConsumers, s.jsConsumerNamesRequest},
966-
{JSApiConsumerList, s.jsConsumerListRequest},
967-
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
968987
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
969988
{JSApiConsumerPause, s.jsConsumerPauseRequest},
970989
{JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
971990
}
991+
infopairs := []struct {
992+
subject string
993+
handler msgHandler
994+
}{
995+
{JSApiAccountInfo, s.jsAccountInfoRequest},
996+
{JSApiStreams, s.jsStreamNamesRequest},
997+
{JSApiStreamList, s.jsStreamListRequest},
998+
{JSApiStreamInfo, s.jsStreamInfoRequest},
999+
{JSApiConsumers, s.jsConsumerNamesRequest},
1000+
{JSApiConsumerList, s.jsConsumerListRequest},
1001+
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
1002+
}
9721003

9731004
js.mu.Lock()
9741005
defer js.mu.Unlock()
9751006

976-
for _, p := range pairs {
1007+
// As well as populating js.apiSubs for the dispatch function to use, we
1008+
// will also populate js.infoSubs, so that the dispatch function can
1009+
// decide quickly whether or not the request is an info request or not.
1010+
for _, p := range append(infopairs, pairs...) {
9771011
sub := &subscription{subject: []byte(p.subject), icb: p.handler}
9781012
if err := js.apiSubs.Insert(sub); err != nil {
9791013
return err
9801014
}
9811015
}
1016+
for _, p := range infopairs {
1017+
if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil {
1018+
return err
1019+
}
1020+
}
9821021

9831022
return nil
9841023
}

server/jetstream_cluster_4_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2977,10 +2977,13 @@ func TestJetStreamClusterAPILimitDefault(t *testing.T) {
29772977
for _, s := range c.servers {
29782978
s.optsMu.RLock()
29792979
lim := s.opts.JetStreamRequestQueueLimit
2980+
ilim := s.opts.JetStreamInfoQueueLimit
29802981
s.optsMu.RUnlock()
29812982

29822983
require_Equal(t, lim, JSDefaultRequestQueueLimit)
2984+
require_Equal(t, ilim, JSDefaultRequestQueueLimit)
29832985
require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit)
2986+
require_Equal(t, atomic.LoadInt64(&s.getJetStream().infoQueueLimit), JSDefaultRequestQueueLimit)
29842987
}
29852988
}
29862989

@@ -5384,7 +5387,7 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
53845387
require_NoError(t, nc.PublishMsg(msg))
53855388
}
53865389
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
5387-
if queued := leader.jsAPIRoutedReqs.len(); queued != count {
5390+
if queued := leader.jsAPIRoutedInfoReqs.len(); queued != count {
53885391
return fmt.Errorf("expected %d queued requests, got %d", count, queued)
53895392
}
53905393
return nil

server/monitor.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1570,8 +1570,12 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
15701570
v.Meta.Replicas = ci.Replicas
15711571
}
15721572
if ipq := s.jsAPIRoutedReqs; ipq != nil {
1573-
v.Meta.Pending = ipq.len()
1573+
v.Meta.PendingRequests = ipq.len()
15741574
}
1575+
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
1576+
v.Meta.PendingInfos = ipq.len()
1577+
}
1578+
v.Meta.Pending = v.Meta.PendingRequests + v.Meta.PendingInfos
15751579
}
15761580
}
15771581
}
@@ -3010,13 +3014,15 @@ type MetaSnapshotStats struct {
30103014

30113015
// MetaClusterInfo shows information about the meta group.
30123016
type MetaClusterInfo struct {
3013-
Name string `json:"name,omitempty"` // Name is the name of the cluster
3014-
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
3015-
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
3016-
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
3017-
Size int `json:"cluster_size"` // Size is the known size of the cluster
3018-
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
3019-
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
3017+
Name string `json:"name,omitempty"` // Name is the name of the cluster
3018+
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
3019+
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
3020+
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
3021+
Size int `json:"cluster_size"` // Size is the known size of the cluster
3022+
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
3023+
PendingRequests int `json:"pending_requests"` // PendingRequests is how many CRUD operations are queued for processing
3024+
PendingInfos int `json:"pending_infos"` // PendingInfos is how many info operations are queued for processing
3025+
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
30203026
}
30213027

30223028
// JSInfo has detailed information on JetStream.
@@ -3239,8 +3245,12 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
32393245
jsi.Meta.Replicas = ci.Replicas
32403246
}
32413247
if ipq := s.jsAPIRoutedReqs; ipq != nil {
3242-
jsi.Meta.Pending = ipq.len()
3248+
jsi.Meta.PendingRequests = ipq.len()
3249+
}
3250+
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
3251+
jsi.Meta.PendingInfos = ipq.len()
32433252
}
3253+
jsi.Meta.Pending = jsi.Meta.PendingRequests + jsi.Meta.PendingInfos
32443254
// Add meta snapshot stats
32453255
jsi.Meta.Snapshot = &MetaSnapshotStats{
32463256
PendingEntries: entries,

server/opts.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ type Options struct {
387387
JetStreamTpm JSTpmOpts
388388
JetStreamMaxCatchup int64
389389
JetStreamRequestQueueLimit int64
390+
JetStreamInfoQueueLimit int64
390391
JetStreamMetaCompact uint64
391392
JetStreamMetaCompactSize uint64
392393
JetStreamMetaCompactSync bool
@@ -2641,6 +2642,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
26412642
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
26422643
}
26432644
opts.JetStreamRequestQueueLimit = lim
2645+
case "info_queue_limit":
2646+
lim, ok := mv.(int64)
2647+
if !ok {
2648+
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
2649+
}
2650+
opts.JetStreamInfoQueueLimit = lim
26442651
case "meta_compact":
26452652
thres, ok := mv.(int64)
26462653
if !ok || thres < 0 {
@@ -6006,6 +6013,9 @@ func setBaselineOptions(opts *Options) {
60066013
if opts.JetStreamRequestQueueLimit <= 0 {
60076014
opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit
60086015
}
6016+
if opts.JetStreamInfoQueueLimit <= 0 {
6017+
opts.JetStreamInfoQueueLimit = opts.JetStreamRequestQueueLimit
6018+
}
60096019
}
60106020

60116021
func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 {

server/opts_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func TestDefaultOptions(t *testing.T) {
7676
JetStreamMaxStore: -1,
7777
SyncInterval: 2 * time.Minute,
7878
JetStreamRequestQueueLimit: JSDefaultRequestQueueLimit,
79+
JetStreamInfoQueueLimit: JSDefaultRequestQueueLimit,
7980
}
8081

8182
opts := &Options{}

server/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ type Server struct {
367367
syncOutSem chan struct{}
368368

369369
// Queue to process JS API requests that come from routes (or gateways)
370-
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
370+
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
371+
jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq]
371372

372373
// Delayed API responses.
373374
delayedAPIResponses *ipQueue[*delayedAPIResponse]

0 commit comments

Comments
 (0)