Skip to content

Commit eacd2bb

Browse files
andrewlecuyerJonathan S. Katz
authored andcommitted
Update Replica Init Logic
The operator will now only initialize replica creation prior to stanza creation if the cluster has s3-only enabled. Otherwise replicas will be created following stanza-creation. Additionally, the InitializeReplicaCreation() function (as used to trigger the creation of replicas via pgreplica custom resources) now uses a Patch() operation instead of an Update() operation. This is to prevent potential conflicts when updating a pgreplica.
1 parent 42f4892 commit eacd2bb

File tree

5 files changed

+475
-22
lines changed

5 files changed

+475
-22
lines changed

internal/controller/controllerutil.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121

2222
"github.com/crunchydata/postgres-operator/internal/config"
23+
"github.com/crunchydata/postgres-operator/internal/kubeapi"
2324
crv1 "github.com/crunchydata/postgres-operator/pkg/apis/crunchydata.com/v1"
2425
pgo "github.com/crunchydata/postgres-operator/pkg/generated/clientset/versioned"
2526
log "github.com/sirupsen/logrus"
@@ -61,17 +62,19 @@ func InitializeReplicaCreation(clientset pgo.Interface, clusterName,
6162
log.Error(err)
6263
return err
6364
}
64-
for _, pgreplica := range pgreplicaList.Items {
6565

66-
if pgreplica.Annotations == nil {
67-
pgreplica.Annotations = make(map[string]string)
66+
for i := range pgreplicaList.Items {
67+
patch, err := kubeapi.NewMergePatch().
68+
Add("metadata", "annotations")(map[string]string{
69+
config.ANNOTATION_PGHA_BOOTSTRAP_REPLICA: "true",
70+
}).Bytes()
71+
if err != nil {
72+
log.Error(err)
6873
}
6974

70-
pgreplica.Annotations[config.ANNOTATION_PGHA_BOOTSTRAP_REPLICA] = "true"
71-
72-
if _, err = clientset.CrunchydataV1().Pgreplicas(namespace).Update(&pgreplica); err != nil {
75+
if _, err := clientset.CrunchydataV1().Pgreplicas(namespace).
76+
Patch(pgreplicaList.Items[i].GetName(), types.MergePatchType, patch, ""); err != nil {
7377
log.Error(err)
74-
return err
7578
}
7679
}
7780
return nil

internal/controller/job/backresthandler.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,18 @@ func (c *Controller) handleBackrestBackupUpdate(job *apiv1.Job) error {
144144
if labels[config.LABEL_PGHA_BACKUP_TYPE] == crv1.BackupTypeBootstrap {
145145
log.Debugf("jobController onUpdate initial backup complete")
146146

147-
controller.SetClusterInitializedStatus(c.Client, labels[config.LABEL_PG_CLUSTER],
148-
job.ObjectMeta.Namespace)
147+
if err := controller.SetClusterInitializedStatus(c.Client, labels[config.LABEL_PG_CLUSTER],
148+
job.ObjectMeta.Namespace); err != nil {
149+
log.Error(err)
150+
return err
151+
}
149152

150-
// now initialize the creation of any replica
151-
controller.InitializeReplicaCreation(c.Client, labels[config.LABEL_PG_CLUSTER],
152-
job.ObjectMeta.Namespace)
153+
// now initialize the creation of any replicas
154+
if err := controller.InitializeReplicaCreation(c.Client, labels[config.LABEL_PG_CLUSTER],
155+
job.ObjectMeta.Namespace); err != nil {
156+
log.Error(err)
157+
return err
158+
}
153159

154160
} else if labels[config.LABEL_PGHA_BACKUP_TYPE] == crv1.BackupTypeFailover {
155161
err := clusteroperator.RemovePrimaryOnRoleChangeTag(c.Client, c.Client.Config,
@@ -197,10 +203,18 @@ func (c *Controller) handleBackrestStanzaCreateUpdate(job *apiv1.Job) error {
197203
if cluster.Spec.Standby {
198204
log.Debugf("job Controller: standby cluster %s will now be set to an initialized "+
199205
"status", clusterName)
200-
controller.SetClusterInitializedStatus(c.Client, clusterName, namespace)
206+
if err := controller.SetClusterInitializedStatus(c.Client, clusterName,
207+
namespace); err != nil {
208+
log.Error(err)
209+
return err
210+
}
201211

202212
// now initialize the creation of any replica
203-
controller.InitializeReplicaCreation(c.Client, clusterName, namespace)
213+
if err := controller.InitializeReplicaCreation(c.Client, clusterName,
214+
namespace); err != nil {
215+
log.Error(err)
216+
return err
217+
}
204218
return nil
205219
}
206220

internal/controller/pod/inithandler.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,21 @@ func (c *Controller) handleStandbyInit(cluster *crv1.Pgcluster) error {
208208
}
209209
backrestoperator.StanzaCreate(namespace, clusterName, c.Client)
210210
} else {
211-
controller.SetClusterInitializedStatus(c.Client, clusterName, namespace)
212-
}
211+
if err := controller.SetClusterInitializedStatus(c.Client, clusterName,
212+
namespace); err != nil {
213+
log.Error(err)
214+
}
213215

214-
// If a standby cluster initialize the creation of any replicas. Replicas
215-
// can be initialized right away, i.e. there is no dependency on
216-
// stanza-creation and/or the creation of any backups, since the replicas
217-
// will be generated from the pgBackRest repository of an external PostgreSQL
218-
// database (which should already exist).
219-
controller.InitializeReplicaCreation(c.Client, clusterName, namespace)
216+
// If a standby cluster with s3 only initialize the creation of any replicas. Replicas
217+
// can be initialized right away, i.e. there is no dependency on
218+
// stanza-creation and/or the creation of any backups, since the replicas
219+
// will be generated from the pgBackRest repository of an external PostgreSQL
220+
// database (which should already exist).
221+
if err := controller.InitializeReplicaCreation(c.Client, clusterName,
222+
namespace); err != nil {
223+
log.Error(err)
224+
}
225+
}
220226

221227
// if this is a pgbouncer enabled cluster, add a pgbouncer
222228
// Note: we only warn if we cannot create the pgBouncer, so eecution can

internal/kubeapi/patch.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package kubeapi
2+
3+
/*
4+
Copyright 2020 - 2021 Crunchy Data Solutions, Inc.
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
import (
19+
"encoding/json"
20+
"strings"
21+
)
22+
23+
// escapeJSONPointer encodes '~' and '/' according to RFC 6901.
24+
var escapeJSONPointer = strings.NewReplacer(
25+
"~", "~0",
26+
"/", "~1",
27+
).Replace
28+
29+
// JSON6902 represents a JSON Patch according to RFC 6902; the same as
30+
// k8s.io/apimachinery/pkg/types.JSONPatchType.
31+
type JSON6902 []interface{}
32+
33+
// NewJSONPatch creates a new JSON Patch according to RFC 6902; the same as
34+
// k8s.io/apimachinery/pkg/types.JSONPatchType.
35+
func NewJSONPatch() *JSON6902 { return &JSON6902{} }
36+
37+
func (*JSON6902) pointer(tokens ...string) string {
38+
var b strings.Builder
39+
40+
for _, t := range tokens {
41+
_ = b.WriteByte('/')
42+
_, _ = b.WriteString(escapeJSONPointer(t))
43+
}
44+
45+
return b.String()
46+
}
47+
48+
// Add appends an "add" operation to patch.
49+
//
50+
// > The "add" operation performs one of the following functions,
51+
// > depending upon what the target location references:
52+
// >
53+
// > o If the target location specifies an array index, a new value is
54+
// > inserted into the array at the specified index.
55+
// >
56+
// > o If the target location specifies an object member that does not
57+
// > already exist, a new member is added to the object.
58+
// >
59+
// > o If the target location specifies an object member that does exist,
60+
// > that member's value is replaced.
61+
//
62+
func (patch *JSON6902) Add(path ...string) func(value interface{}) *JSON6902 {
63+
i := len(*patch)
64+
f := func(value interface{}) *JSON6902 {
65+
(*patch)[i] = map[string]interface{}{
66+
"op": "add",
67+
"path": patch.pointer(path...),
68+
"value": value,
69+
}
70+
return patch
71+
}
72+
73+
*patch = append(*patch, f)
74+
75+
return f
76+
}
77+
78+
// Remove appends a "remove" operation to patch.
79+
//
80+
// > The "remove" operation removes the value at the target location.
81+
// >
82+
// > The target location MUST exist for the operation to be successful.
83+
//
84+
func (patch *JSON6902) Remove(path ...string) *JSON6902 {
85+
*patch = append(*patch, map[string]interface{}{
86+
"op": "remove",
87+
"path": patch.pointer(path...),
88+
})
89+
90+
return patch
91+
}
92+
93+
// Replace appends a "replace" operation to patch.
94+
//
95+
// > The "replace" operation replaces the value at the target location
96+
// > with a new value.
97+
// >
98+
// > The target location MUST exist for the operation to be successful.
99+
//
100+
func (patch *JSON6902) Replace(path ...string) func(value interface{}) *JSON6902 {
101+
i := len(*patch)
102+
f := func(value interface{}) *JSON6902 {
103+
(*patch)[i] = map[string]interface{}{
104+
"op": "replace",
105+
"path": patch.pointer(path...),
106+
"value": value,
107+
}
108+
return patch
109+
}
110+
111+
*patch = append(*patch, f)
112+
113+
return f
114+
}
115+
116+
// Bytes returns the JSON representation of patch.
117+
func (patch JSON6902) Bytes() ([]byte, error) { return json.Marshal(patch) }
118+
119+
// Merge7386 represents a JSON Merge Patch according to RFC 7386; the same as
120+
// k8s.io/apimachinery/pkg/types.MergePatchType.
121+
type Merge7386 map[string]interface{}
122+
123+
// NewMergePatch creates a new JSON Merge Patch according to RFC 7386; the same
124+
// as k8s.io/apimachinery/pkg/types.MergePatchType.
125+
func NewMergePatch() *Merge7386 { return &Merge7386{} }
126+
127+
// Add modifies patch to indicate that the member at path should be added or
128+
// replaced with value.
129+
//
130+
// > If the provided merge patch contains members that do not appear
131+
// > within the target, those members are added. If the target does
132+
// > contain the member, the value is replaced. Null values in the merge
133+
// > patch are given special meaning to indicate the removal of existing
134+
// > values in the target.
135+
//
136+
func (patch *Merge7386) Add(path ...string) func(value interface{}) *Merge7386 {
137+
position := *patch
138+
139+
for len(path) > 1 {
140+
p, ok := position[path[0]].(Merge7386)
141+
if !ok {
142+
p = Merge7386{}
143+
position[path[0]] = p
144+
}
145+
146+
position = p
147+
path = path[1:]
148+
}
149+
150+
if len(path) < 1 {
151+
return func(interface{}) *Merge7386 { return patch }
152+
}
153+
154+
f := func(value interface{}) *Merge7386 {
155+
position[path[0]] = value
156+
return patch
157+
}
158+
159+
position[path[0]] = f
160+
161+
return f
162+
}
163+
164+
// Remove modifies patch to indicate that the member at path should be removed
165+
// if it exists.
166+
func (patch *Merge7386) Remove(path ...string) *Merge7386 {
167+
return patch.Add(path...)(nil)
168+
}
169+
170+
// Bytes returns the JSON representation of patch.
171+
func (patch Merge7386) Bytes() ([]byte, error) { return json.Marshal(patch) }

0 commit comments

Comments
 (0)