Skip to content

Commit c348284

Browse files
authored
Merge pull request #56 from qorix-group/prabakaran_add_task_context
runtime: Add TaskContext to get ids of task, engine and worker
2 parents ed312bd + 00c6c96 commit c348284

File tree

15 files changed

+432
-159
lines changed

15 files changed

+432
-159
lines changed

src/kyron/examples/safety_task.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//
2+
// Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
//
4+
// See the NOTICE file(s) distributed with this work for additional
5+
// information regarding copyright ownership.
6+
//
7+
// This program and the accompanying materials are made available under the
8+
// terms of the Apache License Version 2.0 which is available at
9+
// <https://www.apache.org/licenses/LICENSE-2.0>
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
14+
use kyron::prelude::*;
15+
use kyron::safety;
16+
use kyron::scheduler::task::task_context::TaskContext;
17+
use kyron::spawn_on_dedicated;
18+
use kyron_foundation::prelude::*;
19+
20+
async fn failing_safety_task() -> Result<(), String> {
21+
info!(
22+
"Worker-N: failing_safety_task. Worker ID: {:?}, Task ID: {:?}",
23+
TaskContext::worker_id(),
24+
TaskContext::task_id().unwrap()
25+
);
26+
Err("Intentional failure".to_string())
27+
}
28+
29+
async fn passing_safety_task() -> Result<(), String> {
30+
info!(
31+
"Worker-N: passing_safety_task. Worker ID: {:?}, Task ID: {:?}",
32+
TaskContext::worker_id(),
33+
TaskContext::task_id().unwrap()
34+
);
35+
Ok(())
36+
}
37+
38+
async fn passing_non_safety_task() -> Result<(), String> {
39+
info!(
40+
"Dedicated worker (dw1): passing_non_safety_task. Worker ID: {:?}, Task ID: {:?}",
41+
TaskContext::worker_id(),
42+
TaskContext::task_id().unwrap()
43+
);
44+
Ok(())
45+
}
46+
47+
fn main() {
48+
tracing_subscriber::fmt()
49+
.with_target(false) // Optional: Remove module path
50+
.with_max_level(Level::DEBUG)
51+
.with_thread_ids(true)
52+
.with_thread_names(true)
53+
.init();
54+
55+
// Create runtime
56+
let (builder, _engine_id) = kyron::runtime::RuntimeBuilder::new().with_engine(
57+
ExecutionEngineBuilder::new()
58+
.task_queue_size(256)
59+
.enable_safety_worker(ThreadParameters::default())
60+
.with_dedicated_worker("dw1".into(), ThreadParameters::default())
61+
.workers(2),
62+
);
63+
64+
let mut runtime = builder.build().unwrap();
65+
// Put programs into runtime and run them
66+
runtime.block_on(async move {
67+
info!(
68+
"Parent task. Worker ID: {:?}, Task Id: {:?}",
69+
TaskContext::worker_id(),
70+
TaskContext::task_id().unwrap()
71+
);
72+
let handle1 = safety::spawn(failing_safety_task());
73+
let handle2 = safety::spawn(passing_safety_task());
74+
let handle3 = spawn_on_dedicated(passing_non_safety_task(), "dw1".into());
75+
76+
info!("=============================== Spawned all tasks ===============================");
77+
78+
let _ = handle1.await;
79+
info!("Since safety task fails, safety worker may execute parent task from this statement onwards.");
80+
81+
info!(
82+
"Parent task. Worker ID: {:?}, Task Id: {:?}",
83+
TaskContext::worker_id(),
84+
TaskContext::task_id().unwrap()
85+
);
86+
let _ = handle2.await;
87+
let _ = handle3.await;
88+
89+
info!("Program finished running.");
90+
});
91+
92+
info!("Exit.");
93+
}

src/kyron/src/core/types.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
// SPDX-License-Identifier: Apache-2.0
1212
//
1313

14-
use ::core::cell::Cell;
1514
use ::core::future::Future;
1615
use ::core::pin::Pin;
16+
use ::core::sync::atomic::AtomicU64;
1717
use std::sync::Arc;
1818

19+
use crate::scheduler::workers::worker_types::WorkerId;
20+
1921
// Used to Box Futures
2022
pub(crate) type BoxCustom<T> = Box<T>; // TODO: We shall replace Global allocator with our own. Since Allocator API is not stable, we shall provide own Box impl (only for internal purpose handling)
2123

@@ -30,27 +32,30 @@ pub(crate) type BoxInternal<T> = Box<T>; // TODO: Use mempool allocator, for now
3032
pub(crate) type ArcInternal<T> = Arc<T>; // TODO: Use mempool allocator, for now we keep default impl
3133

3234
///
33-
/// TaskId encodes the worker on which it was created and it's number local to the worker.
35+
/// TaskId encodes the worker on which it was created and it is global to the process.
3436
/// This id cannot be used to infer task order creation or anything like that, it's only for identification purpose.
3537
///
3638
#[derive(Copy, Clone, Debug)]
37-
pub(crate) struct TaskId(pub(crate) u32);
38-
39-
thread_local! {
40-
static TASK_COUNTER: Cell<u32> = const {Cell::new(0)};
41-
}
39+
pub struct TaskId(pub(crate) u64);
4240

43-
#[allow(dead_code)]
4441
impl TaskId {
45-
pub(crate) fn new(worker_id: u8) -> Self {
46-
let val = (TASK_COUNTER.get()) % 0x00FFFFFF; //TODO: Fix it later or change algo
47-
TASK_COUNTER.set(val + 1);
42+
pub(crate) fn new(worker_id: &WorkerId) -> Self {
43+
let engine_id = worker_id.engine_id();
44+
let worker_id = worker_id.worker_id();
45+
static TASK_COUNTER: AtomicU64 = const { AtomicU64::new(0) };
46+
// Just increment the global counter, it wraps around on overflow. Only lower 48 bits are used for the TaskId.
47+
let val = TASK_COUNTER.fetch_add(1, ::core::sync::atomic::Ordering::Relaxed);
48+
Self((val << 16) | ((engine_id as u64) << 8) | worker_id as u64)
49+
}
4850

49-
Self((val << 8) | worker_id as u32)
51+
/// Get the worker id that created this task
52+
pub fn worker(&self) -> u8 {
53+
(self.0 & 0xFF_u64) as u8
5054
}
5155

52-
pub(crate) fn worker(&self) -> u8 {
53-
(self.0 & 0xFF_u32) as u8
56+
/// Get the engine id that created this task
57+
pub fn engine(&self) -> u8 {
58+
((self.0 >> 8) & 0xFF_u64) as u8
5459
}
5560
}
5661

src/kyron/src/runtime/runtime_impl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl RuntimeBuilder {
4949
///
5050
pub fn with_engine(mut self, builder: ExecutionEngineBuilder) -> (Self, usize) {
5151
let id = self.next_id;
52+
let builder = builder.set_engine_id(id as u8);
5253
self.engine_builders.push(builder);
5354
self.next_id += 1;
5455
(self, id)

src/kyron/src/scheduler/context.rs

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -165,26 +165,26 @@ impl Handler {
165165
fn internal<T>(
166166
&self,
167167
boxed: FutureBox<T>,
168-
c: impl Fn(FutureBox<T>, u8, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, Arc<AsyncScheduler>>>,
168+
c: impl Fn(FutureBox<T>, &WorkerId, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, Arc<AsyncScheduler>>>,
169169
) -> JoinHandle<T>
170170
where
171171
T: Send + 'static,
172172
{
173173
let task_ref;
174174
let handle;
175175

176-
let worker_id = ctx_get_worker_id().worker_id();
176+
let worker_id = ctx_get_worker_id();
177177

178178
match self.inner {
179179
HandlerImpl::Async(ref async_inner) => {
180-
let task = c(boxed, worker_id, async_inner.scheduler.clone());
180+
let task = c(boxed, &worker_id, async_inner.scheduler.clone());
181181
task_ref = TaskRef::new(task.clone());
182182
handle = JoinHandle::new(task_ref.clone());
183183

184184
async_inner.scheduler.spawn_from_runtime(task_ref, &async_inner.prod_con);
185185
}
186186
HandlerImpl::Dedicated(ref dedicated_inner) => {
187-
let task = c(boxed, worker_id, dedicated_inner.scheduler.clone());
187+
let task = c(boxed, &worker_id, dedicated_inner.scheduler.clone());
188188
task_ref = TaskRef::new(task.clone());
189189
handle = JoinHandle::new(task_ref.clone());
190190

@@ -198,26 +198,26 @@ impl Handler {
198198
fn reusable_safety_internal<T>(
199199
&self,
200200
reusable: ReusableBoxFuture<T>,
201-
c: impl Fn(Pin<ReusableBoxFuture<T>>, u8, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, Arc<AsyncScheduler>>>,
201+
c: impl Fn(Pin<ReusableBoxFuture<T>>, &WorkerId, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, Arc<AsyncScheduler>>>,
202202
) -> JoinHandle<T>
203203
where
204204
T: Send + 'static,
205205
{
206206
let task_ref;
207207
let handle;
208208

209-
let worker_id = ctx_get_worker_id().worker_id();
209+
let worker_id = ctx_get_worker_id();
210210

211211
match self.inner {
212212
HandlerImpl::Async(ref async_inner) => {
213-
let task = c(reusable.into_pin(), worker_id, async_inner.scheduler.clone());
213+
let task = c(reusable.into_pin(), &worker_id, async_inner.scheduler.clone());
214214
task_ref = TaskRef::new(task.clone());
215215
handle = JoinHandle::new(task_ref.clone());
216216

217217
async_inner.scheduler.spawn_from_runtime(task_ref, &async_inner.prod_con);
218218
}
219219
HandlerImpl::Dedicated(ref dedicated_inner) => {
220-
let task = c(reusable.into_pin(), worker_id, dedicated_inner.scheduler.clone());
220+
let task = c(reusable.into_pin(), &worker_id, dedicated_inner.scheduler.clone());
221221
task_ref = TaskRef::new(task.clone());
222222
handle = JoinHandle::new(task_ref.clone());
223223

@@ -232,7 +232,11 @@ impl Handler {
232232
&self,
233233
boxed: FutureBox<T>,
234234
worker_id: UniqueWorkerId,
235-
c: impl Fn(FutureBox<T>, u8, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, DedicatedSchedulerLocal>>,
235+
c: impl Fn(
236+
FutureBox<T>,
237+
&WorkerId,
238+
DedicatedSchedulerLocal,
239+
) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, DedicatedSchedulerLocal>>,
236240
) -> JoinHandle<T>
237241
where
238242
T: Send + 'static,
@@ -242,11 +246,7 @@ impl Handler {
242246
HandlerImpl::Dedicated(ref dedicated_inner) => &dedicated_inner.dedicated_scheduler,
243247
};
244248

245-
let task = c(
246-
boxed,
247-
ctx_get_worker_id().worker_id(),
248-
DedicatedSchedulerLocal::new(worker_id, scheduler.clone()),
249-
);
249+
let task = c(boxed, &ctx_get_worker_id(), DedicatedSchedulerLocal::new(worker_id, scheduler.clone()));
250250

251251
let task_ref = TaskRef::new(task.clone());
252252
let handle = JoinHandle::new(task_ref.clone());
@@ -261,7 +261,7 @@ impl Handler {
261261
&self,
262262
reusable: ReusableBoxFuture<T>,
263263
worker_id: UniqueWorkerId,
264-
c: impl Fn(Pin<ReusableBoxFuture<T>>, u8, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, DedicatedSchedulerLocal>>,
264+
c: impl Fn(Pin<ReusableBoxFuture<T>>, &WorkerId, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, DedicatedSchedulerLocal>>,
265265
) -> JoinHandle<T>
266266
where
267267
T: Send + 'static,
@@ -273,7 +273,7 @@ impl Handler {
273273

274274
let task = c(
275275
reusable.into_pin(),
276-
ctx_get_worker_id().worker_id(),
276+
&ctx_get_worker_id(),
277277
DedicatedSchedulerLocal::new(worker_id, scheduler.clone()),
278278
);
279279

@@ -303,9 +303,8 @@ impl Handler {
303303
/// This is an entry point for public API that is filled by each worker once it's created
304304
///
305305
pub(crate) struct WorkerContext {
306-
#[allow(dead_code)] // used in the tests
307-
/// The ID of task that is currently run by worker
308-
running_task_id: Cell<Option<TaskId>>,
306+
/// Task that is currently run by worker
307+
running_task: RefCell<Option<TaskRef>>,
309308

310309
/// WorkerID and EngineID
311310
worker_id: Cell<WorkerId>,
@@ -395,7 +394,7 @@ impl ContextBuilder {
395394

396395
pub(crate) fn build(self) -> WorkerContext {
397396
WorkerContext {
398-
running_task_id: Cell::new(None),
397+
running_task: RefCell::new(None),
399398
worker_id: Cell::new(self.worker_id.expect("Worker type must be set in context builder!")),
400399
handler: RefCell::new(Some(Rc::new(self.handle.expect("Handler type must be set in context builder!")))),
401400
is_safety_enabled: self.is_with_safety,
@@ -482,43 +481,51 @@ pub(crate) fn ctx_get_drivers() -> Drivers {
482481
.unwrap()
483482
}
484483

485-
#[cfg(test)]
486-
mod tests {
487-
use super::*;
484+
///
485+
/// Sets currently running `task`
486+
///
487+
pub(super) fn ctx_set_running_task(task: TaskRef) {
488+
let _ = CTX
489+
.try_with(|ctx| {
490+
ctx.borrow().as_ref().expect("Called before CTX init?").running_task.replace(Some(task));
491+
})
492+
.map_err(|e| {
493+
panic!("Something is really bad here, error {}!", e);
494+
});
495+
}
488496

489-
///
490-
/// Sets currently running `task id`
491-
///
492-
pub(crate) fn ctx_set_running_task_id(id: TaskId) {
493-
let _ = CTX
494-
.try_with(|ctx| {
495-
ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.replace(Some(id));
496-
})
497-
.map_err(|e| {
498-
panic!("Something is really bad here, error {}!", e);
499-
});
500-
}
497+
///
498+
/// Clears currently running `task`
499+
///
500+
pub(super) fn ctx_unset_running_task() {
501+
let _ = CTX
502+
.try_with(|ctx| {
503+
ctx.borrow().as_ref().expect("Called before CTX init?").running_task.replace(None);
504+
})
505+
.map_err(|_| {});
506+
}
501507

502-
///
503-
/// Clears currently running `task id`
504-
///
505-
pub(crate) fn ctx_unset_running_task_id() {
506-
let _ = CTX
507-
.try_with(|ctx| {
508-
ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.replace(None);
509-
})
510-
.map_err(|_| {});
511-
}
508+
///
509+
/// Gets currently running `task id`
510+
///
511+
pub(crate) fn ctx_get_running_task_id() -> Option<TaskId> {
512+
CTX.try_with(|ctx| {
513+
ctx.borrow()
514+
.as_ref()
515+
.expect("Called before CTX init?")
516+
.running_task
517+
.borrow()
518+
.as_ref()
519+
.map(|task| task.id())
520+
})
521+
.unwrap_or_else(|e| {
522+
panic!("Something is really bad here, error {}!", e);
523+
})
524+
}
512525

513-
///
514-
/// Gets currently running `task id`
515-
///
516-
pub(crate) fn ctx_get_running_task_id() -> Option<TaskId> {
517-
CTX.try_with(|ctx| ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.get())
518-
.unwrap_or_else(|e| {
519-
panic!("Something is really bad here, error {}!", e);
520-
})
521-
}
526+
#[cfg(test)]
527+
mod tests {
528+
use super::*;
522529

523530
#[test]
524531
fn test_context_no_init_panic_handler() {
@@ -540,12 +547,13 @@ mod tests {
540547
#[test]
541548
#[should_panic]
542549
fn test_context_no_init_panic_set_task_id() {
543-
ctx_set_running_task_id(TaskId::new(1));
550+
let (_, task) = crate::testing::get_dummy_task_waker();
551+
ctx_set_running_task(TaskRef::new(task));
544552
}
545553

546554
#[test]
547555
#[should_panic]
548556
fn test_context_no_init_panic_unset_task_id() {
549-
ctx_unset_running_task_id();
557+
ctx_unset_running_task();
550558
}
551559
}

0 commit comments

Comments
 (0)