Skip to content

Commit 87bdac7

Browse files
committed
Use Arrow Records in the datasource
1 parent 6497598 commit 87bdac7

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

app/server/datasource/rdbms/datasource.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,37 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
217217

218218
defer common.LogCloserError(logger, queryResult, "close query result")
219219

220-
rows := queryResult.Rows
221-
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
220+
var rowsRead int64
221+
222+
var processErr error
223+
224+
// Choose the appropriate processing method based on which field is filled
225+
if queryResult.Rows != nil {
226+
rowsRead, processErr = ds.processRowBasedResult(query, queryResult.Rows, sink)
227+
} else if queryResult.Columns != nil {
228+
rowsRead, processErr = ds.processArrowBasedResult(queryResult.Columns, sink)
229+
} else {
230+
return 0, fmt.Errorf("query result contains neither Rows nor Columns")
231+
}
232+
233+
if processErr != nil {
234+
return 0, processErr
235+
}
236+
237+
// Notify sink that there will be no more data from this connection.
238+
// Hours lost in attempts to move this call into defer: 2
239+
sink.Finish()
240+
241+
return rowsRead, nil
242+
}
222243

244+
// processRowBasedResult processes row-based results from the database
245+
func (ds *dataSourceImpl) processRowBasedResult(
246+
query *rdbms_utils.SelectQuery,
247+
rows rdbms_utils.Rows,
248+
sink paging.Sink[any],
249+
) (int64, error) {
250+
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
223251
if err != nil {
224252
return 0, fmt.Errorf("make transformer: %w", err)
225253
}
@@ -250,9 +278,28 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
250278
return 0, fmt.Errorf("rows error: %w", err)
251279
}
252280

253-
// Notify sink that there will be no more data from this connection.
254-
// Hours lost in attempts to move this call into defer: 2
255-
sink.Finish()
281+
return rowsRead, nil
282+
}
283+
284+
// processArrowBasedResult processes Arrow-based results from the database
285+
func (dataSourceImpl) processArrowBasedResult(
286+
columns rdbms_utils.Columns,
287+
sink paging.Sink[any],
288+
) (int64, error) {
289+
rowsRead := int64(0)
290+
291+
for columns.Next() {
292+
record := columns.Record()
293+
rowsRead += record.NumRows()
294+
295+
if err := sink.AddArrowRecord(record); err != nil {
296+
return 0, fmt.Errorf("add arrow record to paging writer: %w", err)
297+
}
298+
}
299+
300+
if err := columns.Err(); err != nil {
301+
return 0, fmt.Errorf("columns error: %w", err)
302+
}
256303

257304
return rowsRead, nil
258305
}

app/server/paging/interface.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ type RowTransformer[T Acceptor] interface {
2424
type ColumnarBuffer[T Acceptor] interface {
2525
// addRow saves a row obtained from the datasource into the columnar buffer
2626
addRow(rowTransformer RowTransformer[T]) error
27-
// addArrowRecord saves an Arrow Block obtained from the datasource into the columnar buffer
28-
addArrowRecord(record arrow.Record) error
2927
// ToResponse returns all the accumulated data and clears buffer
3028
ToResponse() (*api_service_protos.TReadSplitsResponse, error)
3129
// Release frees resources if buffer is no longer used

0 commit comments

Comments
 (0)