Skip to content

Commit ed312bd

Browse files
authored
Merge pull request eclipse-score#50 from qorix-group/prabakaran_expose_safety_queue_size_cfg
worker: Expose safety worker queue size config
2 parents 64795cc + bc645b8 commit ed312bd

File tree

5 files changed

+55
-7
lines changed

5 files changed

+55
-7
lines changed

src/kyron-macros/src/lib.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct MacroArgs {
2323
worker_thread_parameters: Option<ThreadParams>,
2424
safety_worker: Option<bool>,
2525
safety_worker_thread_parameters: Option<ThreadParams>,
26+
safety_worker_task_queue_size: Option<Expr>,
2627
dedicated_workers: Vec<DedicatedWorker>,
2728
}
2829

@@ -34,6 +35,7 @@ impl MacroArgs {
3435
worker_thread_parameters: None,
3536
safety_worker: None,
3637
safety_worker_thread_parameters: None,
38+
safety_worker_task_queue_size: None,
3739
dedicated_workers: Vec::new(),
3840
}
3941
}
@@ -103,6 +105,10 @@ impl Parse for MacroArgs {
103105
let tp: ThreadParams = parse_braced_thread_params(&input)?;
104106
args.safety_worker_thread_parameters = Some(tp);
105107
}
108+
"safety_worker_task_queue_size" => {
109+
let expr: Expr = input.parse()?;
110+
args.safety_worker_task_queue_size = Some(expr);
111+
}
106112
"dedicated_workers" => {
107113
let inner;
108114
bracketed!(inner in input);
@@ -247,6 +253,7 @@ fn expr_to_usize(expr: &Expr) -> Result<usize> {
247253
/// priority = 20,
248254
/// scheduler_type = "RoundRobin"
249255
/// },
256+
/// safety_worker_task_queue_size = 32, // Optional, must be power of two, default: 64
250257
/// dedicated_workers = [ // Optional, list of dedicated workers
251258
/// {
252259
/// id = "dedicated1", // Required, unique id
@@ -268,7 +275,7 @@ fn expr_to_usize(expr: &Expr) -> Result<usize> {
268275
///
269276
/// ## Notes:
270277
/// - All parameters are optional unless otherwise noted.
271-
/// - `task_queue_size` must be a power of two.
278+
/// - `task_queue_size` and `safety_worker_task_queue_size` must be a power of two.
272279
/// - `worker_threads` must be between 1 and 128.
273280
/// - `priority` must be between 0 and 255.
274281
/// - `scheduler_type` must be one of "Fifo", "RoundRobin", "Other".
@@ -324,6 +331,18 @@ pub fn main(attr: TokenStream, item: TokenStream) -> TokenStream {
324331
// safety worker token if enabled
325332
let sw_enabled = args.safety_worker.unwrap_or(false);
326333
let safety_worker_tokens = if sw_enabled {
334+
let safety_worker_task_queue_size_ts = match args.safety_worker_task_queue_size {
335+
Some(e) => {
336+
let queue_size = expr_to_usize(&e).unwrap();
337+
if !queue_size.is_power_of_two() {
338+
return syn::Error::new_spanned(e, "'safety_worker_task_queue_size' must be a power of two.")
339+
.to_compile_error()
340+
.into();
341+
}
342+
quote! { #e }
343+
}
344+
None => quote! { 64 }, // default
345+
};
327346
match args.safety_worker_thread_parameters {
328347
Some(tp) => {
329348
if tp.priority.is_none() ^ tp.scheduler_type.is_none() {
@@ -335,13 +354,23 @@ pub fn main(attr: TokenStream, item: TokenStream) -> TokenStream {
335354
ThreadParameters::new()
336355
#tp_tokens
337356
)
357+
.safety_worker_task_queue_size(#safety_worker_task_queue_size_ts)
338358
}
339359
}
340360
None => quote! {
341361
.enable_safety_worker(ThreadParameters::default())
362+
.safety_worker_task_queue_size(#safety_worker_task_queue_size_ts)
342363
},
343364
}
344365
} else {
366+
if args.safety_worker_task_queue_size.is_some() {
367+
return syn::Error::new_spanned(
368+
args.safety_worker_task_queue_size.unwrap(),
369+
"Safety worker is not enabled, but queue size is configured.",
370+
)
371+
.to_compile_error()
372+
.into();
373+
}
345374
quote! {/* safety worker not enabled */}
346375
};
347376

src/kyron/src/scheduler/execution_engine.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ pub struct ExecutionEngineBuilder {
227227

228228
dedicated_workers_ids: GrowableVec<(UniqueWorkerId, ThreadParameters)>,
229229
with_safe_worker: (bool, ThreadParameters), //enabled, params
230+
safety_worker_queue_size: u32,
230231
}
231232

232233
impl Default for ExecutionEngineBuilder {
@@ -242,6 +243,7 @@ impl ExecutionEngineBuilder {
242243
queue_size: 256,
243244
dedicated_workers_ids: GrowableVec::new(2),
244245
with_safe_worker: (false, ThreadParameters::default()),
246+
safety_worker_queue_size: 64,
245247
thread_params: ThreadParameters::default(),
246248
}
247249
}
@@ -311,6 +313,17 @@ impl ExecutionEngineBuilder {
311313
self
312314
}
313315

316+
///
317+
/// Configure safety worker task queue size with `size`.
318+
/// >ATTENTION: `size` has to be power of two and safety worker shall be enabled prior to queue size configuration.
319+
///
320+
pub fn safety_worker_task_queue_size(mut self, size: u32) -> Self {
321+
assert!(size.is_power_of_two(), "Safety worker task queue size ({}) must be power of two", size);
322+
assert!(self.with_safe_worker.0, "Enable safety worker prior to configuring its queue size.");
323+
self.safety_worker_queue_size = size;
324+
self
325+
}
326+
314327
///
315328
/// Adds new dedicated worker identified by `id` to the engine with given thread parameters `params`.
316329
/// If priority or scheduler type is `None`, then both attributes will be inherited from parent thread.
@@ -341,7 +354,11 @@ impl ExecutionEngineBuilder {
341354
let safety_worker_queue;
342355
let safety_worker = {
343356
if self.with_safe_worker.0 {
344-
let w = SafetyWorker::new(WorkerId::new("SafetyWorker".into(), 0, 0, WorkerType::Dedicated), self.with_safe_worker.1);
357+
let w = SafetyWorker::new(
358+
WorkerId::new("SafetyWorker".into(), 0, 0, WorkerType::Dedicated),
359+
self.with_safe_worker.1,
360+
self.safety_worker_queue_size,
361+
);
345362
safety_worker_queue = Some(w.get_queue());
346363
Some(w)
347364
} else {

src/kyron/src/scheduler/workers/safety_worker.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ use super::{spawn_thread, worker_types::WorkerId, ThreadParameters};
3838
///
3939
const LOCAL_STORAGE_SIZE_REDUCTION: usize = 8;
4040

41-
const SAFETY_QUEUE_SIZE: usize = 64; // For now hardcoded
42-
4341
pub(crate) struct SafetyWorker {
4442
thread_handle: Option<Thread>,
4543
id: WorkerId,
@@ -49,11 +47,11 @@ pub(crate) struct SafetyWorker {
4947
}
5048

5149
impl SafetyWorker {
52-
pub(crate) fn new(id: WorkerId, thread_params: ThreadParameters) -> Self {
50+
pub(crate) fn new(id: WorkerId, thread_params: ThreadParameters, safety_queue_size: u32) -> Self {
5351
SafetyWorker {
5452
id,
5553
thread_handle: None,
56-
queue: Arc::new(TriggerQueue::new(SAFETY_QUEUE_SIZE)),
54+
queue: Arc::new(TriggerQueue::new(safety_queue_size as usize)),
5755
stop_signal: Arc::new(FoundationAtomicBool::new(false)),
5856
thread_params,
5957
}

tests/test_cases/tests/runtime/worker/test_safety_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,6 @@ def test_regular_worker_execution(
310310
assert len(failing_tasks) == task_count
311311

312312

313-
@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/396")
314313
class TestSafeWorkerHeavyLoad(CitScenario):
315314
@pytest.fixture(scope="class")
316315
def scenario_name(self) -> str:
@@ -331,6 +330,7 @@ def test_config(self, successful_task_count: int, fail_task_count: int) -> dict[
331330
"task_queue_size": 2048,
332331
"workers": 4,
333332
"safety_worker": {},
333+
"safety_worker_task_queue_size": 512,
334334
},
335335
"test": {
336336
"successful_tasks": successful_task_count,

tests/test_scenarios/rust/src/internals/runtime_helper.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct DedicatedWorkerConfig {
4747
pub struct ExecEngineConfig {
4848
pub task_queue_size: u32,
4949
pub workers: usize,
50+
pub safety_worker_task_queue_size: Option<u32>,
5051
#[serde(flatten)]
5152
pub thread_parameters: ThreadParameters,
5253
pub dedicated_workers: Option<Vec<DedicatedWorkerConfig>>,
@@ -135,6 +136,9 @@ impl Runtime {
135136
if let Some(safety_worker) = &exec_engine.safety_worker {
136137
let safety_worker_thread_params = self.set_thread_params(safety_worker);
137138
exec_engine_builder = exec_engine_builder.enable_safety_worker(safety_worker_thread_params);
139+
if let Some(queue_size) = exec_engine.safety_worker_task_queue_size {
140+
exec_engine_builder = exec_engine_builder.safety_worker_task_queue_size(queue_size);
141+
}
138142
}
139143

140144
let (builder, _) = async_rt_builder.with_engine(exec_engine_builder);

0 commit comments

Comments
 (0)