Skip to content

Commit 4eb2933

Browse files
authored
Refactor crypto functions code (#18664)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> N/A ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Deduplicate & simplify code in the crypto functions. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Fold Sha224/Sha256/Sha384/Sha512 into a common struct. Cleanup signature & return types. Simplify code in `datafusion/functions/src/crypto/basic.rs` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing tests. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Some public methods were removed, though I don't believe they were intended to be used outside of other DataFusion crates. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 473cbda commit 4eb2933

File tree

11 files changed

+312
-609
lines changed

11 files changed

+312
-609
lines changed

datafusion/functions/src/crypto/basic.rs

Lines changed: 47 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
//! "crypto" DataFusion functions
1919
2020
use arrow::array::{
21-
Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray,
22-
OffsetSizeTrait,
21+
Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType, StringViewArray,
2322
};
24-
use arrow::array::{AsArray, GenericStringArray, StringViewArray};
2523
use arrow::datatypes::DataType;
2624
use blake2::{Blake2b512, Blake2s256, Digest};
2725
use blake3::Hasher as Blake3;
@@ -84,18 +82,7 @@ define_digest_function!(
8482
"computes blake3 hash digest of the given input"
8583
);
8684

87-
macro_rules! digest_to_scalar {
88-
($METHOD: ident, $INPUT:expr) => {{
89-
ScalarValue::Binary($INPUT.as_ref().map(|v| {
90-
let mut digest = $METHOD::default();
91-
digest.update(v);
92-
#[allow(deprecated)]
93-
digest.finalize().as_slice().to_vec()
94-
}))
95-
}};
96-
}
97-
98-
#[derive(Debug, Copy, Clone)]
85+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
9986
pub enum DigestAlgorithm {
10087
Md5,
10188
Sha224,
@@ -107,23 +94,6 @@ pub enum DigestAlgorithm {
10794
Blake3,
10895
}
10996

110-
/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
111-
/// Second argument is the algorithm to use.
112-
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
113-
pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
114-
let [data, digest_algorithm] = take_function_args("digest", args)?;
115-
let digest_algorithm = match digest_algorithm {
116-
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
117-
Some(Some(method)) => method.parse::<DigestAlgorithm>(),
118-
_ => exec_err!("Unsupported data type {scalar:?} for function digest"),
119-
},
120-
ColumnarValue::Array(_) => {
121-
internal_err!("Digest using dynamically decided method is not yet supported")
122-
}
123-
}?;
124-
digest_process(data, digest_algorithm)
125-
}
126-
12797
impl FromStr for DigestAlgorithm {
12898
type Err = DataFusionError;
12999
fn from_str(name: &str) -> Result<DigestAlgorithm> {
@@ -183,7 +153,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
183153
ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
184154
ColumnarValue::Scalar(ScalarValue::Utf8View(opt.map(hex_encode::<_>)))
185155
}
186-
_ => return exec_err!("Impossibly got invalid results from digest"),
156+
_ => return internal_err!("Impossibly got invalid results from digest"),
187157
})
188158
}
189159

@@ -198,25 +168,7 @@ fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
198168
}
199169
s
200170
}
201-
pub fn utf8_or_binary_to_binary_type(
202-
arg_type: &DataType,
203-
name: &str,
204-
) -> Result<DataType> {
205-
Ok(match arg_type {
206-
DataType::Utf8View
207-
| DataType::LargeUtf8
208-
| DataType::Utf8
209-
| DataType::Binary
210-
| DataType::BinaryView
211-
| DataType::LargeBinary => DataType::Binary,
212-
DataType::Null => DataType::Null,
213-
_ => {
214-
return plan_err!(
215-
"The {name:?} function can only accept strings or binary arrays."
216-
);
217-
}
218-
})
219-
}
171+
220172
macro_rules! digest_to_array {
221173
($METHOD:ident, $INPUT:expr) => {{
222174
let binary_array: BinaryArray = $INPUT
@@ -232,9 +184,20 @@ macro_rules! digest_to_array {
232184
Arc::new(binary_array)
233185
}};
234186
}
187+
188+
macro_rules! digest_to_scalar {
189+
($METHOD: ident, $INPUT:expr) => {{
190+
ScalarValue::Binary($INPUT.as_ref().map(|v| {
191+
let mut digest = $METHOD::default();
192+
digest.update(v);
193+
digest.finalize().as_slice().to_vec()
194+
}))
195+
}};
196+
}
197+
235198
impl DigestAlgorithm {
236199
/// digest an optional string to its hash value, null values are returned as is
237-
pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
200+
fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
238201
ColumnarValue::Scalar(match self {
239202
Self::Md5 => digest_to_scalar!(Md5, value),
240203
Self::Sha224 => digest_to_scalar!(Sha224, value),
@@ -251,49 +214,7 @@ impl DigestAlgorithm {
251214
})
252215
}
253216

254-
/// digest a binary array to their hash values
255-
pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
256-
where
257-
T: OffsetSizeTrait,
258-
{
259-
let array = match value.data_type() {
260-
DataType::Binary | DataType::LargeBinary => {
261-
let v = value.as_binary::<T>();
262-
self.digest_binary_array_impl::<&GenericBinaryArray<T>>(&v)
263-
}
264-
DataType::BinaryView => {
265-
let v = value.as_binary_view();
266-
self.digest_binary_array_impl::<&BinaryViewArray>(&v)
267-
}
268-
other => {
269-
return exec_err!("unsupported type for digest_utf_array: {other:?}")
270-
}
271-
};
272-
Ok(ColumnarValue::Array(array))
273-
}
274-
275-
/// digest a string array to their hash values
276-
pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
277-
where
278-
T: OffsetSizeTrait,
279-
{
280-
let array = match value.data_type() {
281-
DataType::Utf8 | DataType::LargeUtf8 => {
282-
let v = value.as_string::<T>();
283-
self.digest_utf8_array_impl::<&GenericStringArray<T>>(&v)
284-
}
285-
DataType::Utf8View => {
286-
let v = value.as_string_view();
287-
self.digest_utf8_array_impl::<&StringViewArray>(&v)
288-
}
289-
other => {
290-
return exec_err!("unsupported type for digest_utf_array: {other:?}")
291-
}
292-
};
293-
Ok(ColumnarValue::Array(array))
294-
}
295-
296-
pub fn digest_utf8_array_impl<'a, StringArrType>(
217+
fn digest_utf8_array_impl<'a, StringArrType>(
297218
self,
298219
input_value: &StringArrType,
299220
) -> ArrayRef
@@ -324,7 +245,7 @@ impl DigestAlgorithm {
324245
}
325246
}
326247

327-
pub fn digest_binary_array_impl<'a, BinaryArrType>(
248+
fn digest_binary_array_impl<'a, BinaryArrType>(
328249
self,
329250
input_value: &BinaryArrType,
330251
) -> ArrayRef
@@ -355,26 +276,40 @@ impl DigestAlgorithm {
355276
}
356277
}
357278
}
279+
358280
pub fn digest_process(
359281
value: &ColumnarValue,
360282
digest_algorithm: DigestAlgorithm,
361283
) -> Result<ColumnarValue> {
362284
match value {
363-
ColumnarValue::Array(a) => match a.data_type() {
364-
DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
365-
DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
366-
DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
367-
DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
368-
DataType::LargeBinary => {
369-
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
370-
}
371-
DataType::BinaryView => {
372-
digest_algorithm.digest_binary_array::<i32>(a.as_ref())
373-
}
374-
other => exec_err!(
375-
"Unsupported data type {other:?} for function {digest_algorithm}"
376-
),
377-
},
285+
ColumnarValue::Array(a) => {
286+
let output = match a.data_type() {
287+
DataType::Utf8View => {
288+
digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
289+
}
290+
DataType::Utf8 => {
291+
digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
292+
}
293+
DataType::LargeUtf8 => {
294+
digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
295+
}
296+
DataType::Binary => {
297+
digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
298+
}
299+
DataType::LargeBinary => {
300+
digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
301+
}
302+
DataType::BinaryView => {
303+
digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
304+
}
305+
other => {
306+
return exec_err!(
307+
"Unsupported data type {other:?} for function {digest_algorithm}"
308+
)
309+
}
310+
};
311+
Ok(ColumnarValue::Array(output))
312+
}
378313
ColumnarValue::Scalar(scalar) => {
379314
match scalar {
380315
ScalarValue::Utf8View(a)

datafusion/functions/src/crypto/digest.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! "crypto" DataFusion functions
19-
use super::basic::{digest, utf8_or_binary_to_binary_type};
18+
use crate::crypto::basic::{digest_process, DigestAlgorithm};
19+
2020
use arrow::datatypes::DataType;
2121
use datafusion_common::{
22+
exec_err, not_impl_err,
2223
types::{logical_binary, logical_string},
24+
utils::take_function_args,
2325
Result,
2426
};
2527
use datafusion_expr::{
@@ -36,16 +38,16 @@ use std::any::Any;
3638
syntax_example = "digest(expression, algorithm)",
3739
sql_example = r#"```sql
3840
> select digest('foo', 'sha256');
39-
+------------------------------------------+
40-
| digest(Utf8("foo"), Utf8("sha256")) |
41-
+------------------------------------------+
42-
| <binary_hash_result> |
43-
+------------------------------------------+
41+
+------------------------------------------------------------------+
42+
| digest(Utf8("foo"),Utf8("sha256")) |
43+
+------------------------------------------------------------------+
44+
| 2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae |
45+
+------------------------------------------------------------------+
4446
```"#,
4547
standard_argument(name = "expression", prefix = "String"),
4648
argument(
4749
name = "algorithm",
48-
description = "String expression specifying algorithm to use. Must be one of:
50+
description = "String expression specifying algorithm to use. Must be one of:
4951
- md5
5052
- sha224
5153
- sha256
@@ -60,6 +62,7 @@ use std::any::Any;
6062
pub struct DigestFunc {
6163
signature: Signature,
6264
}
65+
6366
impl Default for DigestFunc {
6467
fn default() -> Self {
6568
Self::new()
@@ -85,6 +88,7 @@ impl DigestFunc {
8588
}
8689
}
8790
}
91+
8892
impl ScalarUDFImpl for DigestFunc {
8993
fn as_any(&self) -> &dyn Any {
9094
self
@@ -98,14 +102,35 @@ impl ScalarUDFImpl for DigestFunc {
98102
&self.signature
99103
}
100104

101-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
102-
utf8_or_binary_to_binary_type(&arg_types[0], self.name())
105+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
106+
Ok(DataType::Binary)
103107
}
108+
104109
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
105-
digest(&args.args)
110+
let [data, digest_algorithm] = take_function_args(self.name(), &args.args)?;
111+
digest(data, digest_algorithm)
106112
}
107113

108114
fn documentation(&self) -> Option<&Documentation> {
109115
self.doc()
110116
}
111117
}
118+
119+
/// Compute binary hash of the given `data` (String or Binary array), according
120+
/// to the specified `digest_algorithm`. See [`DigestAlgorithm`] for supported
121+
/// algorithms.
122+
fn digest(
123+
data: &ColumnarValue,
124+
digest_algorithm: &ColumnarValue,
125+
) -> Result<ColumnarValue> {
126+
let digest_algorithm = match digest_algorithm {
127+
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
128+
Some(Some(method)) => method.parse::<DigestAlgorithm>(),
129+
_ => exec_err!("Unsupported data type {scalar:?} for function digest"),
130+
},
131+
ColumnarValue::Array(_) => {
132+
not_impl_err!("Digest using dynamically decided method is not yet supported")
133+
}
134+
}?;
135+
digest_process(data, digest_algorithm)
136+
}

0 commit comments

Comments
 (0)