Skip to content

Commit 10e29e4

Browse files
authored
Fix snapshot read (#22684)
Fix snapshot read Approved by: @XuPeng-SH
1 parent 784d8f9 commit 10e29e4

File tree

2 files changed

+248
-19
lines changed

2 files changed

+248
-19
lines changed

pkg/vm/engine/tae/db/checkpoint/snapshot.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -150,34 +150,55 @@ func loadCheckpointMeta(
150150
metaFiles []ioutil.TSRangeFile,
151151
fs fileservice.FileService,
152152
) (entries []*CheckpointEntry, err error) {
153-
// Collect all meta file names
154-
fileNames := make([]string, len(metaFiles))
155-
for i, metaFile := range metaFiles {
156-
fileNames[i] = metaFile.GetCKPFullName()
157-
}
153+
allEntries := make([]*CheckpointEntry, 0)
158154

159-
// Create reader using NewCKPMetaReader
160-
reader := NewCKPMetaReader(sid, "", fileNames, 0, fs)
161-
getter := MetadataEntryGetter{reader: reader}
162-
defer getter.Close()
155+
// Process each meta file individually to apply the filtering logic
156+
for _, metaFile := range metaFiles {
163157

164-
// Read all entries
165-
allEntries := make([]*CheckpointEntry, 0)
166-
for {
167-
batchEntries, err := getter.NextBatch(ctx, nil, common.DebugAllocator)
168-
if err != nil {
169-
if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) {
170-
break
158+
// Create reader for this specific file
159+
reader := NewCKPMetaReader(sid, "", []string{metaFile.GetCKPFullName()}, 0, fs)
160+
getter := MetadataEntryGetter{reader: reader}
161+
162+
// Read entries from this file
163+
fileEntries := make([]*CheckpointEntry, 0)
164+
for {
165+
batchEntries, err := getter.NextBatch(ctx, nil, common.DebugAllocator)
166+
if err != nil {
167+
if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) {
168+
break
169+
}
170+
getter.Close() // Close immediately on error
171+
return nil, err
171172
}
172-
return nil, err
173+
fileEntries = append(fileEntries, batchEntries...)
173174
}
174-
allEntries = append(allEntries, batchEntries...)
175+
getter.Close() // Close after successful reading
176+
177+
// Filter entries that match the file's start and end timestamps
178+
// This replicates the logic from appendCheckpointToBatch
179+
fileStart := metaFile.GetStart()
180+
fileEnd := metaFile.GetEnd()
181+
filteredEntries := filterEntriesByTimestamp(fileEntries, fileStart, fileEnd)
182+
allEntries = append(allEntries, filteredEntries...)
175183
}
176184

177185
// Apply the same logic as ListSnapshotCheckpointWithMeta
178186
return filterSnapshotEntries(allEntries), nil
179187
}
180188

189+
// filterEntriesByTimestamp filters checkpoint entries that match the given start and end timestamps
190+
// This function replicates the filtering logic from the original appendCheckpointToBatch function
191+
func filterEntriesByTimestamp(entries []*CheckpointEntry, fileStart, fileEnd *types.TS) []*CheckpointEntry {
192+
filteredEntries := make([]*CheckpointEntry, 0)
193+
for _, entry := range entries {
194+
if entry != nil && fileStart != nil && fileEnd != nil &&
195+
entry.start.EQ(fileStart) && entry.end.EQ(fileEnd) {
196+
filteredEntries = append(filteredEntries, entry)
197+
}
198+
}
199+
return filteredEntries
200+
}
201+
181202
// filterSnapshotEntries implements the same logic as ListSnapshotCheckpointWithMeta
182203
func filterSnapshotEntries(entries []*CheckpointEntry) []*CheckpointEntry {
183204
if len(entries) == 0 {
@@ -187,7 +208,7 @@ func filterSnapshotEntries(entries []*CheckpointEntry) []*CheckpointEntry {
187208
// Find the maximum global end timestamp
188209
var maxGlobalEnd types.TS
189210
for _, entry := range entries {
190-
if entry.entryType == ET_Global {
211+
if entry != nil && entry.entryType == ET_Global {
191212
if entry.end.GT(&maxGlobalEnd) {
192213
maxGlobalEnd = entry.end
193214
}
@@ -196,11 +217,17 @@ func filterSnapshotEntries(entries []*CheckpointEntry) []*CheckpointEntry {
196217

197218
// Sort by end timestamp
198219
sort.Slice(entries, func(i, j int) bool {
220+
if entries[i] == nil || entries[j] == nil {
221+
return false
222+
}
199223
return entries[i].end.LT(&entries[j].end)
200224
})
201225

202226
// Find the appropriate truncation point
203227
for i := range entries {
228+
if entries[i] == nil {
229+
continue
230+
}
204231
p := maxGlobalEnd.Prev()
205232
if entries[i].end.Equal(&p) || (entries[i].end.Equal(&maxGlobalEnd) &&
206233
entries[i].entryType == ET_Global) {
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright 2021 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package checkpoint
16+
17+
import (
18+
"testing"
19+
20+
"github.com/matrixorigin/matrixone/pkg/container/types"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
// TestFilterEntriesByTimestamp tests the filterEntriesByTimestamp function
25+
func TestFilterEntriesByTimestamp(t *testing.T) {
26+
t.Run("BasicFiltering", func(t *testing.T) {
27+
// Create mock checkpoint entries with different timestamps
28+
entries := []*CheckpointEntry{
29+
createMockCheckpointEntry(100, 200),
30+
createMockCheckpointEntry(200, 300),
31+
createMockCheckpointEntry(300, 400),
32+
createMockCheckpointEntry(150, 250), // This should be filtered out
33+
}
34+
35+
// Test filtering for file with timestamps 200-300
36+
fileStart := types.BuildTS(200, 0)
37+
fileEnd := types.BuildTS(300, 0)
38+
39+
filteredEntries := filterEntriesByTimestamp(entries, &fileStart, &fileEnd)
40+
41+
// Should only match the entry with timestamps 200-300
42+
assert.Equal(t, 1, len(filteredEntries))
43+
assert.True(t, filteredEntries[0].start.EQ(&fileStart))
44+
assert.True(t, filteredEntries[0].end.EQ(&fileEnd))
45+
})
46+
47+
t.Run("EmptyEntries", func(t *testing.T) {
48+
entries := []*CheckpointEntry{}
49+
fileStart := types.BuildTS(100, 0)
50+
fileEnd := types.BuildTS(200, 0)
51+
52+
filteredEntries := filterEntriesByTimestamp(entries, &fileStart, &fileEnd)
53+
assert.Equal(t, 0, len(filteredEntries))
54+
})
55+
56+
t.Run("NoMatchingEntries", func(t *testing.T) {
57+
entries := []*CheckpointEntry{
58+
createMockCheckpointEntry(300, 400),
59+
createMockCheckpointEntry(500, 600),
60+
}
61+
fileStart := types.BuildTS(100, 0)
62+
fileEnd := types.BuildTS(200, 0)
63+
64+
filteredEntries := filterEntriesByTimestamp(entries, &fileStart, &fileEnd)
65+
assert.Equal(t, 0, len(filteredEntries))
66+
})
67+
68+
t.Run("MultipleMatchingEntries", func(t *testing.T) {
69+
entries := []*CheckpointEntry{
70+
createMockCheckpointEntry(200, 300),
71+
createMockCheckpointEntry(200, 300), // Duplicate
72+
createMockCheckpointEntry(100, 200), // Should be filtered out
73+
}
74+
fileStart := types.BuildTS(200, 0)
75+
fileEnd := types.BuildTS(300, 0)
76+
77+
filteredEntries := filterEntriesByTimestamp(entries, &fileStart, &fileEnd)
78+
assert.Equal(t, 2, len(filteredEntries))
79+
for _, entry := range filteredEntries {
80+
assert.True(t, entry.start.EQ(&fileStart))
81+
assert.True(t, entry.end.EQ(&fileEnd))
82+
}
83+
})
84+
85+
t.Run("PreventExpiredCheckpointReading", func(t *testing.T) {
86+
// This test verifies that the filtering prevents reading expired checkpoints
87+
// that would cause "is not found" errors
88+
allEntries := []*CheckpointEntry{
89+
createMockCheckpointEntry(100, 200), // Expired - should be filtered out
90+
createMockCheckpointEntry(200, 300), // Current - should be kept
91+
createMockCheckpointEntry(300, 400), // Future - should be filtered out
92+
}
93+
94+
// File represents range 200-300
95+
fileStart := types.BuildTS(200, 0)
96+
fileEnd := types.BuildTS(300, 0)
97+
98+
filteredEntries := filterEntriesByTimestamp(allEntries, &fileStart, &fileEnd)
99+
100+
// Should only return the entry that matches the file's timestamp range
101+
assert.Equal(t, 1, len(filteredEntries))
102+
assert.Equal(t, int64(200), filteredEntries[0].start.Physical())
103+
assert.Equal(t, int64(300), filteredEntries[0].end.Physical())
104+
105+
// Verify that expired entries are not included
106+
for _, entry := range filteredEntries {
107+
assert.NotEqual(t, int64(100), entry.start.Physical(), "Should not include expired checkpoint 100-200")
108+
assert.NotEqual(t, int64(300), entry.start.Physical(), "Should not include future checkpoint 300-400")
109+
}
110+
})
111+
}
112+
113+
// TestFilterSnapshotEntries tests the filterSnapshotEntries function
114+
func TestFilterSnapshotEntries(t *testing.T) {
115+
t.Run("BasicFiltering", func(t *testing.T) {
116+
// Create mock entries with different types and timestamps
117+
entries := []*CheckpointEntry{
118+
{
119+
start: types.BuildTS(100, 0),
120+
end: types.BuildTS(200, 0),
121+
entryType: ET_Incremental,
122+
},
123+
{
124+
start: types.BuildTS(200, 0),
125+
end: types.BuildTS(300, 0),
126+
entryType: ET_Global,
127+
},
128+
{
129+
start: types.BuildTS(300, 0),
130+
end: types.BuildTS(400, 0),
131+
entryType: ET_Incremental,
132+
},
133+
}
134+
135+
result := filterSnapshotEntries(entries)
136+
137+
// Should return entries from the global checkpoint onwards
138+
assert.Equal(t, 2, len(result))
139+
assert.Equal(t, ET_Global, result[0].entryType)
140+
assert.Equal(t, ET_Incremental, result[1].entryType)
141+
})
142+
143+
t.Run("EmptyEntries", func(t *testing.T) {
144+
entries := []*CheckpointEntry{}
145+
result := filterSnapshotEntries(entries)
146+
assert.Equal(t, 0, len(result))
147+
})
148+
149+
t.Run("NoGlobalCheckpoint", func(t *testing.T) {
150+
entries := []*CheckpointEntry{
151+
{
152+
start: types.BuildTS(100, 0),
153+
end: types.BuildTS(200, 0),
154+
entryType: ET_Incremental,
155+
},
156+
{
157+
start: types.BuildTS(200, 0),
158+
end: types.BuildTS(300, 0),
159+
entryType: ET_Incremental,
160+
},
161+
}
162+
163+
result := filterSnapshotEntries(entries)
164+
// Should return all entries if no global checkpoint
165+
assert.Equal(t, 2, len(result))
166+
})
167+
168+
t.Run("MultipleGlobalCheckpoints", func(t *testing.T) {
169+
entries := []*CheckpointEntry{
170+
{
171+
start: types.BuildTS(100, 0),
172+
end: types.BuildTS(200, 0),
173+
entryType: ET_Global,
174+
},
175+
{
176+
start: types.BuildTS(200, 0),
177+
end: types.BuildTS(300, 0),
178+
entryType: ET_Global,
179+
},
180+
{
181+
start: types.BuildTS(300, 0),
182+
end: types.BuildTS(400, 0),
183+
entryType: ET_Incremental,
184+
},
185+
}
186+
187+
result := filterSnapshotEntries(entries)
188+
// Should return entries from the latest global checkpoint onwards
189+
assert.Equal(t, 2, len(result))
190+
assert.Equal(t, ET_Global, result[0].entryType)
191+
assert.Equal(t, ET_Incremental, result[1].entryType)
192+
})
193+
}
194+
195+
func createMockCheckpointEntry(start, end int64) *CheckpointEntry {
196+
return &CheckpointEntry{
197+
start: types.BuildTS(start, 0),
198+
end: types.BuildTS(end, 0),
199+
state: ST_Finished,
200+
doneC: make(chan struct{}),
201+
}
202+
}

0 commit comments

Comments
 (0)