Skip to content

Commit 62cdc8d

Browse files
Merge branch 'main' into fix-scalarvalue-null-partial-cmp
2 parents 552a1e9 + 9a9ff8d commit 62cdc8d

File tree

10 files changed

+404
-33
lines changed

10 files changed

+404
-33
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@ff581034fb69296c525e51afd68cb9823bfbe4ed # v2.65.8
45+
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ jobs:
421421
sudo apt-get update -qq
422422
sudo apt-get install -y -qq clang
423423
- name: Setup wasm-pack
424-
uses: taiki-e/install-action@ff581034fb69296c525e51afd68cb9823bfbe4ed # v2.65.8
424+
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
425425
with:
426426
tool: wasm-pack
427427
- name: Run tests with headless mode
@@ -741,7 +741,7 @@ jobs:
741741
- name: Setup Rust toolchain
742742
uses: ./.github/actions/setup-builder
743743
- name: Install cargo-msrv
744-
uses: taiki-e/install-action@ff581034fb69296c525e51afd68cb9823bfbe4ed # v2.65.8
744+
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
745745
with:
746746
tool: cargo-msrv
747747

datafusion/functions/src/unicode/strpos.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,37 @@ where
215215
)
216216
}
217217
} else {
218-
// The `find` method returns the byte index of the substring.
219-
// We count the number of chars up to that byte index.
220-
T::Native::from_usize(
221-
string
222-
.find(substring)
223-
.map(|x| string[..x].chars().count() + 1)
224-
.unwrap_or(0),
225-
)
218+
// For non-ASCII, use a single-pass search that tracks both
219+
// byte position and character position simultaneously
220+
if substring.is_empty() {
221+
return T::Native::from_usize(1);
222+
}
223+
224+
let substring_bytes = substring.as_bytes();
225+
let string_bytes = string.as_bytes();
226+
227+
if substring_bytes.len() > string_bytes.len() {
228+
return T::Native::from_usize(0);
229+
}
230+
231+
// Single pass: find substring while counting characters
232+
let mut char_pos = 0;
233+
for (byte_idx, _) in string.char_indices() {
234+
char_pos += 1;
235+
if byte_idx + substring_bytes.len() <= string_bytes.len() {
236+
// SAFETY: We just checked that byte_idx + substring_bytes.len() <= string_bytes.len()
237+
let slice = unsafe {
238+
string_bytes.get_unchecked(
239+
byte_idx..byte_idx + substring_bytes.len(),
240+
)
241+
};
242+
if slice == substring_bytes {
243+
return T::Native::from_usize(char_pos);
244+
}
245+
}
246+
}
247+
248+
T::Native::from_usize(0)
226249
}
227250
}
228251
_ => None,
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
20+
use arrow::array::ArrayRef;
21+
use arrow::compute::{DatePart, date_part};
22+
use arrow::datatypes::DataType;
23+
use datafusion_common::Result;
24+
use datafusion_common::utils::take_function_args;
25+
use datafusion_expr::{
26+
Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
27+
TypeSignatureClass, Volatility,
28+
};
29+
use datafusion_functions::utils::make_scalar_function;
30+
31+
/// Creates a signature for datetime extraction functions that accept timestamp types.
32+
fn extract_signature() -> Signature {
33+
Signature::coercible(
34+
vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
35+
Volatility::Immutable,
36+
)
37+
}
38+
39+
// -----------------------------------------------------------------------------
40+
// SparkHour
41+
// -----------------------------------------------------------------------------
42+
43+
#[derive(Debug, PartialEq, Eq, Hash)]
44+
pub struct SparkHour {
45+
signature: Signature,
46+
}
47+
48+
impl Default for SparkHour {
49+
fn default() -> Self {
50+
Self::new()
51+
}
52+
}
53+
54+
impl SparkHour {
55+
pub fn new() -> Self {
56+
Self {
57+
signature: extract_signature(),
58+
}
59+
}
60+
}
61+
62+
impl ScalarUDFImpl for SparkHour {
63+
fn as_any(&self) -> &dyn Any {
64+
self
65+
}
66+
67+
fn name(&self) -> &str {
68+
"hour"
69+
}
70+
71+
fn signature(&self) -> &Signature {
72+
&self.signature
73+
}
74+
75+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
76+
Ok(DataType::Int32)
77+
}
78+
79+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80+
make_scalar_function(spark_hour, vec![])(&args.args)
81+
}
82+
}
83+
84+
fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
85+
let [ts_arg] = take_function_args("hour", args)?;
86+
let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
87+
Ok(result)
88+
}
89+
90+
// -----------------------------------------------------------------------------
91+
// SparkMinute
92+
// -----------------------------------------------------------------------------
93+
94+
#[derive(Debug, PartialEq, Eq, Hash)]
95+
pub struct SparkMinute {
96+
signature: Signature,
97+
}
98+
99+
impl Default for SparkMinute {
100+
fn default() -> Self {
101+
Self::new()
102+
}
103+
}
104+
105+
impl SparkMinute {
106+
pub fn new() -> Self {
107+
Self {
108+
signature: extract_signature(),
109+
}
110+
}
111+
}
112+
113+
impl ScalarUDFImpl for SparkMinute {
114+
fn as_any(&self) -> &dyn Any {
115+
self
116+
}
117+
118+
fn name(&self) -> &str {
119+
"minute"
120+
}
121+
122+
fn signature(&self) -> &Signature {
123+
&self.signature
124+
}
125+
126+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
127+
Ok(DataType::Int32)
128+
}
129+
130+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
131+
make_scalar_function(spark_minute, vec![])(&args.args)
132+
}
133+
}
134+
135+
fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
136+
let [ts_arg] = take_function_args("minute", args)?;
137+
let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
138+
Ok(result)
139+
}
140+
141+
// -----------------------------------------------------------------------------
142+
// SparkSecond
143+
// -----------------------------------------------------------------------------
144+
145+
#[derive(Debug, PartialEq, Eq, Hash)]
146+
pub struct SparkSecond {
147+
signature: Signature,
148+
}
149+
150+
impl Default for SparkSecond {
151+
fn default() -> Self {
152+
Self::new()
153+
}
154+
}
155+
156+
impl SparkSecond {
157+
pub fn new() -> Self {
158+
Self {
159+
signature: extract_signature(),
160+
}
161+
}
162+
}
163+
164+
impl ScalarUDFImpl for SparkSecond {
165+
fn as_any(&self) -> &dyn Any {
166+
self
167+
}
168+
169+
fn name(&self) -> &str {
170+
"second"
171+
}
172+
173+
fn signature(&self) -> &Signature {
174+
&self.signature
175+
}
176+
177+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
178+
Ok(DataType::Int32)
179+
}
180+
181+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182+
make_scalar_function(spark_second, vec![])(&args.args)
183+
}
184+
}
185+
186+
fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
187+
let [ts_arg] = take_function_args("second", args)?;
188+
let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
189+
Ok(result)
190+
}
191+
192+
#[cfg(test)]
193+
mod tests {
194+
use super::*;
195+
use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
196+
use arrow::datatypes::TimeUnit;
197+
use std::sync::Arc;
198+
199+
#[test]
200+
fn test_spark_hour() {
201+
// Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds)
202+
// 14:30:45 -> hour = 14
203+
let ts_micros = 1_705_329_045_000_000_i64; // 2024-01-15 14:30:45 UTC
204+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205+
let ts_array = Arc::new(ts_array) as ArrayRef;
206+
207+
let result = spark_hour(&[ts_array]).unwrap();
208+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209+
210+
assert_eq!(result.value(0), 14);
211+
assert!(result.is_null(1));
212+
}
213+
214+
#[test]
215+
fn test_spark_minute() {
216+
// 14:30:45 -> minute = 30
217+
let ts_micros = 1_705_329_045_000_000_i64;
218+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219+
let ts_array = Arc::new(ts_array) as ArrayRef;
220+
221+
let result = spark_minute(&[ts_array]).unwrap();
222+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223+
224+
assert_eq!(result.value(0), 30);
225+
assert!(result.is_null(1));
226+
}
227+
228+
#[test]
229+
fn test_spark_second() {
230+
// 14:30:45 -> second = 45
231+
let ts_micros = 1_705_329_045_000_000_i64;
232+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
233+
let ts_array = Arc::new(ts_array) as ArrayRef;
234+
235+
let result = spark_second(&[ts_array]).unwrap();
236+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
237+
238+
assert_eq!(result.value(0), 45);
239+
assert!(result.is_null(1));
240+
}
241+
242+
#[test]
243+
fn test_hour_return_type() {
244+
let func = SparkHour::new();
245+
let result = func
246+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
247+
.unwrap();
248+
assert_eq!(result, DataType::Int32);
249+
}
250+
251+
#[test]
252+
fn test_minute_return_type() {
253+
let func = SparkMinute::new();
254+
let result = func
255+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
256+
.unwrap();
257+
assert_eq!(result, DataType::Int32);
258+
}
259+
260+
#[test]
261+
fn test_second_return_type() {
262+
let func = SparkSecond::new();
263+
let result = func
264+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
265+
.unwrap();
266+
assert_eq!(result, DataType::Int32);
267+
}
268+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod date_add;
1919
pub mod date_sub;
20+
pub mod extract;
2021
pub mod last_day;
2122
pub mod make_dt_interval;
2223
pub mod make_interval;
@@ -28,6 +29,9 @@ use std::sync::Arc;
2829

2930
make_udf_function!(date_add::SparkDateAdd, date_add);
3031
make_udf_function!(date_sub::SparkDateSub, date_sub);
32+
make_udf_function!(extract::SparkHour, hour);
33+
make_udf_function!(extract::SparkMinute, minute);
34+
make_udf_function!(extract::SparkSecond, second);
3135
make_udf_function!(last_day::SparkLastDay, last_day);
3236
make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval);
3337
make_udf_function!(make_interval::SparkMakeInterval, make_interval);
@@ -46,6 +50,17 @@ pub mod expr_fn {
4650
"Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.",
4751
arg1 arg2
4852
));
53+
export_functions!((hour, "Extracts the hour component of a timestamp.", arg1));
54+
export_functions!((
55+
minute,
56+
"Extracts the minute component of a timestamp.",
57+
arg1
58+
));
59+
export_functions!((
60+
second,
61+
"Extracts the second component of a timestamp.",
62+
arg1
63+
));
4964
export_functions!((
5065
last_day,
5166
"Returns the last day of the month which the date belongs to.",
@@ -74,6 +89,9 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
7489
vec![
7590
date_add(),
7691
date_sub(),
92+
hour(),
93+
minute(),
94+
second(),
7795
last_day(),
7896
make_dt_interval(),
7997
make_interval(),

0 commit comments

Comments
 (0)