Skip to content

Commit b916519

Browse files
authored
set wal_level for streams in statefulSet sync (#2187)
* set wal_level for streams in statefulSet sync
1 parent 4741b3f commit b916519

File tree

2 files changed

+88
-79
lines changed

2 files changed

+88
-79
lines changed

pkg/cluster/streams.go

Lines changed: 11 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -79,38 +79,6 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
7979
return appIds
8080
}
8181

82-
func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) (bool, error) {
83-
errorMsg := "no pods found to update config"
84-
85-
// if streams are defined wal_level must be switched to logical
86-
requiredPgParameters := map[string]string{"wal_level": "logical"}
87-
88-
// apply config changes in pods
89-
pods, err := c.listPods()
90-
if err != nil {
91-
errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err)
92-
}
93-
for i, pod := range pods {
94-
podName := util.NameFromMeta(pods[i].ObjectMeta)
95-
effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod)
96-
if err != nil {
97-
errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err)
98-
continue
99-
}
100-
101-
configPatched, _, err := c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
102-
if err != nil {
103-
errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
104-
continue
105-
}
106-
107-
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
108-
return configPatched, nil
109-
}
110-
111-
return false, fmt.Errorf(errorMsg)
112-
}
113-
11482
func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
11583
createPublications := make(map[string]string)
11684
alterPublications := make(map[string]string)
@@ -273,7 +241,6 @@ func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Co
273241
}
274242

275243
func (c *Cluster) syncStreams() error {
276-
277244
c.setProcessName("syncing streams")
278245

279246
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
@@ -282,20 +249,10 @@ func (c *Cluster) syncStreams() error {
282249
return nil
283250
}
284251

285-
// update config to set wal_level: logical
286-
requiredPatroniConfig := c.Spec.Patroni
287-
requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig)
288-
if err != nil {
289-
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
290-
}
291-
if requiresRestart {
292-
c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync")
293-
return nil
294-
}
295-
296252
slots := make(map[string]map[string]string)
297253
slotsToSync := make(map[string]map[string]string)
298254
publications := make(map[string]map[string]acidv1.StreamTable)
255+
requiredPatroniConfig := c.Spec.Patroni
299256

300257
if len(requiredPatroniConfig.Slots) > 0 {
301258
slots = requiredPatroniConfig.Slots
@@ -343,13 +300,19 @@ func (c *Cluster) syncStreams() error {
343300
return nil
344301
}
345302

346-
// add extra logical slots to Patroni config
347-
_, err = c.syncPostgresConfig(requiredPatroniConfig)
303+
c.logger.Debug("syncing logical replication slots")
304+
pods, err := c.listPods()
305+
if err != nil {
306+
return fmt.Errorf("could not get list of pods to sync logical replication slots via Patroni API: %v", err)
307+
}
308+
309+
// sync logical replication slots in Patroni config
310+
configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil)
348311
if err != nil {
349-
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
312+
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
350313
}
351314

352-
// after Postgres was restarted we can create stream CRDs
315+
// finally sync stream CRDs
353316
err = c.createOrUpdateStreams()
354317
if err != nil {
355318
return err

pkg/cluster/sync.go

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -395,84 +395,130 @@ func (c *Cluster) syncStatefulSet() error {
395395
}
396396
}
397397

398-
// Apply special PostgreSQL parameters that can only be set via the Patroni API.
398+
// apply PostgreSQL parameters that can only be set via the Patroni API.
399399
// it is important to do it after the statefulset pods are there, but before the rolling update
400400
// since those parameters require PostgreSQL restart.
401401
pods, err = c.listPods()
402402
if err != nil {
403-
c.logger.Warnf("could not get list of pods to apply special PostgreSQL parameters only to be set via Patroni API: %v", err)
403+
c.logger.Warnf("could not get list of pods to apply PostgreSQL parameters only to be set via Patroni API: %v", err)
404404
}
405405

406+
requiredPgParameters := c.Spec.Parameters
407+
// if streams are defined wal_level must be switched to logical
408+
if len(c.Spec.Streams) > 0 {
409+
requiredPgParameters["wal_level"] = "logical"
410+
}
411+
412+
// sync Patroni config
413+
if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
414+
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
415+
isSafeToRecreatePods = false
416+
}
417+
418+
// restart Postgres where it is still pending
419+
if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
420+
c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
421+
isSafeToRecreatePods = false
422+
}
423+
424+
// if we get here we also need to re-create the pods (either leftovers from the old
425+
// statefulset or those that got their configuration from the outdated statefulset)
426+
if len(podsToRecreate) > 0 {
427+
if isSafeToRecreatePods {
428+
c.logger.Debugln("performing rolling update")
429+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
430+
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
431+
return fmt.Errorf("could not recreate pods: %v", err)
432+
}
433+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
434+
} else {
435+
c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync")
436+
}
437+
}
438+
439+
return nil
440+
}
441+
442+
func (c *Cluster) syncPatroniConfig(pods []v1.Pod, requiredPatroniConfig acidv1.Patroni, requiredPgParameters map[string]string) (bool, bool, uint32, error) {
443+
var (
444+
effectivePatroniConfig acidv1.Patroni
445+
effectivePgParameters map[string]string
446+
loopWait uint32
447+
configPatched bool
448+
restartPrimaryFirst bool
449+
err error
450+
)
451+
452+
errors := make([]string, 0)
453+
406454
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
407-
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
408455
for i, pod := range pods {
409-
patroniConfig, pgParameters, err := c.getPatroniConfig(&pod)
456+
podName := util.NameFromMeta(pods[i].ObjectMeta)
457+
effectivePatroniConfig, effectivePgParameters, err = c.patroni.GetConfig(&pod)
410458
if err != nil {
411-
c.logger.Warningf("%v", err)
412-
isSafeToRecreatePods = false
459+
errors = append(errors, fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err))
413460
continue
414461
}
415-
restartWait = patroniConfig.LoopWait
462+
loopWait = effectivePatroniConfig.LoopWait
416463

417464
// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
418-
// do not attempt a restart
419-
if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
420-
// compare config returned from Patroni with what is specified in the manifest
421-
configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters)
465+
if reflect.DeepEqual(effectivePatroniConfig, acidv1.Patroni{}) || len(effectivePgParameters) == 0 {
466+
errors = append(errors, fmt.Sprintf("empty Patroni config on pod %s - skipping config patch", podName))
467+
} else {
468+
configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
422469
if err != nil {
423-
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err)
470+
errors = append(errors, fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err))
424471
continue
425472
}
426473

427474
// it could take up to LoopWait to apply the config
428475
if configPatched {
429-
time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2)
476+
time.Sleep(time.Duration(loopWait)*time.Second + time.Second*2)
477+
// Patroni's config endpoint is just a "proxy" to DCS.
478+
// It is enough to patch it only once and it doesn't matter which pod is used
430479
break
431480
}
432481
}
433482
}
434483

435-
// restart instances if it is still pending
484+
if len(errors) > 0 {
485+
err = fmt.Errorf("%v", strings.Join(errors, `', '`))
486+
}
487+
488+
return configPatched, restartPrimaryFirst, loopWait, err
489+
}
490+
491+
func (c *Cluster) restartInstances(pods []v1.Pod, restartWait uint32, restartPrimaryFirst bool) (err error) {
492+
errors := make([]string, 0)
436493
remainingPods := make([]*v1.Pod, 0)
494+
437495
skipRole := Master
438496
if restartPrimaryFirst {
439497
skipRole = Replica
440498
}
499+
441500
for i, pod := range pods {
442501
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
443502
if role == skipRole {
444503
remainingPods = append(remainingPods, &pods[i])
445504
continue
446505
}
447506
if err = c.restartInstance(&pod, restartWait); err != nil {
448-
c.logger.Errorf("%v", err)
449-
isSafeToRecreatePods = false
507+
errors = append(errors, fmt.Sprintf("%v", err))
450508
}
451509
}
452510

453511
// in most cases only the master should be left to restart
454512
if len(remainingPods) > 0 {
455513
for _, remainingPod := range remainingPods {
456514
if err = c.restartInstance(remainingPod, restartWait); err != nil {
457-
c.logger.Errorf("%v", err)
458-
isSafeToRecreatePods = false
515+
errors = append(errors, fmt.Sprintf("%v", err))
459516
}
460517
}
461518
}
462519

463-
// if we get here we also need to re-create the pods (either leftovers from the old
464-
// statefulset or those that got their configuration from the outdated statefulset)
465-
if len(podsToRecreate) > 0 {
466-
if isSafeToRecreatePods {
467-
c.logger.Debugln("performing rolling update")
468-
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
469-
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
470-
return fmt.Errorf("could not recreate pods: %v", err)
471-
}
472-
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
473-
} else {
474-
c.logger.Warningf("postpone pod recreation until next sync")
475-
}
520+
if len(errors) > 0 {
521+
return fmt.Errorf("%v", strings.Join(errors, `', '`))
476522
}
477523

478524
return nil

0 commit comments

Comments
 (0)