Skip to content

Commit f59e279

Browse files
committed
Use Arrow Records in the datasource
1 parent 687a9e6 commit f59e279

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

app/server/datasource/rdbms/datasource.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,37 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
248248

249249
defer common.LogCloserError(logger, queryResult, "close query result")
250250

251-
rows := queryResult.Rows
252-
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
251+
var rowsRead int64
252+
253+
var processErr error
254+
255+
// Choose the appropriate processing method based on which field is filled
256+
if queryResult.Rows != nil {
257+
rowsRead, processErr = ds.processRowBasedResult(query, queryResult.Rows, sink)
258+
} else if queryResult.Columns != nil {
259+
rowsRead, processErr = ds.processArrowBasedResult(queryResult.Columns, sink)
260+
} else {
261+
return 0, fmt.Errorf("query result contains neither Rows nor Columns")
262+
}
263+
264+
if processErr != nil {
265+
return 0, processErr
266+
}
267+
268+
// Notify sink that there will be no more data from this connection.
269+
// Hours lost in attempts to move this call into defer: 2
270+
sink.Finish()
271+
272+
return rowsRead, nil
273+
}
253274

275+
// processRowBasedResult processes row-based results from the database
276+
func (ds *dataSourceImpl) processRowBasedResult(
277+
query *rdbms_utils.SelectQuery,
278+
rows rdbms_utils.Rows,
279+
sink paging.Sink[any],
280+
) (int64, error) {
281+
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
254282
if err != nil {
255283
return 0, fmt.Errorf("make transformer: %w", err)
256284
}
@@ -275,9 +303,28 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
275303
return 0, fmt.Errorf("rows error: %w", err)
276304
}
277305

278-
// Notify sink that there will be no more data from this connection.
279-
// Hours lost in attempts to move this call into defer: 2
280-
sink.Finish()
306+
return rowsRead, nil
307+
}
308+
309+
// processArrowBasedResult processes Arrow-based results from the database
310+
func (dataSourceImpl) processArrowBasedResult(
311+
columns rdbms_utils.Columns,
312+
sink paging.Sink[any],
313+
) (int64, error) {
314+
rowsRead := int64(0)
315+
316+
for columns.Next() {
317+
record := columns.Record()
318+
rowsRead += record.NumRows()
319+
320+
if err := sink.AddArrowRecord(record); err != nil {
321+
return 0, fmt.Errorf("add arrow record to paging writer: %w", err)
322+
}
323+
}
324+
325+
if err := columns.Err(); err != nil {
326+
return 0, fmt.Errorf("columns error: %w", err)
327+
}
281328

282329
return rowsRead, nil
283330
}

app/server/paging/interface.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ type RowTransformer[T Acceptor] interface {
2424
type ColumnarBuffer[T Acceptor] interface {
2525
// addRow saves a row obtained from the datasource into the columnar buffer
2626
addRow(rowTransformer RowTransformer[T]) error
27-
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
28-
addArrowRecord(record arrow.Record) error
2927
// ToResponse returns all the accumulated data and clears buffer
3028
ToResponse() (*api_service_protos.TReadSplitsResponse, error)
3129
// Release frees resources if buffer is no longer used

0 commit comments

Comments
 (0)