Skip to content

Commit 6497598

Browse files
committed
Avoid putting Arrow Records into the buckets
1 parent f2b4adc commit 6497598

17 files changed

+360
-242
lines changed

app/server/data_source_collection.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,7 @@ func doReadSplit[T paging.Acceptor](
318318
logger,
319319
memoryAllocator,
320320
request.Format,
321-
split.Select.What,
322-
false) // Default to row-based approach for now
321+
split.Select.What)
323322
if err != nil {
324323
return fmt.Errorf("new columnar buffer factory: %w", err)
325324
}

app/server/datasource/rdbms/clickhouse/connection_http.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.Qu
7373
}
7474

7575
rows := &rows{Rows: out}
76+
7677
return &rdbms_utils.QueryResult{
7778
Rows: rows,
7879
}, nil

app/server/datasource/rdbms/clickhouse/connection_native.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.
7373
}
7474

7575
rows := &rowsNative{Rows: out}
76+
7677
return &rdbms_utils.QueryResult{
7778
Rows: rows,
7879
}, nil

app/server/datasource/rdbms/datasource.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
219219

220220
rows := queryResult.Rows
221221
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
222+
222223
if err != nil {
223224
return 0, fmt.Errorf("make transformer: %w", err)
224225
}

app/server/datasource/rdbms/utils/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package utils
33
import (
44
"context"
55

6+
"github.com/apache/arrow/go/v13/arrow"
67
"go.uber.org/zap"
78

8-
"github.com/apache/arrow/go/v13/arrow"
99
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1010

1111
api_common "github.com/ydb-platform/fq-connector-go/api/common"

app/server/datasource/rdbms/ydb/connection_database_sql.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (c *connectionDatabaseSQL) Query(params *rdbms_utils.QueryParams) (*rdbms_u
7474
}
7575

7676
rows := rowsDatabaseSQL{Rows: out}
77+
7778
return &rdbms_utils.QueryResult{
7879
Rows: rows,
7980
}, nil

app/server/datasource/rdbms/ydb/connection_native.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ import (
88
"io"
99
"time"
1010

11+
"github.com/apache/arrow/go/v13/arrow"
12+
"github.com/apache/arrow/go/v13/arrow/ipc"
1113
"go.uber.org/zap"
1214

1315
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1416
ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3"
1517
ydb_sdk_query "github.com/ydb-platform/ydb-go-sdk/v3/query"
1618

17-
"github.com/apache/arrow/go/v13/arrow"
18-
"github.com/apache/arrow/go/v13/arrow/ipc"
19-
2019
api_common "github.com/ydb-platform/fq-connector-go/api/common"
2120
"github.com/ydb-platform/fq-connector-go/app/config"
2221
"github.com/ydb-platform/fq-connector-go/app/server/conversion"
@@ -126,6 +125,7 @@ func (c *columnsNative) Close() error {
126125
if err := c.arrowResult.Close(c.ctx); err != nil {
127126
return fmt.Errorf("arrow result close: %w", err)
128127
}
128+
129129
return nil
130130
}
131131

@@ -142,6 +142,7 @@ func (c *columnsNative) Next() bool {
142142

143143
// Try to get the next part
144144
var part io.Reader
145+
145146
var err error
146147

147148
for p, e := range c.arrowResult.Parts(c.ctx) {
@@ -167,10 +168,12 @@ func (c *columnsNative) Next() bool {
167168
// Create a new reader for this part
168169
c.currentPart = part
169170
reader, err := ipc.NewReader(part)
171+
170172
if err != nil {
171173
c.err = fmt.Errorf("create arrow reader: %w", err)
172174
return false
173175
}
176+
174177
c.reader = reader
175178

176179
// Get the first record from this part
@@ -180,6 +183,7 @@ func (c *columnsNative) Next() bool {
180183
}
181184

182185
c.record = c.reader.Record()
186+
183187
return true
184188
}
185189

app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) addArrowRecord(record arrow
6060
// ToResponse returns all the accumulated data and clears buffer
6161
func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
6262
var record arrow.Record
63+
6364
var releaseRecord bool
6465

6566
// If we have a stored Arrow Record, use it directly
@@ -99,13 +100,15 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_
99100
if releaseRecord {
100101
record.Release()
101102
}
103+
102104
return nil, fmt.Errorf("write record: %w", err)
103105
}
104106

105107
if err := writer.Close(); err != nil {
106108
if releaseRecord {
107109
record.Release()
108110
}
111+
109112
return nil, fmt.Errorf("close arrow writer: %w", err)
110113
}
111114

@@ -127,9 +130,11 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) TotalRows() int {
127130
if cb.arrowRecord != nil {
128131
return int(cb.arrowRecord.NumRows())
129132
}
133+
130134
if len(cb.builders) > 0 {
131135
return cb.builders[0].Len()
132136
}
137+
133138
return 0
134139
}
135140

app/server/paging/columnar_buffer_arrow_ipc_streaming_records.go

Lines changed: 0 additions & 96 deletions
This file was deleted.

app/server/paging/columnar_buffer_arrow_ipc_streaming_rows.go

Lines changed: 0 additions & 105 deletions
This file was deleted.

0 commit comments

Comments
 (0)