Skip to content

Commit fdbcb22

Browse files
corylanouclaude
andcommitted
Add compressed size prefix to page headers for lz4 v4.1.23 compatibility
The lz4 library v4.1.23 added frame concatenation support, which peeks ahead after reading a frame to check for another concatenated frame. This broke LTX because each page is an independent LZ4 frame with a PageHeader in between. This change adds a new PageHeaderFlagCompressedSize flag and writes a 4-byte compressed size prefix after each page header. The decoder uses this size to create an exact LimitedReader, preventing lz4 from peeking into the next page. For backward compatibility, the decoder handles both formats: - New format (flag set): reads compressed size, uses exact LimitedReader - Old format (flag=0): uses LimitedReader workaround with lz4 frame footer size Fixes #70 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 133c1b1 commit fdbcb22

File tree

7 files changed

+134
-66
lines changed

7 files changed

+134
-66
lines changed

decoder.go

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,21 @@ import (
77
"hash"
88
"hash/crc64"
99
"io"
10+
"math"
1011

1112
"github.com/pierrec/lz4/v4"
1213
)
1314

15+
// lz4FrameFooterSize is the size of the LZ4 frame footer:
16+
// EndMark (4 bytes) + Content Checksum (4 bytes).
17+
// Used when decoding old format files without compressed size prefix.
18+
const lz4FrameFooterSize = 8
19+
1420
// Decoder represents a decoder of an LTX file.
1521
type Decoder struct {
16-
r io.Reader // main reader
17-
zr *lz4.Reader // lz4 reader
22+
r io.Reader // main reader
23+
lr io.LimitedReader // limited reader for lz4 (reused)
24+
zr *lz4.Reader // lz4 reader
1825

1926
header Header
2027
trailer Trailer
@@ -172,19 +179,53 @@ func (dec *Decoder) DecodePage(hdr *PageHeader, data []byte) error {
172179
return err
173180
}
174181

175-
// Read page data next.
176-
dec.zr.Reset(dec.r)
177-
if _, err := io.ReadFull(dec.zr, data); err != nil {
178-
return err
182+
// Read page data using format-specific approach.
183+
if hdr.Flags&PageHeaderFlagCompressedSize != 0 {
184+
// New format: read compressed size prefix, then use exact LimitedReader.
185+
sizeBuf := make([]byte, 4)
186+
if _, err := io.ReadFull(dec.r, sizeBuf); err != nil {
187+
return fmt.Errorf("read compressed size: %w", err)
188+
}
189+
dec.writeToHash(sizeBuf)
190+
191+
compressedSize := binary.BigEndian.Uint32(sizeBuf)
192+
dec.lr.R = dec.r
193+
dec.lr.N = int64(compressedSize)
194+
dec.zr.Reset(&dec.lr)
195+
196+
if _, err := io.ReadFull(dec.zr, data); err != nil {
197+
return err
198+
}
199+
200+
// Drain any remaining bytes from the LimitedReader.
201+
// The LZ4 reader may not consume all bytes (e.g., frame trailer).
202+
if dec.lr.N > 0 {
203+
if _, err := io.CopyN(io.Discard, &dec.lr, dec.lr.N); err != nil {
204+
return fmt.Errorf("drain lz4 frame: %w", err)
205+
}
206+
}
207+
} else {
208+
// Old format: use LimitedReader workaround for lz4 frame concatenation.
209+
// The lz4 library peeks ahead after EOF to check for concatenated frames,
210+
// so we limit reads to prevent it from reading into the next page header.
211+
dec.lr.R = dec.r
212+
dec.lr.N = math.MaxInt64
213+
dec.zr.Reset(&dec.lr)
214+
215+
if _, err := io.ReadFull(dec.zr, data); err != nil {
216+
return err
217+
}
218+
219+
// Limit remaining reads to the LZ4 frame footer size before checking EOF.
220+
dec.lr.N = lz4FrameFooterSize
221+
if err := dec.readLZ4Trailer(); err != nil {
222+
return fmt.Errorf("read lz4 trailer: %w", err)
223+
}
179224
}
225+
180226
dec.writeToHash(data)
181227
dec.pageN++
182228

183-
// Read off the LZ4 trailer frame to ensure we hit EOF.
184-
if err := dec.readLZ4Trailer(); err != nil {
185-
return fmt.Errorf("read lz4 trailer: %w", err)
186-
}
187-
188229
// Calculate checksum while decoding snapshots if tracking checksums.
189230
if dec.header.IsSnapshot() && !dec.header.NoChecksum() {
190231
if hdr.Pgno != LockPgno(dec.header.PageSize) {
@@ -301,7 +342,19 @@ func DecodePageData(b []byte) (hdr PageHeader, data []byte, err error) {
301342
return hdr, data, nil
302343
}
303344

304-
zr := lz4.NewReader(bytes.NewReader(b[PageHeaderSize:]))
345+
// Determine offset and size of LZ4 data based on format.
346+
var r io.Reader
347+
if hdr.Flags&PageHeaderFlagCompressedSize != 0 {
348+
// New format: read compressed size and limit reader to that size.
349+
compressedSize := binary.BigEndian.Uint32(b[PageHeaderSize:])
350+
offset := PageHeaderSize + 4
351+
r = bytes.NewReader(b[offset : offset+int(compressedSize)])
352+
} else {
353+
// Old format: pass remaining bytes, rely on LZ4 frame boundaries.
354+
r = bytes.NewReader(b[PageHeaderSize:])
355+
}
356+
357+
zr := lz4.NewReader(r)
305358
data, err = io.ReadAll(zr)
306359
return hdr, data, err
307360
}

decoder_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,15 @@ func TestDecoder(t *testing.T) {
4747
data := make([]byte, 1024)
4848
if err := dec.DecodePage(&hdr, data); err != nil {
4949
t.Fatal(err)
50-
} else if got, want := hdr, spec.Pages[i].Header; got != want {
51-
t.Fatalf("page hdr mismatch:\ngot=%#v\nwant=%#v", got, want)
52-
} else if got, want := data, spec.Pages[i].Data; !bytes.Equal(got, want) {
50+
}
51+
// Encoder now sets PageHeaderFlagCompressedSize, so compare only Pgno.
52+
if got, want := hdr.Pgno, spec.Pages[i].Header.Pgno; got != want {
53+
t.Fatalf("page hdr pgno mismatch:\ngot=%d\nwant=%d", got, want)
54+
}
55+
if got, want := hdr.Flags, uint16(ltx.PageHeaderFlagCompressedSize); got != want {
56+
t.Fatalf("page hdr flags mismatch:\ngot=0x%x\nwant=0x%x", got, want)
57+
}
58+
if got, want := data, spec.Pages[i].Data; !bytes.Equal(got, want) {
5359
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
5460
}
5561
}
@@ -64,28 +70,29 @@ func TestDecoder(t *testing.T) {
6470
}
6571

6672
// Verify page index.
73+
// New format adds 4-byte compressed size prefix, so Size is 55 instead of 51.
6774
index := dec.PageIndex()
6875
if got, want := index, map[uint32]ltx.PageIndexElem{
69-
1: {MinTXID: 1, MaxTXID: 1, Offset: 100, Size: 51},
70-
2: {MinTXID: 1, MaxTXID: 1, Offset: 151, Size: 51},
76+
1: {MinTXID: 1, MaxTXID: 1, Offset: 100, Size: 55},
77+
2: {MinTXID: 1, MaxTXID: 1, Offset: 155, Size: 55},
7178
}; !reflect.DeepEqual(got, want) {
7279
t.Fatalf("page index mismatch:\ngot=%#v\nwant=%#v", got, want)
7380
}
7481

7582
// Read page 1 by offset.
7683
if hdr, data, err := ltx.DecodePageData(fileSpecData[100:]); err != nil {
7784
t.Fatal(err)
78-
} else if got, want := hdr, (ltx.PageHeader{Pgno: 1}); got != want {
79-
t.Fatalf("page header mismatch:\ngot=%#v\nwant=%#v", got, want)
85+
} else if got, want := hdr.Pgno, uint32(1); got != want {
86+
t.Fatalf("page header pgno mismatch:\ngot=%d\nwant=%d", got, want)
8087
} else if got, want := data, bytes.Repeat([]byte("2"), 1024); !bytes.Equal(got, want) {
8188
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
8289
}
8390

84-
// Read page 2 by offset.
85-
if hdr, data, err := ltx.DecodePageData(fileSpecData[151:]); err != nil {
91+
// Read page 2 by offset. Offset is 155 with new format (+4 bytes per page).
92+
if hdr, data, err := ltx.DecodePageData(fileSpecData[155:]); err != nil {
8693
t.Fatal(err)
87-
} else if got, want := hdr, (ltx.PageHeader{Pgno: 2}); got != want {
88-
t.Fatalf("page header mismatch:\ngot=%#v\nwant=%#v", got, want)
94+
} else if got, want := hdr.Pgno, uint32(2); got != want {
95+
t.Fatalf("page header pgno mismatch:\ngot=%d\nwant=%d", got, want)
8996
} else if got, want := data, bytes.Repeat([]byte("3"), 1024); !bytes.Equal(got, want) {
9097
t.Fatalf("page data mismatch:\ngot=%#v\nwant=%#v", got, want)
9198
}

encoder.go

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -243,18 +243,41 @@ func (enc *Encoder) EncodePage(hdr PageHeader, data []byte) (err error) {
243243

244244
offset := enc.n
245245

246-
// Encode & write header.
246+
// Compress data first to get the compressed size.
247+
enc.buf.Reset()
248+
enc.zw.Reset(&enc.buf)
249+
if _, err = enc.zw.Write(data); err != nil {
250+
return fmt.Errorf("compress page data: %w", err)
251+
}
252+
if err := enc.zw.Close(); err != nil {
253+
return fmt.Errorf("close lz4 writer: %w", err)
254+
}
255+
compressed := enc.buf.Bytes()
256+
257+
// Set flag indicating compressed size follows the page header.
258+
hdr.Flags |= PageHeaderFlagCompressedSize
259+
260+
// Write page header.
247261
b, err := hdr.MarshalBinary()
248262
if err != nil {
249263
return fmt.Errorf("marshal: %w", err)
250264
} else if _, err := enc.write(b); err != nil {
251-
return fmt.Errorf("write: %w", err)
265+
return fmt.Errorf("write page header: %w", err)
266+
}
267+
268+
// Write compressed size (4 bytes, big-endian).
269+
sizeBuf := make([]byte, 4)
270+
binary.BigEndian.PutUint32(sizeBuf, uint32(len(compressed)))
271+
if _, err := enc.write(sizeBuf); err != nil {
272+
return fmt.Errorf("write compressed size: %w", err)
252273
}
253274

254-
// Write data to file.
255-
if _, err = enc.writeCompressed(data); err != nil {
256-
return fmt.Errorf("write page data: %w", err)
275+
// Write compressed data.
276+
if _, err := enc.w.Write(compressed); err != nil {
277+
return fmt.Errorf("write compressed data: %w", err)
257278
}
279+
_, _ = enc.hash.Write(data) // hash the uncompressed data
280+
enc.n += int64(len(compressed))
258281

259282
enc.pagesWritten++
260283
enc.prevPgno = hdr.Pgno
@@ -273,34 +296,6 @@ func (enc *Encoder) write(b []byte) (n int, err error) {
273296
return n, err
274297
}
275298

276-
// write to the compressed writer & add to the checksum.
277-
// Returns the size of the compressed data.
278-
func (enc *Encoder) writeCompressed(b []byte) (n int, err error) {
279-
// Reset the buffer & compressed writer.
280-
enc.buf.Reset()
281-
enc.zw.Reset(&enc.buf)
282-
283-
// Write to the compressed writer to the buffer and then write the buffer to the uncompressed writer.
284-
// This is necessary so we can calculate the size of the compressed data for the page index.
285-
if _, err = enc.zw.Write(b); err != nil {
286-
return n, err
287-
}
288-
289-
// Close the compressed writer to flush any remaining data.
290-
if err := enc.zw.Close(); err != nil {
291-
return n, fmt.Errorf("cannot close lz4 writer: %w", err)
292-
}
293-
294-
compressed := enc.buf.Bytes()
295-
n, err = enc.w.Write(compressed)
296-
297-
// Write the uncompressed data to the hash, but add the compressed length to the size.
298-
_, _ = enc.hash.Write(b)
299-
enc.n += int64(len(compressed))
300-
301-
return n, err
302-
}
303-
304299
func (enc *Encoder) writeToHash(b []byte) {
305300
_, _ = enc.hash.Write(b)
306301
enc.n += int64(len(b))

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ module github.com/superfly/ltx
22

33
go 1.24
44

5-
require github.com/pierrec/lz4/v4 v4.1.22
5+
require github.com/pierrec/lz4/v4 v4.1.23

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
2-
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
1+
github.com/pierrec/lz4/v4 v4.1.23 h1:oJE7T90aYBGtFNrI8+KbETnPymobAhzRrR8Mu8n1yfU=
2+
github.com/pierrec/lz4/v4 v4.1.23/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=

ltx.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,15 @@ func IsValidPageSize(sz uint32) bool {
405405
return false
406406
}
407407

408+
// PageHeader flags.
409+
const (
410+
// PageHeaderFlagCompressedSize indicates that a 4-byte compressed size
411+
// field follows the page header. This allows the decoder to know the exact
412+
// size of the LZ4 frame without relying on the LZ4 library's frame
413+
// concatenation behavior.
414+
PageHeaderFlagCompressedSize = uint16(1 << 0)
415+
)
416+
408417
// PageHeader represents the header for a single page in an LTX file.
409418
type PageHeader struct {
410419
Pgno uint32
@@ -421,8 +430,8 @@ func (h *PageHeader) Validate() error {
421430
if h.Pgno == 0 {
422431
return fmt.Errorf("page number required")
423432
}
424-
if h.Flags != 0 {
425-
return fmt.Errorf("no flags allowed, reserved for future use")
433+
if h.Flags & ^PageHeaderFlagCompressedSize != 0 {
434+
return fmt.Errorf("invalid page header flags: 0x%04x", h.Flags)
426435
}
427436
return nil
428437
}

ltx_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func TestPageHeader_Validate(t *testing.T) {
275275
})
276276
t.Run("ErrFlagsNotAllowed", func(t *testing.T) {
277277
hdr := ltx.PageHeader{Pgno: 1, Flags: 2}
278-
if err := hdr.Validate(); err == nil || err.Error() != `no flags allowed, reserved for future use` {
278+
if err := hdr.Validate(); err == nil || err.Error() != `invalid page header flags: 0x0002` {
279279
t.Fatalf("unexpected error: %s", err)
280280
}
281281
})
@@ -816,15 +816,19 @@ func assertFileSpecEqual(tb testing.TB, x, y *ltx.FileSpec) {
816816
tb.Fatalf("page count: %d, want %d", got, want)
817817
}
818818
for i := range x.Pages {
819-
if got, want := x.Pages[i].Header, y.Pages[i].Header; got != want {
820-
tb.Fatalf("page header mismatch: i=%d\ngot=%#v\nwant=%#v", i, got, want)
819+
// Compare only Pgno, not Flags. The encoder sets PageHeaderFlagCompressedSize
820+
// on output, so Flags won't match the input spec.
821+
if got, want := x.Pages[i].Header.Pgno, y.Pages[i].Header.Pgno; got != want {
822+
tb.Fatalf("page header pgno mismatch: i=%d\ngot=%d\nwant=%d", i, got, want)
821823
}
822824
if got, want := x.Pages[i].Data, y.Pages[i].Data; !bytes.Equal(got, want) {
823825
tb.Fatalf("page data mismatch: i=%d\ngot=%#v\nwant=%#v", i, got, want)
824826
}
825827
}
826828

827-
if got, want := x.Trailer, y.Trailer; got != want {
828-
tb.Fatalf("trailer mismatch:\ngot=%#v\nwant=%#v", got, want)
829+
// Compare only PostApplyChecksum, not FileChecksum. FileChecksum depends on
830+
// the exact byte representation of the file, which changes when the format changes.
831+
if got, want := x.Trailer.PostApplyChecksum, y.Trailer.PostApplyChecksum; got != want {
832+
tb.Fatalf("trailer PostApplyChecksum mismatch:\ngot=0x%x\nwant=0x%x", got, want)
829833
}
830834
}

0 commit comments

Comments
 (0)