Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 90 additions & 13 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ import (
"hash"
"hash/crc64"
"io"
"math"

"github.com/pierrec/lz4/v4"
)

// lz4FrameFooterSize is the size of the LZ4 frame footer:
// EndMark (4 bytes) + Content Checksum (4 bytes).
// Used when decoding old format files without compressed size prefix.
const lz4FrameFooterSize = 8

// Decoder represents a decoder of an LTX file.
type Decoder struct {
r io.Reader // main reader
zr *lz4.Reader // lz4 reader
r io.Reader // main reader
lr io.LimitedReader // limited reader for lz4 (reused)
zr *lz4.Reader // lz4 reader

header Header
trailer Trailer
Expand Down Expand Up @@ -172,19 +179,56 @@ func (dec *Decoder) DecodePage(hdr *PageHeader, data []byte) error {
return err
}

// Read page data next.
dec.zr.Reset(dec.r)
if _, err := io.ReadFull(dec.zr, data); err != nil {
return err
// Read page data using format-specific approach.
if hdr.Flags&PageHeaderFlagCompressedSize != 0 {
// New block format: read size prefix, then block data.
sizeBuf := make([]byte, 4)
if _, err := io.ReadFull(dec.r, sizeBuf); err != nil {
return fmt.Errorf("read data size: %w", err)
}
dec.writeToHash(sizeBuf)
dataSize := binary.BigEndian.Uint32(sizeBuf)

if hdr.Flags&PageHeaderFlagUncompressed != 0 {
// Data stored uncompressed - validate size matches page size.
if dataSize != dec.header.PageSize {
return fmt.Errorf("uncompressed data size mismatch: got %d, expected %d", dataSize, dec.header.PageSize)
}
if _, err := io.ReadFull(dec.r, data); err != nil {
return fmt.Errorf("read uncompressed data: %w", err)
}
} else {
// LZ4 block compressed data.
compressed := make([]byte, dataSize)
if _, err := io.ReadFull(dec.r, compressed); err != nil {
return fmt.Errorf("read compressed data: %w", err)
}
if _, err := lz4.UncompressBlock(compressed, data); err != nil {
return fmt.Errorf("decompress block: %w", err)
}
}
} else {
// Old format: use LimitedReader workaround for lz4 frame concatenation.
// The lz4 library peeks ahead after EOF to check for concatenated frames,
// so we limit reads to prevent it from reading into the next page header.
dec.lr.R = dec.r
dec.lr.N = math.MaxInt64
dec.zr.Reset(&dec.lr)

if _, err := io.ReadFull(dec.zr, data); err != nil {
return err
}

// Limit remaining reads to the LZ4 frame footer size before checking EOF.
dec.lr.N = lz4FrameFooterSize
if err := dec.readLZ4Trailer(); err != nil {
return fmt.Errorf("read lz4 trailer: %w", err)
}
}

dec.writeToHash(data)
dec.pageN++

// Read off the LZ4 trailer frame to ensure we hit EOF.
if err := dec.readLZ4Trailer(); err != nil {
return fmt.Errorf("read lz4 trailer: %w", err)
}

// Calculate checksum while decoding snapshots if tracking checksums.
if dec.header.IsSnapshot() && !dec.header.NoChecksum() {
if hdr.Pgno != LockPgno(dec.header.PageSize) {
Expand Down Expand Up @@ -301,8 +345,41 @@ func DecodePageData(b []byte) (hdr PageHeader, data []byte, err error) {
return hdr, data, nil
}

zr := lz4.NewReader(bytes.NewReader(b[PageHeaderSize:]))
data, err = io.ReadAll(zr)
if hdr.Flags&PageHeaderFlagCompressedSize != 0 {
// New block format: read size and decompress.
if len(b) < PageHeaderSize+4 {
return hdr, nil, fmt.Errorf("buffer too small for size prefix")
}
dataSize := binary.BigEndian.Uint32(b[PageHeaderSize:])
offset := PageHeaderSize + 4

if len(b) < offset+int(dataSize) {
return hdr, nil, fmt.Errorf("buffer too small for data: need %d, have %d", offset+int(dataSize), len(b))
}

if hdr.Flags&PageHeaderFlagUncompressed != 0 {
// Data stored uncompressed.
data = make([]byte, dataSize)
copy(data, b[offset:offset+int(dataSize)])
} else {
// LZ4 block compressed data.
compressed := b[offset : offset+int(dataSize)]
// Estimate uncompressed size - pages are typically 512-65536 bytes.
// We'll use a reasonable upper bound and resize if needed.
data = make([]byte, 65536)
n, err := lz4.UncompressBlock(compressed, data)
if err != nil {
return hdr, nil, fmt.Errorf("decompress block: %w", err)
}
data = data[:n]
}
} else {
// Old frame format: use LZ4 reader.
r := bytes.NewReader(b[PageHeaderSize:])
zr := lz4.NewReader(r)
data, err = io.ReadAll(zr)
}

return hdr, data, err
}

Expand Down
176 changes: 165 additions & 11 deletions decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ltx_test

import (
"bytes"
"crypto/rand"
"io"
"reflect"
"testing"
Expand Down Expand Up @@ -47,9 +48,15 @@ func TestDecoder(t *testing.T) {
data := make([]byte, 1024)
if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
} else if got, want := hdr, spec.Pages[i].Header; got != want {
t.Fatalf("page hdr mismatch:\ngot=%#v\nwant=%#v", got, want)
} else if got, want := data, spec.Pages[i].Data; !bytes.Equal(got, want) {
}
// Encoder now sets PageHeaderFlagCompressedSize, so compare only Pgno.
if got, want := hdr.Pgno, spec.Pages[i].Header.Pgno; got != want {
t.Fatalf("page hdr pgno mismatch:\ngot=%d\nwant=%d", got, want)
}
if got, want := hdr.Flags, uint16(ltx.PageHeaderFlagCompressedSize); got != want {
t.Fatalf("page hdr flags mismatch:\ngot=0x%x\nwant=0x%x", got, want)
}
if got, want := data, spec.Pages[i].Data; !bytes.Equal(got, want) {
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
}
}
Expand All @@ -64,28 +71,29 @@ func TestDecoder(t *testing.T) {
}

// Verify page index.
// Block format: PageHeader(6) + Size(4) + compressed block data (~26 bytes for repetitive data).
index := dec.PageIndex()
if got, want := index, map[uint32]ltx.PageIndexElem{
1: {MinTXID: 1, MaxTXID: 1, Offset: 100, Size: 51},
2: {MinTXID: 1, MaxTXID: 1, Offset: 151, Size: 51},
1: {MinTXID: 1, MaxTXID: 1, Offset: 100, Size: 36},
2: {MinTXID: 1, MaxTXID: 1, Offset: 136, Size: 36},
}; !reflect.DeepEqual(got, want) {
t.Fatalf("page index mismatch:\ngot=%#v\nwant=%#v", got, want)
}

// Read page 1 by offset.
if hdr, data, err := ltx.DecodePageData(fileSpecData[100:]); err != nil {
t.Fatal(err)
} else if got, want := hdr, (ltx.PageHeader{Pgno: 1}); got != want {
t.Fatalf("page header mismatch:\ngot=%#v\nwant=%#v", got, want)
} else if got, want := hdr.Pgno, uint32(1); got != want {
t.Fatalf("page header pgno mismatch:\ngot=%d\nwant=%d", got, want)
} else if got, want := data, bytes.Repeat([]byte("2"), 1024); !bytes.Equal(got, want) {
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
}

// Read page 2 by offset.
if hdr, data, err := ltx.DecodePageData(fileSpecData[151:]); err != nil {
// Read page 2 by offset. Offset is 136 with block format.
if hdr, data, err := ltx.DecodePageData(fileSpecData[136:]); err != nil {
t.Fatal(err)
} else if got, want := hdr, (ltx.PageHeader{Pgno: 2}); got != want {
t.Fatalf("page header mismatch:\ngot=%#v\nwant=%#v", got, want)
} else if got, want := hdr.Pgno, uint32(2); got != want {
t.Fatalf("page header pgno mismatch:\ngot=%d\nwant=%d", got, want)
} else if got, want := data, bytes.Repeat([]byte("3"), 1024); !bytes.Equal(got, want) {
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
}
Expand Down Expand Up @@ -232,3 +240,149 @@ func TestDecoder_DecodeDatabaseTo(t *testing.T) {
}
})
}

func TestDecoder_64KBPageSize(t *testing.T) {
const pageSize = 65536 // 64KB - maximum SQLite page size

t.Run("Compressible", func(t *testing.T) {
// Test with compressible data (repetitive pattern).
page1Data := bytes.Repeat([]byte("A"), pageSize)
page2Data := bytes.Repeat([]byte("B"), pageSize)

// Calculate correct post-apply checksum.
chksum := ltx.ChecksumFlag
chksum = ltx.ChecksumFlag | (chksum ^ ltx.ChecksumPage(1, page1Data))
chksum = ltx.ChecksumFlag | (chksum ^ ltx.ChecksumPage(2, page2Data))

var buf bytes.Buffer
enc, err := ltx.NewEncoder(&buf)
if err != nil {
t.Fatal(err)
}
if err := enc.EncodeHeader(ltx.Header{
Version: ltx.Version,
PageSize: pageSize,
Commit: 2,
MinTXID: 1,
MaxTXID: 1,
Timestamp: 1000,
}); err != nil {
t.Fatal(err)
}
if err := enc.EncodePage(ltx.PageHeader{Pgno: 1}, page1Data); err != nil {
t.Fatal(err)
}
if err := enc.EncodePage(ltx.PageHeader{Pgno: 2}, page2Data); err != nil {
t.Fatal(err)
}
enc.SetPostApplyChecksum(chksum)
if err := enc.Close(); err != nil {
t.Fatal(err)
}

// Decode and verify.
dec := ltx.NewDecoder(&buf)
if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

var hdr ltx.PageHeader
data := make([]byte, pageSize)

if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
if hdr.Pgno != 1 {
t.Fatalf("expected pgno 1, got %d", hdr.Pgno)
}
if !bytes.Equal(data, page1Data) {
t.Fatal("page 1 data mismatch")
}
// Should be compressed (flag without uncompressed bit).
if hdr.Flags != ltx.PageHeaderFlagCompressedSize {
t.Fatalf("expected compressed flag only, got 0x%x", hdr.Flags)
}

if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
if hdr.Pgno != 2 {
t.Fatalf("expected pgno 2, got %d", hdr.Pgno)
}
if !bytes.Equal(data, page2Data) {
t.Fatal("page 2 data mismatch")
}

if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if err := dec.Close(); err != nil {
t.Fatal(err)
}
})

t.Run("Incompressible", func(t *testing.T) {
// Test with incompressible data (truly random bytes).
page1Data := make([]byte, pageSize)
if _, err := rand.Read(page1Data); err != nil {
t.Fatal(err)
}

// Calculate correct post-apply checksum.
chksum := ltx.ChecksumFlag | ltx.ChecksumPage(1, page1Data)

var buf bytes.Buffer
enc, err := ltx.NewEncoder(&buf)
if err != nil {
t.Fatal(err)
}
if err := enc.EncodeHeader(ltx.Header{
Version: ltx.Version,
PageSize: pageSize,
Commit: 1,
MinTXID: 1,
MaxTXID: 1,
Timestamp: 1000,
}); err != nil {
t.Fatal(err)
}
if err := enc.EncodePage(ltx.PageHeader{Pgno: 1}, page1Data); err != nil {
t.Fatal(err)
}
enc.SetPostApplyChecksum(chksum)
if err := enc.Close(); err != nil {
t.Fatal(err)
}

// Decode and verify.
dec := ltx.NewDecoder(&buf)
if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

var hdr ltx.PageHeader
data := make([]byte, pageSize)

if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
if hdr.Pgno != 1 {
t.Fatalf("expected pgno 1, got %d", hdr.Pgno)
}
if !bytes.Equal(data, page1Data) {
t.Fatal("page 1 data mismatch")
}
// Should be stored uncompressed (random data doesn't compress).
expectedFlags := ltx.PageHeaderFlagCompressedSize | ltx.PageHeaderFlagUncompressed
if hdr.Flags != expectedFlags {
t.Fatalf("expected uncompressed flags 0x%x, got 0x%x", expectedFlags, hdr.Flags)
}

if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if err := dec.Close(); err != nil {
t.Fatal(err)
}
})
}
Loading