Skip to content

Commit 3087ca8

Browse files
authored
perf: optimize NthValue when ignore_nulls is true (#19496)
## Rationale for this change The `PartitionEvaluator` implementation for `NthValue` in DataFusion has a few shortcomings: * When nulls are ignored (meaning the count should skip over them), the evaluation collects an array of all valid indices, to select at most one index accordingly to the `First`/`Last`/`Nth` case. * The `memoize` implementation gives up in the same condition, even after performing part of the logic! ## What changes are included in this PR? Use only as much iteration over the valid indices as needed for the function case, without collecting all indices. The `memoize` implementation does the right thing for `FirstValue` with `ignore_nulls` set to true, or returns early for other function cases. ## Are these changes tested? All existing tests pass for `FirstValue`/`LastValue`/`NthValue`. ## Are there any user-facing changes? No.
1 parent 821d410 commit 3087ca8

File tree

4 files changed

+372
-82
lines changed

4 files changed

+372
-82
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/functions-window/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,11 @@ datafusion-physical-expr = { workspace = true }
5151
datafusion-physical-expr-common = { workspace = true }
5252
log = { workspace = true }
5353
paste = { workspace = true }
54+
55+
[dev-dependencies]
56+
arrow = { workspace = true, features = ["test_utils"] }
57+
criterion = { workspace = true }
58+
59+
[[bench]]
60+
name = "nth_value"
61+
harness = false
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::ops::Range;
20+
use std::slice;
21+
use std::sync::Arc;
22+
23+
use arrow::array::ArrayRef;
24+
use arrow::datatypes::{DataType, Field, FieldRef, Int64Type};
25+
use arrow::util::bench_util::create_primitive_array;
26+
27+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
28+
use datafusion_common::ScalarValue;
29+
use datafusion_expr::{PartitionEvaluator, WindowUDFImpl};
30+
use datafusion_functions_window::nth_value::{NthValue, NthValueKind};
31+
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
32+
use datafusion_physical_expr::expressions::{Column, Literal};
33+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34+
35+
const ARRAY_SIZE: usize = 8192;
36+
37+
/// Creates a partition evaluator for FIRST_VALUE, LAST_VALUE, or NTH_VALUE
38+
fn create_evaluator(
39+
kind: NthValueKind,
40+
ignore_nulls: bool,
41+
n: Option<i64>,
42+
) -> Box<dyn PartitionEvaluator> {
43+
let expr = Arc::new(Column::new("c", 0)) as Arc<dyn PhysicalExpr>;
44+
let input_field: FieldRef = Field::new("c", DataType::Int64, true).into();
45+
let input_fields = vec![input_field];
46+
47+
let (nth_value, exprs): (NthValue, Vec<Arc<dyn PhysicalExpr>>) = match kind {
48+
NthValueKind::First => (NthValue::first(), vec![expr]),
49+
NthValueKind::Last => (NthValue::last(), vec![expr]),
50+
NthValueKind::Nth => {
51+
let n_value =
52+
Arc::new(Literal::new(ScalarValue::Int64(n))) as Arc<dyn PhysicalExpr>;
53+
(NthValue::nth(), vec![expr, n_value])
54+
}
55+
};
56+
57+
let args = PartitionEvaluatorArgs::new(&exprs, &input_fields, false, ignore_nulls);
58+
nth_value.partition_evaluator(args).unwrap()
59+
}
60+
61+
fn bench_nth_value_ignore_nulls(c: &mut Criterion) {
62+
let mut group = c.benchmark_group("nth_value_ignore_nulls");
63+
64+
// Test different null densities
65+
let null_densities = [0.0, 0.3, 0.5, 0.8];
66+
67+
for null_density in null_densities {
68+
let values = Arc::new(create_primitive_array::<Int64Type>(
69+
ARRAY_SIZE,
70+
null_density,
71+
)) as ArrayRef;
72+
let null_pct = (null_density * 100.0) as u32;
73+
74+
// FIRST_VALUE with ignore_nulls - expanding window
75+
group.bench_function(
76+
BenchmarkId::new("first_value_expanding", format!("{null_pct}%_nulls")),
77+
|b| {
78+
b.iter(|| {
79+
let mut evaluator = create_evaluator(NthValueKind::First, true, None);
80+
let values_slice = slice::from_ref(&values);
81+
for i in 0..values.len() {
82+
let range = Range {
83+
start: 0,
84+
end: i + 1,
85+
};
86+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
87+
}
88+
})
89+
},
90+
);
91+
92+
// LAST_VALUE with ignore_nulls - expanding window
93+
group.bench_function(
94+
BenchmarkId::new("last_value_expanding", format!("{null_pct}%_nulls")),
95+
|b| {
96+
b.iter(|| {
97+
let mut evaluator = create_evaluator(NthValueKind::Last, true, None);
98+
let values_slice = slice::from_ref(&values);
99+
for i in 0..values.len() {
100+
let range = Range {
101+
start: 0,
102+
end: i + 1,
103+
};
104+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
105+
}
106+
})
107+
},
108+
);
109+
110+
// NTH_VALUE(col, 10) with ignore_nulls - get 10th non-null value
111+
group.bench_function(
112+
BenchmarkId::new("nth_value_10_expanding", format!("{null_pct}%_nulls")),
113+
|b| {
114+
b.iter(|| {
115+
let mut evaluator =
116+
create_evaluator(NthValueKind::Nth, true, Some(10));
117+
let values_slice = slice::from_ref(&values);
118+
for i in 0..values.len() {
119+
let range = Range {
120+
start: 0,
121+
end: i + 1,
122+
};
123+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
124+
}
125+
})
126+
},
127+
);
128+
129+
// NTH_VALUE(col, -10) with ignore_nulls - get 10th from last non-null value
130+
group.bench_function(
131+
BenchmarkId::new("nth_value_neg10_expanding", format!("{null_pct}%_nulls")),
132+
|b| {
133+
b.iter(|| {
134+
let mut evaluator =
135+
create_evaluator(NthValueKind::Nth, true, Some(-10));
136+
let values_slice = slice::from_ref(&values);
137+
for i in 0..values.len() {
138+
let range = Range {
139+
start: 0,
140+
end: i + 1,
141+
};
142+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
143+
}
144+
})
145+
},
146+
);
147+
148+
// Sliding window benchmarks with 100-row window
149+
let window_size: usize = 100;
150+
151+
group.bench_function(
152+
BenchmarkId::new("first_value_sliding_100", format!("{null_pct}%_nulls")),
153+
|b| {
154+
b.iter(|| {
155+
let mut evaluator = create_evaluator(NthValueKind::First, true, None);
156+
let values_slice = slice::from_ref(&values);
157+
for i in 0..values.len() {
158+
let start = i.saturating_sub(window_size - 1);
159+
let range = Range { start, end: i + 1 };
160+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
161+
}
162+
})
163+
},
164+
);
165+
166+
group.bench_function(
167+
BenchmarkId::new("last_value_sliding_100", format!("{null_pct}%_nulls")),
168+
|b| {
169+
b.iter(|| {
170+
let mut evaluator = create_evaluator(NthValueKind::Last, true, None);
171+
let values_slice = slice::from_ref(&values);
172+
for i in 0..values.len() {
173+
let start = i.saturating_sub(window_size - 1);
174+
let range = Range { start, end: i + 1 };
175+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
176+
}
177+
})
178+
},
179+
);
180+
}
181+
182+
group.finish();
183+
184+
// Comparison benchmarks: ignore_nulls vs respect_nulls
185+
let mut comparison_group = c.benchmark_group("nth_value_nulls_comparison");
186+
let values_with_nulls =
187+
Arc::new(create_primitive_array::<Int64Type>(ARRAY_SIZE, 0.5)) as ArrayRef;
188+
189+
// FIRST_VALUE comparison
190+
comparison_group.bench_function(
191+
BenchmarkId::new("first_value", "ignore_nulls"),
192+
|b| {
193+
b.iter(|| {
194+
let mut evaluator = create_evaluator(NthValueKind::First, true, None);
195+
let values_slice = slice::from_ref(&values_with_nulls);
196+
for i in 0..values_with_nulls.len() {
197+
let range = Range {
198+
start: 0,
199+
end: i + 1,
200+
};
201+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
202+
}
203+
})
204+
},
205+
);
206+
207+
comparison_group.bench_function(
208+
BenchmarkId::new("first_value", "respect_nulls"),
209+
|b| {
210+
b.iter(|| {
211+
let mut evaluator = create_evaluator(NthValueKind::First, false, None);
212+
let values_slice = slice::from_ref(&values_with_nulls);
213+
for i in 0..values_with_nulls.len() {
214+
let range = Range {
215+
start: 0,
216+
end: i + 1,
217+
};
218+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
219+
}
220+
})
221+
},
222+
);
223+
224+
// NTH_VALUE comparison
225+
comparison_group.bench_function(
226+
BenchmarkId::new("nth_value_10", "ignore_nulls"),
227+
|b| {
228+
b.iter(|| {
229+
let mut evaluator = create_evaluator(NthValueKind::Nth, true, Some(10));
230+
let values_slice = slice::from_ref(&values_with_nulls);
231+
for i in 0..values_with_nulls.len() {
232+
let range = Range {
233+
start: 0,
234+
end: i + 1,
235+
};
236+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
237+
}
238+
})
239+
},
240+
);
241+
242+
comparison_group.bench_function(
243+
BenchmarkId::new("nth_value_10", "respect_nulls"),
244+
|b| {
245+
b.iter(|| {
246+
let mut evaluator = create_evaluator(NthValueKind::Nth, false, Some(10));
247+
let values_slice = slice::from_ref(&values_with_nulls);
248+
for i in 0..values_with_nulls.len() {
249+
let range = Range {
250+
start: 0,
251+
end: i + 1,
252+
};
253+
black_box(evaluator.evaluate(values_slice, &range).unwrap());
254+
}
255+
})
256+
},
257+
);
258+
259+
comparison_group.finish();
260+
}
261+
262+
criterion_group!(benches, bench_nth_value_ignore_nulls);
263+
criterion_main!(benches);

0 commit comments

Comments
 (0)