Skip to content

Commit 6925b3e

Browse files
committed
feat(compactor): allow header timestamp override
1 parent d017048 commit 6925b3e

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

compactor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ type Compactor struct {
3636
total atomic.Uint32 // total pages (from last input's Commit)
3737

3838
// These flags will be set when encoding the header.
39-
HeaderFlags uint32
39+
HeaderFlags uint32
40+
HeaderTimestamp *int64
4041

4142
// If true, the compactor will not validate that input files have contiguous
4243
// transaction IDs. This is false by default but can be enabled when
@@ -106,6 +107,10 @@ func (c *Compactor) Compact(ctx context.Context) (retErr error) {
106107
// Fetch the first and last headers from the sorted readers.
107108
minHdr := c.inputs[0].dec.Header()
108109
maxHdr := c.inputs[len(c.inputs)-1].dec.Header()
110+
timestamp := maxHdr.Timestamp
111+
if c.HeaderTimestamp != nil {
112+
timestamp = *c.HeaderTimestamp
113+
}
109114

110115
// Generate output header. Skip NodeID as it's not meaningful after compaction.
111116
if err := c.enc.EncodeHeader(Header{
@@ -115,7 +120,7 @@ func (c *Compactor) Compact(ctx context.Context) (retErr error) {
115120
Commit: maxHdr.Commit,
116121
MinTXID: minHdr.MinTXID,
117122
MaxTXID: maxHdr.MaxTXID,
118-
Timestamp: maxHdr.Timestamp,
123+
Timestamp: timestamp,
119124
PreApplyChecksum: minHdr.PreApplyChecksum,
120125
}); err != nil {
121126
return fmt.Errorf("write header: %w", err)

compactor_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,67 @@ func TestCompactor_Compact(t *testing.T) {
120120
},
121121
})
122122
})
123+
t.Run("HeaderTimestampOverride", func(t *testing.T) {
124+
input0 := &ltx.FileSpec{
125+
Header: ltx.Header{
126+
Version: ltx.Version,
127+
PageSize: 512,
128+
Commit: 1,
129+
MinTXID: 2,
130+
MaxTXID: 2,
131+
Timestamp: 1000,
132+
PreApplyChecksum: ltx.ChecksumFlag | 2,
133+
},
134+
Pages: []ltx.PageSpec{
135+
{
136+
Header: ltx.PageHeader{Pgno: 1},
137+
Data: bytes.Repeat([]byte{0x81}, 512),
138+
},
139+
},
140+
Trailer: ltx.Trailer{
141+
PostApplyChecksum: ltx.ChecksumFlag | 2,
142+
},
143+
}
144+
input1 := &ltx.FileSpec{
145+
Header: ltx.Header{
146+
Version: ltx.Version,
147+
PageSize: 512,
148+
Commit: 1,
149+
MinTXID: 3,
150+
MaxTXID: 3,
151+
Timestamp: 2000,
152+
PreApplyChecksum: ltx.ChecksumFlag | 3,
153+
},
154+
Pages: []ltx.PageSpec{
155+
{
156+
Header: ltx.PageHeader{Pgno: 1},
157+
Data: bytes.Repeat([]byte{0x91}, 512),
158+
},
159+
},
160+
Trailer: ltx.Trailer{
161+
PostApplyChecksum: ltx.ChecksumFlag | 3,
162+
},
163+
}
164+
var buf0, buf1 bytes.Buffer
165+
writeFileSpec(t, &buf0, input0)
166+
writeFileSpec(t, &buf1, input1)
167+
168+
var output bytes.Buffer
169+
c, err := ltx.NewCompactor(&output, []io.Reader{&buf0, &buf1})
170+
if err != nil {
171+
t.Fatal(err)
172+
}
173+
timestamp := int64(1000)
174+
c.HeaderTimestamp = &timestamp
175+
if err := c.Compact(context.Background()); err != nil {
176+
t.Fatal(err)
177+
}
178+
179+
spec := readFileSpec(t, &output)
180+
if got, want := spec.Header.Timestamp, int64(1000); got != want {
181+
t.Fatalf("Timestamp=%d, want %d", got, want)
182+
}
183+
})
123184
t.Run("NonSnapshotPageDataOnly", func(t *testing.T) {
124185
spec, err := compactFileSpecs(t,
125186
&ltx.FileSpec{

0 commit comments

Comments
 (0)