Skip to content

Commit e2bcca8

Browse files
armruleonardocemnencia
authored
fix(backup,snapshot): avoid parallel actions on endpoints (cloudnative-pg#6890)
Multiple backup operations on the same instance were able to execute simultaneously, potentially causing conflicts and data inconsistencies. This patch introduces a mechanism to serialize these operations, ensuring that only one backup can proceed at a time per instance. Partially Closes cloudnative-pg#6761 Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Co-authored-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
1 parent 6d01b4f commit e2bcca8

File tree

6 files changed

+160
-111
lines changed

6 files changed

+160
-111
lines changed

pkg/management/postgres/webserver/backup_connection.go

Lines changed: 20 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ const (
5757
var replicationSlotInvalidCharacters = regexp.MustCompile(`[^a-z0-9_]`)
5858

5959
type backupConnection struct {
60-
sync sync.Mutex
6160
immediateCheckpoint bool
6261
waitForArchive bool
6362
conn *sql.Conn
@@ -66,44 +65,6 @@ type backupConnection struct {
6665
err error
6766
}
6867

69-
func (bc *backupConnection) setPhase(phase BackupConnectionPhase, backupName string) {
70-
bc.sync.Lock()
71-
defer bc.sync.Unlock()
72-
if backupName != bc.data.BackupName {
73-
return
74-
}
75-
bc.data.Phase = phase
76-
}
77-
78-
func (bc *backupConnection) closeConnection(backupName string) error {
79-
bc.sync.Lock()
80-
defer bc.sync.Unlock()
81-
if backupName != bc.data.BackupName {
82-
return nil
83-
}
84-
85-
return bc.conn.Close()
86-
}
87-
88-
func (bc *backupConnection) forceCloseConnection() error {
89-
bc.sync.Lock()
90-
defer bc.sync.Unlock()
91-
92-
return bc.conn.Close()
93-
}
94-
95-
func (bc *backupConnection) executeWithLock(backupName string, cb func() error) {
96-
bc.sync.Lock()
97-
defer bc.sync.Unlock()
98-
if backupName != bc.data.BackupName {
99-
return
100-
}
101-
102-
if err := cb(); err != nil {
103-
bc.err = err
104-
}
105-
}
106-
10768
func newBackupConnection(
10869
ctx context.Context,
10970
instance *postgres.Instance,
@@ -139,8 +100,10 @@ func newBackupConnection(
139100
}, nil
140101
}
141102

142-
func (bc *backupConnection) startBackup(ctx context.Context, backupName string) {
103+
func (bc *backupConnection) startBackup(ctx context.Context, sync *sync.Mutex) {
143104
contextLogger := log.FromContext(ctx).WithValues("step", "start")
105+
sync.Lock()
106+
defer sync.Unlock()
144107

145108
if bc == nil {
146109
return
@@ -152,7 +115,7 @@ func (bc *backupConnection) startBackup(ctx context.Context, backupName string)
152115
}
153116
contextLogger.Error(bc.err, "encountered error while starting backup")
154117

155-
if err := bc.closeConnection(backupName); err != nil {
118+
if err := bc.conn.Close(); err != nil {
156119
if !errors.Is(err, sql.ErrConnDone) {
157120
contextLogger.Error(err, "while closing backup connection")
158121
}
@@ -180,25 +143,25 @@ func (bc *backupConnection) startBackup(ctx context.Context, backupName string)
180143
bc.immediateCheckpoint)
181144
}
182145

183-
bc.executeWithLock(backupName, func() error {
184-
if err := row.Scan(&bc.data.BeginLSN); err != nil {
185-
return fmt.Errorf("while scanning backup start: %w", err)
186-
}
187-
bc.data.Phase = Started
146+
if err := row.Scan(&bc.data.BeginLSN); err != nil {
147+
bc.err = fmt.Errorf("while scanning backup start: %w", err)
148+
return
149+
}
188150

189-
return nil
190-
})
151+
bc.data.Phase = Started
191152
}
192153

193-
func (bc *backupConnection) stopBackup(ctx context.Context, backupName string) {
154+
func (bc *backupConnection) stopBackup(ctx context.Context, sync *sync.Mutex) {
194155
contextLogger := log.FromContext(ctx).WithValues("step", "stop")
156+
sync.Lock()
157+
defer sync.Unlock()
195158

196159
if bc == nil {
197160
return
198161
}
199162

200163
defer func() {
201-
if err := bc.closeConnection(backupName); err != nil {
164+
if err := bc.conn.Close(); err != nil {
202165
if !errors.Is(err, sql.ErrConnDone) {
203166
contextLogger.Error(err, "while closing backup connection")
204167
}
@@ -218,12 +181,11 @@ func (bc *backupConnection) stopBackup(ctx context.Context, backupName string) {
218181
"SELECT lsn, labelfile, spcmapfile FROM pg_catalog.pg_backup_stop(wait_for_archive => $1);", bc.waitForArchive)
219182
}
220183

221-
bc.executeWithLock(backupName, func() error {
222-
if err := row.Scan(&bc.data.EndLSN, &bc.data.LabelFile, &bc.data.SpcmapFile); err != nil {
223-
contextLogger.Error(err, "while stopping PostgreSQL physical backup")
224-
return fmt.Errorf("while scanning backup stop: %w", err)
225-
}
226-
bc.data.Phase = Completed
227-
return nil
228-
})
184+
if err := row.Scan(&bc.data.EndLSN, &bc.data.LabelFile, &bc.data.SpcmapFile); err != nil {
185+
contextLogger.Error(err, "while stopping PostgreSQL physical backup")
186+
bc.err = fmt.Errorf("while scanning backup stop: %w", err)
187+
return
188+
}
189+
190+
bc.data.Phase = Completed
229191
}

pkg/management/postgres/webserver/client/local/backup.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,16 @@ import (
3333
// BackupClient is the interface to interact with the backup endpoints
3434
type BackupClient interface {
3535
StatusWithErrors(ctx context.Context, pod *corev1.Pod) (*webserver.Response[webserver.BackupResultData], error)
36-
Start(ctx context.Context, pod *corev1.Pod, sbq webserver.StartBackupRequest) error
37-
Stop(ctx context.Context, pod *corev1.Pod, sbq webserver.StopBackupRequest) error
36+
Start(
37+
ctx context.Context,
38+
pod *corev1.Pod,
39+
sbq webserver.StartBackupRequest,
40+
) (*webserver.Response[webserver.BackupResultData], error)
41+
Stop(
42+
ctx context.Context,
43+
pod *corev1.Pod,
44+
sbq webserver.StopBackupRequest,
45+
) (*webserver.Response[webserver.BackupResultData], error)
3846
}
3947

4048
// backupClientImpl a client to interact with the instance backup endpoints
@@ -59,40 +67,46 @@ func (c *backupClientImpl) StatusWithErrors(
5967
}
6068

6169
// Start runs the pg_start_backup
62-
func (c *backupClientImpl) Start(ctx context.Context, pod *corev1.Pod, sbq webserver.StartBackupRequest) error {
70+
func (c *backupClientImpl) Start(
71+
ctx context.Context,
72+
pod *corev1.Pod,
73+
sbq webserver.StartBackupRequest,
74+
) (*webserver.Response[webserver.BackupResultData], error) {
6375
scheme := remote.GetStatusSchemeFromPod(pod)
6476
httpURL := url.Build(scheme.ToString(), pod.Status.PodIP, url.PathPgModeBackup, url.StatusPort)
6577

6678
// Marshalling the payload to JSON
6779
jsonBody, err := json.Marshal(sbq)
6880
if err != nil {
69-
return fmt.Errorf("failed to marshal start payload: %w", err)
81+
return nil, fmt.Errorf("failed to marshal start payload: %w", err)
7082
}
7183

7284
req, err := http.NewRequestWithContext(ctx, "POST", httpURL, bytes.NewReader(jsonBody))
7385
if err != nil {
74-
return err
86+
return nil, err
7587
}
7688
req.Header.Set("Content-Type", "application/json")
7789

78-
_, err = executeRequestWithError[struct{}](ctx, c.cli, req, false)
79-
return err
90+
return executeRequestWithError[webserver.BackupResultData](ctx, c.cli, req, true)
8091
}
8192

8293
// Stop runs the command pg_stop_backup
83-
func (c *backupClientImpl) Stop(ctx context.Context, pod *corev1.Pod, sbq webserver.StopBackupRequest) error {
94+
func (c *backupClientImpl) Stop(
95+
ctx context.Context,
96+
pod *corev1.Pod,
97+
sbq webserver.StopBackupRequest,
98+
) (*webserver.Response[webserver.BackupResultData], error) {
8499
scheme := remote.GetStatusSchemeFromPod(pod)
85100
httpURL := url.Build(scheme.ToString(), pod.Status.PodIP, url.PathPgModeBackup, url.StatusPort)
86101
// Marshalling the payload to JSON
87102
jsonBody, err := json.Marshal(sbq)
88103
if err != nil {
89-
return fmt.Errorf("failed to marshal stop payload: %w", err)
104+
return nil, fmt.Errorf("failed to marshal stop payload: %w", err)
90105
}
91106

92107
req, err := http.NewRequestWithContext(ctx, "PUT", httpURL, bytes.NewReader(jsonBody))
93108
if err != nil {
94-
return err
109+
return nil, err
95110
}
96-
_, err = executeRequestWithError[webserver.BackupResultData](ctx, c.cli, req, false)
97-
return err
111+
return executeRequestWithError[webserver.BackupResultData](ctx, c.cli, req, true)
98112
}

pkg/management/postgres/webserver/remote.go

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import (
2727
"os"
2828
"os/exec"
2929
"path"
30+
"sync"
3031
"time"
3132

3233
"github.com/cloudnative-pg/machinery/pkg/execlog"
3334
"github.com/cloudnative-pg/machinery/pkg/fileutils"
3435
"github.com/cloudnative-pg/machinery/pkg/log"
36+
"go.uber.org/multierr"
3537
apierrs "k8s.io/apimachinery/pkg/api/errors"
3638
"sigs.k8s.io/controller-runtime/pkg/client"
3739

@@ -46,11 +48,22 @@ import (
4648
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
4749
)
4850

51+
const errCodeAnotherRequestInProgress = "ANOTHER_REQUEST_IN_PROGRESS"
52+
53+
// IsRetryableError checks if the error is retryable
54+
func IsRetryableError(err *Error) bool {
55+
if err == nil {
56+
return false
57+
}
58+
return err.Code == errCodeAnotherRequestInProgress
59+
}
60+
4961
type remoteWebserverEndpoints struct {
5062
typedClient client.Client
5163
instance *postgres.Instance
5264
currentBackup *backupConnection
5365
readinessChecker *readiness.Data
66+
ongoingRequest sync.Mutex
5467
}
5568

5669
// StartBackupRequest the required data to execute the pg_start_backup
@@ -128,6 +141,7 @@ func (ws *remoteWebserverEndpoints) cleanupStaleCollections(ctx context.Context)
128141
log.Warning("Closing stale PostgreSQL backup connection")
129142

130143
if err := bc.conn.Close(); err != nil {
144+
bc.err = multierr.Append(bc.err, err)
131145
log.Error(err, "Error while closing stale PostgreSQL backup connection")
132146
}
133147
bc.data.Phase = Completed
@@ -142,13 +156,13 @@ func (ws *remoteWebserverEndpoints) cleanupStaleCollections(ctx context.Context)
142156
return
143157
}
144158

159+
ws.ongoingRequest.Lock()
160+
defer ws.ongoingRequest.Unlock()
161+
145162
if bc.data.Phase == Completed || bc.data.BackupName == "" {
146163
return
147164
}
148165

149-
bc.sync.Lock()
150-
defer bc.sync.Unlock()
151-
152166
if bc.err != nil {
153167
closeBackupConnection(bc)
154168
return
@@ -317,6 +331,11 @@ func (ws *remoteWebserverEndpoints) updateInstanceManager(
317331
// nolint: gocognit
318332
func (ws *remoteWebserverEndpoints) backup(w http.ResponseWriter, req *http.Request) {
319333
log.Trace("request method", "method", req.Method)
334+
if !ws.ongoingRequest.TryLock() {
335+
sendUnprocessableEntityJSONResponse(w, errCodeAnotherRequestInProgress, "")
336+
return
337+
}
338+
defer ws.ongoingRequest.Unlock()
320339

321340
switch req.Method {
322341
case http.MethodGet:
@@ -351,10 +370,10 @@ func (ws *remoteWebserverEndpoints) backup(w http.ResponseWriter, req *http.Requ
351370
}
352371
}()
353372
if ws.currentBackup != nil {
354-
log.Info("trying to close the current backup connection",
373+
log.Debug("trying to close the current backup connection",
355374
"backupName", ws.currentBackup.data.BackupName,
356375
)
357-
if err := ws.currentBackup.forceCloseConnection(); err != nil {
376+
if err := ws.currentBackup.conn.Close(); err != nil {
358377
if !errors.Is(err, sql.ErrConnDone) {
359378
log.Error(err, "Error while closing backup connection (start)")
360379
}
@@ -371,8 +390,12 @@ func (ws *remoteWebserverEndpoints) backup(w http.ResponseWriter, req *http.Requ
371390
sendUnprocessableEntityJSONResponse(w, "CANNOT_INITIALIZE_CONNECTION", err.Error())
372391
return
373392
}
374-
go ws.currentBackup.startBackup(context.Background(), p.BackupName)
375-
sendJSONResponseWithData(w, 200, struct{}{})
393+
go ws.currentBackup.startBackup(context.Background(), &ws.ongoingRequest)
394+
395+
res := Response[BackupResultData]{
396+
Data: &ws.currentBackup.data,
397+
}
398+
sendJSONResponseWithData(w, 200, res)
376399
return
377400

378401
case http.MethodPut:
@@ -398,8 +421,23 @@ func (ws *remoteWebserverEndpoints) backup(w http.ResponseWriter, req *http.Requ
398421
return
399422
}
400423

424+
if ws.currentBackup.err != nil {
425+
if err := ws.currentBackup.conn.Close(); err != nil {
426+
if !errors.Is(err, sql.ErrConnDone) {
427+
log.Error(err, "Error while closing backup connection (stop)")
428+
}
429+
}
430+
431+
sendUnprocessableEntityJSONResponse(w, "BACKUP_FAILED", ws.currentBackup.err.Error())
432+
return
433+
}
434+
435+
res := Response[BackupResultData]{
436+
Data: &ws.currentBackup.data,
437+
}
438+
401439
if ws.currentBackup.data.Phase == Closing {
402-
sendJSONResponseWithData(w, 200, struct{}{})
440+
sendJSONResponseWithData(w, 200, res)
403441
return
404442
}
405443

@@ -409,19 +447,10 @@ func (ws *remoteWebserverEndpoints) backup(w http.ResponseWriter, req *http.Requ
409447
return
410448
}
411449

412-
if ws.currentBackup.err != nil {
413-
if err := ws.currentBackup.closeConnection(p.BackupName); err != nil {
414-
if !errors.Is(err, sql.ErrConnDone) {
415-
log.Error(err, "Error while closing backup connection (stop)")
416-
}
417-
}
450+
ws.currentBackup.data.Phase = Closing
418451

419-
sendJSONResponseWithData(w, 200, struct{}{})
420-
return
421-
}
422-
ws.currentBackup.setPhase(Closing, p.BackupName)
423-
go ws.currentBackup.stopBackup(context.Background(), p.BackupName)
424-
sendJSONResponseWithData(w, 200, struct{}{})
452+
go ws.currentBackup.stopBackup(context.Background(), &ws.ongoingRequest)
453+
sendJSONResponseWithData(w, 200, res)
425454
return
426455
}
427456
}

pkg/management/postgres/webserver/webserver.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,19 @@ type Response[T interface{}] struct {
5050
Error *Error `json:"error,omitempty"`
5151
}
5252

53-
// EnsureDataIsPresent returns an error if the data is field is nil
54-
func (body Response[T]) EnsureDataIsPresent() error {
55-
status := body.Data
56-
if status != nil {
57-
return nil
58-
}
59-
53+
// GetError returns an error if an error response is detected or if the data
54+
// field is nil
55+
func (body Response[T]) GetError() error {
6056
if body.Error != nil {
6157
return fmt.Errorf("encountered a body error while preparing, code: '%s', message: %s",
6258
body.Error.Code, body.Error.Message)
6359
}
6460

61+
status := body.Data
62+
if status != nil {
63+
return nil
64+
}
65+
6566
return fmt.Errorf("encounteered an empty body while expecting it to not be empty")
6667
}
6768

0 commit comments

Comments
 (0)