Skip to content

Commit 41ea136

Browse files
authored
Merge pull request #27 from gents83/job-system-optimization-13438576981680291281
Refactor Job System for Efficiency and Parallelization
2 parents 566e1cf + 595486c commit 41ea136

File tree

19 files changed

+476
-111
lines changed

19 files changed

+476
-111
lines changed

.cargo/config.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ wasm-opt = ['-Oz']
4040
# `brew install michaeleisel/zld/zld`
4141
[target.x86_64-apple-darwin]
4242
rustflags = ["-C", "link-arg=-fuse-ld=lld"]
43-
[target.aarch64-apple-darwin]
44-
rustflags = ["-C", "link-arg=-fuse-ld=/opt/homebrew/opt/llvm/bin/ld64.lld"]
43+
#[target.aarch64-apple-darwin]
44+
#rustflags = ["-C", "link-arg=-fuse-ld=/opt/homebrew/opt/llvm/bin/ld64.lld"]
4545

4646
[target.wasm32-unknown-unknown]
4747
rustflags = [

.github/workflows/rust-android.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ jobs:
4444
cd crates
4545
cargo ndk -t arm64-v8a clippy --release --no-deps -- -D warnings
4646
47+
- name: "Run cargo test workspace"
48+
run: |
49+
cd crates
50+
cargo test --workspace
51+
4752
- name: "Run cargo build release"
4853
run: |
4954
cd crates

.github/workflows/rust-ios.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ jobs:
4141
export SDKROOT=$(xcrun -sdk iphoneos --show-sdk-path)
4242
cargo clippy --manifest-path ./crates/Cargo.toml --target aarch64-apple-ios --release --no-deps -- -D warnings
4343
44+
- name: "Run cargo test workspace"
45+
run: |
46+
cargo test --workspace --manifest-path ./crates/Cargo.toml
47+
4448
- name: "Run cargo build release"
4549
run: |
4650
export SDKROOT=$(xcrun -sdk iphoneos --show-sdk-path)

.github/workflows/rust-unix.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ jobs:
7878
- name: "Run cargo clippy check"
7979
run: cargo clippy --manifest-path ./crates/Cargo.toml --no-deps -- -D warnings
8080

81+
- name: "Run cargo test workspace"
82+
run: cargo test --workspace --manifest-path ./crates/Cargo.toml
83+
8184
- name: "Run cargo build workspace release"
8285
run: cargo build --manifest-path ./crates/Cargo.toml --release
8386

.github/workflows/rust-web.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ jobs:
6262
cargo update
6363
cd ..
6464
65+
- name: "Run cargo test workspace"
66+
run: cargo test --workspace --manifest-path ./crates/Cargo.toml
67+
6568
- name: "Run cargo build workspace release wasm32"
6669
run: cargo build --manifest-path ./crates/Cargo.toml --release --target wasm32-unknown-unknown
6770

.github/workflows/rust-windows.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ jobs:
8181
- name: "Run cargo clippy check"
8282
run: cargo clippy --manifest-path ./crates/Cargo.toml --no-deps -- -D warnings
8383

84+
- name: "Run cargo test workspace"
85+
run: cargo test --workspace --manifest-path ./crates/Cargo.toml
86+
8487
- name: "Run cargo build workspace release"
8588
run: cargo build --manifest-path ./crates/Cargo.toml --release
8689

crates/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ pyo3 = { git = "https://github.com/PyO3/pyo3", features = [
7171
"abi3-py310",
7272
"generate-import-lib",
7373
] }
74-
rand = { git = "https://github.com/rust-random/rand", features = ["std"] }
7574
serde_json = { git = "https://github.com/serde-rs/json" }
7675
flexbuffers = { git = "https://github.com/google/flatbuffers" }
7776
ttf-parser = { git = "https://github.com/RazrFalcon/ttf-parser" }
@@ -137,7 +136,6 @@ rspirv = { version = "0.11" }
137136
js-sys = { version = "0.3.77" }
138137
wasm-bindgen = { version = "0.2.100" }
139138
wasm-bindgen-futures = { version = "0.4.43" }
140-
wasm-bindgen-test = { version = "0.3" }
141139
web-sys = { version = "0.3.77", features = [
142140
"Request",
143141
"Response",

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ inox_time = { path = "../time" }
2525
inox_uid = { path = "../uid" }
2626

2727
[target.'cfg(target_arch = "wasm32")'.dependencies]
28-
wasm-bindgen = { workspace = true }
28+
wasm-bindgen = { workspace = true }

crates/core/src/schedule/job.rs

Lines changed: 57 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::Worker;
1515
const NUM_WORKER_THREADS: usize = 0;
1616
#[cfg(not(target_arch = "wasm32"))]
1717
const NUM_WORKER_THREADS: usize = 5;
18+
const LOW_PRIORITY_THREAD_RATIO: f32 = 0.5;
1819

1920
pub type JobId = Uid;
2021
pub const INDEPENDENT_JOB_ID: JobId = inox_uid::generate_static_uid_from_string("IndependentJob");
@@ -34,12 +35,6 @@ impl Job {
3435
F: FnOnce() + Send + Sync + 'static,
3536
{
3637
pending_jobs.fetch_add(1, Ordering::SeqCst);
37-
/*
38-
debug_log(
39-
"Adding job {:?} - remaining {:?}",
40-
name,
41-
pending_jobs.load(Ordering::SeqCst)
42-
);*/
4338
Self {
4439
func: Some(Box::new(func)),
4540
pending_jobs,
@@ -53,32 +48,18 @@ impl Job {
5348

5449
pub fn execute(mut self) {
5550
inox_profiler::scoped_profile!("Job {}", self.name);
56-
/*
57-
debug_log(
58-
"Starting {:?} - remaining {:?}",
59-
self.name.as_str(),
60-
self.pending_jobs.load(Ordering::SeqCst)
61-
);
62-
*/
6351
let f = self.func.take().unwrap();
6452
(f)();
6553

6654
self.pending_jobs.fetch_sub(1, Ordering::SeqCst);
6755
self.name.clear();
68-
/*
69-
debug_log(
70-
"Ending {:?} - remaining {:?}",
71-
self.name.as_str(),
72-
self.pending_jobs.load(Ordering::SeqCst)
73-
);
74-
*/
7556
}
7657
}
7758

7859
pub type JobHandlerRw = Arc<RwLock<JobHandler>>;
7960
pub type JobReceiverRw = Arc<Mutex<Receiver<Job>>>;
8061

81-
#[derive(Debug)]
62+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8263
pub enum JobPriority {
8364
High = 0,
8465
Medium = 1,
@@ -113,7 +94,7 @@ impl Default for PrioChannel {
11394
#[derive(Default)]
11495
pub struct JobHandler {
11596
channel: [PrioChannel; JobPriority::Count as usize],
116-
pending_jobs: HashMap<JobId, Arc<AtomicUsize>>,
97+
pending_jobs: RwLock<HashMap<JobId, Arc<AtomicUsize>>>,
11798
workers: HashMap<String, Worker>,
11899
}
119100

@@ -124,11 +105,12 @@ impl JobHandler {
124105
#[inline]
125106
fn get_pending_jobs_count(&self, job_category: &JobId) -> usize {
126107
inox_profiler::scoped_profile!("JobHandler::get_pending_jobs_count");
127-
if let Some(pending_jobs) = self.pending_jobs.get(job_category) {
128-
pending_jobs.load(Ordering::SeqCst)
129-
} else {
130-
0
108+
if let Ok(pending_jobs) = self.pending_jobs.read() {
109+
if let Some(pending_jobs) = pending_jobs.get(job_category) {
110+
return pending_jobs.load(Ordering::SeqCst);
111+
}
131112
}
113+
0
132114
}
133115
#[inline]
134116
fn get_job_with_priority(&self, job_priority: JobPriority) -> Option<Job> {
@@ -145,29 +127,43 @@ impl JobHandler {
145127
}
146128
}
147129

148-
fn add_worker(&mut self, name: &str, can_continue: &Arc<AtomicBool>) -> &mut Worker {
130+
fn add_worker(
131+
&mut self,
132+
name: &str,
133+
can_continue: &Arc<AtomicBool>,
134+
receivers: Vec<JobReceiverRw>,
135+
) -> &mut Worker {
149136
let key = String::from(name);
150137
let w = self.workers.entry(key).or_default();
151138
if !w.is_started() {
152-
w.start(
153-
name,
154-
can_continue,
155-
self.channel
156-
.iter()
157-
.map(|channel| channel.receiver.clone())
158-
.collect(),
159-
);
139+
w.start(name, can_continue, receivers);
160140
}
161141
w
162142
}
163143

164144
#[inline]
165145
fn setup_worker_threads(&mut self, can_continue: &Arc<AtomicBool>) {
166-
#[allow(clippy::absurd_extreme_comparisons)]
167146
if NUM_WORKER_THREADS > 0 {
168-
#[allow(clippy::reversed_empty_ranges)]
169-
for i in 1..NUM_WORKER_THREADS + 1 {
170-
self.add_worker(format!("Worker{i}").as_str(), can_continue);
147+
// High priority jobs are mandatory and should be executed as fast as possible
148+
// Low priority jobs are non-mandatory and should not block the frame
149+
// We can set a ratio of threads that can execute Low priority jobs
150+
let num_low_priority_workers = (NUM_WORKER_THREADS as f32 * LOW_PRIORITY_THREAD_RATIO).ceil() as usize;
151+
let num_low_priority_workers = num_low_priority_workers.max(1);
152+
153+
for i in 0..NUM_WORKER_THREADS {
154+
let mut receivers = vec![
155+
self.channel[JobPriority::High as usize].receiver.clone(),
156+
self.channel[JobPriority::Medium as usize].receiver.clone(),
157+
];
158+
if i < num_low_priority_workers {
159+
receivers.push(self.channel[JobPriority::Low as usize].receiver.clone());
160+
}
161+
162+
self.add_worker(
163+
format!("Worker{i}").as_str(),
164+
can_continue,
165+
receivers,
166+
);
171167
}
172168
}
173169
}
@@ -177,10 +173,12 @@ impl JobHandler {
177173
for (_name, w) in self.workers.iter_mut() {
178174
w.stop();
179175
}
180-
self.pending_jobs.iter().for_each(|(_, pending_jobs)| {
181-
pending_jobs.store(0, Ordering::SeqCst);
182-
});
183-
self.pending_jobs.clear();
176+
if let Ok(mut pending_jobs) = self.pending_jobs.write() {
177+
pending_jobs.iter().for_each(|entry| {
178+
entry.1.store(0, Ordering::SeqCst);
179+
});
180+
pending_jobs.clear();
181+
}
184182

185183
self.channel.iter_mut().for_each(|c| {
186184
while let Some(j) = c.receiver.get_job() {
@@ -190,7 +188,7 @@ impl JobHandler {
190188
}
191189

192190
fn add_job<F>(
193-
&mut self,
191+
&self,
194192
job_category: &JobId,
195193
job_name: &str,
196194
job_priority: JobPriority,
@@ -199,13 +197,22 @@ impl JobHandler {
199197
F: FnOnce() + Send + Sync + 'static,
200198
{
201199
inox_profiler::scoped_profile!("JobHandler::add_job[{}]", job_name);
202-
let pending_jobs = self
203-
.pending_jobs
204-
.entry(*job_category)
205-
.or_insert_with(|| Arc::new(AtomicUsize::new(0)))
206-
.clone();
200+
let pending_jobs = {
201+
let read_lock = self.pending_jobs.read().unwrap();
202+
read_lock.get(job_category).cloned()
203+
};
204+
let pending_jobs = pending_jobs.unwrap_or_else(|| {
205+
let mut write_lock = self.pending_jobs.write().unwrap();
206+
write_lock
207+
.entry(*job_category)
208+
.or_insert_with(|| Arc::new(AtomicUsize::new(0)))
209+
.clone()
210+
});
211+
207212
let job = Job::new(job_name, func, pending_jobs);
208213
self.channel[job_priority as usize].sender.send(job).ok();
214+
// Wake up all workers as we don't know which one is sleeping on this priority
215+
// Using unpark is cheap
209216
self.workers.iter().for_each(|(_n, w)| {
210217
w.wakeup();
211218
});
@@ -229,7 +236,7 @@ impl JobHandlerTrait for JobHandlerRw {
229236
where
230237
F: FnOnce() + Send + Sync + 'static,
231238
{
232-
self.write()
239+
self.read()
233240
.unwrap()
234241
.add_job(job_category, job_name, job_priority, func);
235242
}

crates/core/src/schedule/worker.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
Arc,
55
},
66
thread::{self, JoinHandle},
7+
time::Duration,
78
};
89

910
use crate::{JobReceiverRw, JobReceiverTrait};
@@ -30,24 +31,20 @@ impl Worker {
3031
let t = builder
3132
.spawn(move || {
3233
inox_profiler::register_profiler_thread!();
33-
let receivers_count = receivers.len() as i32;
34-
let mut i;
3534
loop {
36-
i = 0i32;
37-
while i >= 0 && i < receivers_count {
38-
if let Some(job) = receivers[i as usize].get_job() {
35+
let mut executed = false;
36+
for r in &receivers {
37+
if let Some(job) = r.get_job() {
3938
job.execute();
40-
//force exit from loop
41-
i = -1;
42-
} else {
43-
i += 1;
39+
executed = true;
40+
break;
4441
}
4542
}
46-
if !can_continue.load(Ordering::SeqCst) {
47-
return false;
48-
}
49-
if i >= 0 {
50-
std::thread::park();
43+
if !executed {
44+
if !can_continue.load(Ordering::SeqCst) {
45+
return false;
46+
}
47+
thread::park_timeout(Duration::from_millis(10));
5148
}
5249
}
5350
})

0 commit comments

Comments
 (0)