Skip to content

Commit dcb85bf

Browse files
fix(parquet): bss encoding and tests on big endian systems (#663)
### Rationale for this change To ensure the Arrow and Parquet Go libraries work correctly on big-endian architectures. ### What changes are included in this PR? Added endianness-aware BYTE_STREAM_SPLIT decoding in the parquet/encoding package. Fixed tests in the parquet package to handle byte order correctly on big-endian systems. ### Are these changes tested? Yes, all affected unit tests now pass on both little-endian and big-endian machines. The changes specifically address some of the previously failing tests on big-endian systems. ### Are there any user-facing changes? No user-facing API changes. The changes are internal and ensure correct behavior on supported architectures.
1 parent f7d74a6 commit dcb85bf

File tree

6 files changed

+160
-51
lines changed

6 files changed

+160
-51
lines changed

parquet/file/file_writer_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/apache/arrow-go/v18/arrow"
2929
"github.com/apache/arrow-go/v18/arrow/array"
3030
"github.com/apache/arrow-go/v18/arrow/memory"
31+
"github.com/apache/arrow-go/v18/internal/utils"
3132
"github.com/apache/arrow-go/v18/parquet"
3233
"github.com/apache/arrow-go/v18/parquet/compress"
3334
"github.com/apache/arrow-go/v18/parquet/file"
@@ -498,9 +499,11 @@ type errCloseWriter struct {
498499
func (c *errCloseWriter) Write(p []byte) (n int, err error) {
499500
return c.sink.Write(p)
500501
}
502+
501503
func (c *errCloseWriter) Close() error {
502504
return fmt.Errorf("error during close")
503505
}
506+
504507
func (c *errCloseWriter) Bytes() []byte {
505508
return c.sink.Bytes()
506509
}
@@ -669,6 +672,7 @@ func NewColumnIndexObject(colIdx metadata.ColumnIndex) (ret ColumnIndexObject) {
669672
}
670673

671674
func simpleEncode[T int32 | int64 | float32 | float64](val T) []byte {
675+
val = utils.ToLE(val)
672676
return unsafe.Slice((*byte)(unsafe.Pointer(&val)), unsafe.Sizeof(val))
673677
}
674678

@@ -987,10 +991,16 @@ func (t *PageIndexRoundTripSuite) TestMultiplePages() {
987991
t.Equal(t.columnIndexes, []ColumnIndexObject{
988992
{
989993
nullPages: []bool{false, false, false, true},
990-
minValues: [][]byte{simpleEncode(int64(1)), simpleEncode(int64(3)),
991-
simpleEncode(int64(6)), {}},
992-
maxValues: [][]byte{simpleEncode(int64(2)), simpleEncode(int64(4)),
993-
simpleEncode(int64(6)), {}},
994+
minValues: [][]byte{
995+
simpleEncode(int64(1)), simpleEncode(int64(3)),
996+
simpleEncode(int64(6)),
997+
{},
998+
},
999+
maxValues: [][]byte{
1000+
simpleEncode(int64(2)), simpleEncode(int64(4)),
1001+
simpleEncode(int64(6)),
1002+
{},
1003+
},
9941004
boundaryOrder: metadata.Ascending, nullCounts: []int64{0, 0, 1, 2},
9951005
},
9961006
{

parquet/internal/encoding/byte_stream_split.go

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -88,38 +88,6 @@ func encodeByteStreamSplitWidth8(data []byte, in []byte) {
8888
}
8989
}
9090

91-
// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data',
92-
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
93-
// 'out' must have space for at least len(data) bytes.
94-
func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) {
95-
const width = 4
96-
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
97-
for element := 0; element < nValues; element++ {
98-
out[width*element] = data[element]
99-
out[width*element+1] = data[stride+element]
100-
out[width*element+2] = data[2*stride+element]
101-
out[width*element+3] = data[3*stride+element]
102-
}
103-
}
104-
105-
// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data',
106-
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
107-
// 'out' must have space for at least len(data) bytes.
108-
func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) {
109-
const width = 8
110-
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
111-
for element := 0; element < nValues; element++ {
112-
out[width*element] = data[element]
113-
out[width*element+1] = data[stride+element]
114-
out[width*element+2] = data[2*stride+element]
115-
out[width*element+3] = data[3*stride+element]
116-
out[width*element+4] = data[4*stride+element]
117-
out[width*element+5] = data[5*stride+element]
118-
out[width*element+6] = data[6*stride+element]
119-
out[width*element+7] = data[7*stride+element]
120-
}
121-
}
122-
12391
// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data',
12492
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
12593
// 'out' must have space for at least nValues slices.
@@ -303,12 +271,15 @@ func (dec *ByteStreamSplitDecoder[T]) Decode(out []T) (int, error) {
303271
return 0, xerrors.New("parquet: eof exception")
304272
}
305273

274+
// reinterpret the output slice as bytes so that we can decode directly into it without an intermediate copy
275+
// however, the byte stream split encoding is defined in little-endian order, so we need to decode the bytes
276+
// into the output slice in the correct order based on the machine's endianness
306277
outBytes := arrow.GetBytes(out)
307278
switch typeLen {
308279
case 4:
309-
decodeByteStreamSplitBatchWidth4(dec.data, toRead, dec.stride, outBytes)
280+
decodeByteStreamSplitBatchWidth4InByteOrder(dec.data, toRead, dec.stride, outBytes)
310281
case 8:
311-
decodeByteStreamSplitBatchWidth8(dec.data, toRead, dec.stride, outBytes)
282+
decodeByteStreamSplitBatchWidth8InByteOrder(dec.data, toRead, dec.stride, outBytes)
312283
default:
313284
return 0, fmt.Errorf("encoding ByteStreamSplit is only defined for numeric type of width 4 or 8, found: %d", typeLen)
314285
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
//go:build armbe || arm64be || m68k || mips || mips64 || mips64p32 || ppc || ppc64 || s390 || s390x || shbe || sparc || sparc64
18+
19+
package encoding
20+
21+
import (
22+
"fmt"
23+
24+
"github.com/apache/arrow-go/v18/parquet/internal/debug"
25+
)
26+
27+
// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided
28+
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
29+
// byte order and are be decoded into the 'out' array in machine's native endianness.
30+
// 'out' must have space for at least len(data) bytes.
31+
func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) {
32+
const width = 4
33+
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
34+
for element := 0; element < nValues; element++ {
35+
// Big Endian: most significant byte first
36+
out[width*element+0] = data[3*stride+element]
37+
out[width*element+1] = data[2*stride+element]
38+
out[width*element+2] = data[stride+element]
39+
out[width*element+3] = data[element]
40+
}
41+
}
42+
43+
// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided
44+
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
45+
// byte order and are be decoded into the 'out' array in machine's native endianness.
46+
// 'out' must have space for at least len(data) bytes.
47+
func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) {
48+
const width = 8
49+
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
50+
for element := 0; element < nValues; element++ {
51+
// Big Endian: most significant byte first
52+
out[width*element+0] = data[7*stride+element]
53+
out[width*element+1] = data[6*stride+element]
54+
out[width*element+2] = data[5*stride+element]
55+
out[width*element+3] = data[4*stride+element]
56+
out[width*element+4] = data[3*stride+element]
57+
out[width*element+5] = data[2*stride+element]
58+
out[width*element+6] = data[stride+element]
59+
out[width*element+7] = data[element]
60+
}
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
//go:build 386 || amd64 || amd64p32 || alpha || arm || arm64 || loong64 || mipsle || mips64le || mips64p32le || nios2 || ppc64le || riscv || riscv64 || sh || wasm
18+
19+
package encoding
20+
21+
import (
22+
"fmt"
23+
24+
"github.com/apache/arrow-go/v18/parquet/internal/debug"
25+
)
26+
27+
// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided
28+
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
29+
// byte order and are be decoded into the 'out' array in machine's native endianness.
30+
// 'out' must have space for at least len(data) bytes.
31+
func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) {
32+
const width = 4
33+
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
34+
for element := 0; element < nValues; element++ {
35+
// Little Endian: least significant byte first
36+
out[width*element+0] = data[element]
37+
out[width*element+1] = data[stride+element]
38+
out[width*element+2] = data[2*stride+element]
39+
out[width*element+3] = data[3*stride+element]
40+
}
41+
}
42+
43+
// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided
44+
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
45+
// byte order and are be decoded into the 'out' array in machine's native endianness.
46+
// 'out' must have space for at least len(data) bytes.
47+
func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) {
48+
const width = 8
49+
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
50+
for element := 0; element < nValues; element++ {
51+
// Little Endian: least significant byte first
52+
out[width*element+0] = data[element]
53+
out[width*element+1] = data[stride+element]
54+
out[width*element+2] = data[2*stride+element]
55+
out[width*element+3] = data[3*stride+element]
56+
out[width*element+4] = data[4*stride+element]
57+
out[width*element+5] = data[5*stride+element]
58+
out[width*element+6] = data[6*stride+element]
59+
out[width*element+7] = data[7*stride+element]
60+
}
61+
}

parquet/internal/encoding/encoding_utils_big_endian.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func writeLE[T fixedLenTypes](enc *encoder, in []T) {
3131
case parquet.Int96:
3232
enc.append(getBytes(in))
3333
default:
34-
binary.Write(enc.sink, binary.LittleEndian, in)
34+
if err := binary.Write(enc.sink, binary.LittleEndian, in); err != nil {
35+
panic(err)
36+
}
3537
}
3638
}
3739

@@ -42,6 +44,8 @@ func copyFrom[T fixedLenTypes](dst []T, src []byte) {
4244
copy(dst, fromBytes[T](src))
4345
default:
4446
r := bytes.NewReader(src)
45-
binary.Read(r, binary.LittleEndian, dst)
47+
if err := binary.Read(r, binary.LittleEndian, dst); err != nil {
48+
panic(err)
49+
}
4650
}
4751
}

parquet/internal/encoding/plain_encoding_types.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (dec *PlainDecoder[T]) Decode(out []T) (int, error) {
8989
dec.Type(), max, nbytes, len(dec.data))
9090
}
9191

92-
copyFrom(out, dec.data[:nbytes])
92+
copyFrom(out[:max], dec.data[:nbytes])
9393
dec.data = dec.data[nbytes:]
9494
dec.nvals -= max
9595
return max, nil
@@ -130,13 +130,15 @@ func (dec *PlainDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byt
130130
return nvalues, nil
131131
}
132132

133-
type PlainInt32Encoder = PlainEncoder[int32]
134-
type PlainInt32Decoder = PlainDecoder[int32]
135-
type PlainInt64Encoder = PlainEncoder[int64]
136-
type PlainInt64Decoder = PlainDecoder[int64]
137-
type PlainFloat32Encoder = PlainEncoder[float32]
138-
type PlainFloat32Decoder = PlainDecoder[float32]
139-
type PlainFloat64Encoder = PlainEncoder[float64]
140-
type PlainFloat64Decoder = PlainDecoder[float64]
141-
type PlainInt96Encoder = PlainEncoder[parquet.Int96]
142-
type PlainInt96Decoder = PlainDecoder[parquet.Int96]
133+
type (
134+
PlainInt32Encoder = PlainEncoder[int32]
135+
PlainInt32Decoder = PlainDecoder[int32]
136+
PlainInt64Encoder = PlainEncoder[int64]
137+
PlainInt64Decoder = PlainDecoder[int64]
138+
PlainFloat32Encoder = PlainEncoder[float32]
139+
PlainFloat32Decoder = PlainDecoder[float32]
140+
PlainFloat64Encoder = PlainEncoder[float64]
141+
PlainFloat64Decoder = PlainDecoder[float64]
142+
PlainInt96Encoder = PlainEncoder[parquet.Int96]
143+
PlainInt96Decoder = PlainDecoder[parquet.Int96]
144+
)

0 commit comments

Comments
 (0)