Skip to content

Commit 6043be4

Browse files
authored
feat: spark crc32/sha1 (#17032)
1 parent 3472aa1 commit 6043be4

File tree

8 files changed

+455
-8
lines changed

8 files changed

+455
-8
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/spark/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@ name = "datafusion_spark"
3838
[dependencies]
3939
arrow = { workspace = true }
4040
chrono = { workspace = true }
41+
crc32fast = "1.4"
4142
datafusion-catalog = { workspace = true }
4243
datafusion-common = { workspace = true }
4344
datafusion-execution = { workspace = true }
4445
datafusion-expr = { workspace = true }
4546
datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
4647
datafusion-macros = { workspace = true }
4748
log = { workspace = true }
49+
sha1 = "0.10"
50+
xxhash-rust = { version = "0.8", features = ["xxh3"] }
4851

4952
[dev-dependencies]
5053
criterion = { workspace = true }
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, Int64Array};
22+
use arrow::datatypes::DataType;
23+
use crc32fast::Hasher;
24+
use datafusion_common::cast::{
25+
as_binary_array, as_binary_view_array, as_large_binary_array,
26+
};
27+
use datafusion_common::{exec_err, internal_err, Result};
28+
use datafusion_expr::{
29+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30+
};
31+
use datafusion_functions::utils::make_scalar_function;
32+
33+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#crc32>
34+
#[derive(Debug)]
35+
pub struct SparkCrc32 {
36+
signature: Signature,
37+
}
38+
39+
impl Default for SparkCrc32 {
40+
fn default() -> Self {
41+
Self::new()
42+
}
43+
}
44+
45+
impl SparkCrc32 {
46+
pub fn new() -> Self {
47+
Self {
48+
signature: Signature::user_defined(Volatility::Immutable),
49+
}
50+
}
51+
}
52+
53+
impl ScalarUDFImpl for SparkCrc32 {
54+
fn as_any(&self) -> &dyn Any {
55+
self
56+
}
57+
58+
fn name(&self) -> &str {
59+
"crc32"
60+
}
61+
62+
fn signature(&self) -> &Signature {
63+
&self.signature
64+
}
65+
66+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
67+
Ok(DataType::Int64)
68+
}
69+
70+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
71+
make_scalar_function(spark_crc32, vec![])(&args.args)
72+
}
73+
74+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
75+
if arg_types.len() != 1 {
76+
return exec_err!(
77+
"`crc32` function requires 1 argument, got {}",
78+
arg_types.len()
79+
);
80+
}
81+
match arg_types[0] {
82+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
83+
Ok(vec![arg_types[0].clone()])
84+
}
85+
DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
86+
DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
87+
DataType::Null => Ok(vec![DataType::Binary]),
88+
_ => exec_err!("`crc32` function does not support type {}", arg_types[0]),
89+
}
90+
}
91+
}
92+
93+
fn spark_crc32_digest(value: &[u8]) -> i64 {
94+
let mut hasher = Hasher::new();
95+
hasher.update(value);
96+
hasher.finalize() as i64
97+
}
98+
99+
fn spark_crc32_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
100+
let result = input
101+
.map(|value| value.map(spark_crc32_digest))
102+
.collect::<Int64Array>();
103+
Arc::new(result)
104+
}
105+
106+
fn spark_crc32(args: &[ArrayRef]) -> Result<ArrayRef> {
107+
let [input] = args else {
108+
return internal_err!(
109+
"Spark `crc32` function requires 1 argument, got {}",
110+
args.len()
111+
);
112+
};
113+
114+
match input.data_type() {
115+
DataType::Binary => {
116+
let input = as_binary_array(input)?;
117+
Ok(spark_crc32_impl(input.iter()))
118+
}
119+
DataType::LargeBinary => {
120+
let input = as_large_binary_array(input)?;
121+
Ok(spark_crc32_impl(input.iter()))
122+
}
123+
DataType::BinaryView => {
124+
let input = as_binary_view_array(input)?;
125+
Ok(spark_crc32_impl(input.iter()))
126+
}
127+
_ => {
128+
exec_err!(
129+
"Spark `crc32` function: argument must be binary or large binary, got {:?}",
130+
input.data_type()
131+
)
132+
}
133+
}
134+
}

datafusion/spark/src/function/hash/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,27 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod crc32;
19+
pub mod sha1;
1820
pub mod sha2;
1921

2022
use datafusion_expr::ScalarUDF;
2123
use datafusion_functions::make_udf_function;
2224
use std::sync::Arc;
2325

26+
make_udf_function!(crc32::SparkCrc32, crc32);
27+
make_udf_function!(sha1::SparkSha1, sha1);
2428
make_udf_function!(sha2::SparkSha2, sha2);
2529

2630
pub mod expr_fn {
2731
use datafusion_functions::export_functions;
28-
export_functions!((sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2));
32+
export_functions!(
33+
(crc32, "crc32(expr) - Returns a cyclic redundancy check value of the expr as a bigint.", arg1),
34+
(sha1, "sha1(expr) - Returns a SHA-1 hash value of the expr as a hex string.", arg1),
35+
(sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2)
36+
);
2937
}
3038

3139
pub fn functions() -> Vec<Arc<ScalarUDF>> {
32-
vec![sha2()]
40+
vec![crc32(), sha1(), sha2()]
3341
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::fmt::Write;
20+
use std::sync::Arc;
21+
22+
use arrow::array::{ArrayRef, StringArray};
23+
use arrow::datatypes::DataType;
24+
use datafusion_common::cast::{
25+
as_binary_array, as_binary_view_array, as_large_binary_array,
26+
};
27+
use datafusion_common::{exec_err, internal_err, Result};
28+
use datafusion_expr::{
29+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30+
};
31+
use datafusion_functions::utils::make_scalar_function;
32+
use sha1::{Digest, Sha1};
33+
34+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#sha1>
35+
#[derive(Debug)]
36+
pub struct SparkSha1 {
37+
signature: Signature,
38+
aliases: Vec<String>,
39+
}
40+
41+
impl Default for SparkSha1 {
42+
fn default() -> Self {
43+
Self::new()
44+
}
45+
}
46+
47+
impl SparkSha1 {
48+
pub fn new() -> Self {
49+
Self {
50+
signature: Signature::user_defined(Volatility::Immutable),
51+
aliases: vec!["sha".to_string()],
52+
}
53+
}
54+
}
55+
56+
impl ScalarUDFImpl for SparkSha1 {
57+
fn as_any(&self) -> &dyn Any {
58+
self
59+
}
60+
61+
fn name(&self) -> &str {
62+
"sha1"
63+
}
64+
65+
fn aliases(&self) -> &[String] {
66+
&self.aliases
67+
}
68+
69+
fn signature(&self) -> &Signature {
70+
&self.signature
71+
}
72+
73+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74+
Ok(DataType::Utf8)
75+
}
76+
77+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78+
make_scalar_function(spark_sha1, vec![])(&args.args)
79+
}
80+
81+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
82+
if arg_types.len() != 1 {
83+
return exec_err!(
84+
"`sha1` function requires 1 argument, got {}",
85+
arg_types.len()
86+
);
87+
}
88+
match arg_types[0] {
89+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
90+
Ok(vec![arg_types[0].clone()])
91+
}
92+
DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
93+
DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
94+
DataType::Null => Ok(vec![DataType::Binary]),
95+
_ => exec_err!("`sha1` function does not support type {}", arg_types[0]),
96+
}
97+
}
98+
}
99+
100+
fn spark_sha1_digest(value: &[u8]) -> String {
101+
let result = Sha1::digest(value);
102+
let mut s = String::with_capacity(result.len() * 2);
103+
for b in result.as_slice() {
104+
#[allow(clippy::unwrap_used)]
105+
write!(&mut s, "{b:02x}").unwrap();
106+
}
107+
s
108+
}
109+
110+
fn spark_sha1_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
111+
let result = input
112+
.map(|value| value.map(spark_sha1_digest))
113+
.collect::<StringArray>();
114+
Arc::new(result)
115+
}
116+
117+
fn spark_sha1(args: &[ArrayRef]) -> Result<ArrayRef> {
118+
let [input] = args else {
119+
return internal_err!(
120+
"Spark `sha1` function requires 1 argument, got {}",
121+
args.len()
122+
);
123+
};
124+
125+
match input.data_type() {
126+
DataType::Binary => {
127+
let input = as_binary_array(input)?;
128+
Ok(spark_sha1_impl(input.iter()))
129+
}
130+
DataType::LargeBinary => {
131+
let input = as_large_binary_array(input)?;
132+
Ok(spark_sha1_impl(input.iter()))
133+
}
134+
DataType::BinaryView => {
135+
let input = as_binary_view_array(input)?;
136+
Ok(spark_sha1_impl(input.iter()))
137+
}
138+
_ => {
139+
exec_err!(
140+
"Spark `sha1` function: argument must be binary or large binary, got {:?}",
141+
input.data_type()
142+
)
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)