Skip to content

Commit 8f918ea

Browse files
committed
Merge branch 'main' into tm/format-the-snake
2 parents 4b7b7e6 + 529da3d commit 8f918ea

File tree

8 files changed

+391
-68
lines changed

8 files changed

+391
-68
lines changed

.github/dependabot.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ updates:
88
open-pull-requests-limit: 10
99
# NOTE: groups are "match first", and "unmatched dependencies have individual PRs"
1010
groups:
11-
arrow-and-datafusion:
11+
apache:
1212
patterns:
1313
- "arrow"
1414
- "arrow-*"
1515
- "datafusion"
1616
- "datafusion-*"
17+
- "sqlparser"
1718
wasmtime:
1819
patterns:
1920
- "wasmtime"

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
run: pip install --requirement=.github/workflows/requirements.txt
2525

2626
- name: Install `cargo-deny` & `just`
27-
uses: taiki-e/install-action@81ee1d48d9194cdcab880cbdc7d36e87d39874cb # v2
27+
uses: taiki-e/install-action@f535147c22906d77695e11cb199e764aa610a4fc # v2
2828
with:
2929
tool: cargo-deny,just
3030

guests/python/requirements.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
1-
# That's https://github.com/urllib3/urllib3/pull/3593 as a wheel.
2-
urllib3 @ https://github.com/crepererum/urllib3/releases/download/2.5.100/urllib3-2.5.100-py3-none-any.whl --hash=sha256:b48be99c923c989e8db2a41a21f2511d830a7b7e99bfde24ae7214baadf7daaf
1+
certifi==2025.10.5 --hash=sha256:0f212c2744a9bb6de0c56639a6f68afe01ecd92d91f14ae897c4fe7bbeeef0de
2+
charset_normalizer==3.4.4 --hash=sha256:7a32c560861a02ff789ad905a2fe94e3f840803362c84fecf1851cb4cf3dc37f
3+
idna==3.11 --hash=sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea
4+
requests==2.32.5 --hash=sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6
5+
6+
# That's https://github.com/urllib3/urllib3/pull/3593 + https://github.com/golemcloud/urllib3/pull/2 as a wheel.
7+
urllib3 @ https://github.com/crepererum/urllib3/releases/download/2.5.101/urllib3-2.5.101-py3-none-any.whl --hash=sha256:2b2dcd0944a3b5d6ce7517cf068c232c1963a6702146695147454c46b4da71f1

guests/python/src/python_modules/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,12 @@ mod wit_world {
11471147
self.inner.take();
11481148
}
11491149

1150+
fn finish(&mut self) -> PyResult<FutureTrailers> {
1151+
let body = self.inner.take().require_resource()?;
1152+
let trailers = wasip2::http::types::IncomingBody::finish(body);
1153+
Ok(FutureTrailers { inner: trailers })
1154+
}
1155+
11501156
fn stream(&self) -> PyResult<InputStream> {
11511157
let stream = self.inner()?.stream().to_pyres()?;
11521158
Ok(InputStream {

host/src/lib.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! [DataFusion]: https://datafusion.apache.org/
55
use std::{any::Any, ops::DerefMut, sync::Arc};
66

7+
use ::http::HeaderName;
78
use arrow::datatypes::DataType;
89
use datafusion_common::{DataFusionError, Result as DataFusionResult};
910
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature};
@@ -17,7 +18,10 @@ use wasmtime_wasi_http::{
1718
HttpResult, WasiHttpCtx, WasiHttpView,
1819
bindings::http::types::ErrorCode as HttpErrorCode,
1920
body::HyperOutgoingBody,
20-
types::{HostFutureIncomingResponse, OutgoingRequestConfig, default_send_request_handler},
21+
types::{
22+
DEFAULT_FORBIDDEN_HEADERS, HostFutureIncomingResponse, OutgoingRequestConfig,
23+
default_send_request_handler,
24+
},
2125
};
2226

2327
use crate::{
@@ -26,7 +30,7 @@ use crate::{
2630
http::{HttpRequestValidator, RejectAllHttpRequests},
2731
linker::link,
2832
tokio_helpers::async_in_sync_context,
29-
vfs::{VfsCtxView, VfsState, VfsView},
33+
vfs::{VfsCtxView, VfsLimits, VfsState, VfsView},
3034
};
3135

3236
// unused-crate-dependencies false positives
@@ -43,7 +47,7 @@ mod error;
4347
pub mod http;
4448
mod linker;
4549
mod tokio_helpers;
46-
mod vfs;
50+
pub mod vfs;
4751

4852
/// State of the WASM payload.
4953
struct WasmStateImpl {
@@ -110,9 +114,12 @@ impl WasiHttpView for WasmStateImpl {
110114

111115
fn send_request(
112116
&mut self,
113-
request: hyper::Request<HyperOutgoingBody>,
117+
mut request: hyper::Request<HyperOutgoingBody>,
114118
config: OutgoingRequestConfig,
115119
) -> HttpResult<HostFutureIncomingResponse> {
120+
// Python `requests` sends this so we allow it but later drop it from the actual request.
121+
request.headers_mut().remove(hyper::header::CONNECTION);
122+
116123
// technically we could return an error straight away, but `urllib3` doesn't handle that super well, so we
117124
// create a future and validate the error in there (before actually starting the request of course)
118125

@@ -132,6 +139,15 @@ impl WasiHttpView for WasmStateImpl {
132139

133140
Ok(HostFutureIncomingResponse::pending(handle))
134141
}
142+
143+
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
144+
// Python `requests` sends this so we allow it but later drop it from the actual request.
145+
if name == hyper::header::CONNECTION {
146+
return false;
147+
}
148+
149+
DEFAULT_FORBIDDEN_HEADERS.contains(name)
150+
}
135151
}
136152

137153
impl VfsView for WasmStateImpl {
@@ -205,6 +221,9 @@ impl std::fmt::Debug for WasmComponentPrecompiled {
205221
pub struct WasmPermissions {
206222
/// Validator for HTTP requests.
207223
http: Arc<dyn HttpRequestValidator>,
224+
225+
/// Virtual file system limits.
226+
vfs: VfsLimits,
208227
}
209228

210229
impl WasmPermissions {
@@ -218,6 +237,7 @@ impl Default for WasmPermissions {
218237
fn default() -> Self {
219238
Self {
220239
http: Arc::new(RejectAllHttpRequests),
240+
vfs: VfsLimits::default(),
221241
}
222242
}
223243
}
@@ -230,6 +250,15 @@ impl WasmPermissions {
230250
{
231251
Self {
232252
http: Arc::new(http),
253+
..self
254+
}
255+
}
256+
257+
/// Set virtual filesystem limits.
258+
pub fn with_vfs_limits(self, limits: VfsLimits) -> Self {
259+
Self {
260+
vfs: limits,
261+
..self
233262
}
234263
}
235264
}
@@ -274,7 +303,7 @@ impl WasmScalarUdf {
274303
let WasmComponentPrecompiled { engine, component } = component;
275304

276305
// Create in-memory VFS
277-
let vfs_state = VfsState::new();
306+
let vfs_state = VfsState::new(&permissions.vfs);
278307

279308
let stderr = MemoryOutputPipe::new(1024);
280309
let wasi_ctx = WasiCtx::builder().stderr(stderr.clone()).build();

host/src/vfs.rs

Lines changed: 145 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! This provides a very crude, read-only in-mem virtual file system for the WASM guests.
44
//!
5-
//! The data gets [populated via a TAR container](VfsState::populate_from_tar).
5+
//! The data gets populated via a TAR container.
66
//!
77
//! While this implementation has rather limited functionality, it is sufficient to get a Python guest interpreter
88
//! running.
@@ -11,7 +11,10 @@ use std::{
1111
collections::HashMap,
1212
hash::Hash,
1313
io::{Cursor, Read},
14-
sync::{Arc, RwLock, Weak},
14+
sync::{
15+
Arc, RwLock, Weak,
16+
atomic::{AtomicU64, Ordering},
17+
},
1518
};
1619

1720
use rand::Rng;
@@ -58,7 +61,7 @@ enum VfsNodeKind {
5861
/// A directory containing child nodes.
5962
Directory {
6063
/// Child nodes indexed by name.
61-
children: HashMap<String, SharedVfsNode>,
64+
children: HashMap<Box<str>, SharedVfsNode>,
6265
},
6366
}
6467

@@ -188,6 +191,125 @@ impl VfsNode {
188191
}
189192
}
190193

194+
/// Tracked allocation of some resource.
195+
#[derive(Debug)]
196+
struct Allocation {
197+
/// Current amount of allocation.
198+
n: AtomicU64,
199+
200+
/// Name of the resource.
201+
name: &'static str,
202+
203+
/// Allocation limit.
204+
limit: u64,
205+
}
206+
207+
impl Allocation {
208+
/// Create new allocation tracker for given resource.
209+
fn new(name: &'static str, limit: u64) -> Self {
210+
Self {
211+
n: AtomicU64::new(0),
212+
name,
213+
limit,
214+
}
215+
}
216+
217+
/// Increase allocation by given amount.
218+
fn inc(&self, n: u64) -> Result<(), FailedAllocation> {
219+
self.n
220+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| {
221+
let new = old.checked_add(n)?;
222+
(new <= self.limit).then_some(new)
223+
})
224+
.map(|_| ())
225+
.map_err(|current| FailedAllocation {
226+
name: self.name,
227+
limit: self.limit,
228+
current,
229+
requested: n,
230+
})
231+
}
232+
}
233+
234+
/// Failed allocation error.
235+
#[derive(Debug)]
236+
struct FailedAllocation {
237+
/// Name of the allocation type/resource.
238+
name: &'static str,
239+
240+
/// Allocation limit.
241+
limit: u64,
242+
243+
/// Current allocation size.
244+
current: u64,
245+
246+
/// Requested/additional allocation.
247+
requested: u64,
248+
}
249+
250+
impl std::fmt::Display for FailedAllocation {
251+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252+
let Self {
253+
name,
254+
limit,
255+
current,
256+
requested,
257+
} = self;
258+
259+
write!(
260+
f,
261+
"{name} limit reached: limit<={limit} current=={current} requested+={requested}"
262+
)
263+
}
264+
}
265+
266+
impl From<FailedAllocation> for std::io::Error {
267+
fn from(e: FailedAllocation) -> Self {
268+
Self::new(std::io::ErrorKind::QuotaExceeded, e.to_string())
269+
}
270+
}
271+
272+
/// Limits for virtual filesystems.
273+
#[derive(Debug, Clone)]
274+
#[expect(missing_copy_implementations, reason = "allow later extensions")]
275+
pub struct VfsLimits {
276+
/// Maximum number of inodes.
277+
pub inodes: u64,
278+
279+
/// Maximum number of bytes in size.
280+
pub bytes: u64,
281+
}
282+
283+
impl Default for VfsLimits {
284+
fn default() -> Self {
285+
Self {
286+
inodes: 10_000,
287+
// 100MB
288+
bytes: 100 * 1024 * 1024,
289+
}
290+
}
291+
}
292+
293+
/// Current virtual filesystem allocation.
294+
#[derive(Debug)]
295+
struct VfsAllocation {
296+
/// Number of inodes.
297+
inodes: Allocation,
298+
299+
/// Number of bytes.
300+
bytes: Allocation,
301+
}
302+
303+
impl VfsAllocation {
304+
/// Create new empty allocation tracker.
305+
fn new(limits: &VfsLimits) -> Self {
306+
Self {
307+
inodes: Allocation::new("inodes", limits.inodes),
308+
bytes: Allocation::new("bytes", limits.bytes),
309+
}
310+
}
311+
}
312+
191313
/// State for the virtual filesystem.
192314
#[derive(Debug)]
193315
pub(crate) struct VfsState {
@@ -196,11 +318,14 @@ pub(crate) struct VfsState {
196318

197319
/// Hash key for metadata hashes.
198320
metadata_hash_key: [u8; 16],
321+
322+
/// Current allocation.
323+
allocation: VfsAllocation,
199324
}
200325

201326
impl VfsState {
202327
/// Create a new empty VFS.
203-
pub(crate) fn new() -> Self {
328+
pub(crate) fn new(limits: &VfsLimits) -> Self {
204329
Self {
205330
root: Arc::new(RwLock::new(VfsNode {
206331
kind: VfsNodeKind::Directory {
@@ -209,6 +334,7 @@ impl VfsState {
209334
parent: None,
210335
})),
211336
metadata_hash_key: rand::rng().random(),
337+
allocation: VfsAllocation::new(limits),
212338
}
213339
}
214340

@@ -228,6 +354,8 @@ impl VfsState {
228354
tar::EntryType::Regular => {
229355
let mut content = Vec::new();
230356
entry.read_to_end(&mut content)?;
357+
content.shrink_to_fit();
358+
self.allocation.bytes.inc(content.capacity() as u64)?;
231359
VfsNodeKind::File { content }
232360
}
233361
other => {
@@ -252,6 +380,17 @@ impl VfsState {
252380
VfsNode::resolve_path(Arc::clone(&self.root), Arc::clone(&self.root), path_str)
253381
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
254382

383+
let child = Arc::new(RwLock::new(VfsNode {
384+
kind,
385+
parent: Some(Arc::downgrade(&node)),
386+
}));
387+
388+
self.allocation.inodes.inc(1)?;
389+
self.allocation.bytes.inc(name.len() as u64)?;
390+
self.allocation
391+
.bytes
392+
.inc(std::mem::size_of_val(&child) as u64)?;
393+
255394
match &mut node.write().unwrap().kind {
256395
VfsNodeKind::File { .. } => {
257396
return Err(std::io::Error::new(
@@ -260,13 +399,7 @@ impl VfsState {
260399
));
261400
}
262401
VfsNodeKind::Directory { children } => {
263-
children.insert(
264-
name.to_owned(),
265-
Arc::new(RwLock::new(VfsNode {
266-
kind,
267-
parent: Some(Arc::downgrade(&node)),
268-
})),
269-
);
402+
children.insert(name.into(), child);
270403
}
271404
}
272405
}
@@ -461,7 +594,7 @@ impl<'a> filesystem::types::HostDescriptor for VfsCtxView<'a> {
461594
};
462595

463596
DirectoryEntry {
464-
name: name.clone(),
597+
name: name.as_ref().to_owned(),
465598
type_,
466599
}
467600
})

0 commit comments

Comments
 (0)