@@ -4,25 +4,20 @@ import (
4
4
"bytes"
5
5
"context"
6
6
"fmt"
7
- "runtime"
8
7
"sync"
9
8
"sync/atomic"
10
9
"time"
11
10
12
11
"go.mongodb.org/mongo-driver/bson/primitive"
13
12
"go.mongodb.org/mongo-driver/mongo"
14
- "golang.org/x/sync/errgroup"
15
13
16
- "github.com/percona/percona-backup-mongodb/internal"
17
- "github.com/percona/percona-backup-mongodb/internal/backup"
18
14
"github.com/percona/percona-backup-mongodb/internal/config"
19
15
"github.com/percona/percona-backup-mongodb/internal/connect"
20
16
"github.com/percona/percona-backup-mongodb/internal/ctrl"
21
17
"github.com/percona/percona-backup-mongodb/internal/defs"
22
18
"github.com/percona/percona-backup-mongodb/internal/errors"
23
19
"github.com/percona/percona-backup-mongodb/internal/lock"
24
20
"github.com/percona/percona-backup-mongodb/internal/log"
25
- "github.com/percona/percona-backup-mongodb/internal/oplog/oplogtmp"
26
21
"github.com/percona/percona-backup-mongodb/internal/resync"
27
22
"github.com/percona/percona-backup-mongodb/internal/storage"
28
23
"github.com/percona/percona-backup-mongodb/internal/topo"
@@ -158,244 +153,6 @@ func (a *Agent) Start(ctx context.Context) error {
158
153
}
159
154
}
160
155
161
- // Delete deletes backup(s) from the store and cleans up its metadata
162
- func (a * Agent ) Delete (ctx context.Context , d * ctrl.DeleteBackupCmd , opid ctrl.OPID , ep config.Epoch ) {
163
- logger := log .FromContext (ctx )
164
-
165
- if d == nil {
166
- l := logger .NewEvent (string (ctrl .CmdDeleteBackup ), "" , opid .String (), ep .TS ())
167
- l .Error ("missed command" )
168
- return
169
- }
170
-
171
- l := logger .NewEvent (string (ctrl .CmdDeleteBackup ), "" , opid .String (), ep .TS ())
172
- ctx = log .SetLogEventToContext (ctx , l )
173
-
174
- nodeInfo , err := topo .GetNodeInfoExt (ctx , a .nodeConn )
175
- if err != nil {
176
- l .Error ("get node info data: %v" , err )
177
- return
178
- }
179
-
180
- if ! nodeInfo .IsLeader () {
181
- l .Info ("not a member of the leader rs, skipping" )
182
- return
183
- }
184
-
185
- epts := ep .TS ()
186
- lock := lock .NewOpLock (a .leadConn , lock.LockHeader {
187
- Replset : a .brief .SetName ,
188
- Node : a .brief .Me ,
189
- Type : ctrl .CmdDeleteBackup ,
190
- OPID : opid .String (),
191
- Epoch : & epts ,
192
- })
193
-
194
- got , err := a .acquireLock (ctx , lock , l , nil )
195
- if err != nil {
196
- l .Error ("acquire lock: %v" , err )
197
- return
198
- }
199
- if ! got {
200
- l .Debug ("skip: lock not acquired" )
201
- return
202
- }
203
- defer func () {
204
- if err := lock .Release (); err != nil {
205
- l .Error ("release lock: %v" , err )
206
- }
207
- }()
208
-
209
- switch {
210
- case d .OlderThan > 0 :
211
- t := time .Unix (d .OlderThan , 0 ).UTC ()
212
- obj := t .Format ("2006-01-02T15:04:05Z" )
213
- l = logger .NewEvent (string (ctrl .CmdDeleteBackup ), obj , opid .String (), ep .TS ())
214
- l .Info ("deleting backups older than %v" , t )
215
- err := backup .DeleteOlderThan (ctx , a .leadConn , t , l )
216
- if err != nil {
217
- l .Error ("deleting: %v" , err )
218
- return
219
- }
220
- case d .Backup != "" :
221
- l = logger .NewEvent (string (ctrl .CmdDeleteBackup ), d .Backup , opid .String (), ep .TS ())
222
- l .Info ("deleting backup" )
223
- err := backup .DeleteBackup (ctx , a .leadConn , d .Backup , l )
224
- if err != nil {
225
- l .Error ("deleting: %v" , err )
226
- return
227
- }
228
- default :
229
- l .Error ("malformed command received in Delete() of backup: %v" , d )
230
- return
231
- }
232
-
233
- l .Info ("done" )
234
- }
235
-
236
- // DeletePITR deletes PITR chunks from the store and cleans up its metadata
237
- func (a * Agent ) DeletePITR (ctx context.Context , d * ctrl.DeletePITRCmd , opid ctrl.OPID , ep config.Epoch ) {
238
- logger := log .FromContext (ctx )
239
-
240
- if d == nil {
241
- l := logger .NewEvent (string (ctrl .CmdDeletePITR ), "" , opid .String (), ep .TS ())
242
- l .Error ("missed command" )
243
- return
244
- }
245
-
246
- l := logger .NewEvent (string (ctrl .CmdDeletePITR ), "" , opid .String (), ep .TS ())
247
- ctx = log .SetLogEventToContext (ctx , l )
248
-
249
- nodeInfo , err := topo .GetNodeInfoExt (ctx , a .nodeConn )
250
- if err != nil {
251
- l .Error ("get node info data: %v" , err )
252
- return
253
- }
254
-
255
- if ! nodeInfo .IsLeader () {
256
- l .Info ("not a member of the leader rs, skipping" )
257
- return
258
- }
259
-
260
- epts := ep .TS ()
261
- lock := lock .NewOpLock (a .leadConn , lock.LockHeader {
262
- Replset : a .brief .SetName ,
263
- Node : a .brief .Me ,
264
- Type : ctrl .CmdDeletePITR ,
265
- OPID : opid .String (),
266
- Epoch : & epts ,
267
- })
268
-
269
- got , err := a .acquireLock (ctx , lock , l , nil )
270
- if err != nil {
271
- l .Error ("acquire lock: %v" , err )
272
- return
273
- }
274
- if ! got {
275
- l .Debug ("skip: lock not acquired" )
276
- return
277
- }
278
- defer func () {
279
- if err := lock .Release (); err != nil {
280
- l .Error ("release lock: %v" , err )
281
- }
282
- }()
283
-
284
- if d .OlderThan > 0 {
285
- t := time .Unix (d .OlderThan , 0 ).UTC ()
286
- obj := t .Format ("2006-01-02T15:04:05Z" )
287
- l = logger .NewEvent (string (ctrl .CmdDeletePITR ), obj , opid .String (), ep .TS ())
288
- l .Info ("deleting pitr chunks older than %v" , t )
289
- err = oplogtmp .DeletePITR (ctx , a .leadConn , & t , l )
290
- } else {
291
- l = logger .NewEvent (string (ctrl .CmdDeletePITR ), "_all_" , opid .String (), ep .TS ())
292
- l .Info ("deleting all pitr chunks" )
293
- err = oplogtmp .DeletePITR (ctx , a .leadConn , nil , l )
294
- }
295
- if err != nil {
296
- l .Error ("deleting: %v" , err )
297
- return
298
- }
299
-
300
- l .Info ("done" )
301
- }
302
-
303
- // Cleanup deletes backups and PITR chunks from the store and cleans up its metadata
304
- func (a * Agent ) Cleanup (ctx context.Context , d * ctrl.CleanupCmd , opid ctrl.OPID , ep config.Epoch ) {
305
- logger := log .FromContext (ctx )
306
-
307
- if d == nil {
308
- l := logger .NewEvent (string (ctrl .CmdCleanup ), "" , opid .String (), ep .TS ())
309
- l .Error ("missed command" )
310
- return
311
- }
312
-
313
- l := logger .NewEvent (string (ctrl .CmdCleanup ), "" , opid .String (), ep .TS ())
314
- ctx = log .SetLogEventToContext (ctx , l )
315
-
316
- if d == nil {
317
- l .Error ("missed command" )
318
- return
319
- }
320
-
321
- nodeInfo , err := topo .GetNodeInfoExt (ctx , a .nodeConn )
322
- if err != nil {
323
- l .Error ("get node info data: %v" , err )
324
- return
325
- }
326
- if ! nodeInfo .IsLeader () {
327
- l .Info ("not a member of the leader rs, skipping" )
328
- return
329
- }
330
-
331
- epts := ep .TS ()
332
- lock := lock .NewOpLock (a .leadConn , lock.LockHeader {
333
- Replset : a .brief .SetName ,
334
- Node : a .brief .Me ,
335
- Type : ctrl .CmdCleanup ,
336
- OPID : opid .String (),
337
- Epoch : & epts ,
338
- })
339
-
340
- got , err := a .acquireLock (ctx , lock , l , nil )
341
- if err != nil {
342
- l .Error ("acquire lock: %v" , err )
343
- return
344
- }
345
- if ! got {
346
- l .Debug ("skip: lock not acquired" )
347
- return
348
- }
349
- defer func () {
350
- if err := lock .Release (); err != nil {
351
- l .Error ("release lock: %v" , err )
352
- }
353
- }()
354
-
355
- stg , err := util .GetStorage (ctx , a .leadConn , l )
356
- if err != nil {
357
- l .Error ("get storage: " + err .Error ())
358
- }
359
-
360
- eg := errgroup.Group {}
361
- eg .SetLimit (runtime .NumCPU ())
362
-
363
- cr , err := internal .MakeCleanupInfo (ctx , a .leadConn , d .OlderThan )
364
- if err != nil {
365
- l .Error ("make cleanup report: " + err .Error ())
366
- return
367
- }
368
-
369
- for i := range cr .Chunks {
370
- name := cr .Chunks [i ].FName
371
-
372
- eg .Go (func () error {
373
- err := stg .Delete (name )
374
- return errors .Wrapf (err , "delete chunk file %q" , name )
375
- })
376
- }
377
- if err := eg .Wait (); err != nil {
378
- l .Error (err .Error ())
379
- }
380
-
381
- for i := range cr .Backups {
382
- bcp := & cr .Backups [i ]
383
-
384
- eg .Go (func () error {
385
- err := backup .DeleteBackupFiles (bcp , stg )
386
- return errors .Wrapf (err , "delete backup files %q" , bcp .Name )
387
- })
388
- }
389
- if err := eg .Wait (); err != nil {
390
- l .Error (err .Error ())
391
- }
392
-
393
- err = resync .ResyncStorage (ctx , a .leadConn , l )
394
- if err != nil {
395
- l .Error ("storage resync: " + err .Error ())
396
- }
397
- }
398
-
399
156
// Resync uploads a backup list from the remote store
400
157
func (a * Agent ) Resync (ctx context.Context , opid ctrl.OPID , ep config.Epoch ) {
401
158
logger := log .FromContext (ctx )
0 commit comments