Skip to content

Commit d5d0008

Browse files
committed
feat: separate recovery and cluster object store
Signed-off-by: Leonardo Cecchi <[email protected]>
1 parent af60a15 commit d5d0008

File tree

20 files changed

+408
-192
lines changed

20 files changed

+408
-192
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
apiVersion: postgresql.cnpg.io/v1
2+
kind: Cluster
3+
metadata:
4+
name: cluster-restore
5+
spec:
6+
instances: 3
7+
imagePullPolicy: IfNotPresent
8+
9+
bootstrap:
10+
recovery:
11+
source: source
12+
13+
plugins:
14+
- name: barman-cloud.cloudnative-pg.io
15+
parameters:
16+
barmanObjectName: minio-store-bis
17+
18+
externalClusters:
19+
- name: source
20+
plugin:
21+
name: barman-cloud.cloudnative-pg.io
22+
parameters:
23+
barmanObjectName: minio-store
24+
serverName: cluster-example
25+
26+
storage:
27+
size: 1Gi

docs/examples/minio-store-bis.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
apiVersion: barmancloud.cnpg.io/v1
2+
kind: ObjectStore
3+
metadata:
4+
name: minio-store-bis
5+
spec:
6+
configuration:
7+
destinationPath: s3://backups/
8+
endpointURL: http://minio-bis:9000
9+
s3Credentials:
10+
accessKeyId:
11+
name: minio-bis
12+
key: ACCESS_KEY_ID
13+
secretAccessKey:
14+
name: minio-bis
15+
key: ACCESS_SECRET_KEY
16+
wal:
17+
compression: gzip
18+
data:
19+
additionalCommandArgs:
20+
- "--min-chunk-size=5MB"
21+
- "--read-timeout=60"
22+
- "-vv"
23+
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: minio-bis
5+
labels:
6+
app: minio-bis
7+
spec:
8+
replicas: 1
9+
selector:
10+
matchLabels:
11+
app: minio-bis
12+
template:
13+
metadata:
14+
labels:
15+
app: minio-bis
16+
spec:
17+
containers:
18+
- name: minio
19+
image: minio/minio
20+
ports:
21+
- containerPort: 9000
22+
volumeMounts:
23+
- mountPath: /data
24+
name: data
25+
args:
26+
- server
27+
- /data
28+
env:
29+
- name: MINIO_ACCESS_KEY
30+
valueFrom:
31+
secretKeyRef:
32+
name: minio-bis
33+
key: ACCESS_KEY_ID
34+
- name: MINIO_SECRET_KEY
35+
valueFrom:
36+
secretKeyRef:
37+
name: minio-bis
38+
key: ACCESS_SECRET_KEY
39+
volumes:
40+
- name: data
41+
persistentVolumeClaim:
42+
claimName: minio-bis

docs/minio-bis/minio-pvc.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
apiVersion: v1
2+
kind: PersistentVolumeClaim
3+
metadata:
4+
name: minio-bis
5+
spec:
6+
accessModes:
7+
- ReadWriteOnce
8+
volumeMode: Filesystem
9+
resources:
10+
requests:
11+
storage: 1Gi

docs/minio-bis/minio-secret.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
apiVersion: v1
2+
data:
3+
ACCESS_KEY_ID: dVo5YWVndW9OZ29vY2g4bG9odHU4aXRlaTJhaHY0ZGE=
4+
ACCESS_SECRET_KEY: ZWV6b2hkOFNpbm9oeG9od2VpbmdvbjhhaXI1T2h5b2g=
5+
kind: Secret
6+
metadata:
7+
name: minio-bis

docs/minio-bis/minio-service.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
apiVersion: v1
2+
kind: Service
3+
metadata:
4+
name: minio-bis
5+
spec:
6+
selector:
7+
app: minio-bis
8+
ports:
9+
- protocol: TCP
10+
port: 9000
11+
targetPort: 9000

internal/cmd/instance/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@ func NewCmd() *cobra.Command {
3636
}
3737

3838
_ = viper.BindEnv("namespace", "NAMESPACE")
39-
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4039
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
4140
_ = viper.BindEnv("pod-name", "POD_NAME")
4241
_ = viper.BindEnv("pgdata", "PGDATA")
4342
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
43+
44+
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4445
_ = viper.BindEnv("server-name", "SERVER_NAME")
4546

47+
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
48+
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
49+
4650
return cmd
4751
}

internal/cmd/restore/main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ func NewCmd() *cobra.Command {
2323
"cluster-name",
2424
"pod-name",
2525
"spool-directory",
26-
"barman-object-name",
27-
"server-name",
26+
27+
// IMPORTANT: barman-object-name and server-name are not required
28+
// to restore a cluster.
29+
"recovery-barman-object-name",
30+
"recovery-server-name",
2831
}
2932

3033
for _, k := range requiredSettings {
@@ -42,8 +45,12 @@ func NewCmd() *cobra.Command {
4245
_ = viper.BindEnv("pod-name", "POD_NAME")
4346
_ = viper.BindEnv("pgdata", "PGDATA")
4447
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
48+
4549
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4650
_ = viper.BindEnv("server-name", "SERVER_NAME")
4751

52+
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
53+
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
54+
4855
return cmd
4956
}

internal/cnpgi/common/wal.go

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@ import (
2424

2525
// WALServiceImplementation is the implementation of the WAL Service
2626
type WALServiceImplementation struct {
27-
ServerName string
28-
BarmanObjectKey client.ObjectKey
27+
wal.UnimplementedWALServer
2928
ClusterObjectKey client.ObjectKey
3029
Client client.Client
3130
InstanceName string
3231
SpoolDirectory string
3332
PGDataPath string
3433
PGWALPath string
35-
wal.UnimplementedWALServer
34+
35+
BarmanObjectKey client.ObjectKey
36+
ServerName string
37+
38+
RecoveryBarmanObjectKey client.ObjectKey
39+
RecoveryServerName string
3640
}
3741

3842
// GetCapabilities implements the WALService interface
@@ -123,22 +127,55 @@ func (w WALServiceImplementation) Restore(
123127
ctx context.Context,
124128
request *wal.WALRestoreRequest,
125129
) (*wal.WALRestoreResult, error) {
126-
contextLogger := log.FromContext(ctx)
127-
startTime := time.Now()
130+
// TODO: build full paths
131+
walName := request.GetSourceWalName()
132+
destinationPath := request.GetDestinationFileName()
128133

129134
var cluster cnpgv1.Cluster
130135
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
131136
return nil, err
132137
}
133138

134139
var objectStore barmancloudv1.ObjectStore
135-
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
136-
return nil, err
140+
var serverName string
141+
142+
switch {
143+
case cluster.IsReplica() && cluster.Status.CurrentPrimary == w.InstanceName:
144+
// Designated primary on replica cluster, using recovery object store
145+
serverName = w.RecoveryServerName
146+
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
147+
return nil, err
148+
}
149+
150+
case cluster.Status.CurrentPrimary == "":
151+
// Recovery from object store, using recovery object store
152+
serverName = w.RecoveryServerName
153+
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
154+
return nil, err
155+
}
156+
157+
default:
158+
// Using cluster object store
159+
serverName = w.ServerName
160+
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
161+
return nil, err
162+
}
137163
}
138164

139-
// TODO: build full paths
140-
walName := request.GetSourceWalName()
141-
destinationPath := request.GetDestinationFileName()
165+
return &wal.WALRestoreResult{}, w.restoreFromBarmanObjectStore(
166+
ctx, &cluster, &objectStore, serverName, walName, destinationPath)
167+
}
168+
169+
func (w WALServiceImplementation) restoreFromBarmanObjectStore(
170+
ctx context.Context,
171+
cluster *cnpgv1.Cluster,
172+
objectStore *barmancloudv1.ObjectStore,
173+
serverName string,
174+
walName string,
175+
destinationPath string,
176+
) error {
177+
contextLogger := log.FromContext(ctx)
178+
startTime := time.Now()
142179

143180
barmanConfiguration := &objectStore.Spec.Configuration
144181

@@ -151,37 +188,37 @@ func (w WALServiceImplementation) Restore(
151188
os.Environ(),
152189
)
153190
if err != nil {
154-
return nil, fmt.Errorf("while getting recover credentials: %w", err)
191+
return fmt.Errorf("while getting recover credentials: %w", err)
155192
}
156193
env = MergeEnv(env, credentialsEnv)
157194

158-
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName)
195+
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName)
159196
if err != nil {
160-
return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
197+
return fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
161198
}
162199

163200
// Create the restorer
164201
var walRestorer *barmanRestorer.WALRestorer
165202
if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil {
166-
return nil, fmt.Errorf("while creating the restorer: %w", err)
203+
return fmt.Errorf("while creating the restorer: %w", err)
167204
}
168205

169206
// Step 1: check if this WAL file is not already in the spool
170207
var wasInSpool bool
171208
if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil {
172-
return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err)
209+
return fmt.Errorf("while restoring a file from the spool directory: %w", err)
173210
}
174211
if wasInSpool {
175212
contextLogger.Info("Restored WAL file from spool (parallel)",
176213
"walName", walName,
177214
)
178-
return nil, nil
215+
return nil
179216
}
180217

181218
// We skip this step if streaming connection is not available
182-
if isStreamingAvailable(&cluster, w.InstanceName) {
219+
if isStreamingAvailable(cluster, w.InstanceName) {
183220
if err := checkEndOfWALStreamFlag(walRestorer); err != nil {
184-
return nil, err
221+
return err
185222
}
186223
}
187224

@@ -194,7 +231,7 @@ func (w WALServiceImplementation) Restore(
194231
if IsWALFile(walName) {
195232
// If this is a regular WAL file, we try to prefetch
196233
if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil {
197-
return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err)
234+
return fmt.Errorf("while generating the list of WAL files to restore: %w", err)
198235
}
199236
} else {
200237
// This is not a regular WAL file, we fetch it directly
@@ -209,18 +246,18 @@ func (w WALServiceImplementation) Restore(
209246
// is the one that PostgreSQL has requested to restore.
210247
// The failure has already been logged in walRestorer.RestoreList method
211248
if walStatus[0].Err != nil {
212-
return nil, walStatus[0].Err
249+
return walStatus[0].Err
213250
}
214251

215252
// We skip this step if streaming connection is not available
216253
endOfWALStream := isEndOfWALStream(walStatus)
217-
if isStreamingAvailable(&cluster, w.InstanceName) && endOfWALStream {
254+
if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream {
218255
contextLogger.Info(
219256
"Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found")
220257

221258
err = walRestorer.SetEndOfWALStream()
222259
if err != nil {
223-
return nil, err
260+
return err
224261
}
225262
}
226263

@@ -241,7 +278,7 @@ func (w WALServiceImplementation) Restore(
241278
"downloadTotalTime", time.Since(downloadStartTime),
242279
"totalTime", time.Since(startTime))
243280

244-
return &wal.WALRestoreResult{}, nil
281+
return nil
245282
}
246283

247284
// Status implements the WALService interface

0 commit comments

Comments
 (0)