Skip to content

Commit f7816e7

Browse files
(2.14) Support durable server-managed stream sourcing/mirroring alternative
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent c06db5e commit f7816e7

File tree

6 files changed

+656
-221
lines changed

6 files changed

+656
-221
lines changed

server/errors.json

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2080,19 +2080,49 @@
20802080
"deprecates": ""
20812081
},
20822082
{
2083-
"constant": "JSMirrorConsumerRequiresAckFCErr",
2083+
"constant": "JSMirrorDurableConsumerCfgInvalid",
20842084
"code": 400,
20852085
"error_code": 10210,
2086+
"description": "stream mirror consumer config is invalid",
2087+
"comment": "",
2088+
"help": "",
2089+
"url": "",
2090+
"deprecates": ""
2091+
},
2092+
{
2093+
"constant": "JSMirrorConsumerRequiresAckFCErr",
2094+
"code": 400,
2095+
"error_code": 10211,
20862096
"description": "stream mirror consumer requires flow control ack policy",
20872097
"comment": "",
20882098
"help": "",
20892099
"url": "",
20902100
"deprecates": ""
20912101
},
2102+
{
2103+
"constant": "JSSourceDurableConsumerCfgInvalid",
2104+
"code": 400,
2105+
"error_code": 10212,
2106+
"description": "stream source consumer config is invalid",
2107+
"comment": "",
2108+
"help": "",
2109+
"url": "",
2110+
"deprecates": ""
2111+
},
2112+
{
2113+
"constant": "JSSourceDurableConsumerDuplicateDetected",
2114+
"code": 400,
2115+
"error_code": 10213,
2116+
"description": "duplicate stream source consumer detected",
2117+
"comment": "",
2118+
"help": "",
2119+
"url": "",
2120+
"deprecates": ""
2121+
},
20922122
{
20932123
"constant": "JSSourceConsumerRequiresAckFCErr",
20942124
"code": 400,
2095-
"error_code": 10211,
2125+
"error_code": 10214,
20962126
"description": "stream source consumer requires flow control ack policy",
20972127
"comment": "",
20982128
"help": "",
@@ -2102,7 +2132,7 @@
21022132
{
21032133
"constant": "JSConsumerAckFCRequiresPushErr",
21042134
"code": 400,
2105-
"error_code": 10212,
2135+
"error_code": 10215,
21062136
"description": "flow control ack policy requires a push based consumer",
21072137
"comment": "",
21082138
"help": "",
@@ -2112,7 +2142,7 @@
21122142
{
21132143
"constant": "JSConsumerAckFCRequiresFCErr",
21142144
"code": 400,
2115-
"error_code": 10213,
2145+
"error_code": 10216,
21162146
"description": "flow control ack policy requires flow control",
21172147
"comment": "",
21182148
"help": "",
@@ -2122,7 +2152,7 @@
21222152
{
21232153
"constant": "JSConsumerAckFCRequiresMaxAckPendingErr",
21242154
"code": 400,
2125-
"error_code": 10214,
2155+
"error_code": 10217,
21262156
"description": "flow control ack policy requires max ack pending",
21272157
"comment": "",
21282158
"help": "",
@@ -2132,7 +2162,7 @@
21322162
{
21332163
"constant": "JSConsumerAckFCRequiresNoAckWaitErr",
21342164
"code": 400,
2135-
"error_code": 10215,
2165+
"error_code": 10218,
21362166
"description": "flow control ack policy requires unset ack wait",
21372167
"comment": "",
21382168
"help": "",
@@ -2142,7 +2172,7 @@
21422172
{
21432173
"constant": "JSConsumerAckFCRequiresNoMaxDeliverErr",
21442174
"code": 400,
2145-
"error_code": 10216,
2175+
"error_code": 10219,
21462176
"description": "flow control ack policy requires unset max deliver",
21472177
"comment": "",
21482178
"help": "",

server/filestore.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12429,11 +12429,8 @@ func (o *consumerFileStore) SetStarting(sseq uint64) error {
1242912429
o.mu.Lock()
1243012430
o.state.Delivered.Stream = sseq
1243112431
o.state.AckFloor.Stream = sseq
12432-
buf, err := o.encodeState()
12432+
buf := encodeConsumerState(&o.state)
1243312433
o.mu.Unlock()
12434-
if err != nil {
12435-
return err
12436-
}
1243712434
return o.writeState(buf)
1243812435
}
1243912436

server/jetstream_cluster_3_test.go

Lines changed: 156 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8329,9 +8329,11 @@ func TestJetStreamClusterDurableStreamMirror(t *testing.T) {
83298329
_, err = jsStreamCreate(t, nc, &StreamConfig{
83308330
Name: "M",
83318331
Mirror: &StreamSource{
8332-
Name: "O",
8333-
ConsumerName: "C",
8334-
ConsumerDeliverSubject: "deliver-subject",
8332+
Name: "O",
8333+
Consumer: &StreamConsumerSource{
8334+
Name: "C",
8335+
DeliverSubject: "deliver-subject",
8336+
},
83358337
},
83368338
Storage: FileStorage,
83378339
Replicas: replicas,
@@ -8363,6 +8365,79 @@ func TestJetStreamClusterDurableStreamMirror(t *testing.T) {
83638365
}
83648366
}
83658367

8368+
func TestJetStreamClusterDurableStreamMirrorServerManaged(t *testing.T) {
8369+
test := func(t *testing.T, replicas int, retention RetentionPolicy) {
8370+
var s *Server
8371+
if replicas == 1 {
8372+
s = RunBasicJetStreamServer(t)
8373+
defer s.Shutdown()
8374+
} else {
8375+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8376+
defer c.shutdown()
8377+
s = c.randomServer()
8378+
}
8379+
8380+
nc, js := jsClientConnect(t, s)
8381+
defer nc.Close()
8382+
8383+
_, err := jsStreamCreate(t, nc, &StreamConfig{
8384+
Name: "O",
8385+
Subjects: []string{"foo"},
8386+
Storage: FileStorage,
8387+
Replicas: replicas,
8388+
Retention: retention,
8389+
})
8390+
require_NoError(t, err)
8391+
8392+
pubAck, err := js.Publish("foo", nil)
8393+
require_NoError(t, err)
8394+
require_Equal(t, pubAck.Sequence, 1)
8395+
8396+
cfg := &StreamConfig{
8397+
Name: "M",
8398+
Mirror: &StreamSource{
8399+
Name: "O",
8400+
Consumer: &StreamConsumerSource{
8401+
Name: "C",
8402+
ServerManaged: true,
8403+
DeliverSubject: "deliver-subject",
8404+
},
8405+
},
8406+
Storage: FileStorage,
8407+
Replicas: replicas,
8408+
}
8409+
_, err = jsStreamCreate(t, nc, cfg)
8410+
require_Error(t, err, NewJSMirrorDurableConsumerCfgInvalidError())
8411+
8412+
cfg.Mirror.Consumer.DeliverSubject = _EMPTY_
8413+
_, err = jsStreamCreate(t, nc, cfg)
8414+
require_NoError(t, err)
8415+
8416+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
8417+
si, err := js.StreamInfo("M")
8418+
if err != nil {
8419+
return err
8420+
}
8421+
if si.Mirror == nil {
8422+
return errors.New("no mirror")
8423+
}
8424+
if si.Mirror.Error != nil {
8425+
return si.Mirror.Error
8426+
}
8427+
_, err = js.GetMsg("M", 1)
8428+
return err
8429+
})
8430+
}
8431+
8432+
for _, replicas := range []int{1, 3} {
8433+
for _, retention := range []RetentionPolicy{LimitsPolicy, WorkQueuePolicy} {
8434+
t.Run(fmt.Sprintf("R%d/%s", replicas, retention), func(t *testing.T) {
8435+
test(t, replicas, retention)
8436+
})
8437+
}
8438+
}
8439+
}
8440+
83668441
func TestJetStreamClusterDurableStreamSource(t *testing.T) {
83678442
test := func(t *testing.T, replicas int, retention RetentionPolicy) {
83688443
var s *Server
@@ -8403,9 +8478,11 @@ func TestJetStreamClusterDurableStreamSource(t *testing.T) {
84038478
_, err = jsStreamCreate(t, nc, &StreamConfig{
84048479
Name: "S",
84058480
Sources: []*StreamSource{{
8406-
Name: "O",
8407-
ConsumerName: "C",
8408-
ConsumerDeliverSubject: "deliver-subject",
8481+
Name: "O",
8482+
Consumer: &StreamConsumerSource{
8483+
Name: "C",
8484+
DeliverSubject: "deliver-subject",
8485+
},
84098486
}},
84108487
Storage: FileStorage,
84118488
Replicas: replicas,
@@ -8436,3 +8513,76 @@ func TestJetStreamClusterDurableStreamSource(t *testing.T) {
84368513
}
84378514
}
84388515
}
8516+
8517+
func TestJetStreamClusterDurableStreamSourceServerManaged(t *testing.T) {
8518+
test := func(t *testing.T, replicas int, retention RetentionPolicy) {
8519+
var s *Server
8520+
if replicas == 1 {
8521+
s = RunBasicJetStreamServer(t)
8522+
defer s.Shutdown()
8523+
} else {
8524+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8525+
defer c.shutdown()
8526+
s = c.randomServer()
8527+
}
8528+
8529+
nc, js := jsClientConnect(t, s)
8530+
defer nc.Close()
8531+
8532+
_, err := jsStreamCreate(t, nc, &StreamConfig{
8533+
Name: "O",
8534+
Subjects: []string{"foo"},
8535+
Storage: FileStorage,
8536+
Replicas: replicas,
8537+
Retention: retention,
8538+
})
8539+
require_NoError(t, err)
8540+
8541+
pubAck, err := js.Publish("foo", nil)
8542+
require_NoError(t, err)
8543+
require_Equal(t, pubAck.Sequence, 1)
8544+
8545+
cfg := &StreamConfig{
8546+
Name: "S",
8547+
Sources: []*StreamSource{{
8548+
Name: "O",
8549+
Consumer: &StreamConsumerSource{
8550+
Name: "C",
8551+
ServerManaged: true,
8552+
DeliverSubject: "deliver-subject",
8553+
},
8554+
}},
8555+
Storage: FileStorage,
8556+
Replicas: replicas,
8557+
}
8558+
_, err = jsStreamCreate(t, nc, cfg)
8559+
require_Error(t, err, NewJSSourceDurableConsumerCfgInvalidError())
8560+
8561+
cfg.Sources[0].Consumer.DeliverSubject = _EMPTY_
8562+
_, err = jsStreamCreate(t, nc, cfg)
8563+
require_NoError(t, err)
8564+
8565+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
8566+
si, err := js.StreamInfo("S")
8567+
if err != nil {
8568+
return err
8569+
}
8570+
if len(si.Sources) != 1 {
8571+
return errors.New("no source")
8572+
}
8573+
if si.Sources[0].Error != nil {
8574+
return si.Sources[0].Error
8575+
}
8576+
_, err = js.GetMsg("S", 1)
8577+
return err
8578+
})
8579+
}
8580+
8581+
for _, replicas := range []int{1, 3} {
8582+
for _, retention := range []RetentionPolicy{LimitsPolicy, WorkQueuePolicy} {
8583+
t.Run(fmt.Sprintf("R%d/%s", replicas, retention), func(t *testing.T) {
8584+
test(t, replicas, retention)
8585+
})
8586+
}
8587+
}
8588+
}

0 commit comments

Comments
 (0)