@@ -29,6 +29,7 @@ import (
2929 log "github.com/sirupsen/logrus"
3030 appsv1 "k8s.io/api/apps/v1"
3131 v1 "k8s.io/api/core/v1"
32+ kerrors "k8s.io/apimachinery/pkg/api/errors"
3233 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334 "k8s.io/apimachinery/pkg/fields"
3435 "k8s.io/apimachinery/pkg/util/wait"
@@ -48,6 +49,8 @@ const (
4849 rollingUpdateTimeout = 60 * time .Second
4950)
5051
52+ const rollingUpdateMaxRetries = 5
53+
5154// RollingUpdate performs a type of "rolling update" on a series of Deployments
5255// of a PostgreSQL cluster in an attempt to minimize downtime.
5356//
@@ -99,9 +102,8 @@ func RollingUpdate(clientset kubeapi.Interface, restConfig *rest.Config,
99102 for i := range instances [deploymentTypeReplica ] {
100103 deployment := instances [deploymentTypeReplica ][i ]
101104
102- // Try to apply the update. If it returns an error during the process,
103- // continue on to the next replica
104- if err := applyUpdateToPostgresInstance (clientset , restConfig , cluster , deployment , rescale , updateFunc ); err != nil {
105+ if err := applyUpdateToPostgresInstanceWithRetries (clientset , restConfig , cluster ,
106+ deployment , rescale , updateFunc ); err != nil {
105107 log .Error (err )
106108 continue
107109 }
@@ -140,9 +142,10 @@ func RollingUpdate(clientset kubeapi.Interface, restConfig *rest.Config,
140142 // as we should have either already promoted a new primary, or this is a
141143 // single instance cluster
142144 for i := range instances [deploymentTypePrimary ] {
143- if err := applyUpdateToPostgresInstance (clientset , restConfig , cluster ,
145+ if err := applyUpdateToPostgresInstanceWithRetries (clientset , restConfig , cluster ,
144146 instances [deploymentTypePrimary ][i ], rescale , updateFunc ); err != nil {
145147 log .Error (err )
148+ continue
146149 }
147150 }
148151
@@ -207,6 +210,52 @@ func applyUpdateToPostgresInstance(clientset kubeapi.Interface, restConfig *rest
207210 return nil
208211}
209212
213+ // applyUpdateToPostgresInstanceWithRetries calls the
214+ // applyUpdateToPostgresInstance function, but allows for it to retry if there
215+ // are any failures
216+ func applyUpdateToPostgresInstanceWithRetries (clientset kubeapi.Interface , restConfig * rest.Config ,
217+ cluster * crv1.Pgcluster , deployment * appsv1.Deployment , rescale bool ,
218+ updateFunc func (kubeapi.Interface , * crv1.Pgcluster , * appsv1.Deployment ) error ) error {
219+ ctx := context .TODO ()
220+
221+ // Try to apply the update. If it returns an error during the process,
222+ // determine if the error is a conflict. If it is, try again for a few
223+ // times.
224+ //
225+ // If not, try again
226+ for i := 0 ; i < rollingUpdateMaxRetries ; i ++ {
227+ err := applyUpdateToPostgresInstance (clientset , restConfig , cluster ,
228+ deployment , rescale , updateFunc )
229+
230+ if err == nil {
231+ break
232+ }
233+
234+ // if the error is anything other than a conflict, log the error and
235+ // continue through the loop
236+ if ! kerrors .IsConflict (err ) {
237+ return err
238+ }
239+
240+ // if the error is a conflict and the next time through the loop is the
241+ // max number of retries, log that we are giving up.
242+ if i + 1 >= rollingUpdateMaxRetries {
243+ log .Error (err )
244+ return fmt .Errorf ("abandoning updating instance %s" , deployment .Name )
245+ }
246+
247+ // because this is a conflict, reload the deployment
248+ // if the reload errors, let's go through the retry loop again with the
249+ // same deployment object
250+ if d , err := clientset .AppsV1 ().Deployments (deployment .Namespace ).Get (ctx ,
251+ deployment .Name , metav1.GetOptions {}); err == nil {
252+ deployment = d
253+ }
254+ }
255+
256+ return nil
257+ }
258+
210259// generateDeploymentTypeMap takes a list of Deployments and determines what
211260// they represent: a primary (hopefully only one) or replicas
212261func generateDeploymentTypeMap (clientset kubernetes.Interface , cluster * crv1.Pgcluster ) (map [deploymentType ][]* appsv1.Deployment , error ) {
0 commit comments