Skip to content

Commit 8ab959a

Browse files
craig[bot]kev-cao
andcommitted
Merge #152363
152363: backup: create helpers for fetching backup indexes r=msbutler a=kev-cao This commits add some helper functions for fetching backup indexes for a backup chain. Epic: CRDB-47942 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents 8a15c13 + 074543d commit 8ab959a

File tree

3 files changed

+502
-4
lines changed

3 files changed

+502
-4
lines changed

pkg/backup/backupdest/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//pkg/sql",
2727
"//pkg/sql/pgwire/pgcode",
2828
"//pkg/sql/pgwire/pgerror",
29+
"//pkg/util/ctxgroup",
2930
"//pkg/util/encoding",
3031
"//pkg/util/hlc",
3132
"//pkg/util/ioctx",
@@ -56,6 +57,7 @@ go_test(
5657
"//pkg/backup/backuptestutils",
5758
"//pkg/backup/backuputils",
5859
"//pkg/base",
60+
"//pkg/blobs",
5961
"//pkg/ccl",
6062
"//pkg/cloud",
6163
"//pkg/cloud/cloudpb",

pkg/backup/backupdest/backup_index.go

Lines changed: 180 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"context"
1111
"fmt"
1212
"path"
13+
"slices"
1314
"strings"
15+
"time"
1416

1517
"github.com/cockroachdb/cockroach/pkg/backup/backupbase"
1618
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
@@ -20,6 +22,7 @@ import (
2022
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2123
"github.com/cockroachdb/cockroach/pkg/security/username"
2224
"github.com/cockroachdb/cockroach/pkg/sql"
25+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2326
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2427
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2528
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -45,6 +48,7 @@ func WriteBackupIndexMetadata(
4548
if err != nil {
4649
return errors.Wrapf(err, "creating external storage")
4750
}
51+
defer indexStore.Close()
4852

4953
if shouldWrite, err := shouldWriteIndex(
5054
ctx, execCfg, indexStore, details,
@@ -105,10 +109,9 @@ func WriteBackupIndexMetadata(
105109
// words, we can remove these checks in v26.2+.
106110
func IndexExists(ctx context.Context, store cloud.ExternalStorage, subdir string) (bool, error) {
107111
var indexExists bool
108-
indexSubdir := path.Join(backupbase.BackupIndexDirectoryPath, flattenSubdirForIndex(subdir))
109112
if err := store.List(
110113
ctx,
111-
indexSubdir,
114+
indexSubdir(subdir),
112115
"/",
113116
func(file string) error {
114117
indexExists = true
@@ -123,6 +126,172 @@ func IndexExists(ctx context.Context, store cloud.ExternalStorage, subdir string
123126
return indexExists, nil
124127
}
125128

129+
// ListIndexes lists all the index files for a backup chain rooted by the full
130+
// backup indicated by the subdir. The store should be rooted at the default
131+
// collection URI (the one that contains the `index/` directory). It returns
132+
// the basenames of the listed index files. It assumes that the subdir is
133+
// resolved and not `LATEST`.
134+
//
135+
// Note: The indexes are returned in ascending end time order, with ties broken
136+
// by ascending start time order. This matches the order that backup manifests
137+
// are returned in.
138+
func ListIndexes(
139+
ctx context.Context, store cloud.ExternalStorage, subdir string,
140+
) ([]string, error) {
141+
var indexBasenames []string
142+
if err := store.List(
143+
ctx,
144+
indexSubdir(subdir)+"/",
145+
"",
146+
func(file string) error {
147+
indexBasenames = append(indexBasenames, path.Base(file))
148+
return nil
149+
},
150+
); err != nil {
151+
return nil, errors.Wrapf(err, "listing indexes in %s", subdir)
152+
}
153+
154+
timeMemo := make(map[string][2]time.Time)
155+
indexTimesFromFile := func(basename string) (time.Time, time.Time, error) {
156+
if times, ok := timeMemo[basename]; ok {
157+
return times[0], times[1], nil
158+
}
159+
start, end, err := parseIndexFilename(basename)
160+
if err != nil {
161+
return time.Time{}, time.Time{}, err
162+
}
163+
timeMemo[basename] = [2]time.Time{start, end}
164+
return start, end, nil
165+
}
166+
var sortErr error
167+
slices.SortFunc(indexBasenames, func(a, b string) int {
168+
aStart, aEnd, err := indexTimesFromFile(a)
169+
if err != nil {
170+
sortErr = err
171+
}
172+
bStart, bEnd, err := indexTimesFromFile(b)
173+
if err != nil {
174+
sortErr = err
175+
}
176+
if aEnd.Before(bEnd) {
177+
return -1
178+
} else if aEnd.After(bEnd) {
179+
return 1
180+
}
181+
// End times are equal, so break tie with start time.
182+
if aStart.Before(bStart) {
183+
return -1
184+
} else {
185+
return 1
186+
}
187+
})
188+
if sortErr != nil {
189+
return nil, errors.Wrapf(sortErr, "sorting index filenames")
190+
}
191+
192+
return indexBasenames, nil
193+
}
194+
195+
// GetBackupTreeIndexMetadata concurrently retrieves the index metadata for all
196+
// backups within the specified subdir, up to the specified end time, inclusive.
197+
// The store should be rooted at the collection URI that contains the `index/`
198+
// directory. Indexes are returned in ascending end time order, with ties broken
199+
// by ascending start time order. If the end time is not covered by the backups
200+
// in the subdir, an error is returned.
201+
//
202+
// Note: If endTime is provided, GetBackupTreeIndexMetadata will return ALL
203+
// backups that could be used to restore to endTime. So even if a compacted
204+
// backup can be used to restore to endTime, the incremental backups that
205+
// make up the compacted backup will also be returned.
206+
func GetBackupTreeIndexMetadata(
207+
ctx context.Context, store cloud.ExternalStorage, subdir string, endTime hlc.Timestamp,
208+
) ([]backuppb.BackupIndexMetadata, error) {
209+
indexBasenames, err := ListIndexes(ctx, store, subdir)
210+
if err != nil {
211+
return nil, err
212+
}
213+
214+
indexes := make([]backuppb.BackupIndexMetadata, len(indexBasenames))
215+
g := ctxgroup.WithContext(ctx)
216+
for i, basename := range indexBasenames {
217+
g.GoCtx(func(ctx context.Context) error {
218+
reader, size, err := store.ReadFile(
219+
ctx, path.Join(indexSubdir(subdir), basename), cloud.ReadOptions{},
220+
)
221+
if err != nil {
222+
return errors.Wrapf(err, "reading index file %s", basename)
223+
}
224+
defer reader.Close(ctx)
225+
226+
bytes := make([]byte, size)
227+
if _, err := reader.Read(ctx, bytes); err != nil {
228+
return errors.Wrapf(err, "reading index file %s bytes", basename)
229+
}
230+
231+
index := backuppb.BackupIndexMetadata{}
232+
if err := protoutil.Unmarshal(bytes, &index); err != nil {
233+
return errors.Wrapf(err, "unmarshalling index file %s", basename)
234+
}
235+
indexes[i] = index
236+
return nil
237+
})
238+
}
239+
240+
if err := g.Wait(); err != nil {
241+
return nil, errors.Wrapf(err, "getting backup index metadata")
242+
}
243+
244+
if endTime.IsEmpty() {
245+
return indexes, nil
246+
}
247+
248+
coveringIdx := slices.IndexFunc(indexes, func(index backuppb.BackupIndexMetadata) bool {
249+
return index.StartTime.Less(endTime) && endTime.LessEq(index.EndTime)
250+
})
251+
if coveringIdx == -1 {
252+
return nil, errors.Newf(`backups in "%s" do not cover end time %s`, subdir, endTime)
253+
}
254+
coverEndTime := indexes[coveringIdx].EndTime
255+
// To include all components of a compacted backup, we need to include all
256+
// backups with the same end time.
257+
for ; coveringIdx < len(indexes); coveringIdx++ {
258+
if !indexes[coveringIdx].EndTime.Equal(coverEndTime) {
259+
break
260+
}
261+
}
262+
return indexes[:coveringIdx], nil
263+
}
264+
265+
// ParseIndexFilename parses the start and end timestamps from the index
266+
// filename.
267+
//
268+
// Note: The timestamps are only millisecond-precise and so do not represent the
269+
// exact nano-specific times in the corresponding backup manifest.
270+
func parseIndexFilename(basename string) (start time.Time, end time.Time, err error) {
271+
invalidFmtErr := errors.Newf("invalid index filename format: %s", basename)
272+
273+
if !strings.HasSuffix(basename, "_metadata.pb") {
274+
return time.Time{}, time.Time{}, invalidFmtErr
275+
}
276+
parts := strings.Split(basename, "_")
277+
if len(parts) != 4 {
278+
return time.Time{}, time.Time{}, invalidFmtErr
279+
}
280+
281+
if parts[1] != "0" {
282+
start, err = time.Parse(backupbase.BackupIndexFilenameTimestampFormat, parts[1])
283+
if err != nil {
284+
return time.Time{}, time.Time{}, errors.Join(invalidFmtErr, err)
285+
}
286+
}
287+
end, err = time.Parse(backupbase.BackupIndexFilenameTimestampFormat, parts[2])
288+
if err != nil {
289+
return time.Time{}, time.Time{}, errors.Join(invalidFmtErr, err)
290+
}
291+
292+
return start, end, nil
293+
}
294+
126295
// shouldWriteIndex determines if a backup index file should be written for a
127296
// given backup. The rule is:
128297
// 1. An index should only be written on a v25.4+ cluster.
@@ -170,8 +339,7 @@ func getBackupIndexFilePath(subdir string, startTime, endTime hlc.Timestamp) (st
170339
return "", errors.AssertionFailedf("expected subdir to be resolved and not be 'LATEST'")
171340
}
172341
return backuputils.JoinURLPath(
173-
backupbase.BackupIndexDirectoryPath,
174-
flattenSubdirForIndex(subdir),
342+
indexSubdir(subdir),
175343
getBackupIndexFileName(startTime, endTime),
176344
), nil
177345
}
@@ -192,6 +360,14 @@ func getBackupIndexFileName(startTime, endTime hlc.Timestamp) string {
192360
)
193361
}
194362

363+
// indexSubdir is a convenient helper function to get the corresponding index
364+
// path for a given full backup subdir. The path is relative to the root of the
365+
// collection URI and does not contain a trailing slash. It assumes that subdir
366+
// has been resolved and is not `LATEST`.
367+
func indexSubdir(subdir string) string {
368+
return path.Join(backupbase.BackupIndexDirectoryPath, flattenSubdirForIndex(subdir))
369+
}
370+
195371
// flattenSubdirForIndex flattens a full backup subdirectory to be used in the
196372
// index. Note that this path does not contain a trailing or leading slash.
197373
// It assumes subdir is not `LATEST` and has been resolved.

0 commit comments

Comments
 (0)