Skip to content

Commit 9bf9f31

Browse files
authored
Merge pull request #143 from influxdata/crepererum/vfs-limits
feat: virtual filesystem limits
2 parents 7dc25c4 + 47e1fa5 commit 9bf9f31

File tree

3 files changed

+206
-16
lines changed

3 files changed

+206
-16
lines changed

host/src/lib.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
http::{HttpRequestValidator, RejectAllHttpRequests},
2727
linker::link,
2828
tokio_helpers::async_in_sync_context,
29-
vfs::{VfsCtxView, VfsState, VfsView},
29+
vfs::{VfsCtxView, VfsLimits, VfsState, VfsView},
3030
};
3131

3232
// unused-crate-dependencies false positives
@@ -44,7 +44,7 @@ pub mod http;
4444
mod linker;
4545
mod tokio_helpers;
4646
pub mod udf_query;
47-
mod vfs;
47+
pub mod vfs;
4848

4949
/// State of the WASM payload.
5050
struct WasmStateImpl {
@@ -206,6 +206,9 @@ impl std::fmt::Debug for WasmComponentPrecompiled {
206206
pub struct WasmPermissions {
207207
/// Validator for HTTP requests.
208208
http: Arc<dyn HttpRequestValidator>,
209+
210+
/// Virtual file system limits.
211+
vfs: VfsLimits,
209212
}
210213

211214
impl WasmPermissions {
@@ -219,6 +222,7 @@ impl Default for WasmPermissions {
219222
fn default() -> Self {
220223
Self {
221224
http: Arc::new(RejectAllHttpRequests),
225+
vfs: VfsLimits::default(),
222226
}
223227
}
224228
}
@@ -231,6 +235,15 @@ impl WasmPermissions {
231235
{
232236
Self {
233237
http: Arc::new(http),
238+
..self
239+
}
240+
}
241+
242+
/// Set virtual filesystem limits.
243+
pub fn with_vfs_limits(self, limits: VfsLimits) -> Self {
244+
Self {
245+
vfs: limits,
246+
..self
234247
}
235248
}
236249
}
@@ -275,7 +288,7 @@ impl WasmScalarUdf {
275288
let WasmComponentPrecompiled { engine, component } = component;
276289

277290
// Create in-memory VFS
278-
let vfs_state = VfsState::new();
291+
let vfs_state = VfsState::new(&permissions.vfs);
279292

280293
let stderr = MemoryOutputPipe::new(1024);
281294
let wasi_ctx = WasiCtx::builder().stderr(stderr.clone()).build();

host/src/vfs.rs

Lines changed: 145 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! This provides a very crude, read-only in-mem virtual file system for the WASM guests.
44
//!
5-
//! The data gets [populated via a TAR container](VfsState::populate_from_tar).
5+
//! The data gets populated via a TAR container.
66
//!
77
//! While this implementation has rather limited functionality, it is sufficient to get a Python guest interpreter
88
//! running.
@@ -11,7 +11,10 @@ use std::{
1111
collections::HashMap,
1212
hash::Hash,
1313
io::{Cursor, Read},
14-
sync::{Arc, RwLock, Weak},
14+
sync::{
15+
Arc, RwLock, Weak,
16+
atomic::{AtomicU64, Ordering},
17+
},
1518
};
1619

1720
use rand::Rng;
@@ -58,7 +61,7 @@ enum VfsNodeKind {
5861
/// A directory containing child nodes.
5962
Directory {
6063
/// Child nodes indexed by name.
61-
children: HashMap<String, SharedVfsNode>,
64+
children: HashMap<Box<str>, SharedVfsNode>,
6265
},
6366
}
6467

@@ -188,6 +191,125 @@ impl VfsNode {
188191
}
189192
}
190193

194+
/// Tracked allocation of some resource.
195+
#[derive(Debug)]
196+
struct Allocation {
197+
/// Current amount of allocation.
198+
n: AtomicU64,
199+
200+
/// Name of the resource.
201+
name: &'static str,
202+
203+
/// Allocation limit.
204+
limit: u64,
205+
}
206+
207+
impl Allocation {
208+
/// Create new allocation tracker for given resource.
209+
fn new(name: &'static str, limit: u64) -> Self {
210+
Self {
211+
n: AtomicU64::new(0),
212+
name,
213+
limit,
214+
}
215+
}
216+
217+
/// Increase allocation by given amount.
218+
fn inc(&self, n: u64) -> Result<(), FailedAllocation> {
219+
self.n
220+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| {
221+
let new = old.checked_add(n)?;
222+
(new <= self.limit).then_some(new)
223+
})
224+
.map(|_| ())
225+
.map_err(|current| FailedAllocation {
226+
name: self.name,
227+
limit: self.limit,
228+
current,
229+
requested: n,
230+
})
231+
}
232+
}
233+
234+
/// Failed allocation error.
235+
#[derive(Debug)]
236+
struct FailedAllocation {
237+
/// Name of the allocation type/resource.
238+
name: &'static str,
239+
240+
/// Allocation limit.
241+
limit: u64,
242+
243+
/// Current allocation size.
244+
current: u64,
245+
246+
/// Requested/additional allocation.
247+
requested: u64,
248+
}
249+
250+
impl std::fmt::Display for FailedAllocation {
251+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252+
let Self {
253+
name,
254+
limit,
255+
current,
256+
requested,
257+
} = self;
258+
259+
write!(
260+
f,
261+
"{name} limit reached: limit<={limit} current=={current} requested+={requested}"
262+
)
263+
}
264+
}
265+
266+
impl From<FailedAllocation> for std::io::Error {
267+
fn from(e: FailedAllocation) -> Self {
268+
Self::new(std::io::ErrorKind::QuotaExceeded, e.to_string())
269+
}
270+
}
271+
272+
/// Limits for virtual filesystems.
273+
#[derive(Debug, Clone)]
274+
#[expect(missing_copy_implementations, reason = "allow later extensions")]
275+
pub struct VfsLimits {
276+
/// Maximum number of inodes.
277+
pub inodes: u64,
278+
279+
/// Maximum number of bytes in size.
280+
pub bytes: u64,
281+
}
282+
283+
impl Default for VfsLimits {
284+
fn default() -> Self {
285+
Self {
286+
inodes: 10_000,
287+
// 100MB
288+
bytes: 100 * 1024 * 1024,
289+
}
290+
}
291+
}
292+
293+
/// Current virtual filesystem allocation.
294+
#[derive(Debug)]
295+
struct VfsAllocation {
296+
/// Number of inodes.
297+
inodes: Allocation,
298+
299+
/// Number of bytes.
300+
bytes: Allocation,
301+
}
302+
303+
impl VfsAllocation {
304+
/// Create new empty allocation tracker.
305+
fn new(limits: &VfsLimits) -> Self {
306+
Self {
307+
inodes: Allocation::new("inodes", limits.inodes),
308+
bytes: Allocation::new("bytes", limits.bytes),
309+
}
310+
}
311+
}
312+
191313
/// State for the virtual filesystem.
192314
#[derive(Debug)]
193315
pub(crate) struct VfsState {
@@ -196,11 +318,14 @@ pub(crate) struct VfsState {
196318

197319
/// Hash key for metadata hashes.
198320
metadata_hash_key: [u8; 16],
321+
322+
/// Current allocation.
323+
allocation: VfsAllocation,
199324
}
200325

201326
impl VfsState {
202327
/// Create a new empty VFS.
203-
pub(crate) fn new() -> Self {
328+
pub(crate) fn new(limits: &VfsLimits) -> Self {
204329
Self {
205330
root: Arc::new(RwLock::new(VfsNode {
206331
kind: VfsNodeKind::Directory {
@@ -209,6 +334,7 @@ impl VfsState {
209334
parent: None,
210335
})),
211336
metadata_hash_key: rand::rng().random(),
337+
allocation: VfsAllocation::new(limits),
212338
}
213339
}
214340

@@ -228,6 +354,8 @@ impl VfsState {
228354
tar::EntryType::Regular => {
229355
let mut content = Vec::new();
230356
entry.read_to_end(&mut content)?;
357+
content.shrink_to_fit();
358+
self.allocation.bytes.inc(content.capacity() as u64)?;
231359
VfsNodeKind::File { content }
232360
}
233361
other => {
@@ -252,6 +380,17 @@ impl VfsState {
252380
VfsNode::resolve_path(Arc::clone(&self.root), Arc::clone(&self.root), path_str)
253381
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
254382

383+
let child = Arc::new(RwLock::new(VfsNode {
384+
kind,
385+
parent: Some(Arc::downgrade(&node)),
386+
}));
387+
388+
self.allocation.inodes.inc(1)?;
389+
self.allocation.bytes.inc(name.len() as u64)?;
390+
self.allocation
391+
.bytes
392+
.inc(std::mem::size_of_val(&child) as u64)?;
393+
255394
match &mut node.write().unwrap().kind {
256395
VfsNodeKind::File { .. } => {
257396
return Err(std::io::Error::new(
@@ -260,13 +399,7 @@ impl VfsState {
260399
));
261400
}
262401
VfsNodeKind::Directory { children } => {
263-
children.insert(
264-
name.to_owned(),
265-
Arc::new(RwLock::new(VfsNode {
266-
kind,
267-
parent: Some(Arc::downgrade(&node)),
268-
})),
269-
);
402+
children.insert(name.into(), child);
270403
}
271404
}
272405
}
@@ -461,7 +594,7 @@ impl<'a> filesystem::types::HostDescriptor for VfsCtxView<'a> {
461594
};
462595

463596
DirectoryEntry {
464-
name: name.clone(),
597+
name: name.as_ref().to_owned(),
465598
type_,
466599
}
467600
})

host/tests/integration_tests/python/runtime/fs.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use arrow::{
55
datatypes::{DataType, Field},
66
};
77
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
8+
use datafusion_udf_wasm_host::{WasmPermissions, WasmScalarUdf, vfs::VfsLimits};
89

910
use crate::integration_tests::{
10-
python::test_utils::python_scalar_udf, test_utils::ColumnarValueExt,
11+
python::test_utils::{python_component, python_scalar_udf},
12+
test_utils::ColumnarValueExt,
1113
};
1214

1315
#[tokio::test(flavor = "multi_thread")]
@@ -225,3 +227,45 @@ def write(path: str) -> str:
225227
as &dyn Array,
226228
);
227229
}
230+
231+
#[tokio::test(flavor = "multi_thread")]
232+
async fn test_limit_inodes() {
233+
let component = python_component().await;
234+
235+
// since the VFS is immutable, we have to use the limit that is too small for the root FS
236+
let err = WasmScalarUdf::new(
237+
component,
238+
&WasmPermissions::new().with_vfs_limits(VfsLimits {
239+
inodes: 42,
240+
..Default::default()
241+
}),
242+
"".to_owned(),
243+
)
244+
.await
245+
.unwrap_err();
246+
247+
insta::assert_snapshot!(
248+
err,
249+
@"IO error: inodes limit reached: limit<=42 current==42 requested+=1");
250+
}
251+
252+
#[tokio::test(flavor = "multi_thread")]
253+
async fn test_limit_bytes() {
254+
let component = python_component().await;
255+
256+
// since the VFS is immutable, we have to use the limit that is too small for the root FS
257+
let err = WasmScalarUdf::new(
258+
component,
259+
&WasmPermissions::new().with_vfs_limits(VfsLimits {
260+
bytes: 1337,
261+
..Default::default()
262+
}),
263+
"".to_owned(),
264+
)
265+
.await
266+
.unwrap_err();
267+
268+
insta::assert_snapshot!(
269+
err,
270+
@"IO error: bytes limit reached: limit<=1337 current==1021 requested+=12355");
271+
}

0 commit comments

Comments
 (0)