Skip to content

Commit be9c708

Browse files
authored
Use operators for RecordBatch conversion (#5771)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 4b0b39c commit be9c708

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

vortex-scan/src/arrow.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use arrow_schema::SchemaRef;
1010
use futures::Stream;
1111
use futures::TryStreamExt;
1212
use vortex_array::ArrayRef;
13+
use vortex_array::arrow::ArrowArrayExecutor;
1314
use vortex_array::arrow::IntoArrowArray;
1415
use vortex_error::VortexResult;
1516
use vortex_io::runtime::BlockingRuntime;
17+
use vortex_layout::layouts::USE_VORTEX_OPERATORS;
18+
use vortex_session::VortexSession;
1619

1720
use crate::ScanBuilder;
1821

@@ -28,9 +31,10 @@ impl ScanBuilder<ArrayRef> {
2831
runtime: &B,
2932
) -> VortexResult<impl RecordBatchReader + 'static> {
3033
let data_type = DataType::Struct(schema.fields().clone());
34+
let session = self.session().clone();
3135

3236
let iter = self
33-
.map(move |chunk| to_record_batch(chunk, &data_type))
37+
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
3438
.into_iter(runtime)?
3539
.map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
3640

@@ -42,19 +46,29 @@ impl ScanBuilder<ArrayRef> {
4246
schema: SchemaRef,
4347
) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
4448
let data_type = DataType::Struct(schema.fields().clone());
49+
let session = self.session().clone();
4550

4651
let stream = self
47-
.map(move |chunk| to_record_batch(chunk, &data_type))
52+
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
4853
.into_stream()?
4954
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
5055

5156
Ok(stream)
5257
}
5358
}
5459

55-
fn to_record_batch(chunk: ArrayRef, data_type: &DataType) -> VortexResult<RecordBatch> {
56-
let arrow = chunk.into_arrow(data_type)?;
57-
Ok(RecordBatch::from(arrow.as_struct().clone()))
60+
fn to_record_batch(
61+
chunk: ArrayRef,
62+
data_type: &DataType,
63+
session: &VortexSession,
64+
) -> VortexResult<RecordBatch> {
65+
if *USE_VORTEX_OPERATORS {
66+
let arrow = chunk.execute_arrow(data_type, session)?;
67+
Ok(RecordBatch::from(arrow.as_struct().clone()))
68+
} else {
69+
let arrow = chunk.into_arrow(data_type)?;
70+
Ok(RecordBatch::from(arrow.as_struct().clone()))
71+
}
5872
}
5973

6074
/// We create an adapter for record batch iterators that supports clone.
@@ -114,6 +128,7 @@ mod tests {
114128
use vortex_error::VortexResult;
115129

116130
use super::*;
131+
use crate::test::SESSION;
117132

118133
fn create_test_struct_array() -> VortexResult<ArrayRef> {
119134
// Create Arrow arrays
@@ -152,7 +167,7 @@ mod tests {
152167
let schema = create_arrow_schema();
153168
let data_type = DataType::Struct(schema.fields().clone());
154169

155-
let batch = to_record_batch(vortex_array, &data_type)?;
170+
let batch = to_record_batch(vortex_array, &data_type, &SESSION)?;
156171
assert_eq!(batch.num_columns(), 2);
157172
assert_eq!(batch.num_rows(), 4);
158173

vortex-scan/src/scan_builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ impl<A: 'static + Send> ScanBuilder<A> {
180180
self.projection.return_dtype(self.layout_reader.dtype())
181181
}
182182

183+
/// The session used by the scan.
184+
pub fn session(&self) -> &VortexSession {
185+
&self.session
186+
}
187+
183188
/// Map each split of the scan. The function will be run on the spawned task.
184189
pub fn map<B: 'static>(
185190
self,

0 commit comments

Comments
 (0)