Skip to content

Commit 766640a

Browse files
committed
chore: review
Signed-off-by: Leonardo Cecchi <[email protected]>
1 parent b94cdc4 commit 766640a

File tree

2 files changed

+65
-8
lines changed

2 files changed

+65
-8
lines changed

docs/examples/minio-store.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ spec:
2121
key: ACCESS_SECRET_KEY
2222
wal:
2323
compression: gzip
24+
maxParallel: 8
2425
data:
2526
additionalCommandArgs:
2627
- "--min-chunk-size=5MB"

internal/cnpgi/common/wal.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
1515
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
1616
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
17+
walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals"
1718
"github.com/cloudnative-pg/machinery/pkg/log"
1819
apierrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/types"
@@ -24,6 +25,28 @@ import (
2425
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
2526
)
2627

28+
// ErrMissingPermissions is raised when the sidecar has no
29+
// permission to download the credentials needed to reach
30+
// the object storage.
31+
// This will be fixed by the reconciliation loop in the
32+
// operator plugin.
33+
var ErrMissingPermissions = fmt.Errorf("no permission to download the backup credentials, retrying")
34+
35+
type SpoolManagementError struct {
36+
walName string
37+
err error
38+
}
39+
40+
// Error implements the error interface
41+
func (e *SpoolManagementError) Error() string {
42+
return fmt.Sprintf("while testing the existence of the WAL file in the spool directory: %v", e.err.Error())
43+
}
44+
45+
// Unwrap implements the error interface
46+
func (e *SpoolManagementError) Unwrap() error {
47+
return e.err
48+
}
49+
2750
// WALServiceImplementation is the implementation of the WAL Service
2851
type WALServiceImplementation struct {
2952
wal.UnimplementedWALServer
@@ -67,6 +90,10 @@ func (w WALServiceImplementation) Archive(
6790
contextLogger := log.FromContext(ctx)
6891
contextLogger.Debug("starting wal archive")
6992

93+
baseWalName := path.Base(request.GetSourceFileName())
94+
95+
// Step 1: parse the configuration and get the environment variables needed
96+
// for barman-cloud-wal-archive
7097
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
7198
if err != nil {
7299
return nil, err
@@ -87,7 +114,7 @@ func (w WALServiceImplementation) Archive(
87114
)
88115
if err != nil {
89116
if apierrors.IsForbidden(err) {
90-
return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop")
117+
return nil, ErrMissingPermissions
91118
}
92119
return nil, err
93120
}
@@ -103,17 +130,46 @@ func (w WALServiceImplementation) Archive(
103130
return nil, err
104131
}
105132

133+
// Step 2: check if this WAL file has not been already archived
134+
var isDeletedFromSpool bool
135+
isDeletedFromSpool, err = arch.DeleteFromSpool(baseWalName)
136+
if err != nil {
137+
return nil, &SpoolManagementError{
138+
walName: baseWalName,
139+
err: err,
140+
}
141+
}
142+
if isDeletedFromSpool {
143+
contextLogger.Info("Archived WAL file (parallel)",
144+
"walName", baseWalName)
145+
return nil, nil
146+
}
147+
148+
// Step 3: gather the WAL files names to archive
106149
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName)
107150
if err != nil {
108151
return nil, err
109152
}
110-
barmanConfiguration := &objectStore.Spec.Configuration
111-
maxParallel := 1
112-
if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 {
113-
maxParallel = barmanConfiguration.Wal.MaxParallel
114-
}
115-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), maxParallel)
116-
result := arch.ArchiveList(ctx, walList, options)
153+
154+
maxParallel := 1
155+
if objectStore.Spec.Configuration.Wal != nil {
156+
maxParallel = objectStore.Spec.Configuration.Wal.MaxParallel
157+
}
158+
159+
walFilesList := walUtils.GatherReadyWALFiles(
160+
ctx,
161+
walUtils.GatherReadyWALFilesConfig{
162+
MaxResults: maxParallel,
163+
SkipWALs: []string{baseWalName},
164+
PgDataPath: w.PGDataPath,
165+
},
166+
)
167+
168+
// Ensure the requested WAL file is always the first one being
169+
// archived
170+
walFilesList.Ready = append([]string{request.GetSourceFileName()}, walFilesList.Ready...)
171+
172+
result := arch.ArchiveList(ctx, walFilesList.ReadyItemsToSlice(), options)
117173
for _, archiverResult := range result {
118174
if archiverResult.Err != nil {
119175
return nil, archiverResult.Err

0 commit comments

Comments
 (0)