Skip to content

Commit b2d8145

Browse files
authored
Stream restore plan selection across levels (#1076)
1 parent 2a03354 commit b2d8145

File tree

3 files changed

+384
-47
lines changed

3 files changed

+384
-47
lines changed

replica.go

Lines changed: 151 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,64 +1001,90 @@ func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, t
10011001

10021002
// Start with latest snapshot before target TXID or timestamp.
10031003
// Pass useMetadata flag to enable accurate timestamp fetching for timestamp-based restore.
1004-
if a, err := FindLTXFiles(ctx, client, SnapshotLevel, !timestamp.IsZero(), func(info *ltx.FileInfo) (bool, error) {
1004+
var snapshot *ltx.FileInfo
1005+
snapshotItr, err := client.LTXFiles(ctx, SnapshotLevel, 0, !timestamp.IsZero())
1006+
if err != nil {
1007+
return nil, err
1008+
}
1009+
for snapshotItr.Next() {
1010+
info := snapshotItr.Item()
10051011
logger.Debug("finding snapshot before target TXID or timestamp", "snapshot", info.MaxTXID)
1006-
if txID != 0 {
1007-
return info.MaxTXID <= txID, nil
1008-
} else if !timestamp.IsZero() {
1009-
return info.CreatedAt.Before(timestamp), nil
1012+
if txID != 0 && info.MaxTXID > txID {
1013+
continue
10101014
}
1011-
return true, nil
1012-
}); err != nil {
1015+
if !timestamp.IsZero() && !info.CreatedAt.Before(timestamp) {
1016+
continue
1017+
}
1018+
snapshot = info
1019+
}
1020+
if err := snapshotItr.Close(); err != nil {
10131021
return nil, err
1014-
} else if len(a) > 0 {
1015-
logger.Debug("found snapshot before target TXID or timestamp", "snapshot", a[len(a)-1].MaxTXID)
1016-
infos = append(infos, a[len(a)-1])
1022+
}
1023+
if snapshot != nil {
1024+
logger.Debug("found snapshot before target TXID or timestamp", "snapshot", snapshot.MaxTXID)
1025+
infos = append(infos, snapshot)
10171026
}
10181027

1019-
// Starting from the highest compaction level, collect all paths after the
1020-
// latest TXID for each level. Compactions are based on the previous level's
1021-
// TXID granularity so the TXIDs should align between compaction levels.
1028+
// Collect candidates across all compaction levels and pick the next file
1029+
// from any level that extends the longest contiguous TXID range.
10221030
const maxLevel = SnapshotLevel - 1
1031+
startTXID := infos.MaxTXID()
1032+
currentMax := startTXID
1033+
if txID != 0 && currentMax >= txID {
1034+
return infos, nil
1035+
}
1036+
1037+
cursors := make([]*restoreLevelCursor, 0, maxLevel+1)
10231038
for level := maxLevel; level >= 0; level-- {
10241039
logger.Debug("finding ltx files for level", "level", level)
1025-
1026-
// Pass useMetadata flag to enable accurate timestamp fetching for timestamp-based restore.
1027-
a, err := FindLTXFiles(ctx, client, level, !timestamp.IsZero(), func(info *ltx.FileInfo) (bool, error) {
1028-
if info.MaxTXID <= infos.MaxTXID() { // skip if already included in previous levels
1029-
return false, nil
1030-
}
1031-
1032-
// Filter by TXID or timestamp, if specified.
1033-
if txID != 0 {
1034-
return info.MaxTXID <= txID, nil
1035-
} else if !timestamp.IsZero() {
1036-
return info.CreatedAt.Before(timestamp), nil
1037-
}
1038-
return true, nil
1039-
})
1040+
itr, err := client.LTXFiles(ctx, level, 0, !timestamp.IsZero())
10401041
if err != nil {
10411042
return nil, err
10421043
}
1044+
cursors = append(cursors, &restoreLevelCursor{
1045+
itr: itr,
1046+
})
1047+
}
1048+
defer func() {
1049+
for _, cursor := range cursors {
1050+
if cursor != nil {
1051+
_ = cursor.itr.Close()
1052+
}
1053+
}
1054+
}()
10431055

1044-
// Append each storage path to the list
1045-
for _, info := range a {
1046-
// Skip if this file's range is already covered by previously added files.
1047-
// This can happen when a larger compacted file at the same level covers
1048-
// a smaller file's entire range (see issue #847).
1049-
if info.MaxTXID <= infos.MaxTXID() {
1056+
for {
1057+
var next *restoreLevelCursor
1058+
for _, cursor := range cursors {
1059+
if err := cursor.refresh(currentMax, txID, timestamp); err != nil {
1060+
return nil, err
1061+
}
1062+
if cursor.candidate == nil {
10501063
continue
10511064
}
1052-
1053-
// Ensure TXIDs are contiguous between each paths.
1054-
if !ltx.IsContiguous(infos.MaxTXID(), info.MinTXID, info.MaxTXID) {
1055-
return nil, fmt.Errorf("non-contiguous transaction files: prev=%s filename=%s",
1056-
infos.MaxTXID().String(), ltx.FormatFilename(info.MinTXID, info.MaxTXID))
1065+
if next == nil || restoreCandidateBetter(next.candidate, cursor.candidate) {
1066+
next = cursor
10571067
}
1068+
}
1069+
1070+
if next == nil || next.candidate == nil {
1071+
break
1072+
}
1073+
1074+
if next.candidate.MaxTXID <= currentMax {
1075+
next.candidate = nil
1076+
continue
1077+
}
1078+
1079+
logger.Debug("matching LTX file for restore",
1080+
"filename", ltx.FormatFilename(next.candidate.MinTXID, next.candidate.MaxTXID),
1081+
"level", next.candidate.Level)
1082+
infos = append(infos, next.candidate)
1083+
currentMax = next.candidate.MaxTXID
1084+
next.candidate = nil
10581085

1059-
logger.Debug("matching LTX file for restore",
1060-
"filename", ltx.FormatFilename(info.MinTXID, info.MaxTXID))
1061-
infos = append(infos, info)
1086+
if txID != 0 && currentMax >= txID {
1087+
break
10621088
}
10631089
}
10641090

@@ -1068,10 +1094,93 @@ func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, t
10681094
if len(infos) == 0 {
10691095
return nil, ErrTxNotAvailable
10701096
}
1097+
if txID != 0 && infos.MaxTXID() < txID {
1098+
return nil, ErrTxNotAvailable
1099+
}
10711100

10721101
return infos, nil
10731102
}
10741103

1104+
type restoreLevelCursor struct {
1105+
// itr streams LTX file infos for a single level in filename order.
1106+
itr ltx.FileIterator
1107+
// current holds the last item read from itr but not yet evaluated.
1108+
current *ltx.FileInfo
1109+
// candidate is the best eligible file at this level for the currentMax.
1110+
candidate *ltx.FileInfo
1111+
// done indicates the iterator has been exhausted or errored.
1112+
done bool
1113+
}
1114+
1115+
func (c *restoreLevelCursor) refresh(currentMax, txID ltx.TXID, timestamp time.Time) error {
1116+
// Advance the iterator until we've evaluated all files that could be
1117+
// contiguous with currentMax. Keep the best eligible candidate.
1118+
if c.done {
1119+
return nil
1120+
}
1121+
if c.candidate != nil && c.candidate.MaxTXID <= currentMax {
1122+
c.candidate = nil
1123+
}
1124+
1125+
for {
1126+
if err := c.ensureCurrent(); err != nil {
1127+
return err
1128+
}
1129+
if c.done {
1130+
return nil
1131+
}
1132+
1133+
info := c.current
1134+
if info.MinTXID > currentMax+1 {
1135+
return nil
1136+
}
1137+
c.current = nil
1138+
1139+
if info.MaxTXID <= currentMax {
1140+
continue
1141+
}
1142+
if txID != 0 && info.MaxTXID > txID {
1143+
continue
1144+
}
1145+
if !timestamp.IsZero() && !info.CreatedAt.Before(timestamp) {
1146+
continue
1147+
}
1148+
1149+
if c.candidate == nil || restoreCandidateBetter(c.candidate, info) {
1150+
c.candidate = info
1151+
}
1152+
}
1153+
}
1154+
1155+
func (c *restoreLevelCursor) ensureCurrent() error {
1156+
// Ensure current is populated with the next iterator item, or mark done.
1157+
if c.done || c.current != nil {
1158+
return nil
1159+
}
1160+
if !c.itr.Next() {
1161+
if err := c.itr.Err(); err != nil {
1162+
return err
1163+
}
1164+
c.done = true
1165+
return nil
1166+
}
1167+
c.current = c.itr.Item()
1168+
return nil
1169+
}
1170+
1171+
func restoreCandidateBetter(curr, next *ltx.FileInfo) bool {
1172+
if next.MaxTXID != curr.MaxTXID {
1173+
return next.MaxTXID > curr.MaxTXID
1174+
}
1175+
if next.MinTXID != curr.MinTXID {
1176+
return next.MinTXID < curr.MinTXID
1177+
}
1178+
if next.Level != curr.Level {
1179+
return next.Level > curr.Level
1180+
}
1181+
return next.CreatedAt.Before(curr.CreatedAt)
1182+
}
1183+
10751184
// ValidationError represents a single validation issue.
10761185
type ValidationError struct {
10771186
Level int // compaction level

replica_test.go

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,44 @@ func TestReplica_CalcRestorePlan(t *testing.T) {
336336
}
337337
})
338338

339-
t.Run("ErrNonContiguousFiles", func(t *testing.T) {
339+
t.Run("SelectLongestAcrossLevels", func(t *testing.T) {
340+
var c mock.ReplicaClient
341+
r := litestream.NewReplicaWithClient(db, &c)
342+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
343+
switch level {
344+
case litestream.SnapshotLevel:
345+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
346+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5},
347+
}), nil
348+
case 2:
349+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
350+
{Level: 2, MinTXID: 6, MaxTXID: 12},
351+
}), nil
352+
case 0:
353+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
354+
{Level: 0, MinTXID: 6, MaxTXID: 20},
355+
}), nil
356+
default:
357+
return ltx.NewFileInfoSliceIterator(nil), nil
358+
}
359+
}
360+
361+
plan, err := litestream.CalcRestorePlan(context.Background(), r.Client, 20, time.Time{}, r.Logger())
362+
if err != nil {
363+
t.Fatalf("unexpected error: %v", err)
364+
}
365+
if got, want := len(plan), 2; got != want {
366+
t.Fatalf("n=%v, want %v", got, want)
367+
}
368+
if got, want := *plan[0], (ltx.FileInfo{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5}); got != want {
369+
t.Fatalf("plan[0]=%#v, want %#v", got, want)
370+
}
371+
if got, want := *plan[1], (ltx.FileInfo{Level: 0, MinTXID: 6, MaxTXID: 20}); got != want {
372+
t.Fatalf("plan[1]=%#v, want %#v", got, want)
373+
}
374+
})
375+
376+
t.Run("GapInLevelResolvedByLowerLevel", func(t *testing.T) {
340377
var c mock.ReplicaClient
341378
r := litestream.NewReplicaWithClient(db, &c)
342379
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
@@ -347,16 +384,72 @@ func TestReplica_CalcRestorePlan(t *testing.T) {
347384
}), nil
348385
case 1:
349386
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
350-
{Level: 1, MinTXID: 8, MaxTXID: 9},
387+
{Level: 1, MinTXID: 6, MaxTXID: 7},
388+
{Level: 1, MinTXID: 9, MaxTXID: 10},
389+
}), nil
390+
case 0:
391+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
392+
{Level: 0, MinTXID: 8, MaxTXID: 8},
393+
{Level: 0, MinTXID: 9, MaxTXID: 9},
394+
{Level: 0, MinTXID: 10, MaxTXID: 10},
351395
}), nil
352396
default:
353397
return ltx.NewFileInfoSliceIterator(nil), nil
354398
}
355399
}
356400

357-
_, err := litestream.CalcRestorePlan(context.Background(), r.Client, 10, time.Time{}, r.Logger())
358-
if err == nil || err.Error() != `non-contiguous transaction files: prev=0000000000000005 filename=0000000000000008-0000000000000009.ltx` {
359-
t.Fatalf("unexpected error: %q", err)
401+
plan, err := litestream.CalcRestorePlan(context.Background(), r.Client, 10, time.Time{}, r.Logger())
402+
if err != nil {
403+
t.Fatalf("unexpected error: %v", err)
404+
}
405+
if got, want := len(plan), 4; got != want {
406+
t.Fatalf("n=%v, want %v", got, want)
407+
}
408+
if got, want := *plan[0], (ltx.FileInfo{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5}); got != want {
409+
t.Fatalf("plan[0]=%#v, want %#v", got, want)
410+
}
411+
if got, want := *plan[1], (ltx.FileInfo{Level: 1, MinTXID: 6, MaxTXID: 7}); got != want {
412+
t.Fatalf("plan[1]=%#v, want %#v", got, want)
413+
}
414+
if got, want := *plan[2], (ltx.FileInfo{Level: 0, MinTXID: 8, MaxTXID: 8}); got != want {
415+
t.Fatalf("plan[2]=%#v, want %#v", got, want)
416+
}
417+
if got, want := *plan[3], (ltx.FileInfo{Level: 1, MinTXID: 9, MaxTXID: 10}); got != want {
418+
t.Fatalf("plan[3]=%#v, want %#v", got, want)
419+
}
420+
})
421+
422+
t.Run("SkipsDuplicateRangesAcrossLevels", func(t *testing.T) {
423+
var c mock.ReplicaClient
424+
r := litestream.NewReplicaWithClient(db, &c)
425+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
426+
switch level {
427+
case litestream.SnapshotLevel:
428+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
429+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 1},
430+
}), nil
431+
case 1:
432+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
433+
{Level: 1, MinTXID: 1, MaxTXID: 1},
434+
}), nil
435+
case 0:
436+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
437+
{Level: 0, MinTXID: 1, MaxTXID: 1},
438+
}), nil
439+
default:
440+
return ltx.NewFileInfoSliceIterator(nil), nil
441+
}
442+
}
443+
444+
plan, err := litestream.CalcRestorePlan(context.Background(), r.Client, 1, time.Time{}, r.Logger())
445+
if err != nil {
446+
t.Fatalf("unexpected error: %v", err)
447+
}
448+
if got, want := len(plan), 1; got != want {
449+
t.Fatalf("n=%v, want %v", got, want)
450+
}
451+
if got, want := *plan[0], (ltx.FileInfo{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 1}); got != want {
452+
t.Fatalf("plan[0]=%#v, want %#v", got, want)
360453
}
361454
})
362455

0 commit comments

Comments
 (0)