Skip to content

Commit c63aca7

Browse files
committed
test: guest file system
1 parent 9e3d58a commit c63aca7

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
use std::sync::Arc;
2+
3+
use arrow::{
4+
array::{Array, StringArray},
5+
datatypes::{DataType, Field},
6+
};
7+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
8+
9+
use crate::integration_tests::{
10+
python::test_utils::python_scalar_udf, test_utils::ColumnarValueExt,
11+
};
12+
13+
#[tokio::test(flavor = "multi_thread")]
14+
async fn test_listdir() {
15+
const CODE: &str = r#"
16+
import os
17+
18+
def listdir(cwd: str | None, dir: str) -> str:
19+
if cwd:
20+
os.chdir(cwd)
21+
22+
return ", ".join(os.listdir(dir))
23+
"#;
24+
25+
let udf = python_scalar_udf(CODE).await.unwrap();
26+
27+
struct TestCase {
28+
cwd: Option<&'static str>,
29+
dir: &'static str,
30+
results: &'static [&'static str],
31+
}
32+
const CASES: &[TestCase] = &[
33+
TestCase {
34+
cwd: None,
35+
dir: "/",
36+
results: &["lib"],
37+
},
38+
TestCase {
39+
cwd: None,
40+
dir: "/lib",
41+
results: &["python3.14"],
42+
},
43+
TestCase {
44+
cwd: None,
45+
dir: "/lib/python3.14/compression",
46+
results: &[
47+
"__init__.py",
48+
"_common",
49+
"bz2.py",
50+
"gzip.py",
51+
"lzma.py",
52+
"zlib.py",
53+
"zstd",
54+
],
55+
},
56+
TestCase {
57+
cwd: None,
58+
dir: "/lib/../../lib",
59+
results: &["python3.14"],
60+
},
61+
TestCase {
62+
cwd: None,
63+
dir: "lib",
64+
results: &["python3.14"],
65+
},
66+
TestCase {
67+
cwd: None,
68+
dir: "./lib",
69+
results: &["python3.14"],
70+
},
71+
TestCase {
72+
cwd: Some("/lib"),
73+
dir: ".",
74+
results: &["python3.14"],
75+
},
76+
TestCase {
77+
cwd: Some("/lib"),
78+
dir: "..",
79+
results: &["lib"],
80+
},
81+
];
82+
83+
let array = udf
84+
.invoke_with_args(ScalarFunctionArgs {
85+
args: vec![
86+
ColumnarValue::Array(Arc::new(StringArray::from_iter(
87+
CASES.iter().map(|c| c.cwd),
88+
))),
89+
ColumnarValue::Array(Arc::new(StringArray::from_iter(
90+
CASES.iter().map(|c| Some(c.dir)),
91+
))),
92+
],
93+
arg_fields: vec![
94+
Arc::new(Field::new("cwd", DataType::Utf8, true)),
95+
Arc::new(Field::new("dir", DataType::Utf8, true)),
96+
],
97+
number_rows: CASES.len(),
98+
return_field: Arc::new(Field::new("r", DataType::Utf8, true)),
99+
})
100+
.unwrap()
101+
.unwrap_array();
102+
103+
assert_eq!(
104+
array.as_ref(),
105+
&StringArray::from_iter(CASES.iter().map(|c| Some(c.results.join(", ")))) as &dyn Array,
106+
);
107+
}
108+
109+
#[tokio::test(flavor = "multi_thread")]
110+
async fn test_read() {
111+
const CODE: &str = r#"
112+
def read(path: str) -> str:
113+
try:
114+
with open(path, "r") as fp:
115+
data = fp.read()
116+
return f"OK: {data}"
117+
except Exception as e:
118+
return f"ERR: {e}"
119+
"#;
120+
121+
let udf = python_scalar_udf(CODE).await.unwrap();
122+
123+
struct TestCase {
124+
path: &'static str,
125+
result: Result<&'static str, &'static str>,
126+
}
127+
const CASES: &[TestCase] = &[
128+
TestCase {
129+
path: "/",
130+
result: Err("[Errno 31] Is a directory: '/'"),
131+
},
132+
TestCase {
133+
path: "/lib",
134+
result: Err("[Errno 31] Is a directory: '/lib'"),
135+
},
136+
TestCase {
137+
path: "/test",
138+
result: Err("[Errno 44] No such file or directory: '/test'"),
139+
},
140+
TestCase {
141+
path: "/lib/python3.14/__phello__/__init__.py",
142+
result: Ok(r#"initialized = True
143+
144+
def main():
145+
print("Hello world!")
146+
147+
if __name__ == '__main__':
148+
main()
149+
"#),
150+
},
151+
];
152+
153+
let array = udf
154+
.invoke_with_args(ScalarFunctionArgs {
155+
args: vec![ColumnarValue::Array(Arc::new(StringArray::from_iter(
156+
CASES.iter().map(|c| Some(c.path)),
157+
)))],
158+
arg_fields: vec![Arc::new(Field::new("path", DataType::Utf8, true))],
159+
number_rows: CASES.len(),
160+
return_field: Arc::new(Field::new("r", DataType::Utf8, true)),
161+
})
162+
.unwrap()
163+
.unwrap_array();
164+
165+
assert_eq!(
166+
array.as_ref(),
167+
&StringArray::from_iter(CASES.iter().map(|c| {
168+
let out = match c.result {
169+
Ok(s) => format!("OK: {s}"),
170+
Err(s) => format!("ERR: {s}"),
171+
};
172+
Some(out)
173+
})) as &dyn Array,
174+
);
175+
}
176+
177+
#[tokio::test(flavor = "multi_thread")]
178+
async fn test_write() {
179+
const CODE: &str = r#"
180+
def write(path: str) -> str:
181+
try:
182+
open(path, "w")
183+
except Exception as e:
184+
return f"ERR: {e}"
185+
186+
raise Exception("unreachable")
187+
"#;
188+
189+
let udf = python_scalar_udf(CODE).await.unwrap();
190+
191+
struct TestCase {
192+
path: &'static str,
193+
err: &'static str,
194+
}
195+
const CASES: &[TestCase] = &[
196+
TestCase {
197+
path: "/",
198+
err: "[Errno 69] Read-only file system: '/'",
199+
},
200+
TestCase {
201+
path: "/lib",
202+
err: "[Errno 69] Read-only file system: '/lib'",
203+
},
204+
TestCase {
205+
path: "/test",
206+
err: "[Errno 69] Read-only file system: '/test'",
207+
},
208+
];
209+
210+
let array = udf
211+
.invoke_with_args(ScalarFunctionArgs {
212+
args: vec![ColumnarValue::Array(Arc::new(StringArray::from_iter(
213+
CASES.iter().map(|c| Some(c.path)),
214+
)))],
215+
arg_fields: vec![Arc::new(Field::new("path", DataType::Utf8, true))],
216+
number_rows: CASES.len(),
217+
return_field: Arc::new(Field::new("r", DataType::Utf8, true)),
218+
})
219+
.unwrap()
220+
.unwrap_array();
221+
222+
assert_eq!(
223+
array.as_ref(),
224+
&StringArray::from_iter(CASES.iter().map(|c| Some(format!("ERR: {}", c.err))))
225+
as &dyn Array,
226+
);
227+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod dependencies;
22
mod errors;
3+
mod fs;
34
mod http;
45
mod null_handling;

0 commit comments

Comments
 (0)