Skip to content

Commit 80ef38f

Browse files
FxKuidanovinda
andauthored
add resource annotation and ignore recovery type (#2817)
* add resource annotation and ignore recovery type * Update docs/reference/cluster_manifest.md --------- Co-authored-by: Ida Novindasari <[email protected]>
1 parent 301462c commit 80ef38f

File tree

8 files changed

+150
-24
lines changed

8 files changed

+150
-24
lines changed

charts/postgres-operator/crds/postgresqls.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,9 @@ spec:
514514
type: string
515515
batchSize:
516516
type: integer
517+
cpu:
518+
type: string
519+
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
517520
database:
518521
type: string
519522
enableRecovery:
@@ -522,6 +525,9 @@ spec:
522525
type: object
523526
additionalProperties:
524527
type: string
528+
memory:
529+
type: string
530+
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
525531
tables:
526532
type: object
527533
additionalProperties:
@@ -533,6 +539,8 @@ spec:
533539
type: string
534540
idColumn:
535541
type: string
542+
ignoreRecovery:
543+
type: boolean
536544
payloadColumn:
537545
type: string
538546
recoveryEventType:

docs/reference/cluster_manifest.md

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -652,11 +652,11 @@ can have the following properties:
652652

653653
* **applicationId**
654654
The application name to which the database and CDC belongs to. For each
655-
set of streams with a distinct `applicationId` a separate stream CR as well
656-
as a separate logical replication slot will be created. This means there can
657-
be different streams in the same database and streams with the same
658-
`applicationId` are bundled in one stream CR. The stream CR will be called
659-
like the Postgres cluster plus "-<applicationId>" suffix. Required.
655+
set of streams with a distinct `applicationId` a separate stream resource as
656+
well as a separate logical replication slot will be created. This means there
657+
can be different streams in the same database and streams with the same
658+
`applicationId` are bundled in one stream resource. The stream resource will
659+
be called like the Postgres cluster plus "-<applicationId>" suffix. Required.
660660

661661
* **database**
662662
Name of the database from where events will be published via Postgres'
@@ -667,7 +667,8 @@ can have the following properties:
667667

668668
* **tables**
669669
Defines a map of table names and their properties (`eventType`, `idColumn`
670-
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
670+
and `payloadColumn`). Required.
671+
The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
671672
The application is responsible for putting events into a (JSON/B or VARCHAR)
672673
payload column of the outbox table in the structure of the specified target
673674
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html)
@@ -676,12 +677,27 @@ can have the following properties:
676677
committed to the outbox table. The `idColumn` will be used in telemetry for
677678
the CDC operator. The names for `idColumn` and `payloadColumn` can be
678679
configured. Defaults are `id` and `payload`. The target `eventType` has to
679-
be defined. Required.
680+
be defined. One can also specify a `recoveryEventType` that will be used
681+
for a dead letter queue. By enabling `ignoreRecovery`, you can choose to
682+
ignore failing events.
680683

681684
* **filter**
682685
Streamed events can be filtered by a jsonpath expression for each table.
683686
Optional.
684687

688+
* **enableRecovery**
689+
Flag to enable a dead letter queue recovery for all streams tables.
690+
Alternatively, recovery can also be enable for single outbox tables by only
691+
specifying a `recoveryEventType` and no `enableRecovery` flag. When set to
692+
false or missing, events will be retried until consuming succeeded. You can
693+
use a `filter` expression to get rid of poison pills. Optional.
694+
685695
* **batchSize**
686696
Defines the size of batches in which events are consumed. Optional.
687697
Defaults to 1.
698+
699+
* **cpu**
700+
CPU requests to be set as an annotation on the stream resource. Optional.
701+
702+
* **memory**
703+
memory requests to be set as an annotation on the stream resource. Optional.

manifests/postgresql.crd.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,9 @@ spec:
512512
type: string
513513
batchSize:
514514
type: integer
515+
cpu:
516+
type: string
517+
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
515518
database:
516519
type: string
517520
enableRecovery:
@@ -520,6 +523,9 @@ spec:
520523
type: object
521524
additionalProperties:
522525
type: string
526+
memory:
527+
type: string
528+
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
523529
tables:
524530
type: object
525531
additionalProperties:
@@ -531,6 +537,8 @@ spec:
531537
type: string
532538
idColumn:
533539
type: string
540+
ignoreRecovery:
541+
type: boolean
534542
payloadColumn:
535543
type: string
536544
recoveryEventType:

pkg/apis/acid.zalan.do/v1/postgresql_type.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,16 @@ type Stream struct {
258258
Tables map[string]StreamTable `json:"tables"`
259259
Filter map[string]*string `json:"filter,omitempty"`
260260
BatchSize *uint32 `json:"batchSize,omitempty"`
261+
CPU *string `json:"cpu,omitempty"`
262+
Memory *string `json:"memory,omitempty"`
261263
EnableRecovery *bool `json:"enableRecovery,omitempty"`
262264
}
263265

264266
// StreamTable defines properties of outbox tables for FabricEventStreams
265267
type StreamTable struct {
266268
EventType string `json:"eventType"`
267269
RecoveryEventType string `json:"recoveryEventType,omitempty"`
270+
IgnoreRecovery *bool `json:"ignoreRecovery,omitempty"`
268271
IdColumn *string `json:"idColumn,omitempty"`
269272
PayloadColumn *string `json:"payloadColumn,omitempty"`
270273
}

pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cluster/streams.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,35 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
178178

179179
func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
180180
eventStreams := make([]zalandov1.EventStream, 0)
181+
resourceAnnotations := map[string]string{}
181182

182183
for _, stream := range c.Spec.Streams {
183184
if stream.ApplicationId != appId {
184185
continue
185186
}
187+
if stream.CPU != nil {
188+
cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey]
189+
if exists {
190+
isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU)
191+
if isSmaller {
192+
resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU
193+
}
194+
}
195+
}
196+
if stream.Memory != nil {
197+
memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey]
198+
if exists {
199+
isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory)
200+
if isSmaller {
201+
resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory
202+
}
203+
}
204+
}
186205
for tableName, table := range stream.Tables {
187206
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
188207
streamFlow := getEventStreamFlow(table.PayloadColumn)
189208
streamSink := getEventStreamSink(stream, table.EventType)
190-
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
209+
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)
191210

192211
eventStreams = append(eventStreams, zalandov1.EventStream{
193212
EventStreamFlow: streamFlow,
@@ -207,7 +226,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
207226
Name: fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
208227
Namespace: c.Namespace,
209228
Labels: c.labelsSet(true),
210-
Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)),
229+
Annotations: c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
211230
OwnerReferences: c.ownerReferences(),
212231
},
213232
Spec: zalandov1.FabricEventStreamSpec{
@@ -247,14 +266,20 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
247266
}
248267
}
249268

250-
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
269+
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
251270
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
252271
(stream.EnableRecovery == nil && recoveryEventType == "") {
253272
return zalandov1.EventStreamRecovery{
254273
Type: constants.EventStreamRecoveryNoneType,
255274
}
256275
}
257276

277+
if ignoreRecovery != nil && *ignoreRecovery {
278+
return zalandov1.EventStreamRecovery{
279+
Type: constants.EventStreamRecoveryIgnoreType,
280+
}
281+
}
282+
258283
if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
259284
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
260285
}

pkg/cluster/streams_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,18 @@ var (
6565
EventType: "stream-type-b",
6666
RecoveryEventType: "stream-type-b-dlq",
6767
},
68+
"data.foofoobar": {
69+
EventType: "stream-type-c",
70+
IgnoreRecovery: util.True(),
71+
},
6872
},
6973
EnableRecovery: util.True(),
7074
Filter: map[string]*string{
7175
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
7276
},
7377
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
78+
CPU: k8sutil.StringToPointer("250m"),
79+
Memory: k8sutil.StringToPointer("500Mi"),
7480
},
7581
},
7682
TeamID: "acid",
@@ -88,6 +94,10 @@ var (
8894
ObjectMeta: metav1.ObjectMeta{
8995
Name: fmt.Sprintf("%s-12345", clusterName),
9096
Namespace: namespace,
97+
Annotations: map[string]string{
98+
constants.EventStreamCpuAnnotationKey: "250m",
99+
constants.EventStreamMemoryAnnotationKey: "500Mi",
100+
},
91101
Labels: map[string]string{
92102
"application": "spilo",
93103
"cluster-name": clusterName,
@@ -180,6 +190,37 @@ var (
180190
Type: constants.EventStreamSourcePGType,
181191
},
182192
},
193+
{
194+
EventStreamFlow: zalandov1.EventStreamFlow{
195+
Type: constants.EventStreamFlowPgGenericType,
196+
},
197+
EventStreamRecovery: zalandov1.EventStreamRecovery{
198+
Type: constants.EventStreamRecoveryIgnoreType,
199+
},
200+
EventStreamSink: zalandov1.EventStreamSink{
201+
EventType: "stream-type-c",
202+
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
203+
Type: constants.EventStreamSinkNakadiType,
204+
},
205+
EventStreamSource: zalandov1.EventStreamSource{
206+
Connection: zalandov1.Connection{
207+
DBAuth: zalandov1.DBAuth{
208+
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
209+
PasswordKey: "password",
210+
Type: constants.EventStreamSourceAuthType,
211+
UserKey: "username",
212+
},
213+
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
214+
SlotName: slotName,
215+
PluginType: constants.EventStreamSourcePluginType,
216+
},
217+
Schema: "data",
218+
EventStreamTable: zalandov1.EventStreamTable{
219+
Name: "foofoobar",
220+
},
221+
Type: constants.EventStreamSourcePGType,
222+
},
223+
},
183224
},
184225
},
185226
}
@@ -528,8 +569,8 @@ func TestSyncStreams(t *testing.T) {
528569

529570
func TestSameStreams(t *testing.T) {
530571
testName := "TestSameStreams"
531-
annotationsA := map[string]string{"owned-by": "acid"}
532-
annotationsB := map[string]string{"owned-by": "foo"}
572+
annotationsA := map[string]string{constants.EventStreamMemoryAnnotationKey: "500Mi"}
573+
annotationsB := map[string]string{constants.EventStreamMemoryAnnotationKey: "1Gi"}
533574

534575
stream1 := zalandov1.EventStream{
535576
EventStreamFlow: zalandov1.EventStreamFlow{},
@@ -621,6 +662,13 @@ func TestSameStreams(t *testing.T) {
621662
match: false,
622663
reason: "event stream specs differ",
623664
},
665+
{
666+
subTest: "event stream annotations differ",
667+
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil),
668+
streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA),
669+
match: false,
670+
reason: "event stream specs differ",
671+
},
624672
{
625673
subTest: "event stream annotations differ",
626674
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA),

pkg/util/constants/streams.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@ package constants
22

33
// PostgreSQL specific constants
44
const (
5-
EventStreamCRDApiVersion = "zalando.org/v1"
6-
EventStreamCRDKind = "FabricEventStream"
7-
EventStreamCRDName = "fabriceventstreams.zalando.org"
8-
EventStreamSourcePGType = "PostgresLogicalReplication"
9-
EventStreamSourceSlotPrefix = "fes"
10-
EventStreamSourcePluginType = "pgoutput"
11-
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
12-
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
13-
EventStreamSinkNakadiType = "Nakadi"
14-
EventStreamRecoveryNoneType = "None"
15-
EventStreamRecoveryDLQType = "DeadLetter"
16-
EventStreamRecoverySuffix = "dead-letter-queue"
5+
EventStreamCRDApiVersion = "zalando.org/v1"
6+
EventStreamCRDKind = "FabricEventStream"
7+
EventStreamCRDName = "fabriceventstreams.zalando.org"
8+
EventStreamSourcePGType = "PostgresLogicalReplication"
9+
EventStreamSourceSlotPrefix = "fes"
10+
EventStreamSourcePluginType = "pgoutput"
11+
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
12+
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
13+
EventStreamSinkNakadiType = "Nakadi"
14+
EventStreamRecoveryDLQType = "DeadLetter"
15+
EventStreamRecoveryIgnoreType = "Ignore"
16+
EventStreamRecoveryNoneType = "None"
17+
EventStreamRecoverySuffix = "dead-letter-queue"
18+
EventStreamCpuAnnotationKey = "fes.zalando.org/FES_CPU"
19+
EventStreamMemoryAnnotationKey = "fes.zalando.org/FES_MEMORY"
1720
)

0 commit comments

Comments
 (0)