Skip to content

Commit 600c721

Browse files
leonardocearmru
authored andcommitted
chore: review
Signed-off-by: Leonardo Cecchi <[email protected]>
1 parent c9c691c commit 600c721

File tree

2 files changed

+71
-8
lines changed

2 files changed

+71
-8
lines changed

docs/examples/minio-store.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ spec:
2121
key: ACCESS_SECRET_KEY
2222
wal:
2323
compression: gzip
24+
maxParallel: 8
2425
data:
2526
additionalCommandArgs:
2627
- "--min-chunk-size=5MB"

internal/cnpgi/common/wal.go

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
1515
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
1616
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
17+
walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals"
1718
"github.com/cloudnative-pg/machinery/pkg/log"
1819
apierrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/types"
@@ -24,6 +25,34 @@ import (
2425
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
2526
)
2627

28+
// ErrMissingPermissions is raised when the sidecar has no
29+
// permission to download the credentials needed to reach
30+
// the object storage.
31+
// This will be fixed by the reconciliation loop in the
32+
// operator plugin.
33+
var ErrMissingPermissions = fmt.Errorf("no permission to download the backup credentials, retrying")
34+
35+
// SpoolManagementError is raised when a spool management
36+
// error has been detected
37+
type SpoolManagementError struct {
38+
walName string
39+
err error
40+
}
41+
42+
// Error implements the error interface
43+
func (e *SpoolManagementError) Error() string {
44+
return fmt.Sprintf(
45+
"while testing the existence of the WAL file (%s) in the spool directory: %v",
46+
e.walName,
47+
e.err.Error(),
48+
)
49+
}
50+
51+
// Unwrap implements the error interface
52+
func (e *SpoolManagementError) Unwrap() error {
53+
return e.err
54+
}
55+
2756
// WALServiceImplementation is the implementation of the WAL Service
2857
type WALServiceImplementation struct {
2958
wal.UnimplementedWALServer
@@ -67,6 +96,10 @@ func (w WALServiceImplementation) Archive(
6796
contextLogger := log.FromContext(ctx)
6897
contextLogger.Debug("starting wal archive")
6998

99+
baseWalName := path.Base(request.GetSourceFileName())
100+
101+
// Step 1: parse the configuration and get the environment variables needed
102+
// for barman-cloud-wal-archive
70103
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
71104
if err != nil {
72105
return nil, err
@@ -87,7 +120,7 @@ func (w WALServiceImplementation) Archive(
87120
)
88121
if err != nil {
89122
if apierrors.IsForbidden(err) {
90-
return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop")
123+
return nil, ErrMissingPermissions
91124
}
92125
return nil, err
93126
}
@@ -103,17 +136,46 @@ func (w WALServiceImplementation) Archive(
103136
return nil, err
104137
}
105138

139+
// Step 2: check if this WAL file has not been already archived
140+
var isDeletedFromSpool bool
141+
isDeletedFromSpool, err = arch.DeleteFromSpool(baseWalName)
142+
if err != nil {
143+
return nil, &SpoolManagementError{
144+
walName: baseWalName,
145+
err: err,
146+
}
147+
}
148+
if isDeletedFromSpool {
149+
contextLogger.Info("Archived WAL file (parallel)",
150+
"walName", baseWalName)
151+
return nil, nil
152+
}
153+
154+
// Step 3: gather the WAL files names to archive
106155
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName)
107156
if err != nil {
108157
return nil, err
109158
}
110-
barmanConfiguration := &objectStore.Spec.Configuration
111-
maxParallel := 1
112-
if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 {
113-
maxParallel = barmanConfiguration.Wal.MaxParallel
114-
}
115-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), maxParallel)
116-
result := arch.ArchiveList(ctx, walList, options)
159+
160+
maxParallel := 1
161+
if objectStore.Spec.Configuration.Wal != nil {
162+
maxParallel = objectStore.Spec.Configuration.Wal.MaxParallel
163+
}
164+
165+
walFilesList := walUtils.GatherReadyWALFiles(
166+
ctx,
167+
walUtils.GatherReadyWALFilesConfig{
168+
MaxResults: maxParallel,
169+
SkipWALs: []string{baseWalName},
170+
PgDataPath: w.PGDataPath,
171+
},
172+
)
173+
174+
// Ensure the requested WAL file is always the first one being
175+
// archived
176+
walFilesList.Ready = append([]string{request.GetSourceFileName()}, walFilesList.Ready...)
177+
178+
result := arch.ArchiveList(ctx, walFilesList.ReadyItemsToSlice(), options)
117179
for _, archiverResult := range result {
118180
if archiverResult.Err != nil {
119181
return nil, archiverResult.Err

0 commit comments

Comments
 (0)