Skip to content

Commit 2160d79

Browse files
committed
Implement block encoder and decoder
1 parent d3602a0 commit 2160d79

13 files changed

Lines changed: 441 additions & 24 deletions

alp/alp.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package alp
22

33
import (
44
"encoding/binary"
5+
"errors"
56
"math"
67

78
"github.com/parquet-go/bitpack"
@@ -36,6 +37,8 @@ const (
3637
EncodingUncompressed EncodingType = 3
3738
)
3839

40+
var ErrInvalidEncoding = errors.New("invalid encoding")
41+
3942
// CompressionMetadata contains metadata about the compressed data
4043
type CompressionMetadata struct {
4144
EncodingType EncodingType
@@ -100,7 +103,7 @@ func Encode(dst []byte, src []float64) []byte {
100103
dst = make([]byte, packedSize+MetadataSize)
101104
}
102105
dst = dst[:packedSize]
103-
bitpack.PackInt64(dst[MetadataSize:], forValues, uint(bitWidth))
106+
bitpack.Pack(dst[MetadataSize:], forValues, uint(bitWidth))
104107

105108
// Create metadata
106109
metadata := CompressionMetadata{
@@ -138,7 +141,7 @@ func Decode(dst []float64, data []byte) []float64 {
138141
case EncodingALP:
139142
result := dst[:metadata.Count]
140143
ints := unsafecast.Slice[int64](result)
141-
bitpack.UnpackInt64(ints, data[MetadataSize:], uint(metadata.BitWidth))
144+
bitpack.Unpack(ints, data[MetadataSize:], uint(metadata.BitWidth))
142145

143146
minValue := metadata.FrameOfRef
144147
numValues := metadata.Count
@@ -258,7 +261,7 @@ func DecompressValues(result []float64, src []byte, metadata CompressionMetadata
258261
// Unpack src
259262
packedData := src[MetadataSize:]
260263
unpacked := unsafecast.Slice[int64](result)
261-
bitpack.UnpackInt64(unpacked, packedData, uint(metadata.BitWidth))
264+
bitpack.Unpack(unpacked, packedData, uint(metadata.BitWidth))
262265

263266
// Reverse frame-of-reference and convert back to float64 in one pass
264267
minValue := metadata.FrameOfRef

alp/alp_stream.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package alp
2+
3+
import (
4+
"io"
5+
6+
"github.com/parquet-go/bitpack"
7+
"github.com/parquet-go/bitpack/unsafecast"
8+
)
9+
10+
// StreamEncode encodes float64 values using ALP with block-based packing for streaming decode
11+
func StreamEncode(dst []byte, src []float64, blockSize int) []byte {
12+
if len(src) == 0 {
13+
if cap(dst) < MetadataSize {
14+
dst = make([]byte, MetadataSize)
15+
}
16+
dst = dst[:MetadataSize]
17+
18+
encodeMetadata(dst, CompressionMetadata{
19+
EncodingType: EncodingNone,
20+
Count: 0,
21+
})
22+
return dst
23+
}
24+
25+
// Find global encoding parameters
26+
exponent := findBestExponent(src)
27+
factor := powersOf10[exponent+10]
28+
29+
// Convert all to integers with global exponent
30+
forValues := encodeToIntegers(src, factor)
31+
32+
// Find global frame-of-reference
33+
minValue := forValues[0]
34+
for _, v := range forValues {
35+
minValue = min(minValue, v)
36+
}
37+
38+
// Apply frame-of-reference and find global bit-width
39+
bitWidth := 0
40+
for i, v := range forValues {
41+
forValues[i] = v - minValue
42+
bits := CalculateBitWidth(uint64(forValues[i]))
43+
bitWidth = max(bitWidth, bits)
44+
}
45+
46+
// Calculate total packed size.
47+
blockSizeBytes := bitpack.ByteCount(uint(blockSize * bitWidth))
48+
totalBlocks := (len(forValues) + blockSize - 1) / blockSize
49+
packedSize := blockSizeBytes*totalBlocks + bitpack.PaddingInt64
50+
51+
// Create output buffer: metadata + packed blocks
52+
totalSize := MetadataSize + packedSize
53+
if cap(dst) < totalSize {
54+
dst = make([]byte, totalSize)
55+
}
56+
dst = dst[:totalSize]
57+
58+
encodeMetadata(dst, CompressionMetadata{
59+
EncodingType: EncodingALP,
60+
Count: int32(len(src)),
61+
Exponent: int8(exponent),
62+
BitWidth: uint8(bitWidth),
63+
FrameOfRef: minValue,
64+
})
65+
66+
// Pack data in blocks continuously block after block.
67+
offset := MetadataSize
68+
for i := range totalBlocks {
69+
var (
70+
blockStart = i * blockSize
71+
blockEnd = min(blockStart+blockSize, len(forValues))
72+
blockData = forValues[blockStart:blockEnd]
73+
)
74+
// Pack this block.
75+
bitpack.Pack(dst[offset:offset+blockSizeBytes], blockData, uint(bitWidth))
76+
offset += blockSizeBytes
77+
}
78+
79+
return dst
80+
}
81+
82+
type StreamDecoder struct {
83+
buf []byte
84+
metadata CompressionMetadata
85+
blockSize int
86+
decodedBuf []float64 // Buffer for decoded block
87+
decodedBufOffset int // Current read position in decoded buffer
88+
valuesRead int32 // Total values read so far
89+
}
90+
91+
func (d *StreamDecoder) Reset(buf []byte, blockSize int) {
92+
d.buf = buf
93+
d.blockSize = blockSize
94+
if cap(d.decodedBuf) < blockSize {
95+
d.decodedBuf = make([]float64, 0, blockSize)
96+
}
97+
d.decodedBuf = d.decodedBuf[:0]
98+
d.decodedBufOffset = 0
99+
d.valuesRead = 0
100+
101+
// Read global metadata
102+
if len(buf) >= MetadataSize {
103+
d.metadata = DecodeMetadata(buf)
104+
d.buf = buf[MetadataSize:]
105+
}
106+
}
107+
108+
func (d *StreamDecoder) Decode(dst []float64) ([]float64, error) {
109+
if d.valuesRead >= d.metadata.Count {
110+
return dst[:0], io.EOF
111+
}
112+
113+
// If we've consumed all values from current block, decode next block
114+
if d.decodedBufOffset >= len(d.decodedBuf) {
115+
// Determine block size
116+
remaining := d.metadata.Count - d.valuesRead
117+
blockSize := min(int32(d.blockSize), remaining)
118+
119+
// Calculate size of packed data for this block (no per-block padding)
120+
packedSize := bitpack.ByteCount(uint(int(blockSize) * int(d.metadata.BitWidth)))
121+
122+
// Allocate buffer for decoded block
123+
if cap(d.decodedBuf) < int(blockSize) {
124+
d.decodedBuf = make([]float64, blockSize)
125+
}
126+
d.decodedBuf = d.decodedBuf[:blockSize]
127+
128+
// Unpack entire block
129+
ints := unsafecast.Slice[int64](d.decodedBuf)
130+
bitpack.Unpack(ints, d.buf, uint(d.metadata.BitWidth))
131+
d.buf = d.buf[packedSize:]
132+
133+
// Convert to float64
134+
minValue := d.metadata.FrameOfRef
135+
invFactor := powersOf10[(10-d.metadata.Exponent+21)%21]
136+
137+
for i := range d.decodedBuf {
138+
d.decodedBuf[i] = float64(ints[i]+minValue) * invFactor
139+
}
140+
141+
d.decodedBufOffset = 0
142+
}
143+
144+
// Return a chunk from the decoded buffer
145+
remaining := len(d.decodedBuf) - d.decodedBufOffset
146+
n := min(len(dst), remaining)
147+
148+
copy(dst[:n], d.decodedBuf[d.decodedBufOffset:d.decodedBufOffset+n])
149+
d.decodedBufOffset += n
150+
d.valuesRead += int32(n)
151+
152+
// Check if we're done
153+
var err error
154+
if d.valuesRead >= d.metadata.Count {
155+
err = io.EOF
156+
}
157+
158+
return dst[:n], err
159+
}

alp/alp_test.go

Lines changed: 155 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package alp
22

33
import (
4+
"io"
45
"math"
56
"testing"
67

@@ -238,17 +239,169 @@ func TestFindMaxBitWidth(t *testing.T) {
238239
// PackInt64Array packs an array of uint64 values with the same bit width
239240
func PackInt64Array(values []int64, bitWidth uint) []byte {
240241
dst := make([]byte, BitPackedSize(uint32(len(values)), bitWidth))
241-
bitpack.PackInt64(dst, values, bitWidth)
242+
bitpack.Pack(dst, values, bitWidth)
242243
return dst
243244
}
244245

245246
// UnpackInt64Array unpacks an array of int64 values
246247
func UnpackInt64Array(data []byte, count int, bitWidth uint) []int64 {
247248
dst := make([]int64, count)
248-
bitpack.UnpackInt64(dst, data, bitWidth)
249+
bitpack.Unpack(dst, data, bitWidth)
249250
return dst
250251
}
251252

252253
func BitPackedSize(numValues uint32, bitWidth uint) int {
253254
return bitpack.ByteCount(uint(numValues)*bitWidth) + bitpack.PaddingInt64
254255
}
256+
257+
func TestDecodeRange(t *testing.T) {
258+
tests := []struct {
259+
name string
260+
data []float64
261+
blockSize int
262+
bufSize int
263+
}{
264+
{
265+
name: "smaller read buffer than values",
266+
data: []float64{6, 2, 3, 4, 5, 6},
267+
blockSize: 120,
268+
bufSize: 3,
269+
},
270+
{
271+
name: "smaller read buffer than values",
272+
data: []float64{1, 2, 3, 4, 5, 6},
273+
blockSize: 120,
274+
bufSize: 3,
275+
},
276+
{
277+
name: "smaller read buffer than values",
278+
data: []float64{1.1, 2.2, 3.2, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.0},
279+
blockSize: 120,
280+
bufSize: 3,
281+
},
282+
{
283+
name: "read buffer multiple of values",
284+
data: []float64{1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8},
285+
blockSize: 120,
286+
bufSize: 4,
287+
},
288+
}
289+
290+
for _, tt := range tests {
291+
t.Run(tt.name, func(t *testing.T) {
292+
// Encode with StreamEncode
293+
compressed := StreamEncode(nil, tt.data, tt.blockSize)
294+
295+
// Decode with StreamDecoder
296+
decoder := StreamDecoder{}
297+
decoder.Reset(compressed, tt.blockSize)
298+
299+
fullDecoded := make([]float64, 0, len(tt.data))
300+
readBuf := make([]float64, tt.bufSize)
301+
302+
for {
303+
readBuf, err := decoder.Decode(readBuf)
304+
if err != io.EOF {
305+
if err != nil {
306+
t.Fatalf("unexpected error %v", err)
307+
}
308+
}
309+
fullDecoded = append(fullDecoded, readBuf...)
310+
if err == io.EOF {
311+
break
312+
}
313+
}
314+
// Verify length
315+
if len(fullDecoded) != len(tt.data) {
316+
t.Fatalf("length mismatch: got %d, want %d", len(fullDecoded), len(tt.data))
317+
}
318+
319+
// Verify values with tolerance
320+
for i := range tt.data {
321+
if math.Abs(fullDecoded[i]-tt.data[i]) > 1e-10 {
322+
t.Errorf("value mismatch at index %d: got %f, want %f", i, fullDecoded[i], tt.data[i])
323+
}
324+
}
325+
})
326+
}
327+
}
328+
329+
func TestStreamEncoderDecoder(t *testing.T) {
330+
tests := []struct {
331+
name string
332+
data []float64
333+
blockSize int
334+
}{
335+
{
336+
name: "small dataset",
337+
data: []float64{1, 2, 3, 4, 5, 6},
338+
blockSize: 3,
339+
},
340+
{
341+
name: "small dataset uneven block size",
342+
data: []float64{1, 2, 3, 4, 5},
343+
blockSize: 3,
344+
},
345+
{
346+
name: "single block",
347+
data: []float64{1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8},
348+
blockSize: 120,
349+
},
350+
{
351+
name: "multiple blocks different ranges",
352+
data: []float64{1, 2, 3, 100, 101, 102, 1000, 1001, 1002},
353+
blockSize: 3,
354+
},
355+
{
356+
name: "large dataset",
357+
data: func() []float64 {
358+
data := make([]float64, 1000)
359+
for i := range data {
360+
data[i] = float64(i) * 0.1
361+
}
362+
return data
363+
}(),
364+
blockSize: 120,
365+
},
366+
}
367+
368+
for _, tt := range tests {
369+
t.Run(tt.name, func(t *testing.T) {
370+
// Encode
371+
compressed := StreamEncode(nil, tt.data, tt.blockSize)
372+
373+
t.Logf("Original size: %d bytes, Compressed size: %d bytes, Ratio: %.2f%%",
374+
len(tt.data)*8, len(compressed), float64(len(compressed))/float64(len(tt.data)*8)*100)
375+
376+
// Decode
377+
decoder := StreamDecoder{}
378+
decoder.Reset(compressed, tt.blockSize)
379+
380+
decoded := make([]float64, 0, len(tt.data))
381+
readBuf := make([]float64, tt.blockSize)
382+
383+
for {
384+
result, err := decoder.Decode(readBuf)
385+
decoded = append(decoded, result...)
386+
if err == io.EOF {
387+
break
388+
}
389+
if err != nil {
390+
t.Fatalf("unexpected error: %v", err)
391+
}
392+
}
393+
394+
// Verify length
395+
if len(decoded) != len(tt.data) {
396+
t.Fatalf("length mismatch: got %d, want %d", len(decoded), len(tt.data))
397+
}
398+
399+
// Verify values
400+
for i := range tt.data {
401+
if math.Abs(decoded[i]-tt.data[i]) > 1e-10 {
402+
t.Errorf("value mismatch at index %d: got %f, want %f", i, decoded[i], tt.data[i])
403+
}
404+
}
405+
})
406+
}
407+
}

0 commit comments

Comments
 (0)