Skip to content

Commit f4506a0

Browse files
Refactor some JetStream helper code, add support for specifying JetStream domain (#3485)
This should gracefully handle some more potential errors that the consumer fetches can return with retries, as well as setting some client settings for reconnects etc when using an external NATS Server. Also allow specifying the JetStream domain in case of a leafnode scenario and better manage client reuse across Dendrite. And also update NATS Server to 2.10.24 for good measure. This code is backported from Harmony. Signed-off-by: Neil Alexander <[email protected]> --------- Signed-off-by: Neil Alexander <[email protected]> Co-authored-by: Neil Alexander <[email protected]> Co-authored-by: Till <[email protected]>
1 parent 9de3e84 commit f4506a0

File tree

5 files changed

+133
-110
lines changed

5 files changed

+133
-110
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ require (
2929
github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7
3030
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66
3131
github.com/mattn/go-sqlite3 v1.14.24
32-
github.com/nats-io/nats-server/v2 v2.10.23
32+
github.com/nats-io/nats-server/v2 v2.10.24
3333
github.com/nats-io/nats.go v1.38.0
3434
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
3535
github.com/opentracing/opentracing-go v1.2.0
@@ -115,7 +115,7 @@ require (
115115
github.com/morikuni/aec v1.0.0 // indirect
116116
github.com/mschoch/smat v0.2.0 // indirect
117117
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
118-
github.com/nats-io/jwt/v2 v2.5.8 // indirect
118+
github.com/nats-io/jwt/v2 v2.7.3 // indirect
119119
github.com/nats-io/nkeys v0.4.9 // indirect
120120
github.com/nats-io/nuid v1.0.1 // indirect
121121
github.com/ncruces/go-strftime v0.1.9 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,10 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
268268
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
269269
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
270270
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
271-
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
272-
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
273-
github.com/nats-io/nats-server/v2 v2.10.23 h1:jvfb9cEi5h8UG6HkZgJGdn9f1UPaX3Dohk0PohEekJI=
274-
github.com/nats-io/nats-server/v2 v2.10.23/go.mod h1:hMFnpDT2XUXsvHglABlFl/uroQCCOcW6X/0esW6GpBk=
271+
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
272+
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
273+
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
274+
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
275275
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
276276
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
277277
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=

setup/config/config_jetstream.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type JetStream struct {
1515
// The prefix to use for stream names for this homeserver - really only
1616
// useful if running more than one Dendrite on the same NATS deployment.
1717
TopicPrefix string `yaml:"topic_prefix"`
18+
// The JetStream domain, if needed.
19+
JetStreamDomain string `yaml:"js_domain"`
1820
// Keep all storage in memory. This is mostly useful for unit tests.
1921
InMemory bool `yaml:"in_memory"`
2022
// Disable logging. This is mostly useful for unit tests.

setup/jetstream/helpers.go

Lines changed: 74 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func JetStreamConsumer(
2222
f func(ctx context.Context, msgs []*nats.Msg) bool,
2323
opts ...nats.SubOpt,
2424
) error {
25-
defer func() {
25+
defer func(durable string) {
2626
// If there are existing consumers from before they were pull
2727
// consumers, we need to clean up the old push consumers. However,
2828
// in order to not affect the interest-based policies, we need to
@@ -33,86 +33,93 @@ func JetStreamConsumer(
3333
logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable)
3434
}
3535
}
36-
}()
36+
}(durable)
3737

38-
name := durable + "Pull"
39-
sub, err := js.PullSubscribe(subj, name, opts...)
38+
durable = durable + "Pull"
39+
sub, err := js.PullSubscribe(subj, durable, opts...)
4040
if err != nil {
4141
sentry.CaptureException(err)
42-
return fmt.Errorf("nats.SubscribeSync: %w", err)
42+
logrus.WithContext(ctx).WithError(err).Warnf("Failed to configure durable %q", durable)
43+
return err
4344
}
44-
go func() {
45-
for {
46-
// If the parent context has given up then there's no point in
47-
// carrying on doing anything, so stop the listener.
48-
select {
49-
case <-ctx.Done():
50-
if err := sub.Unsubscribe(); err != nil {
51-
logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable)
52-
}
53-
return
54-
default:
55-
}
56-
// The context behaviour here is surprising — we supply a context
57-
// so that we can interrupt the fetch if we want, but NATS will still
58-
// enforce its own deadline (roughly 5 seconds by default). Therefore
59-
// it is our responsibility to check whether our context expired or
60-
// not when a context error is returned. Footguns. Footguns everywhere.
61-
msgs, err := sub.Fetch(batch, nats.Context(ctx))
62-
if err != nil {
63-
if err == context.Canceled || err == context.DeadlineExceeded {
64-
// Work out whether it was the JetStream context that expired
65-
// or whether it was our supplied context.
66-
select {
67-
case <-ctx.Done():
68-
// The supplied context expired, so we want to stop the
69-
// consumer altogether.
70-
return
71-
default:
72-
// The JetStream context expired, so the fetch probably
73-
// just timed out and we should try again.
74-
continue
75-
}
76-
} else if errors.Is(err, nats.ErrConsumerDeleted) {
77-
// The consumer was deleted so stop.
45+
go jetStreamConsumerWorker(ctx, sub, subj, batch, f)
46+
return nil
47+
}
48+
49+
func jetStreamConsumerWorker(
50+
ctx context.Context, sub *nats.Subscription, subj string, batch int,
51+
f func(ctx context.Context, msgs []*nats.Msg) bool,
52+
) {
53+
for {
54+
// If the parent context has given up then there's no point in
55+
// carrying on doing anything, so stop the listener.
56+
select {
57+
case <-ctx.Done():
58+
return
59+
default:
60+
}
61+
// The context behaviour here is surprising — we supply a context
62+
// so that we can interrupt the fetch if we want, but NATS will still
63+
// enforce its own deadline (roughly 5 seconds by default). Therefore
64+
// it is our responsibility to check whether our context expired or
65+
// not when a context error is returned. Footguns. Footguns everywhere.
66+
msgs, err := sub.Fetch(batch, nats.Context(ctx))
67+
if err != nil {
68+
if err == context.Canceled || err == context.DeadlineExceeded {
69+
// Work out whether it was the JetStream context that expired
70+
// or whether it was our supplied context.
71+
select {
72+
case <-ctx.Done():
73+
// The supplied context expired, so we want to stop the
74+
// consumer altogether.
7875
return
79-
} else {
80-
// Unfortunately, there's no ErrServerShutdown or similar, so we need to compare the string
81-
if err.Error() == "nats: Server Shutdown" {
82-
logrus.WithContext(ctx).Warn("nats server shutting down")
83-
return
84-
}
85-
// Something else went wrong, so we'll panic.
86-
sentry.CaptureException(err)
87-
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
76+
default:
77+
// The JetStream context expired, so the fetch probably
78+
// just timed out and we should try again.
79+
continue
8880
}
81+
} else if errors.Is(err, nats.ErrTimeout) {
82+
// Pull request was invalidated, try again.
83+
continue
84+
} else if errors.Is(err, nats.ErrConsumerLeadershipChanged) {
85+
// Leadership changed so pending pull requests became invalidated,
86+
// just try again.
87+
continue
88+
} else if err.Error() == "nats: Server Shutdown" {
89+
// The server is shutting down, but we'll rely on reconnect
90+
// behaviour to try and either connect us to another node (if
91+
// clustered) or to reconnect when the server comes back up.
92+
continue
93+
} else {
94+
// Something else went wrong.
95+
logrus.WithContext(ctx).WithField("subject", subj).WithError(err).Warn("Error on pull subscriber fetch")
96+
return
8997
}
90-
if len(msgs) < 1 {
98+
}
99+
if len(msgs) < 1 {
100+
continue
101+
}
102+
for _, msg := range msgs {
103+
if err = msg.InProgress(nats.Context(ctx)); err != nil {
104+
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
105+
sentry.CaptureException(err)
91106
continue
92107
}
108+
}
109+
if f(ctx, msgs) {
93110
for _, msg := range msgs {
94-
if err = msg.InProgress(nats.Context(ctx)); err != nil {
95-
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
111+
if err = msg.AckSync(nats.Context(ctx)); err != nil {
112+
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
96113
sentry.CaptureException(err)
97-
continue
98114
}
99115
}
100-
if f(ctx, msgs) {
101-
for _, msg := range msgs {
102-
if err = msg.AckSync(nats.Context(ctx)); err != nil {
103-
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
104-
sentry.CaptureException(err)
105-
}
106-
}
107-
} else {
108-
for _, msg := range msgs {
109-
if err = msg.Nak(nats.Context(ctx)); err != nil {
110-
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
111-
sentry.CaptureException(err)
112-
}
116+
} else {
117+
for _, msg := range msgs {
118+
if err = msg.Nak(nats.Context(ctx)); err != nil {
119+
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
120+
sentry.CaptureException(err)
113121
}
114122
}
115123
}
116-
}()
117-
return nil
124+
}
118125
}

setup/jetstream/nats.go

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"sync"
99
"time"
1010

11-
"github.com/getsentry/sentry-go"
1211
"github.com/sirupsen/logrus"
1312

1413
"github.com/element-hq/dendrite/setup/config"
@@ -36,17 +35,20 @@ func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
3635
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
3736
natsLock.Lock()
3837
defer natsLock.Unlock()
39-
// check if we need an in-process NATS Server
40-
if len(cfg.Addresses) != 0 {
41-
// reuse existing connections
42-
if s.nc != nil {
43-
return s.js, s.nc
44-
}
38+
var err error
39+
40+
// If an existing connection exists, return it.
41+
if s.nc != nil && s.js != nil {
42+
return s.js, s.nc
43+
}
44+
45+
// For connecting to an external NATS server.
46+
if len(cfg.Addresses) > 0 {
4547
s.js, s.nc = setupNATS(process, cfg, nil)
4648
return s.js, s.nc
4749
}
48-
if s.Server == nil {
49-
var err error
50+
51+
if len(cfg.Addresses) == 0 && s.Server == nil {
5052
opts := &natsserver.Options{
5153
ServerName: "monolith",
5254
DontListen: true,
@@ -58,8 +60,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
5860
NoLog: cfg.NoLog,
5961
SyncAlways: true,
6062
}
61-
s.Server, err = natsserver.NewServer(opts)
62-
if err != nil {
63+
if s.Server, err = natsserver.NewServer(opts); err != nil {
6364
panic(err)
6465
}
6566
if !cfg.NoLog {
@@ -75,29 +76,42 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
7576
s.WaitForShutdown()
7677
process.ComponentFinished()
7778
}()
79+
if !s.ReadyForConnections(time.Second * 60) {
80+
logrus.Fatalln("NATS did not start in time")
81+
}
7882
}
79-
if !s.ReadyForConnections(time.Second * 60) {
80-
logrus.Fatalln("NATS did not start in time")
81-
}
82-
// reuse existing connections
83-
if s.nc != nil {
84-
return s.js, s.nc
85-
}
86-
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
87-
if err != nil {
83+
84+
// No existing process connection, create a new one.
85+
if s.nc, err = natsclient.Connect("", natsclient.InProcessServer(s.Server)); err != nil {
8886
logrus.Fatalln("Failed to create NATS client")
8987
}
90-
js, _ := setupNATS(process, cfg, nc)
91-
s.js = js
92-
s.nc = nc
93-
return js, nc
88+
s.js, s.nc = setupNATS(process, cfg, s.nc)
89+
return s.js, s.nc
9490
}
9591

9692
// nolint:gocyclo
9793
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
94+
jsOpts := []natsclient.JSOpt{}
95+
if cfg.JetStreamDomain != "" {
96+
jsOpts = append(jsOpts, natsclient.Domain(cfg.JetStreamDomain))
97+
}
98+
9899
if nc == nil {
99100
var err error
100-
opts := []natsclient.Option{}
101+
opts := []natsclient.Option{
102+
natsclient.Name("Dendrite"),
103+
natsclient.MaxReconnects(-1), // Try forever
104+
natsclient.ReconnectJitter(time.Second, time.Second),
105+
natsclient.ReconnectWait(time.Second * 10),
106+
natsclient.ReconnectHandler(func(c *natsclient.Conn) {
107+
js, jerr := c.JetStream(jsOpts...)
108+
if jerr != nil {
109+
logrus.WithError(jerr).Panic("Unable to get JetStream context in reconnect handler")
110+
return
111+
}
112+
checkAndConfigureStreams(process, cfg, js)
113+
}),
114+
}
101115
if cfg.DisableTLSValidation {
102116
opts = append(opts, natsclient.Secure(&tls.Config{
103117
InsecureSkipVerify: true,
@@ -113,15 +127,19 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
113127
}
114128
}
115129

116-
s, err := nc.JetStream()
130+
js, err := nc.JetStream(jsOpts...)
117131
if err != nil {
118132
logrus.WithError(err).Panic("Unable to get JetStream context")
119133
return nil, nil
120134
}
135+
checkAndConfigureStreams(process, cfg, js)
136+
return js, nc
137+
}
121138

139+
func checkAndConfigureStreams(process *process.ProcessContext, cfg *config.JetStream, js natsclient.JetStreamContext) {
122140
for _, stream := range streams { // streams are defined in streams.go
123141
name := cfg.Prefixed(stream.Name)
124-
info, err := s.StreamInfo(name)
142+
info, err := js.StreamInfo(name)
125143
if err != nil && err != natsclient.ErrStreamNotFound {
126144
logrus.WithError(err).Fatal("Unable to get stream info")
127145
}
@@ -153,11 +171,11 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
153171
case info.Config.MaxAge != stream.MaxAge:
154172
// Try updating the stream first, as many things can be updated
155173
// non-destructively.
156-
if info, err = s.UpdateStream(stream); err != nil {
174+
if info, err = js.UpdateStream(stream); err != nil {
157175
logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name)
158176
// We failed to update the stream, this is a last attempt to get
159177
// things working but may result in data loss.
160-
if err = s.DeleteStream(name); err != nil {
178+
if err = js.DeleteStream(name); err != nil {
161179
logrus.WithError(err).Fatalf("Unable to delete stream %q", name)
162180
}
163181
info = nil
@@ -176,7 +194,7 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
176194
namespaced := *stream
177195
namespaced.Name = name
178196
namespaced.Subjects = subjects
179-
if _, err = s.AddStream(&namespaced); err != nil {
197+
if _, err = js.AddStream(&namespaced); err != nil {
180198
logger := logrus.WithError(err).WithFields(logrus.Fields{
181199
"stream": namespaced.Name,
182200
"subjects": namespaced.Subjects,
@@ -193,10 +211,9 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
193211
// we can't recover anything that was queued on the disk but we
194212
// will still be able to start and run hopefully in the meantime.
195213
logger.WithError(err).Error("Unable to add stream")
196-
sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
197214

198215
namespaced.Storage = natsclient.MemoryStorage
199-
if _, err = s.AddStream(&namespaced); err != nil {
216+
if _, err = js.AddStream(&namespaced); err != nil {
200217
// We tried to add the stream in-memory instead but something
201218
// went wrong. That's an unrecoverable situation so we will
202219
// give up at this point.
@@ -208,7 +225,6 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
208225
// disk will be left alone, but our ability to recover from a
209226
// future crash will be limited. Yell about it.
210227
err := fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory", namespaced.Name)
211-
sentry.CaptureException(err)
212228
process.Degraded(err)
213229
}
214230
}
@@ -229,15 +245,13 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
229245
streamName := cfg.Matrix.JetStream.Prefixed(stream)
230246
for _, consumer := range consumers {
231247
consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull"
232-
consumerInfo, err := s.ConsumerInfo(streamName, consumerName)
248+
consumerInfo, err := js.ConsumerInfo(streamName, consumerName)
233249
if err != nil || consumerInfo == nil {
234250
continue
235251
}
236-
if err = s.DeleteConsumer(streamName, consumerName); err != nil {
252+
if err = js.DeleteConsumer(streamName, consumerName); err != nil {
237253
logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream)
238254
}
239255
}
240256
}
241-
242-
return s, nc
243257
}

0 commit comments

Comments
 (0)