Skip to content

Commit 3d9d025

Browse files
authored
patch event stream CRD instead of update (#2535)
1 parent 569fc57 commit 3d9d025

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

pkg/cluster/streams.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cluster
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"reflect"
78
"sort"
@@ -13,6 +14,7 @@ import (
1314
"github.com/zalando/postgres-operator/pkg/util/constants"
1415
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/types"
1618
)
1719

1820
func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) {
@@ -29,8 +31,12 @@ func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, err
2931

3032
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error {
3133
c.setProcessName("updating event streams")
32-
33-
if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil {
34+
patch, err := json.Marshal(newEventStreams)
35+
if err != nil {
36+
return fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err)
37+
}
38+
if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch(
39+
context.TODO(), newEventStreams.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
3440
return err
3541
}
3642

pkg/cluster/streams_test.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,21 @@ func TestSameStreams(t *testing.T) {
294294
},
295295
}
296296

297+
stream3 := zalandov1.EventStream{
298+
EventStreamFlow: zalandov1.EventStreamFlow{},
299+
EventStreamRecovery: zalandov1.EventStreamRecovery{
300+
Type: constants.EventStreamRecoveryNoneType,
301+
},
302+
EventStreamSink: zalandov1.EventStreamSink{
303+
EventType: "stream-type-b",
304+
},
305+
EventStreamSource: zalandov1.EventStreamSource{
306+
EventStreamTable: zalandov1.EventStreamTable{
307+
Name: "bar",
308+
},
309+
},
310+
}
311+
297312
tests := []struct {
298313
subTest string
299314
streamsA []zalandov1.EventStream
@@ -336,6 +351,13 @@ func TestSameStreams(t *testing.T) {
336351
match: false,
337352
reason: "number of defined streams is different",
338353
},
354+
{
355+
subTest: "event stream recovery specs differ",
356+
streamsA: []zalandov1.EventStream{stream2},
357+
streamsB: []zalandov1.EventStream{stream3},
358+
match: false,
359+
reason: "event stream specs differ",
360+
},
339361
}
340362

341363
for _, tt := range tests {
@@ -409,6 +431,28 @@ func TestUpdateFabricEventStream(t *testing.T) {
409431

410432
result := cluster.generateFabricEventStream(appId)
411433
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
412-
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streams.Items[0], result)
434+
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
435+
}
436+
437+
// disable recovery
438+
for _, stream := range pg.Spec.Streams {
439+
if stream.ApplicationId == appId {
440+
stream.EnableRecovery = util.False()
441+
}
442+
}
443+
patchData, err = specPatch(pg.Spec)
444+
assert.NoError(t, err)
445+
446+
pgPatched, err = cluster.KubeClient.Postgresqls(namespace).Patch(
447+
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
448+
assert.NoError(t, err)
449+
450+
cluster.Postgresql.Spec = pgPatched.Spec
451+
err = cluster.createOrUpdateStreams()
452+
assert.NoError(t, err)
453+
454+
result = cluster.generateFabricEventStream(appId)
455+
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
456+
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
413457
}
414458
}

0 commit comments

Comments
 (0)