Skip to content

Commit a9b0be4

Browse files
pixelherodevNoam Preil
andauthored
Batch of small optimizations (apache#556)
This primarily consists of reducing allocations. e.g. 03be6d3 adds a specific case to TypeEqual for ListView types, which avoids reflect.DeepEqual. l.elem.Equal does not allocate; reflect.DeepEqual does, so this reduces allocations when comparing a ListViewType. 8c7729f changes from referencing flatbug internal types by reference to by-value, which avoids letting them escape to the heap. e340658 switches from reflect.DeepEqual to slices.Equal when comparing the child IDs of a Union type, which is functionally equivalent for slices and, again, does not allocate. 9eba6d2 does similar for an arrowflight cookie test, replacing reflect.DeepEquals with maps.Equal; I did a grep for reflect.DeepEqual and noticed it. 7557d15 is probably the most objectionable; it replaces a *float64 with float64 directly for minSpaceSavings, since the current nil value has identical behavior to zero anyways. It's a tiny cleanup IMO, but doesn't really have any practical value. 2d3593f is a bit ugly; it inlines a copy of NewReaderFromMessageReader and a copy of NewMessageReader into NewReader - by implementing the two directly, it's able to avoid allocating two copies of the `config` structure, and to avoid looping over and invoking config functions twice. It's kinda ugly though, and the `// types: make(dictTypeMap)` which is copied over suggests that there's further room for improvement. I didn't drop the two APIs, as they are exposed to users, but IMO the performance win is probably worth it, and it can be cleaned up later if needed. 0273ffe is a small optimization to the allocator: the Go allocator banches on each allocation, but both branches are functionally equivalent, so I just merged the two. I doubt this would have a _major_ impact, since the cost of a branch is negligible compared to the cost of a heap allocation, but performance is often won with a lot of small impacts IMO :) Lastly, for now at least, f1a6a33 statically allocates 4 bytes per IPC messageReader (I did _not_ touch the ArrowFlight implementation), and uses it on every message read, rather than heap-allocating 4 bytes each time Message is called. There's more to be done, but this should be a much more practically sized chunk to review, and I can add more later :) --------- Co-authored-by: Noam Preil <[email protected]>
1 parent fac7a53 commit a9b0be4

File tree

11 files changed

+65
-37
lines changed

11 files changed

+65
-37
lines changed

arrow/compare.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package arrow
1818

1919
import (
2020
"reflect"
21+
"slices"
2122
)
2223

2324
type typeEqualsConfig struct {
@@ -122,7 +123,7 @@ func TypeEqual(left, right DataType, opts ...TypeEqualOption) bool {
122123
return false
123124
}
124125

125-
if !reflect.DeepEqual(l.ChildIDs(), r.ChildIDs()) {
126+
if !slices.Equal(l.ChildIDs(), r.ChildIDs()) {
126127
return false
127128
}
128129

@@ -149,6 +150,8 @@ func TypeEqual(left, right DataType, opts ...TypeEqualOption) bool {
149150
r := right.(*RunEndEncodedType)
150151
return TypeEqual(l.Encoded(), r.Encoded(), opts...) &&
151152
TypeEqual(l.runEnds, r.runEnds, opts...)
153+
case *ListViewType:
154+
return l.elem.Equal(right.(*ListViewType).elem)
152155
default:
153156
return reflect.DeepEqual(left, right)
154157
}

arrow/flight/basic_auth_flight_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,17 @@ func (*validator) IsValid(bearerToken string) (interface{}, error) {
7373
func TestErrorAuths(t *testing.T) {
7474
unary, stream := flight.CreateServerBearerTokenAuthInterceptors(&validator{})
7575
s := flight.NewFlightServer(grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream))
76-
s.Init("localhost:0")
76+
if err := s.Init("localhost:0"); err != nil {
77+
panic(err)
78+
}
7779
f := &HeaderAuthTestFlight{}
7880
s.RegisterFlightService(f)
7981

80-
go s.Serve()
82+
go func() {
83+
if err := s.Serve(); err != nil {
84+
panic(err)
85+
}
86+
}()
8187
defer s.Shutdown()
8288

8389
client, err := flight.NewFlightClient(s.Addr().String(), nil, grpc.WithTransportCredentials(insecure.NewCredentials()))

arrow/flight/cookie_middleware_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import (
2121
"errors"
2222
"fmt"
2323
"io"
24+
"maps"
2425
"net/http"
2526
"net/textproto"
26-
"reflect"
2727
"strings"
2828
"testing"
2929
"time"
@@ -85,7 +85,7 @@ func (s *serverAddCookieMiddleware) StartCall(ctx context.Context) context.Conte
8585
}
8686
}
8787

88-
if !reflect.DeepEqual(s.expectedCookies, got) {
88+
if !maps.Equal(s.expectedCookies, got) {
8989
panic(fmt.Sprintf("did not get expected cookies, expected %+v, got %+v", s.expectedCookies, got))
9090
}
9191

arrow/ipc/file_reader.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -531,12 +531,12 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
531531
return raw
532532
}
533533

534-
func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
534+
func (src *ipcSource) fieldMetadata(i int) flatbuf.FieldNode {
535535
var node flatbuf.FieldNode
536536
if !src.meta.Nodes(&node, i) {
537537
panic("arrow/ipc: field metadata out of bound")
538538
}
539-
return &node
539+
return node
540540
}
541541

542542
func (src *ipcSource) variadicCount(i int) int64 {
@@ -553,7 +553,7 @@ type arrayLoaderContext struct {
553553
version MetadataVersion
554554
}
555555

556-
func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
556+
func (ctx *arrayLoaderContext) field() flatbuf.FieldNode {
557557
field := ctx.src.fieldMetadata(ctx.ifield)
558558
ctx.ifield++
559559
return field
@@ -647,7 +647,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData {
647647
}
648648
}
649649

650-
func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int) (*flatbuf.FieldNode, []*memory.Buffer) {
650+
func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int) (flatbuf.FieldNode, []*memory.Buffer) {
651651
buffers := make([]*memory.Buffer, 0, nbufs)
652652
field := ctx.field()
653653

arrow/ipc/file_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ type FileWriter struct {
247247
codec flatbuf.CompressionType
248248
compressNP int
249249
compressors []compressor
250-
minSpaceSavings *float64
250+
minSpaceSavings float64
251251

252252
// map of the last written dictionaries by id
253253
// so we can avoid writing the same dictionary over and over

arrow/ipc/ipc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type config struct {
7171
ensureNativeEndian bool
7272
noAutoSchema bool
7373
emitDictDeltas bool
74-
minSpaceSavings *float64
74+
minSpaceSavings float64
7575
}
7676

7777
func newConfig(opts ...Option) *config {
@@ -189,7 +189,7 @@ func WithDictionaryDeltas(v bool) Option {
189189
// Go and C++ versions prior to 12.0.0.
190190
func WithMinSpaceSavings(savings float64) Option {
191191
return func(cfg *config) {
192-
cfg.minSpaceSavings = &savings
192+
cfg.minSpaceSavings = savings
193193
}
194194
}
195195

arrow/ipc/message.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ type messageReader struct {
149149
refCount atomic.Int64
150150
msg *Message
151151

152-
mem memory.Allocator
152+
mem memory.Allocator
153+
header [4]byte
153154
}
154155

155156
// NewMessageReader returns a reader that reads messages from an input stream.
@@ -188,7 +189,7 @@ func (r *messageReader) Release() {
188189
// underlying stream.
189190
// It is valid until the next call to Message.
190191
func (r *messageReader) Message() (*Message, error) {
191-
buf := make([]byte, 4)
192+
buf := r.header[:]
192193
_, err := io.ReadFull(r.r, buf)
193194
if err != nil {
194195
return nil, fmt.Errorf("arrow/ipc: could not read continuation indicator: %w", err)

arrow/ipc/reader.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,32 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader
8989
}
9090

9191
// NewReader returns a reader that reads records from an input stream.
92-
func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
93-
return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
92+
func NewReader(r io.Reader, opts ...Option) (rr *Reader, err error) {
93+
defer func() {
94+
if pErr := recover(); pErr != nil {
95+
err = utils.FormatRecoveredError("arrow/ipc: unknown error while reading", pErr)
96+
}
97+
}()
98+
cfg := newConfig(opts...)
99+
mr := &messageReader{r: r, mem: cfg.alloc}
100+
mr.refCount.Add(1)
101+
rr = &Reader{
102+
r: mr,
103+
refCount: atomic.Int64{},
104+
// types: make(dictTypeMap),
105+
memo: dictutils.NewMemo(),
106+
mem: cfg.alloc,
107+
ensureNativeEndian: cfg.ensureNativeEndian,
108+
expectedSchema: cfg.schema,
109+
}
110+
rr.refCount.Add(1)
111+
112+
if !cfg.noAutoSchema {
113+
if err := rr.readSchema(cfg.schema); err != nil {
114+
return nil, err
115+
}
116+
}
117+
return rr, nil
94118
}
95119

96120
// Err returns the last error encountered during the iteration over the

arrow/ipc/writer.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type Writer struct {
8787
codec flatbuf.CompressionType
8888
compressNP int
8989
compressors []compressor
90-
minSpaceSavings *float64
90+
minSpaceSavings float64
9191

9292
// map of the last written dictionaries by id
9393
// so we can avoid writing the same dictionary over and over
@@ -328,7 +328,7 @@ type recordEncoder struct {
328328
codec flatbuf.CompressionType
329329
compressNP int
330330
compressors []compressor
331-
minSpaceSavings *float64
331+
minSpaceSavings float64
332332
}
333333

334334
func newRecordEncoder(
@@ -338,7 +338,7 @@ func newRecordEncoder(
338338
allow64b bool,
339339
codec flatbuf.CompressionType,
340340
compressNP int,
341-
minSpaceSavings *float64,
341+
minSpaceSavings float64,
342342
compressors []compressor,
343343
) *recordEncoder {
344344
return &recordEncoder{
@@ -355,12 +355,12 @@ func newRecordEncoder(
355355

356356
func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool {
357357
debug.Assert(uncompressed > 0, "uncompressed size is 0")
358-
if w.minSpaceSavings == nil {
358+
if w.minSpaceSavings == 0 {
359359
return true
360360
}
361361

362362
savings := 1.0 - float64(compressed)/float64(uncompressed)
363-
return savings >= *w.minSpaceSavings
363+
return savings >= w.minSpaceSavings
364364
}
365365

366366
func (w *recordEncoder) reset() {
@@ -477,13 +477,10 @@ func (w *recordEncoder) encode(p *Payload, rec arrow.RecordBatch) error {
477477
}
478478

479479
if w.codec != -1 {
480-
if w.minSpaceSavings != nil {
481-
pct := *w.minSpaceSavings
482-
if pct < 0 || pct > 1 {
483-
p.Release()
484-
return fmt.Errorf("%w: minSpaceSavings not in range [0,1]. Provided %.05f",
485-
arrow.ErrInvalid, pct)
486-
}
480+
if w.minSpaceSavings < 0 || w.minSpaceSavings > 1 {
481+
p.Release()
482+
return fmt.Errorf("%w: minSpaceSavings not in range [0,1]. Provided %.05f",
483+
arrow.ErrInvalid, w.minSpaceSavings)
487484
}
488485
w.compressBodyBuffers(p)
489486
}

arrow/ipc/writer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
195195

196196
for _, codec := range []flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD} {
197197
compressors := []compressor{getCompressor(codec)}
198-
enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil, compressors)
198+
enc := newRecordEncoder(mem, 0, 5, true, codec, 1, 0, compressors)
199199
var payload Payload
200200
require.NoError(t, enc.encode(&payload, batch))
201201
assert.Len(t, payload.body, 2)
@@ -207,7 +207,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
207207
assert.Greater(t, compressedSize, int64(0))
208208
expectedSavings := 1.0 - float64(compressedSize)/float64(uncompressedSize)
209209

210-
compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, &expectedSavings, compressors)
210+
compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, expectedSavings, compressors)
211211
payload.Release()
212212
payload.body = payload.body[:0]
213213
require.NoError(t, compressEncoder.encode(&payload, batch))
@@ -220,7 +220,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
220220
// slightly bump the threshold. the body buffer should now be prefixed
221221
// with -1 and its content left uncompressed
222222
minSavings := math.Nextafter(expectedSavings, 1.0)
223-
compressEncoder.minSpaceSavings = &minSavings
223+
compressEncoder.minSpaceSavings = minSavings
224224
require.NoError(t, compressEncoder.encode(&payload, batch))
225225
assert.Len(t, payload.body, 2)
226226
assert.EqualValues(t, -1, prefixedSize(payload.body[1]))
@@ -229,7 +229,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
229229
payload.body = payload.body[:0]
230230

231231
for _, outOfRange := range []float64{math.Nextafter(1.0, 2.0), math.Nextafter(0, -1)} {
232-
compressEncoder.minSpaceSavings = &outOfRange
232+
compressEncoder.minSpaceSavings = outOfRange
233233
err := compressEncoder.encode(&payload, batch)
234234
assert.ErrorIs(t, err, arrow.ErrInvalid)
235235
assert.ErrorContains(t, err, "minSpaceSavings not in range [0,1]")

0 commit comments

Comments
 (0)