Skip to content

Commit 7bc9906

Browse files
authored
Implement Spark url function parse_url (#16937)
* implement Spark url function parse_url Signed-off-by: Alan Tang <[email protected]> * feat: Support only when all three arguments are StringViewArray Signed-off-by: Alan Tang <[email protected]> * feat: only support three types on parse_url function Signed-off-by: Alan Tang <[email protected]> * chore: fix clippy error Signed-off-by: Alan Tang <[email protected]> --------- Signed-off-by: Alan Tang <[email protected]>
1 parent a83237f commit 7bc9906

File tree

5 files changed

+374
-22
lines changed

5 files changed

+374
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/spark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
4747
datafusion-macros = { workspace = true }
4848
log = { workspace = true }
4949
sha1 = "0.10"
50+
url = { workspace = true }
5051
xxhash-rust = { version = "0.8", features = ["xxh3"] }
5152

5253
[dev-dependencies]

datafusion/spark/src/function/url/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,19 @@
1616
// under the License.
1717

1818
use datafusion_expr::ScalarUDF;
19+
use datafusion_functions::make_udf_function;
1920
use std::sync::Arc;
2021

21-
pub mod expr_fn {}
22+
pub mod parse_url;
23+
24+
make_udf_function!(parse_url::ParseUrl, parse_url);
25+
26+
pub mod expr_fn {
27+
use datafusion_functions::export_functions;
28+
29+
export_functions!((parse_url, "Extracts a part from a URL.", args));
30+
}
2231

2332
pub fn functions() -> Vec<Arc<ScalarUDF>> {
24-
vec![]
33+
vec![parse_url()]
2534
}
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{
22+
Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, StringArrayType,
23+
};
24+
use arrow::datatypes::DataType;
25+
use datafusion_common::cast::{
26+
as_large_string_array, as_string_array, as_string_view_array,
27+
};
28+
use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result};
29+
use datafusion_expr::{
30+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31+
Volatility,
32+
};
33+
use datafusion_functions::utils::make_scalar_function;
34+
use url::Url;
35+
36+
#[derive(Debug)]
37+
pub struct ParseUrl {
38+
signature: Signature,
39+
}
40+
41+
impl Default for ParseUrl {
42+
fn default() -> Self {
43+
Self::new()
44+
}
45+
}
46+
47+
impl ParseUrl {
48+
pub fn new() -> Self {
49+
Self {
50+
signature: Signature::one_of(
51+
vec![
52+
TypeSignature::Uniform(
53+
1,
54+
vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
55+
),
56+
TypeSignature::Uniform(
57+
2,
58+
vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
59+
),
60+
TypeSignature::Uniform(
61+
3,
62+
vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
63+
),
64+
],
65+
Volatility::Immutable,
66+
),
67+
}
68+
}
69+
/// Parses a URL and extracts the specified component.
70+
///
71+
/// This function takes a URL string and extracts different parts of it based on the
72+
/// `part` parameter. For query parameters, an optional `key` can be specified to
73+
/// extract a specific query parameter value.
74+
///
75+
/// # Arguments
76+
///
77+
/// * `value` - The URL string to parse
78+
/// * `part` - The component of the URL to extract. Valid values are:
79+
/// - `"HOST"` - The hostname (e.g., "example.com")
80+
/// - `"PATH"` - The path portion (e.g., "/path/to/resource")
81+
/// - `"QUERY"` - The query string or a specific query parameter
82+
/// - `"REF"` - The fragment/anchor (the part after #)
83+
/// - `"PROTOCOL"` - The URL scheme (e.g., "https", "http")
84+
/// - `"FILE"` - The path with query string (e.g., "/path?query=value")
85+
/// - `"AUTHORITY"` - The authority component (host:port)
86+
/// - `"USERINFO"` - The user information (username:password)
87+
/// * `key` - Optional parameter used only with `"QUERY"`. When provided, extracts
88+
/// the value of the specific query parameter with this key name.
89+
///
90+
/// # Returns
91+
///
92+
/// * `Ok(Some(String))` - The extracted URL component as a string
93+
/// * `Ok(None)` - If the requested component doesn't exist or is empty
94+
/// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed
95+
///
96+
fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> {
97+
Url::parse(value)
98+
.map_err(|e| exec_datafusion_err!("{e:?}"))
99+
.map(|url| match part {
100+
"HOST" => url.host_str().map(String::from),
101+
"PATH" => Some(url.path().to_string()),
102+
"QUERY" => match key {
103+
None => url.query().map(String::from),
104+
Some(key) => url
105+
.query_pairs()
106+
.find(|(k, _)| k == key)
107+
.map(|(_, v)| v.into_owned()),
108+
},
109+
"REF" => url.fragment().map(String::from),
110+
"PROTOCOL" => Some(url.scheme().to_string()),
111+
"FILE" => {
112+
let path = url.path();
113+
match url.query() {
114+
Some(query) => Some(format!("{path}?{query}")),
115+
None => Some(path.to_string()),
116+
}
117+
}
118+
"AUTHORITY" => Some(url.authority().to_string()),
119+
"USERINFO" => {
120+
let username = url.username();
121+
if username.is_empty() {
122+
return None;
123+
}
124+
match url.password() {
125+
Some(password) => Some(format!("{username}:{password}")),
126+
None => Some(username.to_string()),
127+
}
128+
}
129+
_ => None,
130+
})
131+
}
132+
}
133+
134+
impl ScalarUDFImpl for ParseUrl {
135+
fn as_any(&self) -> &dyn Any {
136+
self
137+
}
138+
139+
fn name(&self) -> &str {
140+
"parse_url"
141+
}
142+
143+
fn signature(&self) -> &Signature {
144+
&self.signature
145+
}
146+
147+
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
148+
if arg_types.len() < 2 || arg_types.len() > 3 {
149+
return plan_err!(
150+
"{} expects 2 or 3 arguments, but got {}",
151+
self.name(),
152+
arg_types.len()
153+
);
154+
}
155+
match arg_types.len() {
156+
2 | 3 => {
157+
if arg_types
158+
.iter()
159+
.any(|arg| matches!(arg, DataType::LargeUtf8))
160+
{
161+
Ok(DataType::LargeUtf8)
162+
} else if arg_types
163+
.iter()
164+
.any(|arg| matches!(arg, DataType::Utf8View))
165+
{
166+
Ok(DataType::Utf8View)
167+
} else {
168+
Ok(DataType::Utf8)
169+
}
170+
}
171+
_ => plan_err!(
172+
"`{}` expects 2 or 3 arguments, got {}",
173+
&self.name(),
174+
arg_types.len()
175+
),
176+
}
177+
}
178+
179+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
180+
let ScalarFunctionArgs { args, .. } = args;
181+
make_scalar_function(spark_parse_url, vec![])(&args)
182+
}
183+
}
184+
185+
/// Core implementation of URL parsing function.
186+
///
187+
/// # Arguments
188+
///
189+
/// * `args` - A slice of ArrayRef containing the input arrays:
190+
/// - `args[0]` - URL array: The URLs to parse
191+
/// - `args[1]` - Part array: The URL components to extract (HOST, PATH, QUERY, etc.)
192+
/// - `args[2]` - Key array (optional): For QUERY part, the specific parameter names to extract
193+
///
194+
/// # Return Value
195+
///
196+
/// Returns `Result<ArrayRef>` containing:
197+
/// - A string array with extracted URL components
198+
/// - `None` values where extraction failed or component doesn't exist
199+
/// - The output array type (StringArray or LargeStringArray) is determined by input types
200+
///
201+
fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
202+
if args.len() < 2 || args.len() > 3 {
203+
return exec_err!(
204+
"{} expects 2 or 3 arguments, but got {}",
205+
"`parse_url`",
206+
args.len()
207+
);
208+
}
209+
// Required arguments
210+
let url = &args[0];
211+
let part = &args[1];
212+
213+
let result = if args.len() == 3 {
214+
let key = &args[2];
215+
216+
match (url.data_type(), part.data_type(), key.data_type()) {
217+
(DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
218+
process_parse_url::<_, _, _, StringArray>(
219+
as_string_array(url)?,
220+
as_string_array(part)?,
221+
as_string_array(key)?,
222+
)
223+
}
224+
(DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
225+
process_parse_url::<_, _, _, StringArray>(
226+
as_string_view_array(url)?,
227+
as_string_view_array(part)?,
228+
as_string_view_array(key)?,
229+
)
230+
}
231+
(DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
232+
process_parse_url::<_, _, _, LargeStringArray>(
233+
as_large_string_array(url)?,
234+
as_large_string_array(part)?,
235+
as_large_string_array(key)?,
236+
)
237+
}
238+
_ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
239+
}
240+
} else {
241+
// The 'key' argument is omitted, assume all values are null
242+
// Create 'null' string array for 'key' argument
243+
let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new();
244+
for _ in 0..args[0].len() {
245+
builder.append_null();
246+
}
247+
let key = builder.finish();
248+
249+
match (url.data_type(), part.data_type()) {
250+
(DataType::Utf8, DataType::Utf8) => {
251+
process_parse_url::<_, _, _, StringArray>(
252+
as_string_array(url)?,
253+
as_string_array(part)?,
254+
&key,
255+
)
256+
}
257+
(DataType::Utf8View, DataType::Utf8View) => {
258+
process_parse_url::<_, _, _, StringArray>(
259+
as_string_view_array(url)?,
260+
as_string_view_array(part)?,
261+
&key,
262+
)
263+
}
264+
(DataType::LargeUtf8, DataType::LargeUtf8) => {
265+
process_parse_url::<_, _, _, LargeStringArray>(
266+
as_large_string_array(url)?,
267+
as_large_string_array(part)?,
268+
&key,
269+
)
270+
}
271+
_ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
272+
}
273+
};
274+
result
275+
}
276+
277+
fn process_parse_url<'a, A, B, C, T>(
278+
url_array: &'a A,
279+
part_array: &'a B,
280+
key_array: &'a C,
281+
) -> Result<ArrayRef>
282+
where
283+
&'a A: StringArrayType<'a>,
284+
&'a B: StringArrayType<'a>,
285+
&'a C: StringArrayType<'a>,
286+
T: Array + FromIterator<Option<String>> + 'static,
287+
{
288+
url_array
289+
.iter()
290+
.zip(part_array.iter())
291+
.zip(key_array.iter())
292+
.map(|((url, part), key)| {
293+
if let (Some(url), Some(part), key) = (url, part, key) {
294+
ParseUrl::parse(url, part, key)
295+
} else {
296+
Ok(None)
297+
}
298+
})
299+
.collect::<Result<T>>()
300+
.map(|array| Arc::new(array) as ArrayRef)
301+
}

0 commit comments

Comments
 (0)