Skip to content

Commit 0c61ede

Browse files
committed
wip
Signed-off-by: Onur Satici <[email protected]>
1 parent 89b236b commit 0c61ede

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

vortex-io/src/runtime/handle.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::file::IoRequestStream;
2121
use crate::runtime::AbortHandleRef;
2222
use crate::runtime::Executor;
2323
use crate::runtime::IoTask;
24+
use crate::runtime::LocalExecutor;
2425

2526
/// A handle to an active Vortex runtime.
2627
///
@@ -45,6 +46,11 @@ impl Handle {
4546
})
4647
}
4748

49+
/// Returns a [`LocalExecutor`] if the underlying runtime supports spawning `!Send` futures.
50+
pub fn as_local_executor(&self) -> Option<Arc<dyn LocalExecutor>> {
51+
self.runtime().as_local_executor()
52+
}
53+
4854
/// Returns a handle to the current runtime, if such a reasonable choice exists.
4955
///
5056
/// For example, if called from within a Tokio context this will return a

vortex-io/src/runtime/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
//! * Tokio: work is driven on a Tokio runtime provided by the caller.
1414
//!
1515
16+
use std::future::Future;
17+
use std::pin::Pin;
18+
use std::sync::Arc;
19+
1620
use futures::future::BoxFuture;
1721
use futures::stream::BoxStream;
1822

@@ -34,6 +38,8 @@ pub mod single;
3438
mod smol;
3539
#[cfg(feature = "tokio")]
3640
pub mod tokio;
41+
#[cfg(not(target_arch = "wasm32"))]
42+
pub mod uring;
3743
#[cfg(target_arch = "wasm32")]
3844
pub mod wasm;
3945

@@ -65,8 +71,29 @@ pub(crate) trait Executor: Send + Sync {
6571
///
6672
/// Cancellation is implied by termination of the request stream.
6773
fn spawn_io(&self, task: IoTask);
74+
75+
/// Returns a [`LocalExecutor`] view if the runtime supports spawning `!Send` futures.
76+
///
77+
/// Default implementation returns `None` for runtimes that only support `Send` futures.
78+
fn as_local_executor(&self) -> Option<Arc<dyn LocalExecutor>> {
79+
None
80+
}
6881
}
6982

83+
/// Extension trait for runtimes that can build and drive `!Send` futures on a single thread.
84+
///
85+
/// The factory is `Send` so it may be sent to the runtime's thread; the produced future can be
86+
/// `!Send` because it never leaves that thread after creation.
87+
pub(crate) trait LocalExecutor: Executor {
88+
fn spawn_local(&self, f: LocalSpawn) -> AbortHandleRef;
89+
}
90+
91+
/// A boxed future that may be `!Send`.
92+
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
93+
94+
/// A boxed factory for building a local future on the target runtime thread.
95+
pub(crate) type LocalSpawn = Box<dyn FnOnce() -> LocalBoxFuture<'static, ()> + Send + 'static>;
96+
7097
/// A handle that may be used to optimistically abort a spawned task.
7198
///
7299
/// If dropped, the task should continue to completion.

vortex-io/src/runtime/uring.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Per-core runtime helpers and dispatching executor.
5+
6+
use std::sync::Arc;
7+
use std::sync::atomic::AtomicUsize;
8+
use std::sync::atomic::Ordering;
9+
10+
use futures::future::BoxFuture;
11+
use vortex_error::vortex_panic;
12+
13+
use crate::runtime::AbortHandleRef;
14+
use crate::runtime::Executor;
15+
use crate::runtime::Handle;
16+
use crate::runtime::IoTask;
17+
use crate::runtime::LocalExecutor;
18+
use crate::runtime::LocalSpawn;
19+
20+
/// An executor that dispatches work across a fixed set of underlying executors.
21+
///
22+
/// Tasks are assigned round-robin; there is no work stealing. This is intended to pair with
23+
/// per-core runtimes where each executor owns a single thread/reactor.
24+
pub(crate) struct HandleSetExecutor {
25+
executors: Arc<[Arc<dyn Executor>]>,
26+
picker: AtomicUsize,
27+
}
28+
29+
impl HandleSetExecutor {
30+
pub(crate) fn new(executors: Vec<Arc<dyn Executor>>) -> Self {
31+
assert!(!executors.is_empty());
32+
Self {
33+
executors: executors.into(),
34+
picker: AtomicUsize::new(0),
35+
}
36+
}
37+
38+
fn pick(&self) -> &Arc<dyn Executor> {
39+
let idx = self.picker.fetch_add(1, Ordering::Relaxed);
40+
// Relaxed is sufficient: we only need uniqueness, not ordering guarantees.
41+
&self.executors[idx % self.executors.len()]
42+
}
43+
}
44+
45+
/// A thin wrapper around a set of executors that produces a dispatching [`Handle`].
46+
///
47+
/// This is intended to be backed by per-core runtimes (e.g., io_uring reactors), but it can be
48+
/// constructed from any set of executors for now.
49+
pub(crate) struct HandleSet {
50+
executors: Arc<[Arc<dyn Executor>]>,
51+
dispatcher: Arc<HandleSetExecutor>,
52+
}
53+
54+
impl HandleSet {
55+
pub(crate) fn new(executors: Vec<Arc<dyn Executor>>) -> Self {
56+
let executors: Arc<[Arc<dyn Executor>]> = executors.into();
57+
let dispatcher = Arc::new(HandleSetExecutor::new(
58+
executors.iter().cloned().collect(),
59+
));
60+
Self {
61+
executors,
62+
dispatcher,
63+
}
64+
}
65+
66+
/// Returns a handle that round-robins spawned work across the underlying executors.
67+
pub(crate) fn dispatching_handle(&self) -> Handle {
68+
let exec: Arc<dyn Executor> = self.dispatcher.clone();
69+
Handle::new(Arc::downgrade(&exec))
70+
}
71+
72+
/// Access to the underlying executors, useful for building per-core pools later.
73+
pub(crate) fn executors(&self) -> &[Arc<dyn Executor>] {
74+
&self.executors
75+
}
76+
}
77+
78+
impl Executor for HandleSetExecutor {
79+
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
80+
self.pick().spawn(fut)
81+
}
82+
83+
fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
84+
self.pick().spawn_cpu(task)
85+
}
86+
87+
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
88+
self.pick().spawn_blocking(task)
89+
}
90+
91+
fn spawn_io(&self, task: IoTask) {
92+
self.pick().spawn_io(task)
93+
}
94+
95+
fn as_local_executor(&self) -> Option<Arc<dyn LocalExecutor>> {
96+
self.pick().as_local_executor()
97+
}
98+
}
99+
100+
impl LocalExecutor for HandleSetExecutor {
101+
fn spawn_local(&self, f: LocalSpawn) -> AbortHandleRef {
102+
match self.pick().as_local_executor() {
103+
Some(exec) => exec.spawn_local(f),
104+
None => vortex_panic!("LocalExecutor requested but not supported by any underlying executor"),
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)