Skip to content

Commit aabd096

Browse files
authored
Add accessor for Binary data (#1383)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? Adds an accessor for Binary blobs Resolves #1382 <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. -->
1 parent e010b95 commit aabd096

File tree

3 files changed

+227
-2
lines changed

3 files changed

+227
-2
lines changed

kernel/src/engine/arrow_data.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ impl ArrowEngineData {
311311
debug!("Pushing string array for {}", ColumnName::new(path));
312312
col.as_string_opt().map(|a| a as _).ok_or("string")
313313
}
314+
&DataType::BINARY => {
315+
debug!("Pushing binary array for {}", ColumnName::new(path));
316+
col.as_binary_opt().map(|a| a as _).ok_or("binary")
317+
}
314318
&DataType::INTEGER => {
315319
debug!("Pushing int32 array for {}", ColumnName::new(path));
316320
col.as_primitive_opt::<Int32Type>()
@@ -734,4 +738,138 @@ mod tests {
734738

735739
Ok(())
736740
}
741+
742+
#[test]
743+
fn test_binary_column_extraction() -> DeltaResult<()> {
744+
use crate::arrow::array::BinaryArray;
745+
use crate::engine_data::{GetData, RowVisitor};
746+
use crate::schema::ColumnName;
747+
use std::sync::LazyLock;
748+
749+
// Create a RecordBatch with binary data
750+
let binary_data: Vec<Option<&[u8]>> = vec![
751+
Some(b"hello"),
752+
Some(b"world"),
753+
None,
754+
Some(b"\x00\x01\x02\x03"),
755+
];
756+
let binary_array = BinaryArray::from(binary_data.clone());
757+
758+
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
759+
"data",
760+
ArrowDataType::Binary,
761+
true,
762+
)]));
763+
764+
let batch = RecordBatch::try_new(schema, vec![Arc::new(binary_array)])?;
765+
let arrow_data = ArrowEngineData::new(batch);
766+
767+
// Create a visitor to extract binary data
768+
struct BinaryVisitor {
769+
values: Vec<Option<Vec<u8>>>,
770+
}
771+
772+
impl RowVisitor for BinaryVisitor {
773+
fn selected_column_names_and_types(
774+
&self,
775+
) -> (&'static [ColumnName], &'static [DataType]) {
776+
static NAMES: LazyLock<Vec<ColumnName>> =
777+
LazyLock::new(|| vec![ColumnName::new(["data"])]);
778+
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
779+
(&NAMES, &TYPES)
780+
}
781+
782+
fn visit<'a>(
783+
&mut self,
784+
row_count: usize,
785+
getters: &[&'a dyn GetData<'a>],
786+
) -> DeltaResult<()> {
787+
assert_eq!(getters.len(), 1);
788+
let getter = getters[0];
789+
790+
for i in 0..row_count {
791+
self.values
792+
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
793+
}
794+
Ok(())
795+
}
796+
}
797+
798+
let mut visitor = BinaryVisitor { values: vec![] };
799+
arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor)?;
800+
801+
// Verify the extracted values
802+
assert_eq!(visitor.values.len(), 4);
803+
assert_eq!(visitor.values[0].as_deref(), Some(b"hello".as_ref()));
804+
assert_eq!(visitor.values[1].as_deref(), Some(b"world".as_ref()));
805+
assert_eq!(visitor.values[2], None);
806+
assert_eq!(
807+
visitor.values[3].as_deref(),
808+
Some(b"\x00\x01\x02\x03".as_ref())
809+
);
810+
811+
Ok(())
812+
}
813+
814+
#[test]
815+
fn test_binary_column_extraction_type_mismatch() -> DeltaResult<()> {
816+
use crate::engine_data::{GetData, RowVisitor};
817+
use crate::schema::ColumnName;
818+
use std::sync::LazyLock;
819+
820+
// Create a RecordBatch with Int32 data (not binary)
821+
let data: Vec<Option<i32>> = vec![Some(123)];
822+
let int_array = Int32Array::from(data);
823+
824+
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
825+
"data",
826+
ArrowDataType::Int32,
827+
true,
828+
)]));
829+
830+
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)])?;
831+
let arrow_data = ArrowEngineData::new(batch);
832+
833+
// Create a visitor that tries to extract binary data from an int column
834+
struct BinaryVisitor {
835+
values: Vec<Option<Vec<u8>>>,
836+
}
837+
838+
impl RowVisitor for BinaryVisitor {
839+
fn selected_column_names_and_types(
840+
&self,
841+
) -> (&'static [ColumnName], &'static [DataType]) {
842+
static NAMES: LazyLock<Vec<ColumnName>> =
843+
LazyLock::new(|| vec![ColumnName::new(["data"])]);
844+
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
845+
(&NAMES, &TYPES)
846+
}
847+
848+
fn visit<'a>(
849+
&mut self,
850+
row_count: usize,
851+
getters: &[&'a dyn GetData<'a>],
852+
) -> DeltaResult<()> {
853+
assert_eq!(getters.len(), 1);
854+
let getter = getters[0];
855+
856+
for i in 0..row_count {
857+
self.values
858+
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
859+
}
860+
Ok(())
861+
}
862+
}
863+
864+
let mut visitor = BinaryVisitor { values: vec![] };
865+
let result = arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor);
866+
867+
// Verify that we get a type mismatch error
868+
assert_result_error_with_message(
869+
result,
870+
"Type mismatch on data: expected binary, got Int32",
871+
);
872+
873+
Ok(())
874+
}
737875
}

kernel/src/engine/arrow_get_data.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::arrow::array::{
2-
types::{GenericStringType, Int32Type, Int64Type},
2+
types::{GenericBinaryType, GenericStringType, Int32Type, Int64Type},
33
Array, BooleanArray, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait,
44
PrimitiveArray,
55
};
@@ -51,6 +51,16 @@ impl<'a> GetData<'a> for GenericByteArray<GenericStringType<i32>> {
5151
}
5252
}
5353

54+
impl<'a> GetData<'a> for GenericByteArray<GenericBinaryType<i32>> {
55+
fn get_binary(&'a self, row_index: usize, _field_name: &str) -> DeltaResult<Option<&'a [u8]>> {
56+
if self.is_valid(row_index) {
57+
Ok(Some(self.value(row_index)))
58+
} else {
59+
Ok(None)
60+
}
61+
}
62+
}
63+
5464
impl<'a, OffsetSize> GetData<'a> for GenericListArray<OffsetSize>
5565
where
5666
OffsetSize: OffsetSizeTrait,

kernel/src/engine_data.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{AsAny, DeltaResult, Error};
1313
///
1414
/// A value of `true` in the selection vector means the corresponding row is selected (i.e., not deleted),
1515
/// while `false` means the row is logically deleted and should be ignored. If the selection vector is shorter
16-
/// then the number of rows in `data` then all rows not covered by the selection vector are assumed to be selected.
16+
/// than the number of rows in `data` then all rows not covered by the selection vector are assumed to be selected.
1717
///
1818
/// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior.
1919
pub struct FilteredEngineData {
@@ -196,6 +196,7 @@ pub trait GetData<'a> {
196196
(get_int, i32),
197197
(get_long, i64),
198198
(get_str, &'a str),
199+
(get_binary, &'a [u8]),
199200
(get_list, ListItem<'a>),
200201
(get_map, MapItem<'a>)
201202
);
@@ -217,6 +218,7 @@ impl<'a> GetData<'a> for () {
217218
(get_int, i32),
218219
(get_long, i64),
219220
(get_str, &'a str),
221+
(get_binary, &'a [u8]),
220222
(get_list, ListItem<'a>),
221223
(get_map, MapItem<'a>)
222224
);
@@ -251,6 +253,7 @@ impl_typed_get_data!(
251253
(get_int, i32),
252254
(get_long, i64),
253255
(get_str, &'a str),
256+
(get_binary, &'a [u8]),
254257
(get_list, ListItem<'a>),
255258
(get_map, MapItem<'a>)
256259
);
@@ -513,6 +516,80 @@ mod tests {
513516
}
514517
}
515518

519+
#[test]
520+
521+
fn test_get_binary_some_value() {
522+
use crate::arrow::array::BinaryArray;
523+
524+
// Use Arrow's BinaryArray implementation
525+
let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello"), Some(b"world"), None];
526+
let binary_array = BinaryArray::from(binary_data);
527+
528+
// Cast to dyn GetData to use TypedGetData trait
529+
let getter: &dyn GetData<'_> = &binary_array;
530+
531+
// Test getting first row
532+
let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
533+
assert_eq!(result, Some(b"hello".as_ref()));
534+
535+
// Test getting second row
536+
let result: Option<&[u8]> = getter.get_opt(1, "binary_field").unwrap();
537+
assert_eq!(result, Some(b"world".as_ref()));
538+
539+
// Test getting None value
540+
let result: Option<&[u8]> = getter.get_opt(2, "binary_field").unwrap();
541+
assert_eq!(result, None);
542+
}
543+
544+
#[test]
545+
fn test_get_binary_required() {
546+
use crate::arrow::array::BinaryArray;
547+
548+
let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello")];
549+
let binary_array = BinaryArray::from(binary_data);
550+
551+
// Cast to dyn GetData to use TypedGetData trait
552+
let getter: &dyn GetData<'_> = &binary_array;
553+
554+
// Test using get() for required field
555+
let result: &[u8] = getter.get(0, "binary_field").unwrap();
556+
assert_eq!(result, b"hello");
557+
}
558+
559+
#[test]
560+
fn test_get_binary_required_missing() {
561+
use crate::arrow::array::BinaryArray;
562+
563+
let binary_data: Vec<Option<&[u8]>> = vec![None];
564+
let binary_array = BinaryArray::from(binary_data);
565+
566+
// Cast to dyn GetData to use TypedGetData trait
567+
let getter: &dyn GetData<'_> = &binary_array;
568+
569+
// Test using get() for missing required field should error
570+
let result: DeltaResult<&[u8]> = getter.get(0, "binary_field");
571+
assert!(result.is_err());
572+
if let Err(e) = result {
573+
assert!(e.to_string().contains("Data missing for field"));
574+
}
575+
}
576+
577+
#[test]
578+
fn test_get_binary_empty_bytes() {
579+
use crate::arrow::array::BinaryArray;
580+
581+
let binary_data: Vec<Option<&[u8]>> = vec![Some(b"")];
582+
let binary_array = BinaryArray::from(binary_data);
583+
584+
// Cast to dyn GetData to use TypedGetData trait
585+
let getter: &dyn GetData<'_> = &binary_array;
586+
587+
// Test getting empty bytes
588+
let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
589+
assert_eq!(result, Some([].as_ref()));
590+
assert_eq!(result.unwrap().len(), 0);
591+
}
592+
516593
#[test]
517594
fn test_from_engine_data() {
518595
let data = get_engine_data(3);

0 commit comments

Comments
 (0)