@@ -24,15 +24,19 @@ import (
2424
2525// WALServiceImplementation is the implementation of the WAL Service
2626type WALServiceImplementation struct {
27- ServerName string
28- BarmanObjectKey client.ObjectKey
27+ wal.UnimplementedWALServer
2928 ClusterObjectKey client.ObjectKey
3029 Client client.Client
3130 InstanceName string
3231 SpoolDirectory string
3332 PGDataPath string
3433 PGWALPath string
35- wal.UnimplementedWALServer
34+
35+ BarmanObjectKey client.ObjectKey
36+ ServerName string
37+
38+ RecoveryBarmanObjectKey client.ObjectKey
39+ RecoveryServerName string
3640}
3741
3842// GetCapabilities implements the WALService interface
@@ -123,22 +127,54 @@ func (w WALServiceImplementation) Restore(
123127 ctx context.Context ,
124128 request * wal.WALRestoreRequest ,
125129) (* wal.WALRestoreResult , error ) {
126- contextLogger := log .FromContext (ctx )
127- startTime := time .Now ()
130+ // TODO: build full paths
131+ walName := request .GetSourceWalName ()
132+ destinationPath := request .GetDestinationFileName ()
128133
129134 var cluster cnpgv1.Cluster
130135 if err := w .Client .Get (ctx , w .ClusterObjectKey , & cluster ); err != nil {
131136 return nil , err
132137 }
133138
134139 var objectStore barmancloudv1.ObjectStore
135- if err := w .Client .Get (ctx , w .BarmanObjectKey , & objectStore ); err != nil {
136- return nil , err
140+ var serverName string
141+
142+ switch {
143+ case cluster .IsReplica () && cluster .Status .CurrentPrimary == w .InstanceName :
144+ // Designated primary on replica cluster, using recovery object store
145+ serverName = w .RecoveryServerName
146+ if err := w .Client .Get (ctx , w .RecoveryBarmanObjectKey , & objectStore ); err != nil {
147+ return nil , err
148+ }
149+
150+ case cluster .Status .CurrentPrimary == "" :
151+ // Recovery from object store, using recovery object store
152+ serverName = w .RecoveryServerName
153+ if err := w .Client .Get (ctx , w .RecoveryBarmanObjectKey , & objectStore ); err != nil {
154+ return nil , err
155+ }
156+
157+ default :
158+ // Using cluster object store
159+ serverName = w .ServerName
160+ if err := w .Client .Get (ctx , w .BarmanObjectKey , & objectStore ); err != nil {
161+ return nil , err
162+ }
137163 }
138164
139- // TODO: build full paths
140- walName := request .GetSourceWalName ()
141- destinationPath := request .GetDestinationFileName ()
165+ return & wal.WALRestoreResult {}, w .RestoreEx (ctx , & cluster , & objectStore , serverName , walName , destinationPath )
166+ }
167+
168+ func (w WALServiceImplementation ) RestoreEx (
169+ ctx context.Context ,
170+ cluster * cnpgv1.Cluster ,
171+ objectStore * barmancloudv1.ObjectStore ,
172+ serverName string ,
173+ walName string ,
174+ destinationPath string ,
175+ ) error {
176+ contextLogger := log .FromContext (ctx )
177+ startTime := time .Now ()
142178
143179 barmanConfiguration := & objectStore .Spec .Configuration
144180
@@ -151,37 +187,37 @@ func (w WALServiceImplementation) Restore(
151187 os .Environ (),
152188 )
153189 if err != nil {
154- return nil , fmt .Errorf ("while getting recover credentials: %w" , err )
190+ return fmt .Errorf ("while getting recover credentials: %w" , err )
155191 }
156192 env = MergeEnv (env , credentialsEnv )
157193
158- options , err := barmanCommand .CloudWalRestoreOptions (ctx , barmanConfiguration , w . ServerName )
194+ options , err := barmanCommand .CloudWalRestoreOptions (ctx , barmanConfiguration , serverName )
159195 if err != nil {
160- return nil , fmt .Errorf ("while getting barman-cloud-wal-restore options: %w" , err )
196+ return fmt .Errorf ("while getting barman-cloud-wal-restore options: %w" , err )
161197 }
162198
163199 // Create the restorer
164200 var walRestorer * barmanRestorer.WALRestorer
165201 if walRestorer , err = barmanRestorer .New (ctx , env , w .SpoolDirectory ); err != nil {
166- return nil , fmt .Errorf ("while creating the restorer: %w" , err )
202+ return fmt .Errorf ("while creating the restorer: %w" , err )
167203 }
168204
169205 // Step 1: check if this WAL file is not already in the spool
170206 var wasInSpool bool
171207 if wasInSpool , err = walRestorer .RestoreFromSpool (walName , destinationPath ); err != nil {
172- return nil , fmt .Errorf ("while restoring a file from the spool directory: %w" , err )
208+ return fmt .Errorf ("while restoring a file from the spool directory: %w" , err )
173209 }
174210 if wasInSpool {
175211 contextLogger .Info ("Restored WAL file from spool (parallel)" ,
176212 "walName" , walName ,
177213 )
178- return nil , nil
214+ return nil
179215 }
180216
181217 // We skip this step if streaming connection is not available
182- if isStreamingAvailable (& cluster , w .InstanceName ) {
218+ if isStreamingAvailable (cluster , w .InstanceName ) {
183219 if err := checkEndOfWALStreamFlag (walRestorer ); err != nil {
184- return nil , err
220+ return err
185221 }
186222 }
187223
@@ -194,7 +230,7 @@ func (w WALServiceImplementation) Restore(
194230 if IsWALFile (walName ) {
195231 // If this is a regular WAL file, we try to prefetch
196232 if walFilesList , err = gatherWALFilesToRestore (walName , maxParallel ); err != nil {
197- return nil , fmt .Errorf ("while generating the list of WAL files to restore: %w" , err )
233+ return fmt .Errorf ("while generating the list of WAL files to restore: %w" , err )
198234 }
199235 } else {
200236 // This is not a regular WAL file, we fetch it directly
@@ -209,18 +245,18 @@ func (w WALServiceImplementation) Restore(
209245 // is the one that PostgreSQL has requested to restore.
210246 // The failure has already been logged in walRestorer.RestoreList method
211247 if walStatus [0 ].Err != nil {
212- return nil , walStatus [0 ].Err
248+ return walStatus [0 ].Err
213249 }
214250
215251 // We skip this step if streaming connection is not available
216252 endOfWALStream := isEndOfWALStream (walStatus )
217- if isStreamingAvailable (& cluster , w .InstanceName ) && endOfWALStream {
253+ if isStreamingAvailable (cluster , w .InstanceName ) && endOfWALStream {
218254 contextLogger .Info (
219255 "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found" )
220256
221257 err = walRestorer .SetEndOfWALStream ()
222258 if err != nil {
223- return nil , err
259+ return err
224260 }
225261 }
226262
@@ -241,7 +277,7 @@ func (w WALServiceImplementation) Restore(
241277 "downloadTotalTime" , time .Since (downloadStartTime ),
242278 "totalTime" , time .Since (startTime ))
243279
244- return & wal. WALRestoreResult {}, nil
280+ return nil
245281}
246282
247283// Status implements the WALService interface
0 commit comments