Skip to content

Commit 88fd3e5

Browse files
MichaluxPLleonardocefcanovai
authored
feat(wal): parallel WAL archiving (#262)
This patch allows the plugin to archive WAL files in parallel. Fix: #260 Fix: #266 Signed-off-by: MichaluxPL <[email protected]> Signed-off-by: Leonardo Cecchi <[email protected]> Signed-off-by: Francesco Canovai <[email protected]> Co-authored-by: Leonardo Cecchi <[email protected]> Co-authored-by: Francesco Canovai <[email protected]>
1 parent ed1feaa commit 88fd3e5

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

docs/examples/minio-store.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ spec:
2121
key: ACCESS_SECRET_KEY
2222
wal:
2323
compression: gzip
24+
maxParallel: 8
2425
data:
2526
additionalCommandArgs:
2627
- "--min-chunk-size=5MB"

internal/cnpgi/common/wal.go

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
1515
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
1616
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
17+
walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals"
1718
"github.com/cloudnative-pg/machinery/pkg/log"
1819
apierrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/types"
@@ -24,6 +25,34 @@ import (
2425
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
2526
)
2627

28+
// ErrMissingPermissions is raised when the sidecar has no
29+
// permission to download the credentials needed to reach
30+
// the object storage.
31+
// This will be fixed by the reconciliation loop in the
32+
// operator plugin.
33+
var ErrMissingPermissions = fmt.Errorf("no permission to download the backup credentials, retrying")
34+
35+
// SpoolManagementError is raised when a spool management
36+
// error has been detected
37+
type SpoolManagementError struct {
38+
walName string
39+
err error
40+
}
41+
42+
// Error implements the error interface
43+
func (e *SpoolManagementError) Error() string {
44+
return fmt.Sprintf(
45+
"while testing the existence of the WAL file (%s) in the spool directory: %v",
46+
e.walName,
47+
e.err.Error(),
48+
)
49+
}
50+
51+
// Unwrap implements the error interface
52+
func (e *SpoolManagementError) Unwrap() error {
53+
return e.err
54+
}
55+
2756
// WALServiceImplementation is the implementation of the WAL Service
2857
type WALServiceImplementation struct {
2958
wal.UnimplementedWALServer
@@ -67,6 +96,10 @@ func (w WALServiceImplementation) Archive(
6796
contextLogger := log.FromContext(ctx)
6897
contextLogger.Debug("starting wal archive")
6998

99+
baseWalName := path.Base(request.GetSourceFileName())
100+
101+
// Step 1: parse the configuration and get the environment variables needed
102+
// for barman-cloud-wal-archive
70103
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
71104
if err != nil {
72105
return nil, err
@@ -87,7 +120,7 @@ func (w WALServiceImplementation) Archive(
87120
)
88121
if err != nil {
89122
if apierrors.IsForbidden(err) {
90-
return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop")
123+
return nil, ErrMissingPermissions
91124
}
92125
return nil, err
93126
}
@@ -103,12 +136,48 @@ func (w WALServiceImplementation) Archive(
103136
return nil, err
104137
}
105138

139+
// Step 2: check if this WAL file has not been already archived
140+
var isDeletedFromSpool bool
141+
isDeletedFromSpool, err = arch.DeleteFromSpool(baseWalName)
142+
if err != nil {
143+
return nil, &SpoolManagementError{
144+
walName: baseWalName,
145+
err: err,
146+
}
147+
}
148+
if isDeletedFromSpool {
149+
contextLogger.Info("WAL file already archived, skipping",
150+
"walName", baseWalName)
151+
return nil, nil
152+
}
153+
154+
// Step 3: gather the WAL files names to archive
106155
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName)
107156
if err != nil {
108157
return nil, err
109158
}
110-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1)
111-
result := arch.ArchiveList(ctx, walList, options)
159+
160+
maxParallel := 1
161+
if objectStore.Spec.Configuration.Wal != nil && objectStore.Spec.Configuration.Wal.MaxParallel > 0 {
162+
maxParallel = objectStore.Spec.Configuration.Wal.MaxParallel
163+
}
164+
165+
maxResults := maxParallel - 1
166+
walFilesList := walUtils.GatherReadyWALFiles(
167+
ctx,
168+
walUtils.GatherReadyWALFilesConfig{
169+
MaxResults: maxResults,
170+
SkipWALs: []string{baseWalName},
171+
PgDataPath: w.PGDataPath,
172+
},
173+
)
174+
175+
// Ensure the requested WAL file is always the first one being
176+
// archived
177+
walFilesList.Ready = append([]string{request.GetSourceFileName()}, walFilesList.Ready...)
178+
contextLogger.Debug("WAL files to archive", "walFilesListReady", walFilesList.Ready)
179+
180+
result := arch.ArchiveList(ctx, walFilesList.ReadyItemsToSlice(), options)
112181
for _, archiverResult := range result {
113182
if archiverResult.Err != nil {
114183
return nil, archiverResult.Err

0 commit comments

Comments
 (0)