Skip to content

Commit 69379a1

Browse files
authored
executor: add ffi for spawning (#48283)
1 parent 6b1f42f commit 69379a1

File tree

5 files changed

+78
-32
lines changed

5 files changed

+78
-32
lines changed

src/core/executor.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,41 @@
1717
#include "executor.h"
1818

1919
Executor::Executor(int tasksMax) :
20-
inner_(ffi::executor_create(tasksMax))
20+
inner_(ffi::executor_create(tasksMax))
2121
{
2222
}
2323

2424
Executor::~Executor()
2525
{
26-
ffi::executor_destroy(inner_);
26+
ffi::executor_destroy(inner_);
2727
}
2828

2929
int Executor::park_cb(void *ctx, int ms)
3030
{
31-
std::function<bool (std::optional<int>)> *park = (std::function<bool (std::optional<int>)> *)ctx;
31+
std::function<bool (std::optional<int>)> *park = (std::function<bool (std::optional<int>)> *)ctx;
3232

33-
std::optional<int> x;
34-
if(ms >= 0)
35-
x = ms;
33+
std::optional<int> x;
34+
if(ms >= 0)
35+
x = ms;
3636

37-
if(!(*park)(x))
38-
return -1;
37+
if(!(*park)(x))
38+
return -1;
3939

40-
return 0;
40+
return 0;
4141
}
4242

4343
bool Executor::run(std::function<bool (std::optional<int>)> park)
4444
{
45-
if(ffi::executor_run(inner_, park_cb, &park) != 0)
46-
return false;
45+
if(ffi::executor_run(inner_, park_cb, &park) != 0)
46+
return false;
4747

48-
return true;
48+
return true;
49+
}
50+
51+
bool Executor::currentSpawn(ffi::UnitFuture *fut)
52+
{
53+
if(ffi::executor_current_spawn(fut) != 0)
54+
return false;
55+
56+
return true;
4957
}

src/core/executor.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,21 @@
2424
class Executor
2525
{
2626
public:
27-
Executor(int tasksMax);
28-
~Executor();
27+
Executor(int tasksMax);
28+
~Executor();
2929

30-
bool run(std::function<bool (std::optional<int>)> park);
30+
bool run(std::function<bool (std::optional<int>)> park);
31+
32+
/// Spawns `fut` on the executor in the current thread. Returns true on
33+
/// success or false on error. An error can occur if there is no executor in
34+
/// the current thread or if the executor is at capacity. This function takes
35+
/// ownership of `fut` regardless of whether spawning is successful.
36+
static bool currentSpawn(ffi::UnitFuture *fut);
3137

3238
private:
33-
ffi::Executor *inner_;
39+
ffi::Executor *inner_;
3440

35-
static int park_cb(void *ctx, int ms);
41+
static int park_cb(void *ctx, int ms);
3642
};
3743

3844
#endif

src/core/executor.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
use crate::core::future::SizedFuture;
1718
use crate::core::list;
1819
use crate::core::waker;
1920
use log::debug;
@@ -69,7 +70,7 @@ fn poll_fut(fut: &mut BoxFuture, waker: Waker) -> bool {
6970
}
7071

7172
struct Task {
72-
fut: Option<Pin<Box<dyn Future<Output = ()>>>>,
73+
fut: Option<BoxFuture>,
7374
wakeable: bool,
7475
low: bool,
7576
}
@@ -124,10 +125,12 @@ impl Tasks {
124125
!self.data.borrow().next.is_empty() || !self.data.borrow().next_low.is_empty()
125126
}
126127

127-
fn add<F>(&self, fut: F) -> Result<(), ()>
128+
fn add<T>(&self, get_fut: T, size: usize) -> Result<(), ()>
128129
where
129-
F: Future<Output = ()> + 'static,
130+
T: FnOnce() -> BoxFuture,
130131
{
132+
debug!("spawning future with size {size}");
133+
131134
let data = &mut *self.data.borrow_mut();
132135

133136
if data.nodes.len() == data.nodes.capacity() {
@@ -138,7 +141,7 @@ impl Tasks {
138141
let nkey = entry.key();
139142

140143
let task = Task {
141-
fut: Some(Box::pin(fut)),
144+
fut: Some(get_fut()),
142145
wakeable: false,
143146
low: false,
144147
};
@@ -340,9 +343,15 @@ impl Executor {
340343
where
341344
F: Future<Output = ()> + 'static,
342345
{
343-
debug!("spawning future with size {}", mem::size_of::<F>());
346+
self.tasks.add(move || Box::pin(fut), mem::size_of::<F>())
347+
}
348+
349+
#[allow(clippy::result_unit_err)]
350+
pub fn spawn_boxed(&self, fut: Pin<Box<dyn SizedFuture<Output = ()>>>) -> Result<(), ()> {
351+
let size = (*fut).size();
352+
let fut = fut.into_future();
344353

345-
self.tasks.add(fut)
354+
self.tasks.add(move || fut, size)
346355
}
347356

348357
pub fn set_pre_poll<F>(&self, pre_poll_fn: F)
@@ -473,6 +482,7 @@ impl Spawner {
473482

474483
mod ffi {
475484
use super::*;
485+
use crate::core::future::ffi::UnitFuture;
476486
use std::ffi::c_int;
477487

478488
#[no_mangle]
@@ -514,6 +524,28 @@ mod ffi {
514524

515525
0
516526
}
527+
528+
/// Spawns `fut` on the executor in the current thread. Returns 0 on
529+
/// success or non-zero on error. An error can occur if there is no
530+
/// executor in the current thread or if the executor is at capacity.
531+
/// This function takes ownership of `fut` regardless of whether spawning
532+
/// is successful.
533+
///
534+
/// SAFETY: `fut` must point to a valid `UnitFuture`.
535+
#[no_mangle]
536+
pub unsafe extern "C" fn executor_current_spawn(fut: *mut UnitFuture) -> c_int {
537+
let Some(executor) = Executor::current() else {
538+
return -1;
539+
};
540+
541+
let fut = Box::from_raw(fut).0;
542+
543+
if executor.spawn_boxed(fut).is_err() {
544+
return -1;
545+
}
546+
547+
0
548+
}
517549
}
518550

519551
#[cfg(test)]

src/core/reactor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@
1717
#include "reactor.h"
1818

1919
Reactor::Reactor(int registrationsMax) :
20-
inner_(ffi::reactor_create(registrationsMax))
20+
inner_(ffi::reactor_create(registrationsMax))
2121
{
2222
}
2323

2424
Reactor::~Reactor()
2525
{
26-
ffi::reactor_destroy(inner_);
26+
ffi::reactor_destroy(inner_);
2727
}
2828

2929
bool Reactor::poll(std::optional<int> ms)
3030
{
31-
if(ffi::reactor_poll(inner_, ms.value_or(-1)) != 0)
32-
return false;
31+
if(ffi::reactor_poll(inner_, ms.value_or(-1)) != 0)
32+
return false;
3333

34-
return true;
34+
return true;
3535
}

src/core/reactor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
class Reactor
2424
{
2525
public:
26-
Reactor(int registrationsMax);
27-
~Reactor();
26+
Reactor(int registrationsMax);
27+
~Reactor();
2828

29-
bool poll(std::optional<int> ms);
29+
bool poll(std::optional<int> ms);
3030

3131
private:
32-
ffi::Reactor *inner_;
32+
ffi::Reactor *inner_;
3333
};
3434

3535
#endif

0 commit comments

Comments
 (0)