Skip to content

Commit e655bba

Browse files
committed
replace builders
1 parent 055958e commit e655bba

File tree

1 file changed

+27
-154
lines changed

1 file changed

+27
-154
lines changed

native/core/src/execution/planner.rs

Lines changed: 27 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,11 @@ use datafusion::physical_expr::LexOrdering;
8686

8787
use crate::parquet::parquet_exec::init_datasource_exec;
8888
use arrow::array::{
89-
BinaryArray, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Array, Decimal128Builder,
90-
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder,
91-
NullArray, StringBuilder, TimestampMicrosecondBuilder,
89+
BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
90+
Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder,
91+
TimestampMicrosecondArray,
9292
};
93+
use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, OffsetBuffer};
9394
use datafusion::common::utils::SingleRowListArrayBuilder;
9495
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
9596
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
@@ -490,181 +491,64 @@ impl PhysicalPlanner {
490491
}
491492
DataType::Boolean => {
492493
let vals = values.clone();
493-
let len = vals.boolean_values.len();
494-
let mut arr = BooleanBuilder::with_capacity(len);
495-
496-
for i in 0 .. len {
497-
if !vals.null_mask[i] {
498-
arr.append_value(vals.boolean_values[i]);
499-
} else {
500-
arr.append_null();
501-
}
502-
}
503-
504-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
494+
SingleRowListArrayBuilder::new(Arc::new(BooleanArray::new(BooleanBuffer::from(vals.boolean_values), Some(vals.null_mask.into()))))
505495
.build_list_scalar()
506496
}
507497
DataType::Int8 => {
508498
let vals = values.clone();
509-
let len = vals.byte_values.len();
510-
let mut arr = Int8Builder::with_capacity(len);
511-
512-
for i in 0 .. len {
513-
if !vals.null_mask[i] {
514-
arr.append_value(vals.byte_values[i] as i8);
515-
} else {
516-
arr.append_null();
517-
}
518-
}
519-
520-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
499+
SingleRowListArrayBuilder::new(Arc::new(Int8Array::new(vals.byte_values.iter().map(|&x| x as i8).collect::<Vec<_>>().into(), Some(vals.null_mask.into()))))
521500
.build_list_scalar()
522501
}
523502
DataType::Int16 => {
524503
let vals = values.clone();
525-
let len = vals.short_values.len();
526-
let mut arr = Int16Builder::with_capacity(len);
527-
528-
for i in 0 .. len {
529-
if !vals.null_mask[i] {
530-
arr.append_value(vals.short_values[i] as i16);
531-
} else {
532-
arr.append_null();
533-
}
534-
}
535-
536-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
504+
SingleRowListArrayBuilder::new(Arc::new(Int16Array::new(vals.short_values.iter().map(|&x| x as i16).collect::<Vec<_>>().into(), Some(vals.null_mask.into()))))
537505
.build_list_scalar()
538506
}
539507
DataType::Int32 => {
540508
let vals = values.clone();
541-
let len = vals.int_values.len();
542-
let mut arr = Int32Builder::with_capacity(len);
543-
544-
for i in 0 .. len {
545-
if !vals.null_mask[i] {
546-
arr.append_value(vals.int_values[i]);
547-
} else {
548-
arr.append_null();
549-
}
550-
}
551-
552-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
509+
SingleRowListArrayBuilder::new(Arc::new(Int32Array::new(vals.int_values.into(), Some(vals.null_mask.into()))))
553510
.build_list_scalar()
554511
}
555512
DataType::Int64 => {
556513
let vals = values.clone();
557-
let len = vals.long_values.len();
558-
let mut arr = Int64Builder::with_capacity(len);
559-
560-
for i in 0 .. len {
561-
if !vals.null_mask[i] {
562-
arr.append_value(vals.long_values[i]);
563-
} else {
564-
arr.append_null();
565-
}
566-
}
567-
568-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
514+
SingleRowListArrayBuilder::new(Arc::new(Int64Array::new(vals.long_values.into(), Some(vals.null_mask.into()))))
569515
.build_list_scalar()
570516
}
571517
DataType::Float32 => {
572518
let vals = values.clone();
573-
let len = vals.float_values.len();
574-
let mut arr = Float32Builder::with_capacity(len);
575-
576-
for i in 0 .. len {
577-
if !vals.null_mask[i] {
578-
arr.append_value(vals.float_values[i]);
579-
} else {
580-
arr.append_null();
581-
}
582-
}
583-
584-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
519+
SingleRowListArrayBuilder::new(Arc::new(Float32Array::new(vals.float_values.into(), Some(vals.null_mask.into()))))
585520
.build_list_scalar()
586521
}
587522
DataType::Float64 => {
588523
let vals = values.clone();
589-
let len = vals.double_values.len();
590-
let mut arr = Float64Builder::with_capacity(len);
591-
592-
for i in 0 .. len {
593-
if !vals.null_mask[i] {
594-
arr.append_value(vals.double_values[i]);
595-
} else {
596-
arr.append_null();
597-
}
598-
}
599-
600-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
524+
SingleRowListArrayBuilder::new(Arc::new(Float64Array::new(vals.double_values.into(), Some(vals.null_mask.into()))))
601525
.build_list_scalar()
602526
}
603527
DataType::Timestamp(TimeUnit::Microsecond, None) => {
604528
let vals = values.clone();
605-
let len = vals.long_values.len();
606-
let mut arr = TimestampMicrosecondBuilder::with_capacity(len);
607-
608-
for i in 0 .. len {
609-
if !vals.null_mask[i] {
610-
arr.append_value(vals.long_values[i]);
611-
} else {
612-
arr.append_null();
613-
}
614-
}
615-
616-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
529+
SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into()))))
617530
.build_list_scalar()
618531
}
619532
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
620533
let vals = values.clone();
621-
let len = vals.long_values.len();
622-
let mut arr = TimestampMicrosecondBuilder::with_capacity(len);
623-
624-
for i in 0 .. len {
625-
if !vals.null_mask[i] {
626-
arr.append_value(vals.long_values[i]);
627-
} else {
628-
arr.append_null();
629-
}
630-
}
631-
632-
SingleRowListArrayBuilder::new(Arc::new(arr.finish().with_timezone(Arc::clone(tz))))
534+
SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into())).with_timezone(Arc::clone(tz))))
633535
.build_list_scalar()
634536
}
635537
DataType::Date32 => {
636538
let vals = values.clone();
637-
let len = vals.int_values.len();
638-
let mut arr = Date32Builder::with_capacity(len);
639-
640-
for i in 0 .. len {
641-
if !vals.null_mask[i] {
642-
arr.append_value(vals.int_values[i]);
643-
} else {
644-
arr.append_null();
645-
}
646-
}
647-
648-
SingleRowListArrayBuilder::new(Arc::new(arr.finish()))
539+
SingleRowListArrayBuilder::new(Arc::new(Date32Array::new(vals.int_values.into(), Some(vals.null_mask.into()))))
649540
.build_list_scalar()
650541
}
651542
DataType::Binary => {
652543
let vals = values.clone();
653-
let mut arr = BinaryBuilder::new();
654-
655-
for (i, v) in vals.bytes_values.into_iter().enumerate() {
656-
if !vals.null_mask[i] {
657-
arr.append_value(v);
658-
} else {
659-
arr.append_null();
660-
}
661-
}
662-
663-
let binary_array: BinaryArray = arr.finish();
664-
SingleRowListArrayBuilder::new(Arc::new(binary_array))
544+
let offsets = MutableBuffer::new((vals.bytes_values.len() + 1) * size_of::<i32>());
545+
let offsets = Buffer::from(offsets);
546+
let value_offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) };
547+
SingleRowListArrayBuilder::new(Arc::new(BinaryArray::new(value_offsets, vals.int_values.into(), Some(vals.null_mask.into()))))
665548
.build_list_scalar()
666549
}
667550
DataType::Utf8 => {
551+
// Using a builder here as it is quite complicated to create StringArray from vector of string with nulls
668552
let vals = values.clone();
669553
let len = vals.string_values.len();
670554
let mut arr = StringBuilder::with_capacity(len, len);
@@ -682,25 +566,14 @@ impl PhysicalPlanner {
682566
}
683567
DataType::Decimal128(p, s) => {
684568
let vals = values.clone();
685-
let mut arr = Decimal128Builder::new().with_precision_and_scale(*p, *s)?;
686-
687-
for (i, v) in vals.decimal_values.into_iter().enumerate() {
688-
if !vals.null_mask[i] {
689-
let big_integer = BigInt::from_signed_bytes_be(&v);
690-
let integer = big_integer.to_i128().ok_or_else(|| {
691-
GeneralError(format!(
692-
"Cannot parse {big_integer:?} as i128 for Decimal literal"
693-
))
694-
})?;
695-
arr.append_value(integer);
696-
} else {
697-
arr.append_null();
698-
}
699-
}
700-
701-
let decimal_array: Decimal128Array = arr.finish();
702-
SingleRowListArrayBuilder::new(Arc::new(decimal_array))
703-
.build_list_scalar()
569+
SingleRowListArrayBuilder::new(Arc::new(Decimal128Array::new(vals.decimal_values.into_iter().map(|v| {
570+
let big_integer = BigInt::from_signed_bytes_be(&v);
571+
big_integer.to_i128().ok_or_else(|| {
572+
return GeneralError(format!(
573+
"Cannot parse {big_integer:?} as i128 for Decimal literal"
574+
))
575+
}).unwrap()
576+
}).collect::<Vec<_>>().into(), Some(vals.null_mask.into())).with_precision_and_scale(*p, *s)?)).build_list_scalar()
704577
}
705578
dt => {
706579
return Err(GeneralError(format!(

0 commit comments

Comments
 (0)