Skip to content

Commit 06197f9

Browse files
FIX (chunking): Add backuping chunk by chunk without buffering in RAM and improve cancelation process
1 parent fe72e9e commit 06197f9

File tree

13 files changed

+692
-106
lines changed

13 files changed

+692
-106
lines changed

backend/internal/features/backups/backups/backup_context_manager.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,59 @@ package backups
22

33
import (
44
"context"
5-
"errors"
65
"sync"
76

87
"github.com/google/uuid"
98
)
109

1110
type BackupContextManager struct {
12-
mu sync.RWMutex
13-
cancelFuncs map[uuid.UUID]context.CancelFunc
11+
mu sync.RWMutex
12+
cancelFuncs map[uuid.UUID]context.CancelFunc
13+
cancelledBackups map[uuid.UUID]bool
1414
}
1515

1616
func NewBackupContextManager() *BackupContextManager {
1717
return &BackupContextManager{
18-
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
18+
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
19+
cancelledBackups: make(map[uuid.UUID]bool),
1920
}
2021
}
2122

2223
func (m *BackupContextManager) RegisterBackup(backupID uuid.UUID, cancelFunc context.CancelFunc) {
2324
m.mu.Lock()
2425
defer m.mu.Unlock()
2526
m.cancelFuncs[backupID] = cancelFunc
27+
delete(m.cancelledBackups, backupID)
2628
}
2729

2830
func (m *BackupContextManager) CancelBackup(backupID uuid.UUID) error {
2931
m.mu.Lock()
3032
defer m.mu.Unlock()
3133

34+
if m.cancelledBackups[backupID] {
35+
return nil
36+
}
37+
3238
cancelFunc, exists := m.cancelFuncs[backupID]
33-
if !exists {
34-
return errors.New("backup is not in progress or already completed")
39+
if exists {
40+
cancelFunc()
41+
delete(m.cancelFuncs, backupID)
3542
}
3643

37-
cancelFunc()
38-
delete(m.cancelFuncs, backupID)
44+
m.cancelledBackups[backupID] = true
3945

4046
return nil
4147
}
4248

49+
func (m *BackupContextManager) IsCancelled(backupID uuid.UUID) bool {
50+
m.mu.RLock()
51+
defer m.mu.RUnlock()
52+
return m.cancelledBackups[backupID]
53+
}
54+
4355
func (m *BackupContextManager) UnregisterBackup(backupID uuid.UUID) {
4456
m.mu.Lock()
4557
defer m.mu.Unlock()
4658
delete(m.cancelFuncs, backupID)
59+
delete(m.cancelledBackups, backupID)
4760
}

backend/internal/features/backups/backups/controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package backups
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"io"
@@ -701,7 +702,7 @@ func createTestBackup(
701702
dummyContent := []byte("dummy backup content for testing")
702703
reader := strings.NewReader(string(dummyContent))
703704
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
704-
if err := storages[0].SaveFile(encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
705+
if err := storages[0].SaveFile(context.Background(), encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
705706
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
706707
}
707708

backend/internal/features/backups/backups/service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,12 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) {
275275
errMsg := err.Error()
276276

277277
// Check if backup was cancelled (not due to shutdown)
278-
if strings.Contains(errMsg, "backup cancelled") && !strings.Contains(errMsg, "shutdown") {
278+
isCancelled := strings.Contains(errMsg, "backup cancelled") ||
279+
strings.Contains(errMsg, "context canceled") ||
280+
errors.Is(err, context.Canceled)
281+
isShutdown := strings.Contains(errMsg, "shutdown")
282+
283+
if isCancelled && !isShutdown {
279284
backup.Status = BackupStatusCanceled
280285
backup.BackupDurationMs = time.Since(start).Milliseconds()
281286
backup.BackupSizeMb = 0

backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ type CreatePostgresqlBackupUsecase struct {
4545
fieldEncryptor encryption.FieldEncryptor
4646
}
4747

48+
type writeResult struct {
49+
bytesWritten int
50+
writeErr error
51+
}
52+
4853
// Execute creates a backup of the database
4954
func (uc *CreatePostgresqlBackupUsecase) Execute(
5055
ctx context.Context,
@@ -172,7 +177,7 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
172177
// Start streaming into storage in its own goroutine
173178
saveErrCh := make(chan error, 1)
174179
go func() {
175-
saveErr := storage.SaveFile(uc.fieldEncryptor, uc.logger, backupID, storageReader)
180+
saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader)
176181
saveErrCh <- saveErr
177182
}()
178183

@@ -195,12 +200,10 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
195200
copyResultCh <- err
196201
}()
197202

198-
// Wait for the copy to finish first, then the dump process
199203
copyErr := <-copyResultCh
200204
bytesWritten := <-bytesWrittenCh
201205
waitErr := cmd.Wait()
202206

203-
// Check for shutdown or cancellation before finalizing
204207
select {
205208
case <-ctx.Done():
206209
uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh)
@@ -213,7 +216,6 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
213216
return nil, err
214217
}
215218

216-
// Wait until storage ends reading
217219
saveErr := <-saveErrCh
218220
stderrOutput := <-stderrCh
219221

@@ -267,7 +269,23 @@ func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck(
267269

268270
bytesRead, readErr := src.Read(buf)
269271
if bytesRead > 0 {
270-
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
272+
writeResultCh := make(chan writeResult, 1)
273+
go func() {
274+
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
275+
writeResultCh <- writeResult{bytesWritten, writeErr}
276+
}()
277+
278+
var bytesWritten int
279+
var writeErr error
280+
281+
select {
282+
case <-ctx.Done():
283+
return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err())
284+
case result := <-writeResultCh:
285+
bytesWritten = result.bytesWritten
286+
writeErr = result.writeErr
287+
}
288+
271289
if bytesWritten < 0 || bytesRead < bytesWritten {
272290
bytesWritten = 0
273291
if writeErr == nil {
@@ -354,6 +372,9 @@ func (uc *CreatePostgresqlBackupUsecase) createBackupContext(
354372
select {
355373
case <-ctx.Done():
356374
return
375+
case <-parentCtx.Done():
376+
cancel()
377+
return
357378
case <-ticker.C:
358379
if config.IsShouldShutdown() {
359380
cancel()

backend/internal/features/restores/controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package restores
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"io"
@@ -340,7 +341,7 @@ func createTestBackup(
340341
dummyContent := []byte("dummy backup content for testing")
341342
reader := strings.NewReader(string(dummyContent))
342343
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
343-
if err := storages[0].SaveFile(fieldEncryptor, logger, backup.ID, reader); err != nil {
344+
if err := storages[0].SaveFile(context.Background(), fieldEncryptor, logger, backup.ID, reader); err != nil {
344345
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
345346
}
346347

backend/internal/features/storages/interfaces.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package storages
22

33
import (
4+
"context"
45
"io"
56
"log/slog"
67
"postgresus-backend/internal/util/encryption"
@@ -10,6 +11,7 @@ import (
1011

1112
type StorageFileSaver interface {
1213
SaveFile(
14+
ctx context.Context,
1315
encryptor encryption.FieldEncryptor,
1416
logger *slog.Logger,
1517
fileID uuid.UUID,

backend/internal/features/storages/model.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package storages
22

33
import (
4+
"context"
45
"errors"
56
"io"
67
"log/slog"
@@ -30,12 +31,13 @@ type Storage struct {
3031
}
3132

3233
func (s *Storage) SaveFile(
34+
ctx context.Context,
3335
encryptor encryption.FieldEncryptor,
3436
logger *slog.Logger,
3537
fileID uuid.UUID,
3638
file io.Reader,
3739
) error {
38-
err := s.getSpecificStorage().SaveFile(encryptor, logger, fileID, file)
40+
err := s.getSpecificStorage().SaveFile(ctx, encryptor, logger, fileID, file)
3941
if err != nil {
4042
lastSaveError := err.Error()
4143
s.LastSaveError = &lastSaveError

backend/internal/features/storages/model_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
167167
fileID := uuid.New()
168168

169169
err = tc.storage.SaveFile(
170+
context.Background(),
170171
encryptor,
171172
logger.GetLogger(),
172173
fileID,
@@ -189,6 +190,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
189190

190191
fileID := uuid.New()
191192
err = tc.storage.SaveFile(
193+
context.Background(),
192194
encryptor,
193195
logger.GetLogger(),
194196
fileID,
@@ -238,7 +240,7 @@ func setupS3Container(ctx context.Context) (*S3Container, error) {
238240
secretKey := "testpassword"
239241
bucketName := "test-bucket"
240242
region := "us-east-1"
241-
endpoint := fmt.Sprintf("localhost:%s", env.TestMinioPort)
243+
endpoint := fmt.Sprintf("127.0.0.1:%s", env.TestMinioPort)
242244

243245
// Create MinIO client and ensure bucket exists
244246
minioClient, err := minio.New(endpoint, &minio.Options{

0 commit comments

Comments
 (0)