Skip to content

Commit 8c65988

Browse files
committed
Add snapshot completeness validation
- Implement validation to ensure snapshots contain all expected pages - Track lastPgno during page decoding - Add comprehensive tests for snapshot completeness validation - Handle special case where commit equals lock page number - Resolves TODO: Ensure last read page is equal to the commit for snapshot LTX files
1 parent d017048 commit 8c65988

File tree

2 files changed

+217
-5
lines changed

2 files changed

+217
-5
lines changed

decoder.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ type Decoder struct {
2828
pageIndex map[uint32]PageIndexElem
2929
state string
3030

31-
chksum Checksum
32-
hash hash.Hash64
33-
pageN int // pages read
34-
n int64 // bytes read
31+
chksum Checksum
32+
hash hash.Hash64
33+
pageN int // pages read
34+
n int64 // bytes read
35+
lastPgno uint32 // last page number read (for snapshot validation)
3536
}
3637

3738
// NewDecoder returns a new instance of Decoder.
@@ -102,8 +103,23 @@ func (dec *Decoder) Close() error {
102103
return fmt.Errorf("unmarshal trailer: %w", err)
103104
}
104105

105-
// TODO: Ensure last read page is equal to the commit for snapshot LTX files
106+
// Ensure last read page is equal to the commit for snapshot LTX files
107+
if dec.header.IsSnapshot() && dec.lastPgno != 0 {
108+
// For snapshots, we expect all pages from 1 to commit (excluding lock page)
109+
expectedLastPage := dec.header.Commit
110+
lockPgno := LockPgno(dec.header.PageSize)
106111

112+
// If commit is past the lock page, the last page should be commit
113+
// If commit is the lock page, the last page should be commit-1
114+
// If commit is before the lock page, the last page should be commit
115+
if dec.header.Commit == lockPgno {
116+
expectedLastPage = dec.header.Commit - 1
117+
}
118+
119+
if dec.lastPgno != expectedLastPage {
120+
return fmt.Errorf("snapshot incomplete: expected last page %d, got %d", expectedLastPage, dec.lastPgno)
121+
}
122+
}
107123
// Compare file checksum with checksum in trailer.
108124
if chksum := ChecksumFlag | Checksum(dec.hash.Sum64()); chksum != dec.trailer.FileChecksum {
109125
return ErrChecksumMismatch
@@ -218,6 +234,9 @@ func (dec *Decoder) DecodePage(hdr *PageHeader, data []byte) error {
218234
dec.writeToHash(data)
219235
dec.pageN++
220236

237+
// Track the last page number read
238+
dec.lastPgno = hdr.Pgno
239+
221240
// Calculate checksum while decoding snapshots if tracking checksums.
222241
if dec.header.IsSnapshot() && !dec.header.NoChecksum() {
223242
if hdr.Pgno != LockPgno(dec.header.PageSize) {

decoder_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,196 @@ func TestDecoder_64KBPageSize(t *testing.T) {
384384
}
385385
})
386386
}
387+
388+
// TestDecoder_SnapshotCompleteness validates that the decoder properly checks
389+
// for snapshot completeness when closing. For snapshot LTX files (MinTXID=1),
390+
// the decoder must verify that all pages from 1 to Commit have been read,
391+
// excluding the lock page which is always zero and never written.
392+
func TestDecoder_SnapshotCompleteness(t *testing.T) {
393+
t.Run("CompleteSnapshot", func(t *testing.T) {
394+
spec := &ltx.FileSpec{
395+
Header: ltx.Header{
396+
Version: ltx.Version,
397+
Flags: ltx.HeaderFlagNoChecksum,
398+
PageSize: 512,
399+
Commit: 3,
400+
MinTXID: 1,
401+
MaxTXID: 1,
402+
Timestamp: 1000,
403+
},
404+
Pages: []ltx.PageSpec{
405+
{Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)},
406+
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
407+
{Header: ltx.PageHeader{Pgno: 3}, Data: bytes.Repeat([]byte("3"), 512)},
408+
},
409+
Trailer: ltx.Trailer{},
410+
}
411+
412+
var buf bytes.Buffer
413+
writeFileSpec(t, &buf, spec)
414+
dec := ltx.NewDecoder(&buf)
415+
416+
if err := dec.DecodeHeader(); err != nil {
417+
t.Fatal(err)
418+
}
419+
420+
for i := 0; i < 3; i++ {
421+
var hdr ltx.PageHeader
422+
data := make([]byte, 512)
423+
if err := dec.DecodePage(&hdr, data); err != nil {
424+
t.Fatal(err)
425+
}
426+
}
427+
428+
var hdr ltx.PageHeader
429+
data := make([]byte, 512)
430+
if err := dec.DecodePage(&hdr, data); err != io.EOF {
431+
t.Fatalf("expected EOF, got: %v", err)
432+
}
433+
434+
if err := dec.Close(); err != nil {
435+
t.Fatal(err)
436+
}
437+
})
438+
439+
t.Run("IncompleteSnapshot", func(t *testing.T) {
440+
spec := &ltx.FileSpec{
441+
Header: ltx.Header{
442+
Version: ltx.Version,
443+
Flags: ltx.HeaderFlagNoChecksum,
444+
PageSize: 512,
445+
Commit: 3,
446+
MinTXID: 1,
447+
MaxTXID: 1,
448+
Timestamp: 1000,
449+
},
450+
Pages: []ltx.PageSpec{
451+
{Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)},
452+
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
453+
},
454+
Trailer: ltx.Trailer{},
455+
}
456+
457+
var buf bytes.Buffer
458+
writeFileSpec(t, &buf, spec)
459+
dec := ltx.NewDecoder(&buf)
460+
461+
if err := dec.DecodeHeader(); err != nil {
462+
t.Fatal(err)
463+
}
464+
465+
for i := 0; i < 2; i++ {
466+
var hdr ltx.PageHeader
467+
data := make([]byte, 512)
468+
if err := dec.DecodePage(&hdr, data); err != nil {
469+
t.Fatal(err)
470+
}
471+
}
472+
473+
var hdr ltx.PageHeader
474+
data := make([]byte, 512)
475+
if err := dec.DecodePage(&hdr, data); err != io.EOF {
476+
t.Fatalf("expected EOF, got: %v", err)
477+
}
478+
479+
if err := dec.Close(); err == nil || err.Error() != "snapshot incomplete: expected last page 3, got 2" {
480+
t.Fatalf("expected snapshot incomplete error, got: %v", err)
481+
}
482+
})
483+
484+
t.Run("SnapshotWithLockPage", func(t *testing.T) {
485+
if testing.Short() {
486+
t.Skip("skipping in short mode")
487+
}
488+
489+
lockPgno := ltx.LockPgno(4096)
490+
491+
spec := &ltx.FileSpec{
492+
Header: ltx.Header{
493+
Version: ltx.Version,
494+
Flags: ltx.HeaderFlagNoChecksum,
495+
PageSize: 4096,
496+
Commit: lockPgno,
497+
MinTXID: 1,
498+
MaxTXID: 1,
499+
Timestamp: 1000,
500+
},
501+
Pages: []ltx.PageSpec{},
502+
Trailer: ltx.Trailer{},
503+
}
504+
505+
for pgno := uint32(1); pgno < lockPgno; pgno++ {
506+
spec.Pages = append(spec.Pages, ltx.PageSpec{
507+
Header: ltx.PageHeader{Pgno: pgno},
508+
Data: bytes.Repeat([]byte{byte(pgno % 256)}, 4096),
509+
})
510+
}
511+
512+
var buf bytes.Buffer
513+
writeFileSpec(t, &buf, spec)
514+
dec := ltx.NewDecoder(&buf)
515+
516+
if err := dec.DecodeHeader(); err != nil {
517+
t.Fatal(err)
518+
}
519+
520+
for i := 0; i < len(spec.Pages); i++ {
521+
var hdr ltx.PageHeader
522+
data := make([]byte, 4096)
523+
if err := dec.DecodePage(&hdr, data); err != nil {
524+
t.Fatal(err)
525+
}
526+
}
527+
528+
var hdr ltx.PageHeader
529+
data := make([]byte, 4096)
530+
if err := dec.DecodePage(&hdr, data); err != io.EOF {
531+
t.Fatalf("expected EOF, got: %v", err)
532+
}
533+
534+
if err := dec.Close(); err != nil {
535+
t.Fatal(err)
536+
}
537+
})
538+
539+
t.Run("NonSnapshot", func(t *testing.T) {
540+
spec := &ltx.FileSpec{
541+
Header: ltx.Header{
542+
Version: ltx.Version,
543+
Flags: 0,
544+
PageSize: 512,
545+
Commit: 3,
546+
MinTXID: 2,
547+
MaxTXID: 2,
548+
Timestamp: 1000,
549+
PreApplyChecksum: ltx.ChecksumFlag | 1,
550+
},
551+
Pages: []ltx.PageSpec{
552+
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
553+
},
554+
Trailer: ltx.Trailer{PostApplyChecksum: ltx.ChecksumFlag | 2},
555+
}
556+
557+
var buf bytes.Buffer
558+
writeFileSpec(t, &buf, spec)
559+
dec := ltx.NewDecoder(&buf)
560+
561+
if err := dec.DecodeHeader(); err != nil {
562+
t.Fatal(err)
563+
}
564+
565+
var hdr ltx.PageHeader
566+
data := make([]byte, 512)
567+
if err := dec.DecodePage(&hdr, data); err != nil {
568+
t.Fatal(err)
569+
}
570+
571+
if err := dec.DecodePage(&hdr, data); err != io.EOF {
572+
t.Fatalf("expected EOF, got: %v", err)
573+
}
574+
575+
if err := dec.Close(); err != nil {
576+
t.Fatal(err)
577+
}
578+
})
579+
}

0 commit comments

Comments
 (0)