Skip to content

Commit 5813316

Browse files
committed
add variants of functions optimized for sorted keys
1 parent 1620453 commit 5813316

File tree

16 files changed

+580
-100
lines changed

16 files changed

+580
-100
lines changed

benches/main.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use codspeed_criterion_compat::{criterion_group, criterion_main, Bencher, Criter
22

33
use datafusion::common::ScalarValue;
44
use datafusion::logical_expr::ColumnarValue;
5-
use datafusion_functions_json::udfs::{json_contains_udf, json_get_str_udf};
5+
use datafusion_functions_json::udfs::{json_contains_udf, json_get_str_top_level_sorted_udf, json_get_str_udf};
66

77
fn bench_json_contains(b: &mut Bencher) {
88
let json_contains = json_contains_udf();
@@ -30,9 +30,54 @@ fn bench_json_get_str(b: &mut Bencher) {
3030
b.iter(|| json_get_str.invoke_batch(args, 1).unwrap());
3131
}
3232

33+
fn make_json_negative_testcase() -> String {
34+
// build a json with keys "b1", "b2" ... "b100", each with a large value ("a" repeated 1024 times)
35+
let kvs = (0..100)
36+
.map(|i| format!(r#""b{}": "{}""#, i, "a".repeat(1024)))
37+
.collect::<Vec<_>>()
38+
.join(",");
39+
format!(r#"{{ {} }}"#, kvs)
40+
}
41+
42+
fn bench_json_get_str_negative(b: &mut Bencher) {
43+
let json_get_str = json_get_str_udf();
44+
let args = &[
45+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(make_json_negative_testcase()))),
46+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))), // lexicographically less than "b1"
47+
];
48+
49+
b.iter(|| json_get_str.invoke_batch(args, 1).unwrap());
50+
}
51+
52+
fn bench_json_get_str_sorted(b: &mut Bencher) {
53+
let json_get_str = json_get_str_top_level_sorted_udf();
54+
let args = &[
55+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(
56+
r#"{"a": {"aa": "x", "ab: "y"}, "b": []}"#.to_string(),
57+
))),
58+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))),
59+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("aa".to_string()))),
60+
];
61+
62+
b.iter(|| json_get_str.invoke_batch(args, 1).unwrap());
63+
}
64+
65+
fn bench_json_get_str_sorted_negative(b: &mut Bencher) {
66+
let json_get_str = json_get_str_top_level_sorted_udf();
67+
let args = &[
68+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(make_json_negative_testcase()))),
69+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))), // lexicographically less than "b1"
70+
];
71+
72+
b.iter(|| json_get_str.invoke_batch(args, 1).unwrap());
73+
}
74+
3375
fn criterion_benchmark(c: &mut Criterion) {
3476
c.bench_function("json_contains", bench_json_contains);
3577
c.bench_function("json_get_str", bench_json_get_str);
78+
c.bench_function("json_get_str_negative", bench_json_get_str_negative);
79+
c.bench_function("json_get_str_sorted", bench_json_get_str_sorted);
80+
c.bench_function("json_get_str_sorted_negative", bench_json_get_str_sorted_negative);
3681
}
3782

3883
criterion_group!(benches, criterion_benchmark);

src/common.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,11 @@ fn wrap_as_large_dictionary(original: &dyn AnyDictionaryArray, new_values: Array
511511
DictionaryArray::new(keys, new_values)
512512
}
513513

514-
pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
514+
pub fn jiter_json_find<'j>(
515+
opt_json: Option<&'j str>,
516+
path: &[JsonPath],
517+
mut sorted: Sortedness,
518+
) -> Option<(Jiter<'j>, Peek)> {
515519
let json_str = opt_json?;
516520
let mut jiter = Jiter::new(json_str.as_bytes());
517521
let mut peek = jiter.peek().ok()?;
@@ -521,6 +525,11 @@ pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Opti
521525
let mut next_key = jiter.known_object().ok()??;
522526

523527
while next_key != *key {
528+
if next_key > *key && matches!(sorted, Sortedness::Recursive | Sortedness::TopLevel) {
529+
// The current object is sorted and next_key is lexicographically greater than key
530+
// we are looking for, so we can early stop here.
531+
return None;
532+
}
524533
jiter.next_skip().ok()?;
525534
next_key = jiter.next_key().ok()??;
526535
}
@@ -541,6 +550,9 @@ pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Opti
541550
return None;
542551
}
543552
}
553+
if sorted == Sortedness::TopLevel {
554+
sorted = Sortedness::Unspecified;
555+
}
544556
}
545557
Some((jiter, peek))
546558
}
@@ -585,3 +597,27 @@ fn mask_dictionary_keys(keys: &PrimitiveArray<Int64Type>, type_ids: &[i8]) -> Pr
585597
}
586598
PrimitiveArray::new(keys.values().clone(), Some(null_mask.into()))
587599
}
600+
601+
/// Information about the sortedness of a JSON object.
602+
/// This is used to optimize key lookups by early stopping when the key we are looking for is
603+
/// lexicographically greater than the current key and the object is known to be sorted.
604+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
605+
pub(crate) enum Sortedness {
606+
/// No guarantees about the order of the elements.
607+
Unspecified,
608+
/// Only the outermost object is known to be sorted.
609+
/// If the outermost item is not an object, this is equivalent to `Unspecified`.
610+
TopLevel,
611+
/// All objects are known to be sorted, including objects nested within arrays.
612+
Recursive,
613+
}
614+
615+
impl Sortedness {
616+
pub(crate) fn function_name_suffix(self) -> &'static str {
617+
match self {
618+
Sortedness::Unspecified => "",
619+
Sortedness::TopLevel => "_top_level_sorted",
620+
Sortedness::Recursive => "_recursive_sorted",
621+
}
622+
}
623+
}

src/common_macros.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
///
1616
/// [`ScalarUDFImpl`]: datafusion_expr::ScalarUDFImpl
1717
macro_rules! make_udf_function {
18-
($udf_impl:ty, $expr_fn_name:ident, $($arg:ident)*, $doc:expr) => {
18+
($udf_impl:ty, $expr_fn_name:ident, $($arg:ident)*, $doc:expr, $sorted:expr) => {
1919
paste::paste! {
2020
#[doc = $doc]
2121
#[must_use] pub fn $expr_fn_name($($arg: datafusion::logical_expr::Expr),*) -> datafusion::logical_expr::Expr {
@@ -37,7 +37,7 @@ macro_rules! make_udf_function {
3737
[< STATIC_ $expr_fn_name:upper >]
3838
.get_or_init(|| {
3939
std::sync::Arc::new(datafusion::logical_expr::ScalarUDF::new_from_impl(
40-
<$udf_impl>::default(),
40+
<$udf_impl>::new($sorted),
4141
))
4242
})
4343
.clone()

src/json_as_text.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,48 @@ use datafusion::common::{Result as DataFusionResult, ScalarValue};
77
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
88
use jiter::Peek;
99

10-
use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, InvokeResult, JsonPath};
10+
use crate::common::{
11+
get_err, invoke, jiter_json_find, return_type_check, GetError, InvokeResult, JsonPath, Sortedness,
12+
};
1113
use crate::common_macros::make_udf_function;
1214

1315
make_udf_function!(
1416
JsonAsText,
1517
json_as_text,
1618
json_data path,
17-
r#"Get any value from a JSON string by its "path", represented as a string"#
19+
r#"Get any value from a JSON string by its "path", represented as a string"#,
20+
Sortedness::Unspecified
21+
);
22+
23+
make_udf_function!(
24+
JsonAsText,
25+
json_as_text_top_level_sorted,
26+
json_data path,
27+
r#"Get any value from a JSON string by its "path", represented as a string; assumes the JSON string's top level object's keys are sorted."#,
28+
Sortedness::TopLevel
29+
);
30+
31+
make_udf_function!(
32+
JsonAsText,
33+
json_as_text_recursive_sorted,
34+
json_data path,
35+
r#"Get any value from a JSON string by its "path", represented as a string; assumes all json object's keys are sorted."#,
36+
Sortedness::Recursive
1837
);
1938

2039
#[derive(Debug)]
2140
pub(super) struct JsonAsText {
2241
signature: Signature,
2342
aliases: [String; 1],
43+
sorted: Sortedness,
2444
}
2545

26-
impl Default for JsonAsText {
27-
fn default() -> Self {
46+
impl JsonAsText {
47+
pub fn new(sorted: Sortedness) -> Self {
2848
Self {
2949
signature: Signature::variadic_any(Volatility::Immutable),
30-
aliases: ["json_as_text".to_string()],
50+
aliases: [format!("json_as_text{}", sorted.function_name_suffix())],
51+
sorted,
3152
}
3253
}
3354
}
@@ -50,7 +71,7 @@ impl ScalarUDFImpl for JsonAsText {
5071
}
5172

5273
fn invoke(&self, args: &[ColumnarValue]) -> DataFusionResult<ColumnarValue> {
53-
invoke::<StringArray>(args, jiter_json_as_text)
74+
invoke::<StringArray>(args, |json, path| jiter_json_as_text(json, path, self.sorted))
5475
}
5576

5677
fn aliases(&self) -> &[String] {
@@ -82,8 +103,8 @@ impl InvokeResult for StringArray {
82103
}
83104
}
84105

85-
fn jiter_json_as_text(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
86-
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
106+
fn jiter_json_as_text(opt_json: Option<&str>, path: &[JsonPath], sorted: Sortedness) -> Result<String, GetError> {
107+
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path, sorted) {
87108
match peek {
88109
Peek::Null => {
89110
jiter.known_null()?;

src/json_contains.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,39 @@ make_udf_function!(
1414
JsonContains,
1515
json_contains,
1616
json_data path,
17-
r#"Does the key/index exist within the JSON value as the specified "path"?"#
17+
r#"Does the key/index exist within the JSON value as the specified "path"?"#,
18+
crate::common::Sortedness::Unspecified
19+
);
20+
21+
make_udf_function!(
22+
JsonContains,
23+
json_contains_top_level_sorted,
24+
json_data path,
25+
r#"Does the key/index exist within the JSON value as the specified "path"; assumes the JSON string's top level object's keys are sorted?"#,
26+
crate::common::Sortedness::TopLevel
27+
);
28+
29+
make_udf_function!(
30+
JsonContains,
31+
json_contains_recursive_sorted,
32+
json_data path,
33+
r#"Does the key/index exist within the JSON value as the specified "path"; assumes all json object's keys are sorted?"#,
34+
crate::common::Sortedness::Recursive
1835
);
1936

2037
#[derive(Debug)]
2138
pub(super) struct JsonContains {
2239
signature: Signature,
2340
aliases: [String; 1],
41+
sorted: crate::common::Sortedness,
2442
}
2543

26-
impl Default for JsonContains {
27-
fn default() -> Self {
44+
impl JsonContains {
45+
pub fn new(sorted: crate::common::Sortedness) -> Self {
2846
Self {
2947
signature: Signature::variadic_any(Volatility::Immutable),
30-
aliases: ["json_contains".to_string()],
48+
aliases: [format!("json_contains{}", sorted.function_name_suffix())],
49+
sorted,
3150
}
3251
}
3352
}
@@ -54,7 +73,7 @@ impl ScalarUDFImpl for JsonContains {
5473
}
5574

5675
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
57-
invoke::<BooleanArray>(args, jiter_json_contains)
76+
invoke::<BooleanArray>(args, |json, path| jiter_json_contains(json, path, self.sorted))
5877
}
5978

6079
fn aliases(&self) -> &[String] {
@@ -88,6 +107,10 @@ impl InvokeResult for BooleanArray {
88107
}
89108

90109
#[allow(clippy::unnecessary_wraps)]
91-
fn jiter_json_contains(json_data: Option<&str>, path: &[JsonPath]) -> Result<bool, GetError> {
92-
Ok(jiter_json_find(json_data, path).is_some())
110+
fn jiter_json_contains(
111+
json_data: Option<&str>,
112+
path: &[JsonPath],
113+
sorted: crate::common::Sortedness,
114+
) -> Result<bool, GetError> {
115+
Ok(jiter_json_find(json_data, path, sorted).is_some())
93116
}

src/json_get.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use datafusion::scalar::ScalarValue;
1010
use jiter::{Jiter, NumberAny, NumberInt, Peek};
1111

1212
use crate::common::InvokeResult;
13+
use crate::common::Sortedness;
1314
use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
1415
use crate::common_macros::make_udf_function;
1516
use crate::common_union::{JsonUnion, JsonUnionField};
@@ -18,22 +19,39 @@ make_udf_function!(
1819
JsonGet,
1920
json_get,
2021
json_data path,
21-
r#"Get a value from a JSON string by its "path""#
22+
r#"Get a value from a JSON string by its "path""#,
23+
Sortedness::Unspecified
2224
);
2325

24-
// build_typed_get!(JsonGet, "json_get", Union, Float64Array, jiter_json_get_float);
26+
make_udf_function!(
27+
JsonGet,
28+
json_get_top_level_sorted,
29+
json_data path,
30+
r#"Get a value from a JSON string by its "path"; assumes the JSON string's top level object's keys are sorted."#,
31+
Sortedness::TopLevel
32+
);
33+
34+
make_udf_function!(
35+
JsonGet,
36+
json_get_recursive_sorted,
37+
json_data path,
38+
r#"Get a value from a JSON string by its "path"; assumes all object's keys are sorted."#,
39+
Sortedness::Recursive
40+
);
2541

2642
#[derive(Debug)]
2743
pub(super) struct JsonGet {
2844
signature: Signature,
2945
aliases: [String; 1],
46+
sorted: Sortedness,
3047
}
3148

32-
impl Default for JsonGet {
33-
fn default() -> Self {
49+
impl JsonGet {
50+
pub fn new(sorted: Sortedness) -> Self {
3451
Self {
3552
signature: Signature::variadic_any(Volatility::Immutable),
36-
aliases: ["json_get".to_string()],
53+
aliases: [format!("json_get{}", sorted.function_name_suffix())],
54+
sorted,
3755
}
3856
}
3957
}
@@ -56,7 +74,7 @@ impl ScalarUDFImpl for JsonGet {
5674
}
5775

5876
fn invoke(&self, args: &[ColumnarValue]) -> DataFusionResult<ColumnarValue> {
59-
invoke::<JsonUnion>(args, jiter_json_get_union)
77+
invoke::<JsonUnion>(args, |json, path| jiter_json_get_union(json, path, self.sorted))
6078
}
6179

6280
fn aliases(&self) -> &[String] {
@@ -93,8 +111,12 @@ impl InvokeResult for JsonUnion {
93111
}
94112
}
95113

96-
fn jiter_json_get_union(opt_json: Option<&str>, path: &[JsonPath]) -> Result<JsonUnionField, GetError> {
97-
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
114+
fn jiter_json_get_union(
115+
opt_json: Option<&str>,
116+
path: &[JsonPath],
117+
sorted: Sortedness,
118+
) -> Result<JsonUnionField, GetError> {
119+
if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path, sorted) {
98120
build_union(&mut jiter, peek)
99121
} else {
100122
get_err!()

0 commit comments

Comments
 (0)