Skip to content

Commit e30edd2

Browse files
leonardocefcanovai
andauthored
feat: separate recovery and cluster object store (#76)
Signed-off-by: Leonardo Cecchi <[email protected]> Signed-off-by: Francesco Canovai <[email protected]> Co-authored-by: Francesco Canovai <[email protected]>
1 parent af60a15 commit e30edd2

File tree

15 files changed

+314
-192
lines changed

15 files changed

+314
-192
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
apiVersion: postgresql.cnpg.io/v1
2+
kind: Cluster
3+
metadata:
4+
name: cluster-restore
5+
spec:
6+
instances: 3
7+
imagePullPolicy: IfNotPresent
8+
9+
bootstrap:
10+
recovery:
11+
source: source
12+
13+
plugins:
14+
- name: barman-cloud.cloudnative-pg.io
15+
parameters:
16+
barmanObjectName: minio-store-bis
17+
18+
externalClusters:
19+
- name: source
20+
plugin:
21+
name: barman-cloud.cloudnative-pg.io
22+
parameters:
23+
barmanObjectName: minio-store
24+
serverName: cluster-example
25+
26+
storage:
27+
size: 1Gi

internal/cmd/instance/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@ func NewCmd() *cobra.Command {
3636
}
3737

3838
_ = viper.BindEnv("namespace", "NAMESPACE")
39-
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4039
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
4140
_ = viper.BindEnv("pod-name", "POD_NAME")
4241
_ = viper.BindEnv("pgdata", "PGDATA")
4342
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
43+
44+
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4445
_ = viper.BindEnv("server-name", "SERVER_NAME")
4546

47+
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
48+
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
49+
4650
return cmd
4751
}

internal/cmd/restore/main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ func NewCmd() *cobra.Command {
2323
"cluster-name",
2424
"pod-name",
2525
"spool-directory",
26-
"barman-object-name",
27-
"server-name",
26+
27+
// IMPORTANT: barman-object-name and server-name are not required
28+
// to restore a cluster.
29+
"recovery-barman-object-name",
30+
"recovery-server-name",
2831
}
2932

3033
for _, k := range requiredSettings {
@@ -42,8 +45,12 @@ func NewCmd() *cobra.Command {
4245
_ = viper.BindEnv("pod-name", "POD_NAME")
4346
_ = viper.BindEnv("pgdata", "PGDATA")
4447
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
48+
4549
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
4650
_ = viper.BindEnv("server-name", "SERVER_NAME")
4751

52+
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
53+
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
54+
4855
return cmd
4956
}

internal/cnpgi/common/wal.go

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@ import (
2424

2525
// WALServiceImplementation is the implementation of the WAL Service
2626
type WALServiceImplementation struct {
27-
ServerName string
28-
BarmanObjectKey client.ObjectKey
27+
wal.UnimplementedWALServer
2928
ClusterObjectKey client.ObjectKey
3029
Client client.Client
3130
InstanceName string
3231
SpoolDirectory string
3332
PGDataPath string
3433
PGWALPath string
35-
wal.UnimplementedWALServer
34+
35+
BarmanObjectKey client.ObjectKey
36+
ServerName string
37+
38+
RecoveryBarmanObjectKey client.ObjectKey
39+
RecoveryServerName string
3640
}
3741

3842
// GetCapabilities implements the WALService interface
@@ -123,22 +127,55 @@ func (w WALServiceImplementation) Restore(
123127
ctx context.Context,
124128
request *wal.WALRestoreRequest,
125129
) (*wal.WALRestoreResult, error) {
126-
contextLogger := log.FromContext(ctx)
127-
startTime := time.Now()
130+
// TODO: build full paths
131+
walName := request.GetSourceWalName()
132+
destinationPath := request.GetDestinationFileName()
128133

129134
var cluster cnpgv1.Cluster
130135
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
131136
return nil, err
132137
}
133138

134139
var objectStore barmancloudv1.ObjectStore
135-
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
136-
return nil, err
140+
var serverName string
141+
142+
switch {
143+
case cluster.IsReplica() && cluster.Status.CurrentPrimary == w.InstanceName:
144+
// Designated primary on replica cluster, using recovery object store
145+
serverName = w.RecoveryServerName
146+
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
147+
return nil, err
148+
}
149+
150+
case cluster.Status.CurrentPrimary == "":
151+
// Recovery from object store, using recovery object store
152+
serverName = w.RecoveryServerName
153+
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
154+
return nil, err
155+
}
156+
157+
default:
158+
// Using cluster object store
159+
serverName = w.ServerName
160+
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
161+
return nil, err
162+
}
137163
}
138164

139-
// TODO: build full paths
140-
walName := request.GetSourceWalName()
141-
destinationPath := request.GetDestinationFileName()
165+
return &wal.WALRestoreResult{}, w.restoreFromBarmanObjectStore(
166+
ctx, &cluster, &objectStore, serverName, walName, destinationPath)
167+
}
168+
169+
func (w WALServiceImplementation) restoreFromBarmanObjectStore(
170+
ctx context.Context,
171+
cluster *cnpgv1.Cluster,
172+
objectStore *barmancloudv1.ObjectStore,
173+
serverName string,
174+
walName string,
175+
destinationPath string,
176+
) error {
177+
contextLogger := log.FromContext(ctx)
178+
startTime := time.Now()
142179

143180
barmanConfiguration := &objectStore.Spec.Configuration
144181

@@ -151,37 +188,37 @@ func (w WALServiceImplementation) Restore(
151188
os.Environ(),
152189
)
153190
if err != nil {
154-
return nil, fmt.Errorf("while getting recover credentials: %w", err)
191+
return fmt.Errorf("while getting recover credentials: %w", err)
155192
}
156193
env = MergeEnv(env, credentialsEnv)
157194

158-
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName)
195+
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName)
159196
if err != nil {
160-
return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
197+
return fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
161198
}
162199

163200
// Create the restorer
164201
var walRestorer *barmanRestorer.WALRestorer
165202
if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil {
166-
return nil, fmt.Errorf("while creating the restorer: %w", err)
203+
return fmt.Errorf("while creating the restorer: %w", err)
167204
}
168205

169206
// Step 1: check if this WAL file is not already in the spool
170207
var wasInSpool bool
171208
if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil {
172-
return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err)
209+
return fmt.Errorf("while restoring a file from the spool directory: %w", err)
173210
}
174211
if wasInSpool {
175212
contextLogger.Info("Restored WAL file from spool (parallel)",
176213
"walName", walName,
177214
)
178-
return nil, nil
215+
return nil
179216
}
180217

181218
// We skip this step if streaming connection is not available
182-
if isStreamingAvailable(&cluster, w.InstanceName) {
219+
if isStreamingAvailable(cluster, w.InstanceName) {
183220
if err := checkEndOfWALStreamFlag(walRestorer); err != nil {
184-
return nil, err
221+
return err
185222
}
186223
}
187224

@@ -194,7 +231,7 @@ func (w WALServiceImplementation) Restore(
194231
if IsWALFile(walName) {
195232
// If this is a regular WAL file, we try to prefetch
196233
if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil {
197-
return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err)
234+
return fmt.Errorf("while generating the list of WAL files to restore: %w", err)
198235
}
199236
} else {
200237
// This is not a regular WAL file, we fetch it directly
@@ -209,18 +246,18 @@ func (w WALServiceImplementation) Restore(
209246
// is the one that PostgreSQL has requested to restore.
210247
// The failure has already been logged in walRestorer.RestoreList method
211248
if walStatus[0].Err != nil {
212-
return nil, walStatus[0].Err
249+
return walStatus[0].Err
213250
}
214251

215252
// We skip this step if streaming connection is not available
216253
endOfWALStream := isEndOfWALStream(walStatus)
217-
if isStreamingAvailable(&cluster, w.InstanceName) && endOfWALStream {
254+
if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream {
218255
contextLogger.Info(
219256
"Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found")
220257

221258
err = walRestorer.SetEndOfWALStream()
222259
if err != nil {
223-
return nil, err
260+
return err
224261
}
225262
}
226263

@@ -241,7 +278,7 @@ func (w WALServiceImplementation) Restore(
241278
"downloadTotalTime", time.Since(downloadStartTime),
242279
"totalTime", time.Since(startTime))
243280

244-
return &wal.WALRestoreResult{}, nil
281+
return nil
245282
}
246283

247284
// Status implements the WALService interface

internal/cnpgi/instance/internal/client/client.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package client
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"sync"
78
"time"
89

@@ -21,32 +22,41 @@ type cachedSecret struct {
2122
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
2223
type ExtendedClient struct {
2324
client.Client
24-
barmanObjectKey client.ObjectKey
25-
cachedSecrets []*cachedSecret
26-
mux *sync.Mutex
27-
ttl int
25+
barmanObjectKeys []client.ObjectKey
26+
cachedSecrets []*cachedSecret
27+
mux *sync.Mutex
28+
ttl int
2829
}
2930

3031
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
3132
func NewExtendedClient(
3233
baseClient client.Client,
33-
objectStoreKey client.ObjectKey,
34+
objectStoreKeys []client.ObjectKey,
3435
) client.Client {
3536
return &ExtendedClient{
36-
Client: baseClient,
37-
barmanObjectKey: objectStoreKey,
38-
mux: &sync.Mutex{},
37+
Client: baseClient,
38+
barmanObjectKeys: objectStoreKeys,
39+
mux: &sync.Mutex{},
3940
}
4041
}
4142

4243
func (e *ExtendedClient) refreshTTL(ctx context.Context) error {
43-
var object v1.ObjectStore
44-
if err := e.Get(ctx, e.barmanObjectKey, &object); err != nil {
45-
return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err)
46-
}
44+
minTTL := math.MaxInt
45+
46+
for _, key := range e.barmanObjectKeys {
47+
var object v1.ObjectStore
48+
49+
if err := e.Get(ctx, key, &object); err != nil {
50+
return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err)
51+
}
4752

48-
e.ttl = object.Spec.InstanceSidecarConfiguration.GetCacheTTL()
53+
currentTTL := object.Spec.InstanceSidecarConfiguration.GetCacheTTL()
54+
if currentTTL < minTTL {
55+
minTTL = currentTTL
56+
}
57+
}
4958

59+
e.ttl = minTTL
5060
return nil
5161
}
5262

internal/cnpgi/instance/internal/client/client_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ var _ = Describe("ExtendedClient Get", func() {
5555
baseClient := fake.NewClientBuilder().
5656
WithScheme(scheme).
5757
WithObjects(secretInClient, objectStore).Build()
58-
extendedClient = NewExtendedClient(baseClient, client.ObjectKeyFromObject(objectStore)).(*ExtendedClient)
58+
extendedClient = NewExtendedClient(baseClient, []client.ObjectKey{
59+
client.ObjectKeyFromObject(objectStore),
60+
}).(*ExtendedClient)
5961
})
6062

6163
It("returns secret from cache if not expired", func(ctx SpecContext) {

0 commit comments

Comments
 (0)