Skip to content

Commit 53d8990

Browse files
committed
Change the columnar buffer interface
1 parent 2cd41ef commit 53d8990

File tree

4 files changed

+94
-11
lines changed

4 files changed

+94
-11
lines changed

app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ type columnarBufferArrowIPCStreamingDefault[T Acceptor] struct {
2020
builders []array.Builder
2121
schema *arrow.Schema
2222
logger *zap.Logger
23+
arrowRecord arrow.Record // Store the Arrow Record directly
24+
rowsAdded bool // Track if rows were added via addRow
2325
}
2426

2527
// AddRow saves a row obtained from the datasource into the buffer
@@ -30,23 +32,62 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) addRow(transformer RowTrans
3032
return fmt.Errorf("append values to arrow builders: %w", err)
3133
}
3234

35+
cb.rowsAdded = true
36+
3337
return nil
3438
}
3539

36-
// ToResponse returns all the accumulated data and clears buffer
37-
func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
38-
// chunk consists of columns
39-
chunk := make([]arrow.Array, 0, len(cb.builders))
40+
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
41+
func (cb *columnarBufferArrowIPCStreamingDefault[T]) addArrowRecord(record arrow.Record) error {
42+
// Create a new record with the same schema as the buffer
43+
if !cb.schema.Equal(record.Schema()) {
44+
return fmt.Errorf("record schema does not match buffer schema")
45+
}
4046

41-
// prepare arrow record
42-
for _, builder := range cb.builders {
43-
chunk = append(chunk, builder.NewArray())
47+
// Store the record directly
48+
if cb.arrowRecord != nil {
49+
// Release the previous record if it exists
50+
cb.arrowRecord.Release()
4451
}
4552

46-
record := array.NewRecord(cb.schema, chunk, -1)
53+
// Retain the record to prevent it from being garbage collected
54+
record.Retain()
55+
cb.arrowRecord = record
4756

48-
for _, col := range chunk {
49-
col.Release()
57+
return nil
58+
}
59+
60+
// ToResponse returns all the accumulated data and clears buffer
61+
func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
62+
var record arrow.Record
63+
var releaseRecord bool
64+
65+
// If we have a stored Arrow Record, use it directly
66+
if cb.arrowRecord != nil {
67+
record = cb.arrowRecord
68+
// We'll release our reference to the record at the end
69+
releaseRecord = false
70+
} else if cb.rowsAdded {
71+
// If rows were added, create a new record from the builders
72+
chunk := make([]arrow.Array, 0, len(cb.builders))
73+
74+
// prepare arrow record
75+
for _, builder := range cb.builders {
76+
chunk = append(chunk, builder.NewArray())
77+
}
78+
79+
record = array.NewRecord(cb.schema, chunk, -1)
80+
81+
// We need to release the arrays after creating the record
82+
for _, col := range chunk {
83+
col.Release()
84+
}
85+
86+
// We'll need to release this record after writing it
87+
releaseRecord = true
88+
} else {
89+
// No data to return
90+
return &api_service_protos.TReadSplitsResponse{}, nil
5091
}
5192

5293
// prepare arrow writer
@@ -55,13 +96,24 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_
5596
writer := ipc.NewWriter(&buf, ipc.WithSchema(cb.schema), ipc.WithAllocator(cb.arrowAllocator))
5697

5798
if err := writer.Write(record); err != nil {
99+
if releaseRecord {
100+
record.Release()
101+
}
58102
return nil, fmt.Errorf("write record: %w", err)
59103
}
60104

61105
if err := writer.Close(); err != nil {
106+
if releaseRecord {
107+
record.Release()
108+
}
62109
return nil, fmt.Errorf("close arrow writer: %w", err)
63110
}
64111

112+
// Release the record if we created it
113+
if releaseRecord {
114+
record.Release()
115+
}
116+
65117
out := &api_service_protos.TReadSplitsResponse{
66118
Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
67119
ArrowIpcStreaming: buf.Bytes(),
@@ -71,12 +123,26 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_
71123
return out, nil
72124
}
73125

74-
func (cb *columnarBufferArrowIPCStreamingDefault[T]) TotalRows() int { return cb.builders[0].Len() }
126+
func (cb *columnarBufferArrowIPCStreamingDefault[T]) TotalRows() int {
127+
if cb.arrowRecord != nil {
128+
return int(cb.arrowRecord.NumRows())
129+
}
130+
if len(cb.builders) > 0 {
131+
return cb.builders[0].Len()
132+
}
133+
return 0
134+
}
75135

76136
// Frees resources if buffer is no longer used
77137
func (cb *columnarBufferArrowIPCStreamingDefault[T]) Release() {
78138
// cleanup builders
79139
for _, b := range cb.builders {
80140
b.Release()
81141
}
142+
143+
// Release the stored Arrow Record if it exists
144+
if cb.arrowRecord != nil {
145+
cb.arrowRecord.Release()
146+
cb.arrowRecord = nil
147+
}
82148
}

app/server/paging/columnar_buffer_arrow_ipc_streaming_empty_columns.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ func (cb *columnarBufferArrowIPCStreamingEmptyColumns[T]) addRow(transformer Row
3434
return nil
3535
}
3636

37+
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
38+
func (cb *columnarBufferArrowIPCStreamingEmptyColumns[T]) addArrowRecord(record arrow.Record) error {
39+
// For empty columns buffer, we just need to count the number of rows
40+
cb.rowsAdded += int(record.NumRows())
41+
return nil
42+
}
43+
3744
// ToResponse returns all the accumulated data and clears buffer
3845
func (cb *columnarBufferArrowIPCStreamingEmptyColumns[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
3946
columns := make([]arrow.Array, 0)

app/server/paging/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type RowTransformer[T Acceptor] interface {
2424
type ColumnarBuffer[T Acceptor] interface {
2525
// addRow saves a row obtained from the datasource into the columnar buffer
2626
addRow(rowTransformer RowTransformer[T]) error
27+
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
28+
addArrowRecord(record arrow.Record) error
2729
// ToResponse returns all the accumulated data and clears buffer
2830
ToResponse() (*api_service_protos.TReadSplitsResponse, error)
2931
// Release frees resources if buffer is no longer used
@@ -54,6 +56,9 @@ type Sink[T Acceptor] interface {
5456
// AddRow saves the row obtained from a stream incoming from an external data source.
5557
AddRow(rowTransformer RowTransformer[T]) error
5658

59+
// AddArrowRecord saves the Arrow block obtained from a stream incoming from an external data source.
60+
// AddArrowRecord(record arrow.Record) error
61+
5762
// Finish reports the successful (!) completion of data stream reading.
5863
// Never call this method if the request has failed.
5964
// This method can be called only once.

app/server/paging/mock.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package paging
22

33
import (
4+
"github.com/apache/arrow/go/v13/arrow"
45
"github.com/stretchr/testify/mock"
56
"go.uber.org/zap"
67

@@ -66,6 +67,10 @@ func (*ColumnarBufferMock) addRow(_ RowTransformer[any]) error {
6667
panic("not implemented") // TODO: Implement
6768
}
6869

70+
func (*ColumnarBufferMock) addArrowRecord(_ arrow.Record) error {
71+
panic("not implemented") // TODO: Implement
72+
}
73+
6974
func (m *ColumnarBufferMock) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
7075
args := m.Called()
7176

0 commit comments

Comments
 (0)