Skip to content

Commit 743aade

Browse files
frittenthekeFxKu
andauthored
Use finalizers to avoid losing delete events and to ensure full resource cleanup (#941)
* Add Finalizer functions to Cluster; add/remove finalizer on Create/Delete events * Check if clusters have a deletion timestamp and we missed that event. Run Delete() and remove finalizer when done. * Fix nil handling when using Service from map; Remove Service, Endpoint entries from their maps - just like with Secrets * Add handling of ResourceNotFound to all delete functions (Service, Endpoint, LogicalBackup CronJob, PDB and Secret) - this is not a real error when deleting things * Emit events when there are issues deleting resources to the user is informed * Depend the removal of the Finalizer on all resources being deleted successfully first. Otherwise the next sync run should let us try again * Add config option to enable finalizers * Removed dangling whitespace at EOL * config.EnableFinalizers is a bool pointer --------- Co-authored-by: Felix Kunde <[email protected]>
1 parent 9581ba9 commit 743aade

File tree

8 files changed

+201
-30
lines changed

8 files changed

+201
-30
lines changed

charts/postgres-operator/crds/operatorconfigurations.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ spec:
205205
enable_cross_namespace_secret:
206206
type: boolean
207207
default: false
208+
enable_finalizers:
209+
type: boolean
210+
default: false
208211
enable_init_containers:
209212
type: boolean
210213
default: true

charts/postgres-operator/values.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ configKubernetes:
123123

124124
# allow user secrets in other namespaces than the Postgres cluster
125125
enable_cross_namespace_secret: false
126+
# use finalizers to ensure all managed resources are deleted prior to the postgresql CR
127+
# this avoids stale resources in case the operator misses a delete event or is not running
128+
# during deletion
129+
enable_finalizers: false
126130
# enables initContainers to run actions before Spilo is started
127131
enable_init_containers: true
128132
# toggles pod anti affinity on the Postgres pods
@@ -166,7 +170,7 @@ configKubernetes:
166170
# defines the template for PDB (Pod Disruption Budget) names
167171
pdb_name_format: "postgres-{cluster}-pdb"
168172
# specify the PVC retention policy when scaling down and/or deleting
169-
persistent_volume_claim_retention_policy:
173+
persistent_volume_claim_retention_policy:
170174
when_deleted: "retain"
171175
when_scaled: "retain"
172176
# switches pod anti affinity type to `preferredDuringSchedulingIgnoredDuringExecution`

pkg/apis/acid.zalan.do/v1/operator_configuration_type.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ type KubernetesMetaConfiguration struct {
104104
PersistentVolumeClaimRetentionPolicy map[string]string `json:"persistent_volume_claim_retention_policy,omitempty"`
105105
EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"`
106106
EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"`
107+
EnableFinalizers *bool `json:"enable_finalizers,omitempty"`
107108
}
108109

109110
// PostgresPodResourcesDefaults defines the spec of default resources

pkg/cluster/cluster.go

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync"
1414
"time"
1515

16+
jsonpatch "github.com/evanphx/json-patch"
1617
"github.com/sirupsen/logrus"
1718
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
1819

@@ -44,6 +45,7 @@ var (
4445
databaseNameRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
4546
userRegexp = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`)
4647
patroniObjectSuffixes = []string{"leader", "config", "sync", "failover"}
48+
finalizerName = "postgres-operator.acid.zalan.do"
4749
)
4850

4951
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
@@ -260,6 +262,12 @@ func (c *Cluster) Create() (err error) {
260262
}()
261263

262264
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
265+
if c.OpConfig.EnableFinalizers != nil && *c.OpConfig.EnableFinalizers {
266+
c.logger.Info("Adding finalizer.")
267+
if err = c.AddFinalizer(); err != nil {
268+
return fmt.Errorf("could not add Finalizer: %v", err)
269+
}
270+
}
263271
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
264272

265273
for _, role := range []PostgresRole{Master, Replica} {
@@ -763,6 +771,98 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
763771
return true, ""
764772
}
765773

774+
// AddFinalizer patches the postgresql CR to add our finalizer.
775+
func (c *Cluster) AddFinalizer() error {
776+
if c.HasFinalizer() {
777+
c.logger.Debugf("Finalizer %s already exists.", finalizerName)
778+
return nil
779+
}
780+
781+
currentSpec := c.DeepCopy()
782+
newSpec := c.DeepCopy()
783+
newSpec.ObjectMeta.SetFinalizers(append(newSpec.ObjectMeta.Finalizers, finalizerName))
784+
patchBytes, err := getPatchBytes(currentSpec, newSpec)
785+
if err != nil {
786+
return fmt.Errorf("Unable to produce patch to add finalizer: %v", err)
787+
}
788+
789+
updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
790+
context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
791+
if err != nil {
792+
return fmt.Errorf("Could not add finalizer: %v", err)
793+
}
794+
795+
// update the spec, maintaining the new resourceVersion.
796+
c.setSpec(updatedSpec)
797+
return nil
798+
}
799+
800+
// RemoveFinalizer patches postgresql CR to remove finalizer.
801+
func (c *Cluster) RemoveFinalizer() error {
802+
if !c.HasFinalizer() {
803+
c.logger.Debugf("No finalizer %s exists to remove.", finalizerName)
804+
return nil
805+
}
806+
currentSpec := c.DeepCopy()
807+
newSpec := c.DeepCopy()
808+
newSpec.ObjectMeta.SetFinalizers(removeString(newSpec.ObjectMeta.Finalizers, finalizerName))
809+
patchBytes, err := getPatchBytes(currentSpec, newSpec)
810+
if err != nil {
811+
return fmt.Errorf("Unable to produce patch to remove finalizer: %v", err)
812+
}
813+
814+
updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
815+
context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
816+
if err != nil {
817+
return fmt.Errorf("Could not remove finalizer: %v", err)
818+
}
819+
820+
// update the spec, maintaining the new resourceVersion.
821+
c.setSpec(updatedSpec)
822+
823+
return nil
824+
}
825+
826+
// HasFinalizer checks if our finalizer is currently set or not
827+
func (c *Cluster) HasFinalizer() bool {
828+
for _, finalizer := range c.ObjectMeta.Finalizers {
829+
if finalizer == finalizerName {
830+
return true
831+
}
832+
}
833+
return false
834+
}
835+
836+
// Iterate through slice and remove certain string, then return cleaned slice
837+
func removeString(slice []string, s string) (result []string) {
838+
for _, item := range slice {
839+
if item == s {
840+
continue
841+
}
842+
result = append(result, item)
843+
}
844+
return result
845+
}
846+
847+
// getPatchBytes will produce a JSONpatch between the two parameters of type acidv1.Postgresql
848+
func getPatchBytes(oldSpec, newSpec *acidv1.Postgresql) ([]byte, error) {
849+
oldData, err := json.Marshal(oldSpec)
850+
if err != nil {
851+
return nil, fmt.Errorf("failed to Marshal oldSpec for postgresql %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err)
852+
}
853+
854+
newData, err := json.Marshal(newSpec)
855+
if err != nil {
856+
return nil, fmt.Errorf("failed to Marshal newSpec for postgresql %s/%s: %v", newSpec.Namespace, newSpec.Name, err)
857+
}
858+
859+
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
860+
if err != nil {
861+
return nil, fmt.Errorf("failed to CreateMergePatch for postgresl %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err)
862+
}
863+
return patchBytes, nil
864+
}
865+
766866
// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object
767867
// (i.e. service) is treated as an error
768868
// logical backup cron jobs are an exception: a user-initiated Update can enable a logical backup job
@@ -1005,59 +1105,88 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
10051105
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
10061106
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
10071107
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
1008-
func (c *Cluster) Delete() {
1108+
func (c *Cluster) Delete() error {
10091109
c.mu.Lock()
10101110
defer c.mu.Unlock()
1011-
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
1111+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
10121112

10131113
if err := c.deleteStreams(); err != nil {
10141114
c.logger.Warningf("could not delete event streams: %v", err)
1115+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete event streams: %v", err)
10151116
}
1117+
var anyErrors = false
10161118

10171119
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
10181120
// deleting the cron job also removes pods and batch jobs it created
10191121
if err := c.deleteLogicalBackupJob(); err != nil {
1122+
anyErrors = true
10201123
c.logger.Warningf("could not remove the logical backup k8s cron job; %v", err)
1124+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove the logical backup k8s cron job; %v", err)
10211125
}
10221126

10231127
if err := c.deleteStatefulSet(); err != nil {
1128+
anyErrors = true
10241129
c.logger.Warningf("could not delete statefulset: %v", err)
1130+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete statefulset: %v", err)
10251131
}
10261132

10271133
if err := c.deleteSecrets(); err != nil {
1134+
anyErrors = true
10281135
c.logger.Warningf("could not delete secrets: %v", err)
1136+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete secrets: %v", err)
10291137
}
10301138

10311139
if err := c.deletePodDisruptionBudget(); err != nil {
1140+
anyErrors = true
10321141
c.logger.Warningf("could not delete pod disruption budget: %v", err)
1142+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete pod disruption budget: %v", err)
10331143
}
10341144

10351145
for _, role := range []PostgresRole{Master, Replica} {
10361146

10371147
if !c.patroniKubernetesUseConfigMaps() {
10381148
if err := c.deleteEndpoint(role); err != nil {
1149+
anyErrors = true
10391150
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
1151+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s endpoint: %v", role, err)
10401152
}
10411153
}
10421154

10431155
if err := c.deleteService(role); err != nil {
1156+
anyErrors = true
10441157
c.logger.Warningf("could not delete %s service: %v", role, err)
1158+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s service: %v", role, err)
10451159
}
10461160
}
10471161

10481162
if err := c.deletePatroniClusterObjects(); err != nil {
1163+
anyErrors = true
10491164
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
1165+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove leftover patroni objects; %v", err)
10501166
}
10511167

10521168
// Delete connection pooler objects anyway, even if it's not mentioned in the
10531169
// manifest, just to not keep orphaned components in case if something went
10541170
// wrong
10551171
for _, role := range [2]PostgresRole{Master, Replica} {
10561172
if err := c.deleteConnectionPooler(role); err != nil {
1173+
anyErrors = true
10571174
c.logger.Warningf("could not remove connection pooler: %v", err)
1175+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove connection pooler: %v", err)
10581176
}
10591177
}
10601178

1179+
// If we are done deleting our various resources we remove the finalizer to let K8S finally delete the Postgres CR
1180+
if anyErrors {
1181+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "Some resources could be successfully deleted yet")
1182+
return fmt.Errorf("some error(s) occured when deleting resources, NOT removing finalizer yet")
1183+
}
1184+
if err := c.RemoveFinalizer(); err != nil {
1185+
return fmt.Errorf("Done cleaning up, but error when trying to remove our finalizer: %v", err)
1186+
}
1187+
1188+
c.logger.Info("Done cleaning up our resources, removed finalizer.")
1189+
return nil
10611190
}
10621191

10631192
// NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status).

pkg/cluster/resources.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,12 @@ func (c *Cluster) deleteStatefulSet() error {
247247
}
248248

249249
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions)
250-
if err != nil {
250+
if k8sutil.ResourceNotFound(err) {
251+
c.logger.Debugf("StatefulSet was already deleted")
252+
} else if err != nil {
251253
return err
252254
}
255+
253256
c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta))
254257
c.Statefulset = nil
255258

@@ -336,18 +339,21 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe
336339
func (c *Cluster) deleteService(role PostgresRole) error {
337340
c.logger.Debugf("deleting service %s", role)
338341

339-
service, ok := c.Services[role]
340-
if !ok {
342+
if c.Services[role] == nil {
341343
c.logger.Debugf("No service for %s role was found, nothing to delete", role)
342344
return nil
343345
}
344346

345-
if err := c.KubeClient.Services(service.Namespace).Delete(context.TODO(), service.Name, c.deleteOptions); err != nil {
346-
return err
347+
if err := c.KubeClient.Services(c.Services[role].Namespace).Delete(context.TODO(), c.Services[role].Name, c.deleteOptions); err != nil {
348+
if k8sutil.ResourceNotFound(err) {
349+
c.logger.Debugf("Service was already deleted")
350+
} else if err != nil {
351+
return err
352+
}
347353
}
348354

349-
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
350-
c.Services[role] = nil
355+
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(c.Services[role].ObjectMeta))
356+
delete(c.Services, role)
351357

352358
return nil
353359
}
@@ -448,9 +454,12 @@ func (c *Cluster) deletePodDisruptionBudget() error {
448454
err := c.KubeClient.
449455
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
450456
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
451-
if err != nil {
452-
return fmt.Errorf("could not delete pod disruption budget: %v", err)
457+
if k8sutil.ResourceNotFound(err) {
458+
c.logger.Debugf("PodDisruptionBudget was already deleted")
459+
} else if err != nil {
460+
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err)
453461
}
462+
454463
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
455464
c.PodDisruptionBudget = nil
456465

@@ -479,14 +488,16 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
479488
return fmt.Errorf("there is no %s endpoint in the cluster", role)
480489
}
481490

482-
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(
483-
context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil {
484-
return fmt.Errorf("could not delete endpoint: %v", err)
491+
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil {
492+
if k8sutil.ResourceNotFound(err) {
493+
c.logger.Debugf("Endpoint was already deleted")
494+
} else if err != nil {
495+
return fmt.Errorf("could not delete endpoint: %v", err)
496+
}
485497
}
486498

487499
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta))
488-
489-
c.Endpoints[role] = nil
500+
delete(c.Endpoints, role)
490501

491502
return nil
492503
}
@@ -514,7 +525,9 @@ func (c *Cluster) deleteSecret(uid types.UID, secret v1.Secret) error {
514525
secretName := util.NameFromMeta(secret.ObjectMeta)
515526
c.logger.Debugf("deleting secret %q", secretName)
516527
err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions)
517-
if err != nil {
528+
if k8sutil.ResourceNotFound(err) {
529+
c.logger.Debugf("Secret was already deleted")
530+
} else if err != nil {
518531
return fmt.Errorf("could not delete secret %q: %v", secretName, err)
519532
}
520533
c.logger.Infof("secret %q has been deleted", secretName)
@@ -573,7 +586,14 @@ func (c *Cluster) deleteLogicalBackupJob() error {
573586

574587
c.logger.Info("removing the logical backup job")
575588

576-
return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions)
589+
err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions)
590+
if k8sutil.ResourceNotFound(err) {
591+
c.logger.Debugf("LogicalBackup CronJob was already deleted")
592+
} else if err != nil {
593+
return err
594+
}
595+
596+
return nil
577597
}
578598

579599
// GetServiceMaster returns cluster's kubernetes master Service

pkg/controller/operator_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
9191
result.SecretNameTemplate = fromCRD.Kubernetes.SecretNameTemplate
9292
result.OAuthTokenSecretName = fromCRD.Kubernetes.OAuthTokenSecretName
9393
result.EnableCrossNamespaceSecret = fromCRD.Kubernetes.EnableCrossNamespaceSecret
94+
result.EnableFinalizers = util.CoalesceBool(fromCRD.Kubernetes.EnableFinalizers, util.False())
9495

9596
result.InfrastructureRolesSecretName = fromCRD.Kubernetes.InfrastructureRolesSecretName
9697
if fromCRD.Kubernetes.InfrastructureRolesDefs != nil {

0 commit comments

Comments
 (0)