Skip to content

Commit 29c8f3c

Browse files
committed
wip
Signed-off-by: Onur Satici <[email protected]>
1 parent 0c61ede commit 29c8f3c

File tree

4 files changed

+86
-6
lines changed

4 files changed

+86
-6
lines changed

vortex-io/src/runtime/handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Handle {
4040
Self { runtime }
4141
}
4242

43-
fn runtime(&self) -> Arc<dyn Executor> {
43+
pub(crate) fn runtime(&self) -> Arc<dyn Executor> {
4444
self.runtime.upgrade().unwrap_or_else(|| {
4545
vortex_panic!("Attempted to use a Handle after its runtime was dropped")
4646
})

vortex-io/src/runtime/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub mod wasm;
4747
mod tests;
4848

4949
/// Trait used to abstract over different async runtimes.
50-
pub(crate) trait Executor: Send + Sync {
50+
pub trait Executor: Send + Sync {
5151
/// Spawns a future to be executed on the runtime.
5252
///
5353
/// The future should continue to be polled in the background by the runtime.
@@ -84,7 +84,7 @@ pub(crate) trait Executor: Send + Sync {
8484
///
8585
/// The factory is `Send` so it may be sent to the runtime's thread; the produced future can be
8686
/// `!Send` because it never leaves that thread after creation.
87-
pub(crate) trait LocalExecutor: Executor {
87+
pub trait LocalExecutor: Executor {
8888
fn spawn_local(&self, f: LocalSpawn) -> AbortHandleRef;
8989
}
9090

@@ -98,11 +98,11 @@ pub(crate) type LocalSpawn = Box<dyn FnOnce() -> LocalBoxFuture<'static, ()> + S
9898
///
9999
/// If dropped, the task should continue to completion.
100100
/// If explicitly aborted, the task should be cancelled if it has not yet started executing.
101-
pub(crate) trait AbortHandle: Send + Sync {
101+
pub trait AbortHandle: Send + Sync {
102102
fn abort(self: Box<Self>);
103103
}
104104

105-
pub(crate) type AbortHandleRef = Box<dyn AbortHandle>;
105+
pub type AbortHandleRef = Box<dyn AbortHandle>;
106106

107107
/// A task for driving I/O requests against a source.
108108
///
@@ -112,7 +112,7 @@ pub(crate) type AbortHandleRef = Box<dyn AbortHandle>;
112112
///
113113
// NOTE(ngates): We could in theory make IoSource support as_any if we wanted each runtime to implement the
114114
// actual read logic themselves? Not sure yet...
115-
pub(crate) struct IoTask {
115+
pub struct IoTask {
116116
pub(crate) source: ReadSourceRef,
117117
pub(crate) stream: BoxStream<'static, IoRequest>,
118118
}

vortex-io/src/runtime/uring.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@ use std::sync::atomic::Ordering;
1010
use futures::future::BoxFuture;
1111
use vortex_error::vortex_panic;
1212

13+
use crate::runtime::blocking::BlockingRuntime;
1314
use crate::runtime::AbortHandleRef;
15+
use crate::runtime::current::CurrentThreadRuntime;
16+
use crate::runtime::current::CurrentThreadWorkerPool;
1417
use crate::runtime::Executor;
1518
use crate::runtime::Handle;
1619
use crate::runtime::IoTask;
1720
use crate::runtime::LocalExecutor;
1821
use crate::runtime::LocalSpawn;
1922

23+
#[allow(dead_code)]
2024
/// An executor that dispatches work across a fixed set of underlying executors.
2125
///
2226
/// Tasks are assigned round-robin; there is no work stealing. This is intended to pair with
@@ -26,6 +30,7 @@ pub(crate) struct HandleSetExecutor {
2630
picker: AtomicUsize,
2731
}
2832

33+
#[allow(dead_code)]
2934
impl HandleSetExecutor {
3035
pub(crate) fn new(executors: Vec<Arc<dyn Executor>>) -> Self {
3136
assert!(!executors.is_empty());
@@ -42,6 +47,7 @@ impl HandleSetExecutor {
4247
}
4348
}
4449

50+
#[allow(dead_code)]
4551
/// A thin wrapper around a set of executors that produces a dispatching [`Handle`].
4652
///
4753
/// This is intended to be backed by per-core runtimes (e.g., io_uring reactors), but it can be
@@ -51,6 +57,7 @@ pub(crate) struct HandleSet {
5157
dispatcher: Arc<HandleSetExecutor>,
5258
}
5359

60+
#[allow(dead_code)]
5461
impl HandleSet {
5562
pub(crate) fn new(executors: Vec<Arc<dyn Executor>>) -> Self {
5663
let executors: Arc<[Arc<dyn Executor>]> = executors.into();
@@ -75,6 +82,71 @@ impl HandleSet {
7582
}
7683
}
7784

85+
/// Create a [`Handle`] that dispatches work round-robin across the provided handles.
86+
///
87+
/// This is useful for thread-per-core runtimes where each handle is tied to a single reactor.
88+
pub fn dispatching_handle(handles: &[Handle]) -> Handle {
89+
let executors = handles
90+
.iter()
91+
.map(|h| h.runtime())
92+
.collect::<Vec<_>>();
93+
let set = HandleSet::new(executors);
94+
set.dispatching_handle()
95+
}
96+
97+
/// A lightweight per-core pool using current-thread runtimes and background workers.
98+
///
99+
/// This is a stopgap until a true io_uring-backed runtime is wired in. Each core owns its own
100+
/// executor driven by a single worker thread, and the exposed handle dispatches round-robin
101+
/// across them.
102+
#[allow(dead_code)]
103+
pub struct PerCoreRuntimePool {
104+
cores: Vec<CurrentThreadCore>,
105+
handle: Handle,
106+
}
107+
108+
#[allow(dead_code)]
109+
impl PerCoreRuntimePool {
110+
/// Build a pool with `cores` runtimes (defaults to available_parallelism if None).
111+
pub fn new(cores: Option<usize>) -> Self {
112+
let core_count = cores
113+
.or_else(|| std::thread::available_parallelism().ok().map(|n| n.get()))
114+
.unwrap_or(1);
115+
116+
let cores: Vec<_> = (0..core_count).map(|_| CurrentThreadCore::new()).collect();
117+
let handles: Vec<_> = cores.iter().map(|c| c.handle()).collect();
118+
let handle = dispatching_handle(&handles);
119+
120+
Self { cores, handle }
121+
}
122+
123+
/// A handle that spreads work across the per-core runtimes.
124+
pub fn handle(&self) -> Handle {
125+
self.handle.clone()
126+
}
127+
}
128+
129+
struct CurrentThreadCore {
130+
runtime: CurrentThreadRuntime,
131+
_pool: CurrentThreadWorkerPool,
132+
}
133+
134+
impl CurrentThreadCore {
135+
fn new() -> Self {
136+
let runtime = CurrentThreadRuntime::new();
137+
let pool = runtime.new_pool();
138+
pool.set_workers(1);
139+
Self {
140+
runtime,
141+
_pool: pool,
142+
}
143+
}
144+
145+
fn handle(&self) -> Handle {
146+
self.runtime.handle()
147+
}
148+
}
149+
78150
impl Executor for HandleSetExecutor {
79151
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
80152
self.pick().spawn(fut)

vortex-io/src/session.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,13 @@ pub trait RuntimeSessionExt: SessionExt {
5252
self.get_mut::<RuntimeSession>().handle = Some(handle);
5353
self
5454
}
55+
56+
/// Configure the runtime session to use a dispatching handle that round-robins across the
57+
/// provided handles. Useful for thread-per-core runtimes.
58+
fn with_dispatching_handles(self, handles: Vec<Handle>) -> Self {
59+
self.get_mut::<RuntimeSession>().handle =
60+
Some(crate::runtime::uring::dispatching_handle(&handles));
61+
self
62+
}
5563
}
5664
impl<S: SessionExt> RuntimeSessionExt for S {}

0 commit comments

Comments
 (0)