Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ rust-version = "1.86.0"

[dependencies]
datafusion = { version = "50", default-features = false }
jiter = "0.10"
log = "0.4"
jiter = "0.11.0"
log = { version = "0.4" }
jsonpath-rust = "1.0.4"
paste = "1"

[dev-dependencies]
codspeed-criterion-compat = "2.6"
codspeed-criterion-compat = "3.0.5"
datafusion = { version = "50", default-features = false, features = [
"nested_expressions",
] }
tokio = { version = "1.43", features = ["full"] }
tokio = { version = "1.47.1", features = ["full"] }
rstest = "0.26.1"

[lints.clippy]
dbg_macro = "deny"
Expand Down
45 changes: 40 additions & 5 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::str::Utf8Error;
use std::sync::Arc;

use crate::common_union::{
is_json_union, json_from_union_scalar, nested_json_array, nested_json_array_ref, TYPE_ID_NULL,
};
use datafusion::arrow::array::{
downcast_array, AnyDictionaryArray, Array, ArrayAccessor, ArrayRef, AsArray, DictionaryArray, LargeStringArray,
PrimitiveArray, PrimitiveBuilder, RunArray, StringArray, StringViewArray,
Expand All @@ -11,10 +14,8 @@ use datafusion::arrow::datatypes::{ArrowNativeType, DataType, Int64Type, UInt64T
use datafusion::common::{exec_err, plan_err, Result as DataFusionResult, ScalarValue};
use datafusion::logical_expr::ColumnarValue;
use jiter::{Jiter, JiterError, Peek};

use crate::common_union::{
is_json_union, json_from_union_scalar, nested_json_array, nested_json_array_ref, TYPE_ID_NULL,
};
use jsonpath_rust::parser::model::{Segment, Selector};
use jsonpath_rust::parser::parse_json_path;

/// General implementation of `ScalarUDFImpl::return_type`.
///
Expand Down Expand Up @@ -68,7 +69,7 @@ fn dict_key_type(d: &DataType) -> Option<DataType> {
None
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum JsonPath<'s> {
Key(&'s str),
Index(usize),
Expand Down Expand Up @@ -141,6 +142,22 @@ impl<'s> JsonPathArgs<'s> {
}
}

pub(crate) fn parse_jsonpath(path: &str) -> Vec<JsonPath<'static>> {
let segments = parse_json_path(path).map(|it| it.segments).unwrap_or(Vec::new());

segments
.into_iter()
.map(|segment| match segment {
Segment::Selector(s) => match s {
Selector::Name(name) => JsonPath::Key(Box::leak(name.into_boxed_str())),
Selector::Index(idx) => JsonPath::Index(idx as usize),
_ => JsonPath::None,
},
_ => JsonPath::None,
})
.collect::<Vec<_>>()
}

pub trait InvokeResult {
type Item;
type Builder;
Expand Down Expand Up @@ -586,3 +603,21 @@ fn mask_dictionary_keys(keys: &PrimitiveArray<Int64Type>, type_ids: &[i8]) -> Pr
}
PrimitiveArray::new(keys.values().clone(), Some(null_mask.into()))
}

#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;

// Test cases for parse_jsonpath
#[rstest]
#[case("$.a.aa", vec![JsonPath::Key("a"), JsonPath::Key("aa")])]
#[case("$.a.ab[0].ac", vec![JsonPath::Key("a"), JsonPath::Key("ab"), JsonPath::Index(0), JsonPath::Key("ac")])]
#[case("$.a.ab[1].ad", vec![JsonPath::Key("a"), JsonPath::Key("ab"), JsonPath::Index(1), JsonPath::Key("ad")])]
#[case(r#"$.a["a b"].ad"#, vec![JsonPath::Key("a"), JsonPath::Key("\"a b\""), JsonPath::Key("ad")])]
#[tokio::test]
async fn test_parse_jsonpath(#[case] path: &str, #[case] expected: Vec<JsonPath<'static>>) {
let result = parse_jsonpath(path);
assert_eq!(result, expected);
}
}
2 changes: 1 addition & 1 deletion src/json_as_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl InvokeResult for StringArray {
}
}

fn jiter_json_as_text(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
pub(crate) fn jiter_json_as_text(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
match peek {
Peek::Null => {
Expand Down
84 changes: 84 additions & 0 deletions src/json_extract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::common::{invoke, parse_jsonpath, return_type_check};
use crate::common_macros::make_udf_function;
use datafusion::arrow::datatypes::{DataType, DataType::Utf8};
use datafusion::common::{exec_err, Result as DataFusionResult, ScalarValue};
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use crate::common_union::JsonUnion;
use crate::json_get::jiter_json_get_union;

make_udf_function!(
JsonExtract,
json_extract,
json_data path,
r#"Get a value from a JSON string by its "path" in JSONPath format"#
);

#[derive(Debug, PartialEq, Eq, Hash)]
pub(super) struct JsonExtract {
signature: Signature,
aliases: [String; 1],
}

impl Default for JsonExtract {
fn default() -> Self {
Self {
signature: Signature::exact(
vec![Utf8, Utf8], // JSON data and JSONPath as strings
Volatility::Immutable,
),
aliases: ["json_extract".to_string()],
}
}
}

impl ScalarUDFImpl for JsonExtract {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
self.aliases[0].as_str()
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
return_type_check(arg_types, self.name(), JsonUnion::data_type())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
if args.args.len() != 2 {
return exec_err!(
"'{}' expects exactly 2 arguments (JSON data, path), got {}",
self.name(),
args.args.len()
);
}

let json_arg = &args.args[0];
let path_arg = &args.args[1];

let path_str = match path_arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s,
_ => {
return exec_err!(
"'{}' expects a valid JSONPath string (e.g., '$.key[0]') as second argument",
self.name()
)
}
};

let path = parse_jsonpath(path_str);

invoke::<JsonUnion>(&[json_arg.clone()], |json, _| {
jiter_json_get_union(json, &path)
})
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
83 changes: 83 additions & 0 deletions src/json_extract_scalar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::any::Any;

use datafusion::arrow::datatypes::DataType;
use datafusion::common::{exec_err, Result as DataFusionResult};
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
use datafusion::scalar::ScalarValue;

use crate::common::parse_jsonpath;
use crate::common::{invoke, return_type_check};
use crate::common_macros::make_udf_function;
use crate::common_union::JsonUnion;
use crate::json_get::jiter_json_get_union;

make_udf_function!(
JsonExtractScalar,
json_extract_scalar,
json_data path,
r#"Get a value from a JSON string by its "path""#
);

#[derive(Debug, PartialEq, Eq, Hash)]
pub(super) struct JsonExtractScalar {
signature: Signature,
aliases: [String; 1],
}

impl Default for JsonExtractScalar {
fn default() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
aliases: ["json_extract_scalar".to_string()],
}
}
}

impl ScalarUDFImpl for JsonExtractScalar {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
self.aliases[0].as_str()
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
return_type_check(arg_types, self.name(), JsonUnion::data_type())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
if args.args.len() != 2 {
return exec_err!(
"'{}' expects exactly 2 arguments (JSON data, path), got {}",
self.name(),
args.args.len()
);
}

let json_arg = &args.args[0];
let path_arg = &args.args[1];

let path_str = match path_arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s,
_ => {
return exec_err!(
"'{}' expects a valid JSONPath string (e.g., '$.key[0]') as second argument",
self.name()
)
}
};

let path = parse_jsonpath(path_str);

invoke::<JsonUnion>(&[json_arg.clone()], |json, _| jiter_json_get_union(json, &path))
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
61 changes: 61 additions & 0 deletions src/json_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::any::Any;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::DataType::Utf8;
use datafusion::common::Result as DataFusionResult;
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};

use crate::common::{invoke, return_type_check};
use crate::common_macros::make_udf_function;
use crate::json_as_text::jiter_json_as_text;

make_udf_function!(
JsonFormat,
json_format,
json_data,
r#"Get any value from a JSON string by its "path", represented as a string"#
);

#[derive(Debug, PartialEq, Eq, Hash)]
pub(super) struct JsonFormat {
signature: Signature,
aliases: [String; 1],
}

impl Default for JsonFormat {
fn default() -> Self {
Self {
signature: Signature::exact(vec![Utf8], Volatility::Immutable),
aliases: ["json_format".to_string()],
}
}
}

impl ScalarUDFImpl for JsonFormat {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
self.aliases[0].as_str()
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
return_type_check(arg_types, self.name(), Utf8)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
let root = vec![];
invoke::<StringArray>(&args.args, |json, _| jiter_json_as_text(json, &root))
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

2 changes: 1 addition & 1 deletion src/json_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl InvokeResult for JsonUnion {
}
}

fn jiter_json_get_union(opt_json: Option<&str>, path: &[JsonPath]) -> Result<JsonUnionField, GetError> {
pub(crate) fn jiter_json_get_union(opt_json: Option<&str>, path: &[JsonPath]) -> Result<JsonUnionField, GetError> {
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
build_union(&mut jiter, peek)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/json_get_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl ScalarUDFImpl for JsonGetJson {
}
}

fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
pub(crate) fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
let start = jiter.current_index();
jiter.known_skip(peek)?;
Expand Down
Loading
Loading