Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions parquet-variant/src/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,8 @@ impl<'m, 'v> VariantObject<'m, 'v> {
let field_name = self.metadata.get_field_by(field_id)?;

let start_offset = field_offsets[i];
let end_offset = field_offsets[i + 1];
let value_bytes = slice_from_slice(
self.value,
self.header.values_start_byte + start_offset
..self.header.values_start_byte + end_offset,
)?;
let value_bytes =
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec says that the offsets may be non monotonically increasing, so the correct slice is all the subsequent bytes (even though fewer may be used)

https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types

This implies that the field_offset values may not be monotonically increasing. For example, for the following object:

If I didn't make this change the tests asserted on me

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. And the problem would arise if the "next" offset (by field name) corresponds to a sub-object that's physically earlier in the layout. So we can't compute the size of the sub-object using offsets. Unless we're willing to search/sort the whole offset list to find the upper bound (= 🙀).

I suppose we could at least limit the slice (in the common case) by using the "next" offset only when it's not smaller than the starting offset. In that case, it should be a safe upper bound. But I'm not sure how helpful it would actually be, given that no invariant is reliably enforced?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... a buggy/malicious variant buffer could potentially lead to some "fun" here, where one sub-object refers to bytes shared by another sub-object. Hopefully at least one of the "overlapping" sub-objects would be obviously invalid in a way we could detect, but there's no guarantee of that.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bonus points if somebody can craft an overlapping pair of sub-objects that are completely valid. I suspect with nested objects it should be possible -- one sub-object would seem to have the other sub-object as both a child and a sibling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could at least limit the slice (in the common case) by using the "next" offset only when it's not smaller than the starting offset. In that case, it should be a safe upper bound. But I'm not sure how helpful it would actually be, given that no invariant is reliably enforced?

Yeah, and I am not sure what limiting the value slice would achieve anyways -- all the code that interprets variant values looks at the value header to know how many bytes of the value to look at. So if the slice is longer than

Actually... a buggy/malicious variant buffer could potentially lead to some "fun" here, where one sub-object refers to bytes shared by another sub-object. Hopefully at least one of the "overlapping" sub-objects would be obviously invalid in a way we could detect, but there's no guarantee of that.

FWIW I don't think the spec prevents variant values from being reused (aka that the values of two sibling fields point to the same offset within the value.

The only requirement from what I can see is that the values pointed to by the value header are valid variants.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the slice is longer than

Incomplete sentence?

The only requirement from what I can see is that the values pointed to by the value header are valid variants.

It does seem that way, yes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the slice is longer than

Incomplete sentence?

Sorry -- what I meant was "if the slice is longer than needed, any remaining byte will be ignored"

slice_from_slice(self.value, self.header.values_start_byte + start_offset..)?;
let variant = Variant::try_new_with_metadata(self.metadata, value_bytes)?;

fields.push((field_name, variant));
Expand Down
145 changes: 104 additions & 41 deletions parquet-variant/tests/variant_interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
use std::fs;
use std::path::{Path, PathBuf};

use arrow_schema::ArrowError;
use chrono::NaiveDate;
use parquet_variant::{Variant, VariantMetadata};
use parquet_variant::Variant;

fn cases_dir() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
Expand All @@ -34,11 +33,24 @@ fn cases_dir() -> PathBuf {
.join("variant")
}

fn load_case(name: &str) -> Result<(Vec<u8>, Vec<u8>), ArrowError> {
let root = cases_dir();
let meta = fs::read(root.join(format!("{name}.metadata")))?;
let val = fs::read(root.join(format!("{name}.value")))?;
Ok((meta, val))
struct Case {
metadata: Vec<u8>,
value: Vec<u8>,
}

impl Case {
/// Load the case with the given name from the parquet testing repository.
fn load(name: &str) -> Self {
let root = cases_dir();
let metadata = fs::read(root.join(format!("{name}.metadata"))).unwrap();
let value = fs::read(root.join(format!("{name}.value"))).unwrap();
Self { metadata, value }
}

/// Return the Variant for this case.
fn variant(&self) -> Variant<'_, '_> {
Variant::try_new(&self.metadata, &self.value).expect("Failed to parse variant")
}
}

/// Return a list of the values from the parquet testing repository:
Expand Down Expand Up @@ -67,47 +79,98 @@ fn get_primitive_cases() -> Vec<(&'static str, Variant<'static, 'static>)> {
("short_string", Variant::ShortString("Less than 64 bytes (❤\u{fe0f} with utf8)")),
]
}

fn get_non_primitive_cases() -> Vec<&'static str> {
vec!["object_primitive", "array_primitive"]
}

#[test]
fn variant_primitive() -> Result<(), ArrowError> {
fn variant_primitive() {
let cases = get_primitive_cases();
for (case, want) in cases {
let (metadata, value) = load_case(case)?;
let got = Variant::try_new(&metadata, &value)?;
let case = Case::load(case);
let got = case.variant();
assert_eq!(got, want);
}
Ok(())
}

#[test]
fn variant_non_primitive() -> Result<(), ArrowError> {
let cases = get_non_primitive_cases();
for case in cases {
let (metadata, value) = load_case(case)?;
let variant_metadata = VariantMetadata::try_new(&metadata)?;
let variant = Variant::try_new(&metadata, &value)?;
match case {
"object_primitive" => {
assert!(matches!(variant, Variant::Object(_)));
assert_eq!(variant_metadata.dictionary_size(), 7);
let dict_val = variant_metadata.get_field_by(0)?;
assert_eq!(dict_val, "int_field");
}
"array_primitive" => match variant {
Variant::List(arr) => {
let v = arr.get(0)?;
assert!(matches!(v, Variant::Int8(2)));
let v = arr.get(1)?;
assert!(matches!(v, Variant::Int8(1)));
}
_ => panic!("expected an array"),
fn variant_object_empty() {
let case = Case::load("object_empty");
let Variant::Object(variant_object) = case.variant() else {
panic!("expected an object");
};
assert_eq!(variant_object.len(), 0);
assert!(variant_object.is_empty());
}
#[test]
fn variant_object_primitive() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR was to add these tests

// the data is defined in
// https://github.com/apache/parquet-testing/blob/84d525a8731cec345852fb4ea2e7c581fbf2ef29/variant/data_dictionary.json#L46-L53
//
// ```json
// " "object_primitive": {
// "boolean_false_field": false,
// "boolean_true_field": true,
// "double_field": 1.23456789,
// "int_field": 1,
// "null_field": null,
// "string_field": "Apache Parquet",
// "timestamp_field": "2025-04-16T12:34:56.78"
// },
// ```
let case = Case::load("object_primitive");
let Variant::Object(variant_object) = case.variant() else {
panic!("expected an object");
};
let expected_fields = vec![
("boolean_false_field", Variant::BooleanFalse),
("boolean_true_field", Variant::BooleanTrue),
// spark wrote this as a decimal4 (not a double)
(
"double_field",
Variant::Decimal4 {
integer: 123456789,
scale: 8,
},
_ => unreachable!(),
}
),
("int_field", Variant::Int8(1)),
("null_field", Variant::Null),
("string_field", Variant::ShortString("Apache Parquet")),
(
// apparently spark wrote this as a string (not a timestamp)
"timestamp_field",
Variant::ShortString("2025-04-16T12:34:56.78"),
),
];
let actual_fields: Vec<_> = variant_object.fields().unwrap().collect();
assert_eq!(actual_fields, expected_fields);
}
#[test]
fn variant_array_primitive() {
// The data is defined in
// https://github.com/apache/parquet-testing/blob/84d525a8731cec345852fb4ea2e7c581fbf2ef29/variant/data_dictionary.json#L24-L29
//
// ```json
// "array_primitive": [
// 2,
// 1,
// 5,
// 9
// ],
// ```
let case = Case::load("array_primitive");
let Variant::List(list) = case.variant() else {
panic!("expected an array");
};
let expected = vec![
Variant::Int8(2),
Variant::Int8(1),
Variant::Int8(5),
Variant::Int8(9),
];
let actual: Vec<_> = list.values().unwrap().collect();
assert_eq!(actual, expected);

// Call `get` for each individual element
for (i, expected_value) in expected.iter().enumerate() {
let got = list.get(i).unwrap();
assert_eq!(&got, expected_value);
}
Ok(())
}

// TODO: Add tests for object_nested and array_nested
Loading