Skip to content

Commit 3e7796d

Browse files
scheduler: Fix bug in scheduling safety task
Fixed bugs related to handling and scheduling safety task. #18 #20 #39
1 parent dd9f123 commit 3e7796d

File tree

13 files changed

+275
-60
lines changed

13 files changed

+275
-60
lines changed

src/kyron/BUILD

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,15 @@ rust_binary(
100100
visibility = ["//visibility:public"],
101101
deps = _EXAMPLE_DEPS,
102102
)
103+
104+
rust_binary(
105+
name = "safety_task",
106+
srcs = [
107+
"examples/safety_task.rs",
108+
],
109+
proc_macro_deps = [
110+
"//src/kyron-macros:runtime_macros",
111+
],
112+
visibility = ["//visibility:public"],
113+
deps = _EXAMPLE_DEPS,
114+
)

src/kyron/examples/safety_task.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
//
4+
// See the NOTICE file(s) distributed with this work for additional
5+
// information regarding copyright ownership.
6+
//
7+
// This program and the accompanying materials are made available under the
8+
// terms of the Apache License Version 2.0 which is available at
9+
// <https://www.apache.org/licenses/LICENSE-2.0>
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
14+
use kyron::prelude::*;
15+
use kyron::safety;
16+
use kyron::spawn_on_dedicated;
17+
use kyron_foundation::prelude::*;
18+
19+
async fn failing_safety_task() -> Result<(), String> {
20+
info!("Worker-N: failing_safety_task");
21+
Err("Intentional failure".to_string())
22+
}
23+
24+
async fn passing_safety_task() -> Result<(), String> {
25+
info!("Worker-N: passing_safety_task");
26+
Ok(())
27+
}
28+
29+
async fn passing_non_safety_task() -> Result<(), String> {
30+
info!("Dedicated worker (dw1): passing_non_safety_task");
31+
Ok(())
32+
}
33+
34+
fn main() {
35+
tracing_subscriber::fmt()
36+
.with_target(false) // Optional: Remove module path
37+
.with_max_level(Level::DEBUG)
38+
.with_thread_ids(true)
39+
.with_thread_names(true)
40+
.init();
41+
42+
// Create runtime
43+
let (builder, _engine_id) = kyron::runtime::RuntimeBuilder::new().with_engine(
44+
ExecutionEngineBuilder::new()
45+
.task_queue_size(256)
46+
.enable_safety_worker(ThreadParameters::default())
47+
.with_dedicated_worker("dw1".into(), ThreadParameters::default())
48+
.workers(2),
49+
);
50+
51+
let mut runtime = builder.build().unwrap();
52+
// Put programs into runtime and run them
53+
runtime.block_on(async move {
54+
let handle1 = safety::spawn(failing_safety_task());
55+
let handle2 = safety::spawn(passing_safety_task());
56+
let handle3 = spawn_on_dedicated(passing_non_safety_task(), "dw1".into());
57+
58+
info!("=============================== Spawned all tasks ===============================");
59+
60+
let _ = handle1.await;
61+
info!("Safety worker: Since safety task fails, safety worker executes parent task from this statement onwards.");
62+
let _ = handle2.await;
63+
let _ = handle3.await;
64+
65+
info!("Safety worker: Program finished running.");
66+
});
67+
68+
info!("Exit.");
69+
}

src/kyron/src/scheduler/context.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::core::types::FutureBox;
1616
use crate::futures::reusable_box_future::ReusableBoxFuture;
1717
use crate::safety::SafetyResult;
1818
use crate::scheduler::driver::Drivers;
19+
use crate::scheduler::task::async_task::TaskHeader;
1920
use ::core::future::Future;
2021
use core::cell::Cell;
2122
use core::cell::RefCell;
@@ -318,6 +319,9 @@ pub(crate) struct WorkerContext {
318319
/// Helper flag to check if safety was enabled in runtime builder
319320
is_safety_enabled: bool,
320321

322+
/// The task that is currently run by worker
323+
running_task: Cell<Option<*const TaskHeader>>,
324+
321325
wakeup_time: Cell<Option<u64>>,
322326
}
323327

@@ -399,6 +403,7 @@ impl ContextBuilder {
399403
worker_id: Cell::new(self.worker_id.expect("Worker type must be set in context builder!")),
400404
handler: RefCell::new(Some(Rc::new(self.handle.expect("Handler type must be set in context builder!")))),
401405
is_safety_enabled: self.is_with_safety,
406+
running_task: Cell::new(None),
402407
wakeup_time: Cell::new(None),
403408
drivers: Some(self.drivers),
404409
}
@@ -452,6 +457,33 @@ pub(crate) fn ctx_is_with_safety() -> bool {
452457
.unwrap_or_default()
453458
}
454459

460+
///
461+
/// Set currently running task of the worker
462+
///
463+
pub(crate) fn ctx_set_running_task(task: *const TaskHeader) {
464+
CTX.try_with(|ctx| {
465+
ctx.borrow().as_ref().expect("Called before CTX init?").running_task.set(Some(task));
466+
})
467+
.unwrap_or_default();
468+
}
469+
470+
#[allow(dead_code)]
471+
///
472+
/// Check whether there is any safety error for the running task of the worker and clear it
473+
///
474+
pub(crate) fn ctx_check_running_task_safety_error() -> bool {
475+
CTX.try_with(|ctx| {
476+
let mut binding = ctx.borrow_mut();
477+
let ctx = binding.as_mut().expect("Called before CTX init?");
478+
let mut res = false;
479+
if let Some(task) = ctx.running_task.take() {
480+
res = unsafe { (*(task as *mut TaskHeader)).check_safety_error_and_clear() };
481+
}
482+
res
483+
})
484+
.unwrap_or_default()
485+
}
486+
455487
pub(crate) fn ctx_set_wakeup_time(time: u64) {
456488
CTX.try_with(|ctx| ctx.borrow().as_ref().expect("Called before CTX init?").wakeup_time.set(Some(time)))
457489
.unwrap_or_default();

src/kyron/src/scheduler/join_handle.rs

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use kyron_foundation::prelude::*;
1414
use kyron_foundation::{not_recoverable_error, prelude::CommonErrors};
1515

16+
use crate::scheduler::context::ctx_set_running_task;
1617
use crate::{
1718
futures::{FutureInternalReturn, FutureState},
1819
TaskRef,
@@ -74,34 +75,52 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
7475
if was_set {
7576
FutureInternalReturn::default()
7677
} else {
78+
// Check whether there is safety error for the completed task
79+
let task_hdr = TaskRef::into_raw(self.for_task.clone());
80+
if unsafe { (*task_hdr).get_safety_error() } {
81+
// Change the running task to safety task to schedule the current task into safety worker
82+
ctx_set_running_task(task_hdr);
83+
waker.wake_by_ref();
84+
FutureInternalReturn::polled()
85+
} else {
86+
let mut ret: Result<T, CommonErrors> = Err(CommonErrors::NoData);
87+
let ret_as_ptr = &mut ret as *mut _;
88+
self.for_task.get_return_val(ret_as_ptr as *mut u8);
89+
90+
match ret {
91+
Ok(v) => FutureInternalReturn::ready(Ok(v)),
92+
Err(CommonErrors::OperationAborted) => FutureInternalReturn::ready(Err(CommonErrors::OperationAborted)),
93+
Err(e) => {
94+
not_recoverable_error!(with e, "There has been an error in a task that is not recoverable ({})!");
95+
}
96+
}
97+
}
98+
}
99+
}
100+
FutureState::Polled => {
101+
let waker = cx.waker();
102+
103+
// Set the waker, return values tells what have happen and took care about correct synchronization
104+
let was_set = self.for_task.set_join_handle_waker(waker.clone());
105+
106+
if was_set {
107+
FutureInternalReturn::default()
108+
} else {
109+
// Safety belows forms AqrRel so waker is really written before we do marking
77110
let mut ret: Result<T, CommonErrors> = Err(CommonErrors::NoData);
78111
let ret_as_ptr = &mut ret as *mut _;
79112
self.for_task.get_return_val(ret_as_ptr as *mut u8);
80113

81114
match ret {
82115
Ok(v) => FutureInternalReturn::ready(Ok(v)),
116+
Err(CommonErrors::NoData) => FutureInternalReturn::polled(),
83117
Err(CommonErrors::OperationAborted) => FutureInternalReturn::ready(Err(CommonErrors::OperationAborted)),
84118
Err(e) => {
85119
not_recoverable_error!(with e, "There has been an error in a task that is not recoverable ({})!");
86120
}
87121
}
88122
}
89123
}
90-
FutureState::Polled => {
91-
// Safety belows forms AqrRel so waker is really written before we do marking
92-
let mut ret: Result<T, CommonErrors> = Err(CommonErrors::NoData);
93-
let ret_as_ptr = &mut ret as *mut _;
94-
self.for_task.get_return_val(ret_as_ptr as *mut u8);
95-
96-
match ret {
97-
Ok(v) => FutureInternalReturn::ready(Ok(v)),
98-
Err(CommonErrors::NoData) => FutureInternalReturn::polled(),
99-
Err(CommonErrors::OperationAborted) => FutureInternalReturn::ready(Err(CommonErrors::OperationAborted)),
100-
Err(e) => {
101-
not_recoverable_error!(with e, "There has been an error in a task that is not recoverable ({})!");
102-
}
103-
}
104-
}
105124
FutureState::Finished => {
106125
not_recoverable_error!("Future polled after it finished!");
107126
}
@@ -130,6 +149,7 @@ mod tests {
130149
use kyron_testing::prelude::*;
131150

132151
#[test]
152+
#[cfg(not(miri))]
133153
fn test_join_handler_ready_task_get_correct_result_to_handle() {
134154
let scheduler = create_mock_scheduler();
135155

@@ -175,6 +195,7 @@ mod tests {
175195
}
176196

177197
#[test]
198+
#[cfg(not(miri))]
178199
fn test_join_handler_aborted_task_produce_handle_abort_result() {
179200
let scheduler = create_mock_scheduler();
180201

@@ -205,6 +226,7 @@ mod tests {
205226
}
206227

207228
#[test]
229+
#[cfg(not(miri))]
208230
#[should_panic]
209231
fn test_join_handler_panics_when_fetched_data_and_repolled() {
210232
let scheduler = create_mock_scheduler();
@@ -256,6 +278,40 @@ mod tests {
256278
assert_eq!(poller.poll(), ::core::task::Poll::Ready(Ok(0)));
257279
}
258280
}
281+
282+
#[test]
283+
fn test_join_handle_waker_is_set_in_polled_state_also() {
284+
let scheduler = create_mock_scheduler();
285+
286+
{
287+
// Data is present before first poll of join handle
288+
let task = ArcInternal::new(AsyncTask::new(box_future(test_function::<u32>()), 1, scheduler.clone()));
289+
290+
let handle = JoinHandle::<u32>::new(TaskRef::new(task.clone()));
291+
292+
let mut poller = TestingFuturePoller::new(handle);
293+
294+
let waker_mock1 = TrackableWaker::new();
295+
let waker1 = waker_mock1.get_waker();
296+
297+
let waker_mock2 = TrackableWaker::new();
298+
let waker2 = waker_mock2.get_waker();
299+
300+
let _ = poller.poll_with_waker(&waker1);
301+
// Now in polled state, poll again with waker2
302+
let _ = poller.poll_with_waker(&waker2);
303+
{
304+
let waker = noop_waker();
305+
let mut cx = Context::from_waker(&waker);
306+
task.poll(&mut cx); // task done
307+
}
308+
309+
assert!(!waker_mock1.was_waked());
310+
// this should be TRUE
311+
assert!(waker_mock2.was_waked());
312+
assert_eq!(poller.poll(), ::core::task::Poll::Ready(Ok(0)));
313+
}
314+
}
259315
}
260316

261317
#[cfg(test)]

src/kyron/src/scheduler/safety_waker.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,9 @@ static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, wake, wake_by_r
5757
///
5858
/// Waker will store internally a pointer to the ref counted Task.
5959
///
60-
pub(crate) unsafe fn create_safety_waker(waker: Waker) -> Waker {
61-
let raw_waker = RawWaker::new(waker.data(), &VTABLE);
62-
63-
// Forget original as we took over the ownership, so ref count
64-
::core::mem::forget(waker);
60+
pub(crate) fn create_safety_waker(ptr: TaskRef) -> Waker {
61+
let ptr = TaskRef::into_raw(ptr); // Extracts the pointer from TaskRef not decreasing it's reference count. Since we have a clone here, ref cnt was already increased
62+
let raw_waker = RawWaker::new(ptr as *const (), &VTABLE);
6563

6664
// Convert RawWaker to Waker
6765
unsafe { Waker::from_raw(raw_waker) }

0 commit comments

Comments
 (0)