Skip to content

Commit 5accc6d

Browse files
committed
PBM-1397: implement checkPhysicalBackupDataFiles()
1 parent 63b334d commit 5accc6d

File tree

5 files changed

+73
-27
lines changed

5 files changed

+73
-27
lines changed

pbm/backup/storage.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func CheckBackupDataFiles(ctx context.Context, stg storage.Storage, bcp *BackupM
4646
case defs.LogicalBackup:
4747
return checkLogicalBackupDataFiles(ctx, stg, bcp)
4848
case defs.PhysicalBackup, defs.IncrementalBackup:
49-
return checkPhysicalBackupFiles(ctx, stg, bcp)
49+
return checkPhysicalBackupDataFiles(ctx, stg, bcp)
5050
case defs.ExternalBackup:
5151
return nil // no files available
5252
}
@@ -111,8 +111,65 @@ func checkLogicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *Ba
111111
return errors.Join(errs...)
112112
}
113113

114-
func checkPhysicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
115-
return nil
114+
func checkPhysicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *BackupMeta) error {
115+
eg := util.NewErrorGroup(runtime.NumCPU() * 2)
116+
for _, rs := range bcp.Replsets {
117+
eg.Go(func() error {
118+
var filelist Filelist
119+
if version.HasFilelistFile(bcp.PBMVersion) {
120+
var err error
121+
filelist, err = ReadFilelistForReplset(stg, bcp.Name, rs.Name)
122+
if err != nil {
123+
return errors.Wrapf(err, "read filelist for replset %s", rs.Name)
124+
}
125+
} else {
126+
filelist = rs.Files
127+
}
128+
if len(filelist) == 0 {
129+
return errors.Errorf("empty filelist for replset %s", rs.Name)
130+
}
131+
132+
for _, f := range filelist {
133+
if f.Len <= 0 {
134+
continue // no file expected
135+
}
136+
137+
eg.Go(func() error {
138+
filepath := path.Join(bcp.Name, rs.Name, f.Path(bcp.Compression))
139+
stat, err := stg.FileStat(filepath)
140+
if err != nil {
141+
return errors.Wrapf(err, "file %s", filepath)
142+
}
143+
if stat.Size == 0 {
144+
return errors.Errorf("empty file %s", filepath)
145+
}
146+
147+
return nil
148+
})
149+
}
150+
151+
return nil
152+
})
153+
}
154+
155+
errs := eg.Wait()
156+
return errors.Join(errs...)
157+
}
158+
159+
func ReadFilelistForReplset(stg storage.Storage, bcpName, rsName string) (Filelist, error) {
160+
pfFilepath := path.Join(bcpName, rsName, FilelistName)
161+
rdr, err := stg.SourceReader(pfFilepath)
162+
if err != nil {
163+
return nil, errors.Wrapf(err, "open %q", pfFilepath)
164+
}
165+
defer rdr.Close()
166+
167+
filelist, err := ReadFilelist(rdr)
168+
if err != nil {
169+
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
170+
}
171+
172+
return filelist, nil
116173
}
117174

118175
func ReadArchiveNamespaces(stg storage.Storage, metafile string) ([]*archive.Namespace, error) {

pbm/backup/types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io"
66
"os"
7+
"path/filepath"
78

89
"go.mongodb.org/mongo-driver/bson"
910
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -156,6 +157,15 @@ func (f File) String() string {
156157
return fmt.Sprintf("%s [%d:%d]", f.Name, f.Off, f.Len)
157158
}
158159

160+
func (f File) Path(c compress.CompressionType) string {
161+
src := filepath.Join(f.Name + c.Suffix())
162+
if f.Len == 0 {
163+
return src
164+
}
165+
166+
return fmt.Sprintf("%s.%d-%d", src, f.Off, f.Len)
167+
}
168+
159169
func (f *File) WriteTo(w io.Writer) (int64, error) {
160170
fd, err := os.Open(f.Name)
161171
if err != nil {

pbm/restore/logical.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,6 @@ func (r *Restore) RunSnapshot(
759759
usersAndRolesOpt restoreUsersAndRolesOption,
760760
) error {
761761
var rdr io.ReadCloser
762-
763762
var err error
764763
if version.IsLegacyArchive(bcp.PBMVersion) {
765764
sr, err := r.bcpStg.SourceReader(dump)

pbm/restore/physical.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,10 +1089,7 @@ func (r *PhysRestore) copyFiles() (*s3.DownloadStat, error) {
10891089
for i := len(r.files) - 1; i >= 0; i-- {
10901090
set := r.files[i]
10911091
for _, f := range set.Data {
1092-
src := filepath.Join(set.BcpName, setName, f.Name+set.Cmpr.Suffix())
1093-
if f.Len != 0 {
1094-
src += fmt.Sprintf(".%d-%d", f.Off, f.Len)
1095-
}
1092+
src := filepath.Join(set.BcpName, setName, f.Path(set.Cmpr))
10961093
// cut dbpath from destination if there is any (see PBM-1058)
10971094
fname := f.Name
10981095
if set.dbpath != "" {

sdk/impl.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package sdk
22

33
import (
44
"context"
5-
"path"
65
"runtime"
76
"time"
87

@@ -192,7 +191,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
192191
rs := &bcp.Replsets[i]
193192

194193
eg.Go(func() error {
195-
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
194+
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
196195
if err != nil {
197196
return errors.Wrapf(err, "get filelist for %q [rs: %s] backup", bcp.Name, rs.Name)
198197
}
@@ -226,7 +225,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
226225
rs := &bcp.Replsets[i]
227226

228227
eg.Go(func() error {
229-
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
228+
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
230229
if err != nil {
231230
return errors.Wrapf(err, "fetch files for %q [rs: %s] backup", bcp.Name, rs.Name)
232231
}
@@ -254,22 +253,6 @@ func getStorageForRead(ctx context.Context, bcp *backup.BackupMeta) (storage.Sto
254253
return stg, nil
255254
}
256255

257-
func getFilelistForReplset(stg storage.Storage, bcpName, rsName string) (backup.Filelist, error) {
258-
pfFilepath := path.Join(bcpName, rsName, backup.FilelistName)
259-
rdr, err := stg.SourceReader(pfFilepath)
260-
if err != nil {
261-
return nil, errors.Wrapf(err, "open %q", pfFilepath)
262-
}
263-
defer rdr.Close()
264-
265-
filelist, err := backup.ReadFilelist(rdr)
266-
if err != nil {
267-
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
268-
}
269-
270-
return filelist, nil
271-
}
272-
273256
func (c *Client) GetRestoreByName(ctx context.Context, name string) (*RestoreMetadata, error) {
274257
return restore.GetRestoreMeta(ctx, c.conn, name)
275258
}

0 commit comments

Comments
 (0)