Skip to content

Commit f2b4adc

Browse files
committed
Split columnar buffer implementations
1 parent 53d8990 commit f2b4adc

File tree

5 files changed

+239
-23
lines changed

5 files changed

+239
-23
lines changed

app/server/data_source_collection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ func doReadSplit[T paging.Acceptor](
318318
logger,
319319
memoryAllocator,
320320
request.Format,
321-
split.Select.What)
321+
split.Select.What,
322+
false) // Default to row-based approach for now
322323
if err != nil {
323324
return fmt.Errorf("new columnar buffer factory: %w", err)
324325
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package paging
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
7+
"github.com/apache/arrow/go/v13/arrow"
8+
"github.com/apache/arrow/go/v13/arrow/ipc"
9+
"github.com/apache/arrow/go/v13/arrow/memory"
10+
"go.uber.org/zap"
11+
12+
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
13+
)
14+
15+
var _ ColumnarBuffer[any] = (*columnarBufferArrowIPCStreamingRecords[any])(nil)
16+
17+
// columnarBufferArrowIPCStreamingRecords is a specialized implementation for Arrow-based data
18+
// that stores Arrow records directly without deconstructing them
19+
type columnarBufferArrowIPCStreamingRecords[T Acceptor] struct {
20+
arrowAllocator memory.Allocator
21+
schema *arrow.Schema
22+
logger *zap.Logger
23+
arrowRecord arrow.Record // Store the Arrow Record directly
24+
}
25+
26+
// addRow is not the primary method for this implementation
27+
// It returns an error since this implementation is optimized for Arrow records
28+
func (cb *columnarBufferArrowIPCStreamingRecords[T]) addRow(transformer RowTransformer[T]) error {
29+
return fmt.Errorf("this implementation is optimized for Arrow records, use columnarBufferArrowIPCStreamingRows for row-based data")
30+
}
31+
32+
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
33+
func (cb *columnarBufferArrowIPCStreamingRecords[T]) addArrowRecord(record arrow.Record) error {
34+
// Verify schema compatibility
35+
if !cb.schema.Equal(record.Schema()) {
36+
return fmt.Errorf("record schema does not match buffer schema")
37+
}
38+
39+
// Store the record directly
40+
if cb.arrowRecord != nil {
41+
// Release the previous record if it exists
42+
cb.arrowRecord.Release()
43+
}
44+
45+
// Retain the record to prevent it from being garbage collected
46+
record.Retain()
47+
cb.arrowRecord = record
48+
49+
return nil
50+
}
51+
52+
// ToResponse returns all the accumulated data and clears buffer
53+
func (cb *columnarBufferArrowIPCStreamingRecords[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
54+
// If no record was added, return an empty response
55+
if cb.arrowRecord == nil {
56+
return &api_service_protos.TReadSplitsResponse{}, nil
57+
}
58+
59+
// prepare arrow writer
60+
var buf bytes.Buffer
61+
62+
writer := ipc.NewWriter(&buf, ipc.WithSchema(cb.schema), ipc.WithAllocator(cb.arrowAllocator))
63+
64+
if err := writer.Write(cb.arrowRecord); err != nil {
65+
return nil, fmt.Errorf("write record: %w", err)
66+
}
67+
68+
if err := writer.Close(); err != nil {
69+
return nil, fmt.Errorf("close arrow writer: %w", err)
70+
}
71+
72+
out := &api_service_protos.TReadSplitsResponse{
73+
Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
74+
ArrowIpcStreaming: buf.Bytes(),
75+
},
76+
}
77+
78+
return out, nil
79+
}
80+
81+
// TotalRows returns the number of rows in the buffer
82+
func (cb *columnarBufferArrowIPCStreamingRecords[T]) TotalRows() int {
83+
if cb.arrowRecord == nil {
84+
return 0
85+
}
86+
return int(cb.arrowRecord.NumRows())
87+
}
88+
89+
// Release frees resources if buffer is no longer used
90+
func (cb *columnarBufferArrowIPCStreamingRecords[T]) Release() {
91+
// Release the stored Arrow Record if it exists
92+
if cb.arrowRecord != nil {
93+
cb.arrowRecord.Release()
94+
cb.arrowRecord = nil
95+
}
96+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package paging
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
7+
"github.com/apache/arrow/go/v13/arrow"
8+
"github.com/apache/arrow/go/v13/arrow/array"
9+
"github.com/apache/arrow/go/v13/arrow/ipc"
10+
"github.com/apache/arrow/go/v13/arrow/memory"
11+
"go.uber.org/zap"
12+
13+
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
14+
)
15+
16+
var _ ColumnarBuffer[any] = (*columnarBufferArrowIPCStreamingRows[any])(nil)
17+
18+
// columnarBufferArrowIPCStreamingRows is a specialized implementation for row-based data
19+
// that builds Arrow arrays from individual rows
20+
type columnarBufferArrowIPCStreamingRows[T Acceptor] struct {
21+
arrowAllocator memory.Allocator
22+
builders []array.Builder
23+
schema *arrow.Schema
24+
logger *zap.Logger
25+
}
26+
27+
// addRow saves a row obtained from the datasource into the buffer
28+
func (cb *columnarBufferArrowIPCStreamingRows[T]) addRow(transformer RowTransformer[T]) error {
29+
if err := transformer.AppendToArrowBuilders(cb.schema, cb.builders); err != nil {
30+
return fmt.Errorf("append values to arrow builders: %w", err)
31+
}
32+
33+
return nil
34+
}
35+
36+
// addArrowRecord is not the primary method for this implementation
37+
// It returns an error since this implementation is optimized for row-based data
38+
func (cb *columnarBufferArrowIPCStreamingRows[T]) addArrowRecord(record arrow.Record) error {
39+
return fmt.Errorf("this implementation is optimized for row-based data, use columnarBufferArrowIPCStreamingRecords for Arrow records")
40+
}
41+
42+
// ToResponse returns all the accumulated data and clears buffer
43+
func (cb *columnarBufferArrowIPCStreamingRows[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
44+
// If no rows were added, return an empty response
45+
if cb.TotalRows() == 0 {
46+
return &api_service_protos.TReadSplitsResponse{}, nil
47+
}
48+
49+
// chunk consists of columns
50+
chunk := make([]arrow.Array, 0, len(cb.builders))
51+
52+
// prepare arrow record
53+
for _, builder := range cb.builders {
54+
chunk = append(chunk, builder.NewArray())
55+
}
56+
57+
record := array.NewRecord(cb.schema, chunk, -1)
58+
59+
// We need to release the arrays after creating the record
60+
for _, col := range chunk {
61+
col.Release()
62+
}
63+
64+
// prepare arrow writer
65+
var buf bytes.Buffer
66+
67+
writer := ipc.NewWriter(&buf, ipc.WithSchema(cb.schema), ipc.WithAllocator(cb.arrowAllocator))
68+
69+
if err := writer.Write(record); err != nil {
70+
record.Release()
71+
return nil, fmt.Errorf("write record: %w", err)
72+
}
73+
74+
if err := writer.Close(); err != nil {
75+
record.Release()
76+
return nil, fmt.Errorf("close arrow writer: %w", err)
77+
}
78+
79+
// Release the record after writing
80+
record.Release()
81+
82+
out := &api_service_protos.TReadSplitsResponse{
83+
Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
84+
ArrowIpcStreaming: buf.Bytes(),
85+
},
86+
}
87+
88+
return out, nil
89+
}
90+
91+
// TotalRows returns the number of rows in the buffer
92+
func (cb *columnarBufferArrowIPCStreamingRows[T]) TotalRows() int {
93+
if len(cb.builders) == 0 {
94+
return 0
95+
}
96+
return cb.builders[0].Len()
97+
}
98+
99+
// Release frees resources if buffer is no longer used
100+
func (cb *columnarBufferArrowIPCStreamingRows[T]) Release() {
101+
// cleanup builders
102+
for _, b := range cb.builders {
103+
b.Release()
104+
}
105+
}

app/server/paging/columnar_buffer_factory.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,18 @@ import (
1414
)
1515

1616
type columnarBufferFactoryImpl[T Acceptor] struct {
17-
arrowAllocator memory.Allocator
18-
logger *zap.Logger
19-
format api_service_protos.TReadSplitsRequest_EFormat
20-
schema *arrow.Schema
21-
ydbTypes []*Ydb.Type
17+
arrowAllocator memory.Allocator
18+
logger *zap.Logger
19+
format api_service_protos.TReadSplitsRequest_EFormat
20+
schema *arrow.Schema
21+
ydbTypes []*Ydb.Type
22+
useArrowRecords bool // Whether to use Arrow records directly
2223
}
2324

2425
func (cbf *columnarBufferFactoryImpl[T]) MakeBuffer() (ColumnarBuffer[T], error) {
2526
switch cbf.format {
2627
case api_service_protos.TReadSplitsRequest_ARROW_IPC_STREAMING:
27-
builders, err := common.YdbTypesToArrowBuilders(cbf.ydbTypes, cbf.arrowAllocator)
28-
if err != nil {
29-
return nil, fmt.Errorf("convert Select.What to arrow.Schema: %w", err)
30-
}
31-
28+
// Special case for empty columns
3229
if len(cbf.ydbTypes) == 0 {
3330
return &columnarBufferArrowIPCStreamingEmptyColumns[T]{
3431
arrowAllocator: cbf.arrowAllocator,
@@ -37,12 +34,26 @@ func (cbf *columnarBufferFactoryImpl[T]) MakeBuffer() (ColumnarBuffer[T], error)
3734
}, nil
3835
}
3936

40-
return &columnarBufferArrowIPCStreamingDefault[T]{
41-
arrowAllocator: cbf.arrowAllocator,
42-
builders: builders,
43-
schema: cbf.schema,
44-
logger: cbf.logger,
45-
}, nil
37+
// Choose implementation based on whether we're using Arrow records directly
38+
if cbf.useArrowRecords {
39+
return &columnarBufferArrowIPCStreamingRecords[T]{
40+
arrowAllocator: cbf.arrowAllocator,
41+
schema: cbf.schema,
42+
logger: cbf.logger,
43+
}, nil
44+
} else {
45+
builders, err := common.YdbTypesToArrowBuilders(cbf.ydbTypes, cbf.arrowAllocator)
46+
if err != nil {
47+
return nil, fmt.Errorf("convert Select.What to arrow.Schema: %w", err)
48+
}
49+
50+
return &columnarBufferArrowIPCStreamingRows[T]{
51+
arrowAllocator: cbf.arrowAllocator,
52+
builders: builders,
53+
schema: cbf.schema,
54+
logger: cbf.logger,
55+
}, nil
56+
}
4657
default:
4758
return nil, fmt.Errorf("unknown format: %v", cbf.format)
4859
}
@@ -53,6 +64,7 @@ func NewColumnarBufferFactory[T Acceptor](
5364
arrowAllocator memory.Allocator,
5465
format api_service_protos.TReadSplitsRequest_EFormat,
5566
selectWhat *api_service_protos.TSelect_TWhat,
67+
useArrowRecords bool,
5668
) (ColumnarBufferFactory[T], error) {
5769
ydbTypes, err := common.SelectWhatToYDBTypes(selectWhat)
5870
if err != nil {
@@ -65,11 +77,12 @@ func NewColumnarBufferFactory[T Acceptor](
6577
}
6678

6779
cbf := &columnarBufferFactoryImpl[T]{
68-
logger: logger,
69-
arrowAllocator: arrowAllocator,
70-
format: format,
71-
schema: schema,
72-
ydbTypes: ydbTypes,
80+
logger: logger,
81+
arrowAllocator: arrowAllocator,
82+
format: format,
83+
schema: schema,
84+
ydbTypes: ydbTypes,
85+
useArrowRecords: useArrowRecords,
7386
}
7487

7588
return cbf, nil

app/server/streaming/read_splits_streamer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ func (tc testCaseStreaming) execute(t *testing.T) {
271271
logger,
272272
memory.NewGoAllocator(),
273273
api_service_protos.TReadSplitsRequest_ARROW_IPC_STREAMING,
274-
split.Select.What)
274+
split.Select.What,
275+
false) // Use row-based approach for tests
275276
require.NoError(t, err)
276277

277278
pagingCfg := &config.TPagingConfig{RowsPerPage: uint64(tc.rowsPerPage)}

0 commit comments

Comments
 (0)