Skip to content

Commit 9a8f69b

Browse files
authored
Merge pull request #165 from influxdata/crepererum/env-vars
feat: environment variables for guests
2 parents 041ea17 + 379a720 commit 9a8f69b

File tree

4 files changed

+95
-3
lines changed

4 files changed

+95
-3
lines changed

guests/python/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ def compute(x: int) -> int:
197197
## I/O
198198
All I/O operations go through the host, there is no direct interaction with the host operating system.
199199

200+
### Environment Variables
201+
Hosts can pass environment variables to the guest if they want. By default, NO variables are available for the guest though (i.e. there is NO implicit pass-through). The standard Python library can read these environment variables, e.g. via [`os.environ`].
202+
200203
### Filesystem
201204
The [Python Standard Library] is mounted as a read-only filesystem. The host file system (incl. special paths like `/proc`) are NOT exposed to the guest.
202205

@@ -234,6 +237,7 @@ There is NO other I/O available that escapes the sandbox.
234237
[`timedelta`]: https://docs.python.org/3/library/datetime.html#datetime.timedelta
235238
[`Duration`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Duration
236239
[`Microsecond`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.TimeUnit.html#variant.Microsecond
240+
[`os.environ`]: https://docs.python.org/3/library/os.html#os.environ
237241
[Python 3.14.0]: https://www.python.org/downloads/release/python-3140
238242
[Python Standard Library]: https://docs.python.org/3/library/index.html
239243
[`requests`]: https://pypi.org/project/requests/

host/src/lib.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//!
44
//! [DataFusion]: https://datafusion.apache.org/
5-
use std::{any::Any, ops::DerefMut, sync::Arc};
5+
use std::{any::Any, collections::BTreeMap, ops::DerefMut, sync::Arc};
66

77
use ::http::HeaderName;
88
use arrow::datatypes::DataType;
@@ -247,6 +247,9 @@ pub struct WasmPermissions {
247247

248248
/// Virtual file system limits.
249249
vfs: VfsLimits,
250+
251+
/// Environment variables.
252+
envs: BTreeMap<String, String>,
250253
}
251254

252255
impl WasmPermissions {
@@ -261,6 +264,7 @@ impl Default for WasmPermissions {
261264
Self {
262265
http: Arc::new(RejectAllHttpRequests),
263266
vfs: VfsLimits::default(),
267+
envs: BTreeMap::default(),
264268
}
265269
}
266270
}
@@ -284,6 +288,12 @@ impl WasmPermissions {
284288
..self
285289
}
286290
}
291+
292+
/// Add environment variable.
293+
pub fn with_env(mut self, key: String, value: String) -> Self {
294+
self.envs.insert(key, value);
295+
self
296+
}
287297
}
288298

289299
/// A [`ScalarUDFImpl`] that wraps a WebAssembly payload.
@@ -329,12 +339,18 @@ impl WasmScalarUdf {
329339
// Create in-memory VFS
330340
let vfs_state = VfsState::new(&permissions.vfs);
331341

342+
// set up WASI p2 context
332343
let stderr = MemoryOutputPipe::new(1024);
333-
let wasi_ctx = WasiCtx::builder().stderr(stderr.clone()).build();
344+
let mut wasi_ctx_builder = WasiCtx::builder();
345+
wasi_ctx_builder.stderr(stderr.clone());
346+
permissions.envs.iter().for_each(|(k, v)| {
347+
wasi_ctx_builder.env(k, v);
348+
});
349+
334350
let state = WasmStateImpl {
335351
vfs_state,
336352
stderr,
337-
wasi_ctx,
353+
wasi_ctx: wasi_ctx_builder.build(),
338354
wasi_http_ctx: WasiHttpCtx::new(),
339355
resource_table: ResourceTable::new(),
340356
http_validator: Arc::clone(&permissions.http),
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use std::sync::Arc;
2+
3+
use arrow::datatypes::{DataType, Field};
4+
use datafusion_common::{cast::as_string_array, config::ConfigOptions};
5+
use datafusion_expr::{ScalarFunctionArgs, async_udf::AsyncScalarUDFImpl};
6+
use datafusion_udf_wasm_host::{WasmPermissions, WasmScalarUdf};
7+
use tokio::runtime::Handle;
8+
9+
use crate::integration_tests::python::test_utils::python_component;
10+
11+
#[tokio::test(flavor = "multi_thread")]
12+
async fn test_env() {
13+
assert_env_roundrip(&[]).await;
14+
assert_env_roundrip(&[("FOO", "BAR")]).await;
15+
assert_env_roundrip(&[("foo", "bar")]).await;
16+
assert_env_roundrip(&[("", "")]).await;
17+
assert_env_roundrip(&[("FOO", "BAR"), ("X", "Y")]).await;
18+
}
19+
20+
pub(crate) async fn assert_env_roundrip(env: &[(&'static str, &'static str)]) {
21+
const CODE: &str = r#"
22+
import os
23+
24+
def env() -> str:
25+
return ",".join((
26+
f"{k}:{v}"
27+
for k, v in os.environ.items()
28+
))
29+
"#;
30+
31+
let component = python_component().await;
32+
33+
let mut permissions = WasmPermissions::default();
34+
for (k, v) in env {
35+
permissions = permissions.with_env((*k).to_owned(), (*v).to_owned());
36+
}
37+
38+
let udfs = WasmScalarUdf::new(component, &permissions, Handle::current(), CODE.to_owned())
39+
.await
40+
.unwrap();
41+
assert_eq!(udfs.len(), 1);
42+
let udf = udfs.into_iter().next().unwrap();
43+
44+
let array = udf
45+
.invoke_async_with_args(
46+
ScalarFunctionArgs {
47+
args: vec![],
48+
arg_fields: vec![],
49+
number_rows: 1,
50+
return_field: Arc::new(Field::new("r", DataType::Utf8, true)),
51+
},
52+
&ConfigOptions::default(),
53+
)
54+
.await
55+
.unwrap();
56+
57+
let actual = as_string_array(&array)
58+
.unwrap()
59+
.into_iter()
60+
.next()
61+
.unwrap()
62+
.unwrap();
63+
64+
let expected = env
65+
.iter()
66+
.map(|(k, v)| format!("{k}:{v}"))
67+
.collect::<Vec<_>>();
68+
let expected = expected.join(",");
69+
70+
assert_eq!(actual, expected);
71+
}

host/tests/integration_tests/python/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod dependencies;
2+
mod env;
23
mod errors;
34
mod fs;
45
mod http;

0 commit comments

Comments
 (0)