Skip to content

Commit 83a5359

Browse files
authored
Merge pull request #148 from influxdata/crepererum/issue140
fix: run HTTP in I/O runtime
2 parents cd78a7e + df4a4e9 commit 83a5359

File tree

7 files changed

+143
-13
lines changed

7 files changed

+143
-13
lines changed

host/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use ::http::HeaderName;
88
use arrow::datatypes::DataType;
99
use datafusion_common::{DataFusionError, Result as DataFusionResult};
1010
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature};
11-
use tokio::sync::Mutex;
11+
use tokio::{runtime::Handle, sync::Mutex};
1212
use wasmtime::{
1313
Engine, Store,
1414
component::{Component, ResourceAny},
@@ -72,6 +72,9 @@ struct WasmStateImpl {
7272

7373
/// HTTP request validator.
7474
http_validator: Arc<dyn HttpRequestValidator>,
75+
76+
/// Handle to tokio I/O runtime.
77+
io_rt: Handle,
7578
}
7679

7780
impl std::fmt::Debug for WasmStateImpl {
@@ -83,13 +86,15 @@ impl std::fmt::Debug for WasmStateImpl {
8386
wasi_http_ctx: _,
8487
resource_table,
8588
http_validator,
89+
io_rt,
8690
} = self;
8791
f.debug_struct("WasmStateImpl")
8892
.field("vfs_state", vfs_state)
8993
.field("stderr", stderr)
9094
.field("wasi_ctx", &"<WASI_CTX>")
9195
.field("resource_table", resource_table)
9296
.field("http_validator", http_validator)
97+
.field("io_rt", io_rt)
9398
.finish()
9499
}
95100
}
@@ -117,6 +122,8 @@ impl WasiHttpView for WasmStateImpl {
117122
mut request: hyper::Request<HyperOutgoingBody>,
118123
config: OutgoingRequestConfig,
119124
) -> HttpResult<HostFutureIncomingResponse> {
125+
let _guard = self.io_rt.enter();
126+
120127
// Python `requests` sends this so we allow it but later drop it from the actual request.
121128
request.headers_mut().remove(hyper::header::CONNECTION);
122129

@@ -298,6 +305,7 @@ impl WasmScalarUdf {
298305
pub async fn new(
299306
component: &WasmComponentPrecompiled,
300307
permissions: &WasmPermissions,
308+
io_rt: Handle,
301309
source: String,
302310
) -> DataFusionResult<Vec<Self>> {
303311
let WasmComponentPrecompiled { engine, component } = component;
@@ -314,6 +322,7 @@ impl WasmScalarUdf {
314322
wasi_http_ctx: WasiHttpCtx::new(),
315323
resource_table: ResourceTable::new(),
316324
http_validator: Arc::clone(&permissions.http),
325+
io_rt,
317326
};
318327
let (bindings, mut store) = link(engine, component, state)
319328
.await

host/tests/integration_tests/python/runtime/fs.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use arrow::{
66
};
77
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
88
use datafusion_udf_wasm_host::{WasmPermissions, WasmScalarUdf, vfs::VfsLimits};
9+
use tokio::runtime::Handle;
910

1011
use crate::integration_tests::{
1112
python::test_utils::{python_component, python_scalar_udf},
@@ -239,6 +240,7 @@ async fn test_limit_inodes() {
239240
inodes: 42,
240241
..Default::default()
241242
}),
243+
Handle::current(),
242244
"".to_owned(),
243245
)
244246
.await
@@ -260,6 +262,7 @@ async fn test_limit_bytes() {
260262
bytes: 1337,
261263
..Default::default()
262264
}),
265+
Handle::current(),
263266
"".to_owned(),
264267
)
265268
.await

host/tests/integration_tests/python/runtime/http.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use arrow::{
44
array::{Array, StringArray, StringBuilder},
@@ -10,6 +10,7 @@ use datafusion_udf_wasm_host::{
1010
WasmPermissions, WasmScalarUdf,
1111
http::{AllowCertainHttpRequests, HttpRequestValidator, Matcher},
1212
};
13+
use tokio::runtime::Handle;
1314
use wasmtime_wasi_http::types::DEFAULT_FORBIDDEN_HEADERS;
1415
use wiremock::{Mock, MockServer, ResponseTemplate, matchers};
1516

@@ -568,6 +569,7 @@ where
568569
WasmScalarUdf::new(
569570
python_component().await,
570571
&WasmPermissions::new().with_http(permissions),
572+
Handle::current(),
571573
code.to_owned(),
572574
)
573575
.await
@@ -582,3 +584,79 @@ where
582584
assert_eq!(udfs.len(), 1);
583585
udfs.into_iter().next().unwrap()
584586
}
587+
588+
#[test]
589+
fn test_io_runtime() {
590+
const CODE: &str = r#"
591+
import urllib3
592+
593+
def perform_request(url: str) -> str:
594+
resp = urllib3.request("GET", url)
595+
return resp.data.decode("utf-8")
596+
"#;
597+
598+
let rt_tmp = tokio::runtime::Builder::new_current_thread()
599+
.build()
600+
.unwrap();
601+
let rt_cpu = tokio::runtime::Builder::new_multi_thread()
602+
.worker_threads(1)
603+
// It would be nice if all the timeouts-related timers would also run within the within the I/O runtime, but
604+
// that requires some larger intervention (either upstream or with a custom WASI HTTP implementation).
605+
// Hence, we don't do that yet.
606+
.enable_time()
607+
.build()
608+
.unwrap();
609+
let rt_io = tokio::runtime::Builder::new_multi_thread()
610+
.worker_threads(1)
611+
.enable_all()
612+
.build()
613+
.unwrap();
614+
615+
let server = rt_io.block_on(async {
616+
let server = MockServer::start().await;
617+
Mock::given(matchers::any())
618+
.respond_with(ResponseTemplate::new(200).set_body_string("hello world!"))
619+
.expect(1)
620+
.mount(&server)
621+
.await;
622+
server
623+
});
624+
625+
// deliberately use a runtime what we are going to throw away later to prevent tricks like `Handle::current`
626+
let udf = rt_tmp.block_on(async {
627+
let mut permissions = AllowCertainHttpRequests::new();
628+
permissions.allow(Matcher {
629+
method: http::Method::GET,
630+
host: server.address().ip().to_string().into(),
631+
port: server.address().port(),
632+
});
633+
634+
let udfs = WasmScalarUdf::new(
635+
python_component().await,
636+
&WasmPermissions::new().with_http(permissions),
637+
rt_io.handle().clone(),
638+
CODE.to_owned(),
639+
)
640+
.await
641+
.unwrap();
642+
assert_eq!(udfs.len(), 1);
643+
udfs.into_iter().next().unwrap()
644+
});
645+
rt_tmp.shutdown_timeout(Duration::from_secs(1));
646+
647+
let array = rt_cpu.block_on(async {
648+
udf.invoke_with_args(ScalarFunctionArgs {
649+
args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(server.uri())))],
650+
arg_fields: vec![Arc::new(Field::new("uri", DataType::Utf8, true))],
651+
number_rows: 1,
652+
return_field: Arc::new(Field::new("r", DataType::Utf8, true)),
653+
})
654+
.unwrap()
655+
.unwrap_array()
656+
});
657+
658+
assert_eq!(
659+
array.as_ref(),
660+
&StringArray::from_iter([Some("hello world!".to_owned()),]) as &dyn Array,
661+
);
662+
}

host/tests/integration_tests/python/test_utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use datafusion_common::DataFusionError;
22
use datafusion_udf_wasm_host::{WasmComponentPrecompiled, WasmScalarUdf};
3-
use tokio::sync::OnceCell;
3+
use tokio::{runtime::Handle, sync::OnceCell};
44

55
/// Static precompiled Python WASM component for tests
66
static COMPONENT: OnceCell<WasmComponentPrecompiled> = OnceCell::const_new();
@@ -20,7 +20,13 @@ pub(crate) async fn python_component() -> &'static WasmComponentPrecompiled {
2020
pub(crate) async fn python_scalar_udfs(code: &str) -> Result<Vec<WasmScalarUdf>, DataFusionError> {
2121
let component = python_component().await;
2222

23-
WasmScalarUdf::new(component, &Default::default(), code.to_owned()).await
23+
WasmScalarUdf::new(
24+
component,
25+
&Default::default(),
26+
Handle::current(),
27+
code.to_owned(),
28+
)
29+
.await
2430
}
2531

2632
/// Compiles the provided Python UDF code into a single WasmScalarUdf instance.

host/tests/integration_tests/rust.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use arrow::{
77
use datafusion_common::ScalarValue;
88
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
99
use datafusion_udf_wasm_host::{WasmComponentPrecompiled, WasmScalarUdf};
10+
use tokio::runtime::Handle;
1011

1112
use crate::integration_tests::test_utils::ColumnarValueExt;
1213

@@ -15,9 +16,14 @@ async fn test_add_one() {
1516
let component = WasmComponentPrecompiled::new(datafusion_udf_wasm_bundle::BIN_EXAMPLE.into())
1617
.await
1718
.unwrap();
18-
let mut udfs = WasmScalarUdf::new(&component, &Default::default(), "".to_owned())
19-
.await
20-
.unwrap();
19+
let mut udfs = WasmScalarUdf::new(
20+
&component,
21+
&Default::default(),
22+
Handle::current(),
23+
"".to_owned(),
24+
)
25+
.await
26+
.unwrap();
2127
assert_eq!(udfs.len(), 1);
2228
let udf = udfs.pop().unwrap();
2329

query/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use sqlparser::ast::{CreateFunctionBody, Expr, Statement as SqlStatement, Value}
1010
use sqlparser::dialect::dialect_from_str;
1111

1212
use datafusion_udf_wasm_host::{WasmComponentPrecompiled, WasmPermissions, WasmScalarUdf};
13+
use tokio::runtime::Handle;
1314

1415
/// A [ParsedQuery] contains the extracted UDFs and SQL query string
1516
#[derive(Debug)]
@@ -48,6 +49,7 @@ impl<'a> UdfQueryParser<'a> {
4849
&self,
4950
udf_query: &str,
5051
permissions: &WasmPermissions,
52+
io_rt: Handle,
5153
task_ctx: &TaskContext,
5254
) -> DataFusionResult<ParsedQuery> {
5355
let (code, sql) = self.parse_inner(udf_query, task_ctx)?;
@@ -62,7 +64,7 @@ impl<'a> UdfQueryParser<'a> {
6264
})?;
6365

6466
for code in blocks {
65-
udfs.extend(WasmScalarUdf::new(component, permissions, code).await?);
67+
udfs.extend(WasmScalarUdf::new(component, permissions, io_rt.clone(), code).await?);
6668
}
6769
}
6870

query/tests/integration.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use datafusion::{
1414
use datafusion_common::{Result as DataFusionResult, test_util::batches_to_string};
1515
use datafusion_udf_wasm_host::WasmPermissions;
1616
use datafusion_udf_wasm_query::{ParsedQuery, UdfQueryParser};
17+
use tokio::runtime::Handle;
1718

1819
mod integration_tests;
1920

@@ -54,7 +55,12 @@ SELECT add_one(1);
5455

5556
let parser = UdfQueryParser::new(HashMap::from_iter([("python".to_string(), component)]));
5657
let parsed_query = parser
57-
.parse(query, &WasmPermissions::new(), ctx.task_ctx().as_ref())
58+
.parse(
59+
query,
60+
&WasmPermissions::new(),
61+
Handle::current(),
62+
ctx.task_ctx().as_ref(),
63+
)
5864
.await
5965
.unwrap();
6066

@@ -98,7 +104,12 @@ SELECT add_one(1), multiply_two(3);
98104

99105
let parser = UdfQueryParser::new(HashMap::from_iter([("python".to_string(), component)]));
100106
let parsed_query = parser
101-
.parse(query, &WasmPermissions::new(), ctx.task_ctx().as_ref())
107+
.parse(
108+
query,
109+
&WasmPermissions::new(),
110+
Handle::current(),
111+
ctx.task_ctx().as_ref(),
112+
)
102113
.await
103114
.unwrap();
104115

@@ -138,7 +149,12 @@ SELECT add_one(1), multiply_two(3);
138149

139150
let parser = UdfQueryParser::new(HashMap::from_iter([("python".to_string(), component)]));
140151
let parsed_query = parser
141-
.parse(query, &WasmPermissions::new(), ctx.task_ctx().as_ref())
152+
.parse(
153+
query,
154+
&WasmPermissions::new(),
155+
Handle::current(),
156+
ctx.task_ctx().as_ref(),
157+
)
142158
.await
143159
.unwrap();
144160

@@ -172,7 +188,12 @@ SELECT add_one(1)
172188

173189
let parser = UdfQueryParser::new(HashMap::from_iter([("python".to_string(), component)]));
174190
let parsed_query = parser
175-
.parse(query, &WasmPermissions::new(), ctx.task_ctx().as_ref())
191+
.parse(
192+
query,
193+
&WasmPermissions::new(),
194+
Handle::current(),
195+
ctx.task_ctx().as_ref(),
196+
)
176197
.await
177198
.unwrap();
178199

@@ -201,7 +222,12 @@ EXPLAIN SELECT add_one(1);
201222

202223
let parser = UdfQueryParser::new(HashMap::from_iter([("python".to_string(), component)]));
203224
let parsed_query = parser
204-
.parse(query, &WasmPermissions::new(), ctx.task_ctx().as_ref())
225+
.parse(
226+
query,
227+
&WasmPermissions::new(),
228+
Handle::current(),
229+
ctx.task_ctx().as_ref(),
230+
)
205231
.await
206232
.unwrap();
207233

0 commit comments

Comments
 (0)