Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/daft-core/src/array/extension_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::sync::Arc;

use arrow::{array::ArrayRef, buffer::NullBuffer};
use common_error::DaftResult;
use daft_schema::{dtype::DataType, field::Field};

use crate::{datatypes::DaftArrayType, series::Series};

#[derive(Clone, Debug)]
pub struct ExtensionArray {
field: Arc<Field>,
/// Extension type name (e.g. "geoarrow.point")
extension_name: Arc<str>,
/// Extension metadata (e.g. '{"crs": "WGS84"}')
metadata: Option<Arc<str>>,
/// The underlying storage data
pub physical: Series,
}

impl ExtensionArray {
pub fn new(field: Arc<Field>, physical: Series) -> Self {
let DataType::Extension(ext_name, _, ext_metadata) = &field.dtype else {
panic!(
"ExtensionArray field must have Extension dtype, got {}",
field.dtype
);
};
Self {
extension_name: Arc::from(ext_name.as_str()),
metadata: ext_metadata.as_deref().map(Arc::from),
field,
physical,
}
}

pub fn name(&self) -> &str {
self.field.name.as_ref()
}

pub fn data_type(&self) -> &DataType {
&self.field.dtype
}

pub fn extension_name(&self) -> &str {
&self.extension_name
}

pub fn extension_metadata(&self) -> Option<&str> {
self.metadata.as_deref()
}

pub fn field(&self) -> &Field {
&self.field
}

pub fn len(&self) -> usize {
self.physical.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn rename(&self, name: &str) -> Self {
Self {
field: Arc::new(Field::new(name, self.field.dtype.clone())),
extension_name: self.extension_name.clone(),
metadata: self.metadata.clone(),
physical: self.physical.rename(name),
}
}

/// Replace the underlying physical `Series` of this `ExtensionArray`.
pub fn with_physical(&self, physical: Series) -> Self {
Self {
field: self.field.clone(),
extension_name: self.extension_name.clone(),
metadata: self.metadata.clone(),
physical,
}
}

pub fn nulls(&self) -> Option<&NullBuffer> {
self.physical.inner.nulls()
}

pub fn to_arrow(&self) -> DaftResult<ArrayRef> {
let arr = self.physical.to_arrow()?;
let target_field = self.field.to_arrow()?;
if arr.data_type() != target_field.data_type() {
Ok(arrow::compute::cast(&arr, target_field.data_type())?)
} else {
Ok(arr)
}
}

pub fn slice(&self, start: usize, end: usize) -> DaftResult<Self> {
Ok(self.with_physical(self.physical.slice(start, end)?))
}

pub fn concat(arrays: &[&Self]) -> DaftResult<Self> {
if arrays.is_empty() {
return Err(common_error::DaftError::ValueError(
"Cannot concat empty list of ExtensionArrays".to_string(),
));
}
let first = arrays[0];
let physical_arrays: Vec<&Series> = arrays.iter().map(|a| &a.physical).collect();
let physical = Series::concat(&physical_arrays)?;
Ok(first.with_physical(physical))
}
}

impl DaftArrayType for ExtensionArray {
fn data_type(&self) -> &DataType {
&self.field.dtype
}
}

impl crate::array::ops::from_arrow::FromArrow for ExtensionArray {
fn from_arrow<F: Into<daft_schema::field::FieldRef>>(
field: F,
arrow_arr: ArrayRef,
) -> DaftResult<Self> {
let field: daft_schema::field::FieldRef = field.into();
let DataType::Extension(_, storage_type, _) = &field.dtype else {
return Err(common_error::DaftError::TypeError(format!(
"Expected Extension dtype for ExtensionArray, got {}",
field.dtype
)));
};
let storage_field = Arc::new(Field::new(field.name.as_ref(), *storage_type.clone()));
let physical = Series::from_arrow(storage_field, arrow_arr)?;
Ok(Self::new(field, physical))
}
}

impl crate::array::ops::full::FullNull for ExtensionArray {
fn full_null(name: &str, dtype: &DataType, length: usize) -> Self {
let DataType::Extension(_, storage_type, _) = dtype else {
panic!("Expected Extension dtype for ExtensionArray::full_null, got {dtype}");
};
let physical = Series::full_null(name, storage_type, length);
Self::new(Arc::new(Field::new(name, dtype.clone())), physical)
}

fn empty(name: &str, dtype: &DataType) -> Self {
let DataType::Extension(_, storage_type, _) = dtype else {
panic!("Expected Extension dtype for ExtensionArray::empty, got {dtype}");
};
let physical = Series::empty(name, storage_type);
Self::new(Arc::new(Field::new(name, dtype.clone())), physical)
}
}
54 changes: 54 additions & 0 deletions src/daft-core/src/array/growable/extension_growable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_schema::{dtype::DataType, field::Field};

use super::Growable;
use crate::{
array::extension_array::ExtensionArray,
series::{IntoSeries, Series},
};

pub struct ExtensionGrowable<'a> {
name: String,
dtype: DataType,
physical_growable: Box<dyn Growable + 'a>,
}

impl<'a> ExtensionGrowable<'a> {
pub fn new(
name: &str,
dtype: &DataType,
arrays: Vec<&'a ExtensionArray>,
use_validity: bool,
capacity: usize,
) -> Self {
let DataType::Extension(_, storage_type, _) = dtype else {
panic!("Expected Extension dtype for ExtensionGrowable, got {dtype}");
};
let physical_series: Vec<&Series> = arrays.iter().map(|a| &a.physical).collect();
let physical_growable =
super::make_growable(name, storage_type, physical_series, use_validity, capacity);
Self {
name: name.to_string(),
dtype: dtype.clone(),
physical_growable,
}
}
}

impl Growable for ExtensionGrowable<'_> {
fn extend(&mut self, index: usize, start: usize, len: usize) {
self.physical_growable.extend(index, start, len);
}

fn add_nulls(&mut self, additional: usize) {
self.physical_growable.add_nulls(additional);
}

fn build(&mut self) -> DaftResult<Series> {
let physical = self.physical_growable.build()?;
let field = Arc::new(Field::new(self.name.as_str(), self.dtype.clone()));
Ok(ExtensionArray::new(field, physical).into_series())
}
}
23 changes: 18 additions & 5 deletions src/daft-core/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use common_error::DaftResult;
use extension_growable::ExtensionGrowable;

use crate::{
array::{FixedSizeListArray, ListArray, StructArray, prelude::*},
array::{
FixedSizeListArray, ListArray, StructArray, extension_array::ExtensionArray, prelude::*,
},
datatypes::{FileArray, prelude::*},
file::DaftMediaType,
series::Series,
Expand All @@ -10,6 +13,7 @@ use crate::{

mod arrow_growable;
mod bitmap_growable;
mod extension_growable;
mod fixed_size_list_growable;
mod list_growable;
mod logical_growable;
Expand Down Expand Up @@ -170,10 +174,19 @@ impl_growable_array!(
arrow_growable::ArrowGrowable<'a, FixedSizeBinaryType>
);
impl_growable_array!(Utf8Array, arrow_growable::ArrowGrowable<'a, Utf8Type>);
impl_growable_array!(
ExtensionArray,
arrow_growable::ArrowGrowable<'a, ExtensionType>
);
impl GrowableArray for ExtensionArray {
type GrowableType<'a> = ExtensionGrowable<'a>;

fn make_growable<'a>(
name: &str,
dtype: &DataType,
arrays: Vec<&'a Self>,
use_validity: bool,
capacity: usize,
) -> Self::GrowableType<'a> {
ExtensionGrowable::new(name, dtype, arrays, use_validity, capacity)
}
}
impl_growable_array!(
FixedSizeListArray,
fixed_size_list_growable::FixedSizeListGrowable<'a>
Expand Down
1 change: 1 addition & 0 deletions src/daft-core/src/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod extension_array;
pub mod file_array;
mod fixed_size_list_array;
pub mod from;
Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/array/ops/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ macro_rules! impl_broadcast_via_concat {

impl_broadcast_via_concat!(FixedSizeListArray);
impl_broadcast_via_concat!(ListArray);
impl_broadcast_via_concat!(ExtensionArray);
#[cfg(feature = "python")]
impl_broadcast_via_concat!(PythonArray);
impl_broadcast_via_concat!(ExtensionArray);

impl Broadcastable for StructArray {
fn broadcast(&self, num: usize) -> DaftResult<Self> {
Expand Down
5 changes: 2 additions & 3 deletions src/daft-core/src/array/ops/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,9 @@ impl ExtensionArray {
idx,
self.len()
);
let is_valid = self.is_valid(idx);
if is_valid {
if self.physical.is_valid(idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential panic on to_arrow() failure

to_arrow() on ExtensionArray is now fallible — it performs a field lookup (which can fail if the extension type has no Arrow representation) and potentially a cast. The previous implementation was infallible (it returned the inner Arrow array directly). Using unwrap() here will panic at runtime if the cast fails, rather than returning None gracefully.

Consider propagating the error or returning None:

Suggested change
if self.physical.is_valid(idx) {
let scalar = Scalar::new(scalar.to_arrow().ok()?);

let scalar = self.slice(idx, idx + 1).unwrap();
let scalar = Scalar::new(scalar.to_arrow());
let scalar = Scalar::new(scalar.to_arrow().unwrap());
Some(scalar)
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/array/ops/get_lit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl ExtensionArray {
self.len()
);

if self.is_valid(idx) {
if self.physical.is_valid(idx) {
Literal::Extension(self.slice(idx, idx + 1).unwrap().into_series())
} else {
Literal::Null
Expand Down
9 changes: 9 additions & 0 deletions src/daft-core/src/array/ops/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,12 @@ impl StructArray {
}
}
}
impl ExtensionArray {
#[inline]
pub fn is_valid(&self, idx: usize) -> bool {
match self.nulls() {
None => true,
Some(nulls) => nulls.is_valid(idx),
}
}
}
16 changes: 4 additions & 12 deletions src/daft-core/src/array/serdes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use super::{DataArray, FixedSizeListArray, ListArray, StructArray};
use crate::prelude::PythonArray;
use crate::{
datatypes::{
BinaryArray, BooleanArray, DaftLogicalType, DaftPrimitiveType, DataType, ExtensionArray,
Field, FixedSizeBinaryArray, Int64Array, IntervalArray, NullArray, Utf8Array,
BinaryArray, BooleanArray, DaftLogicalType, DaftPrimitiveType, ExtensionArray,
FixedSizeBinaryArray, Int64Array, IntervalArray, NullArray, Utf8Array,
logical::LogicalArray,
},
series::{IntoSeries, Series},
series::IntoSeries,
};

pub struct IterSer<I>
Expand Down Expand Up @@ -102,15 +102,7 @@ impl serde::Serialize for ExtensionArray {
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
let DataType::Extension(_, inner, _) = self.data_type() else {
panic!("Expected Extension Type!")
};
let values = Series::from_arrow(
Field::new("physical", inner.as_ref().clone()),
self.to_arrow(),
)
.unwrap();
s.serialize_entry("values", &values)?;
s.serialize_entry("values", &self.physical)?;
s.end()
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/daft-core/src/datatypes/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ macro_rules! with_match_physical_daft_types {
DataType::FixedSizeList(_, _) => __with_ty__! { FixedSizeListType },
DataType::List(_) => __with_ty__! { ListType },
DataType::Struct(_) => __with_ty__! { StructType },
DataType::Extension(_, _, _) => __with_ty__! { ExtensionType },
DataType::Interval => __with_ty__! { IntervalType },
#[cfg(feature = "python")]
DataType::Python => __with_ty__! { PythonType },
Expand Down Expand Up @@ -129,7 +128,6 @@ macro_rules! with_match_arrow_daft_types {
DataType::Float64 => __with_ty__! { Float64Type },
DataType::Decimal128(..) => __with_ty__! { Decimal128Type },
DataType::List(_) => __with_ty__! { ListType },
DataType::Extension(_, _, _) => __with_ty__! { ExtensionType },
DataType::Utf8 => __with_ty__! { Utf8Type },

_ => panic!("{:?} not implemented", $key_type)
Expand Down
15 changes: 13 additions & 2 deletions src/daft-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,18 @@ impl_daft_arrow_datatype!(Float64Type, Float64);
impl_daft_arrow_datatype!(BinaryType, Binary);
impl_daft_arrow_datatype!(FixedSizeBinaryType, Unknown);
impl_daft_arrow_datatype!(Utf8Type, Utf8);
impl_daft_arrow_datatype!(ExtensionType, Unknown);
// ExtensionType is a logical type backed by a variable physical type (stored as Series).
// It is neither DaftPhysicalType nor DaftArrowBackedType.
#[derive(Clone, Debug)]
pub struct ExtensionType {}

impl DaftDataType for ExtensionType {
#[inline]
fn get_dtype() -> DataType {
DataType::Unknown
}
type ArrayType = crate::array::extension_array::ExtensionArray;
}
impl_daft_arrow_datatype!(Decimal128Type, Unknown);

impl_nested_datatype!(FixedSizeListType, FixedSizeListArray);
Expand Down Expand Up @@ -461,6 +472,6 @@ pub type Float64Array = DataArray<Float64Type>;
pub type BinaryArray = DataArray<BinaryType>;
pub type FixedSizeBinaryArray = DataArray<FixedSizeBinaryType>;
pub type Utf8Array = DataArray<Utf8Type>;
pub type ExtensionArray = DataArray<ExtensionType>;
pub use crate::array::extension_array::ExtensionArray;
pub type IntervalArray = DataArray<IntervalType>;
pub type Decimal128Array = DataArray<Decimal128Type>;
13 changes: 0 additions & 13 deletions src/daft-core/src/series/array_impl/data_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,3 @@ impl_series_like_for_data_array!(Float64Array);
impl_series_like_for_data_array!(Utf8Array);
impl_series_like_for_data_array!(IntervalArray);
impl_series_like_for_data_array!(Decimal128Array);
impl_series_like_for_data_array!(ExtensionArray, {
fn to_arrow(&self) -> DaftResult<ArrayRef> {
let arr: ArrayRef = self.0.to_arrow();
// Reverse the coercion applied during from_arrow (e.g. LargeBinary → Binary)
// so callers see the original storage type.
let target_field = self.0.field.to_arrow()?;
if arr.data_type() != target_field.data_type() {
Ok(arrow::compute::cast(&arr, target_field.data_type())?)
} else {
Ok(arr)
}
}
});
Loading
Loading