Skip to content

Commit cc5b304

Browse files
authored
use OrderedLocalQueue (#338)
2 parents bb89532 + 32c366b commit cc5b304

File tree

16 files changed

+204
-107
lines changed

16 files changed

+204
-107
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353
PROJECT_DIR: ${{ github.workspace }}
5454
run: sh .github/workflows/ci.sh
5555
- name: Run preemptive tests
56-
if: always()
56+
if: ${{ contains(fromJSON('["x86_64-unknown-linux-gnu", "i686-unknown-linux-gnu", "x86_64-apple-darwin", "aarch64-apple-darwin"]'), matrix.target) }}
5757
env:
5858
CHANNEL: ${{ matrix.channel }}
5959
CROSS: ${{ !startsWith(matrix.target, 'x86_64') && contains(matrix.target, 'linux') && '1' || '0' }}

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ nanosleep hooked
201201

202202
### 0.6.x
203203

204+
- [x] support custom task and coroutine priority.
204205
- [x] support scalable stack
205206

206207
### 0.5.x

core/src/co_pool/mod.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ use crate::co_pool::creator::CoroutineCreator;
22
use crate::co_pool::task::Task;
33
use crate::common::beans::BeanFactory;
44
use crate::common::constants::PoolState;
5-
use crate::common::work_steal::{LocalQueue, WorkStealQueue};
5+
use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue};
66
use crate::common::{get_timeout_time, now, CondvarBlocker};
77
use crate::coroutine::suspender::Suspender;
88
use crate::scheduler::{SchedulableCoroutine, Scheduler};
99
use crate::{impl_current_for, impl_display_by_debug, impl_for_named, trace};
1010
use dashmap::DashMap;
1111
use std::cell::Cell;
12+
use std::ffi::c_longlong;
1213
use std::io::{Error, ErrorKind};
1314
use std::ops::{Deref, DerefMut};
1415
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
@@ -31,7 +32,7 @@ pub struct CoroutinePool<'p> {
3132
//协程池状态
3233
state: Cell<PoolState>,
3334
//任务队列
34-
task_queue: LocalQueue<'p, Task<'p>>,
35+
task_queue: OrderedLocalQueue<'p, Task<'p>>,
3536
//工作协程组
3637
workers: Scheduler<'p>,
3738
//当前协程数
@@ -128,7 +129,7 @@ impl<'p> CoroutinePool<'p> {
128129
pop_fail_times: AtomicUsize::new(0),
129130
min_size: AtomicUsize::new(min_size),
130131
max_size: AtomicUsize::new(max_size),
131-
task_queue: BeanFactory::get_or_default::<WorkStealQueue<Task<'p>>>(
132+
task_queue: BeanFactory::get_or_default::<OrderedWorkStealQueue<Task<'p>>>(
132133
crate::common::constants::TASK_GLOBAL_QUEUE_BEAN,
133134
)
134135
.local_queue(),
@@ -210,6 +211,7 @@ impl<'p> CoroutinePool<'p> {
210211
name: Option<String>,
211212
func: impl FnOnce(Option<usize>) -> Option<usize> + 'p,
212213
param: Option<usize>,
214+
priority: Option<c_longlong>,
213215
) -> std::io::Result<String> {
214216
match self.state() {
215217
PoolState::Running => {}
@@ -221,7 +223,7 @@ impl<'p> CoroutinePool<'p> {
221223
}
222224
}
223225
let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4()));
224-
self.submit_raw_task(Task::new(name.clone(), func, param));
226+
self.submit_raw_task(Task::new(name.clone(), func, param, priority));
225227
Ok(name)
226228
}
227229

@@ -230,7 +232,7 @@ impl<'p> CoroutinePool<'p> {
230232
/// Allow multiple threads to concurrently submit task to the pool,
231233
/// but only allow one thread to execute scheduling.
232234
pub(crate) fn submit_raw_task(&self, task: Task<'p>) {
233-
self.task_queue.push_back(task);
235+
self.task_queue.push(task);
234236
self.blocker.notify();
235237
}
236238

@@ -338,6 +340,7 @@ impl<'p> CoroutinePool<'p> {
338340
}
339341
},
340342
None,
343+
None,
341344
)
342345
}
343346

@@ -349,6 +352,7 @@ impl<'p> CoroutinePool<'p> {
349352
&self,
350353
f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
351354
stack_size: Option<usize>,
355+
priority: Option<c_longlong>,
352356
) -> std::io::Result<()> {
353357
if self.get_running_size() >= self.get_max_size() {
354358
trace!(
@@ -360,7 +364,7 @@ impl<'p> CoroutinePool<'p> {
360364
"The coroutine pool has reached its maximum size !",
361365
));
362366
}
363-
self.deref().submit_co(f, stack_size).map(|()| {
367+
self.deref().submit_co(f, stack_size, priority).map(|()| {
364368
_ = self.running.fetch_add(1, Ordering::Release);
365369
})
366370
}
@@ -370,7 +374,7 @@ impl<'p> CoroutinePool<'p> {
370374
}
371375

372376
fn try_run(&self) -> Option<()> {
373-
self.task_queue.pop_front().map(|task| {
377+
self.task_queue.pop().map(|task| {
374378
let (task_name, result) = task.run();
375379
assert!(
376380
self.results.insert(task_name.clone(), result).is_none(),

core/src/co_pool/task.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::catch;
2+
use crate::common::ordered_work_steal::Ordered;
3+
use std::ffi::c_longlong;
24

35
/// 做C兼容时会用到
46
pub type UserTaskFunc = extern "C" fn(usize) -> usize;
@@ -12,6 +14,7 @@ pub struct Task<'t> {
1214
#[educe(Debug(ignore))]
1315
func: Box<dyn FnOnce(Option<usize>) -> Option<usize> + 't>,
1416
param: Option<usize>,
17+
priority: Option<c_longlong>,
1518
}
1619

1720
impl<'t> Task<'t> {
@@ -20,11 +23,13 @@ impl<'t> Task<'t> {
2023
name: String,
2124
func: impl FnOnce(Option<usize>) -> Option<usize> + 't,
2225
param: Option<usize>,
26+
priority: Option<c_longlong>,
2327
) -> Self {
2428
Task {
2529
name,
2630
func: Box::new(func),
2731
param,
32+
priority,
2833
}
2934
}
3035

@@ -44,6 +49,12 @@ impl<'t> Task<'t> {
4449
}
4550
}
4651

52+
impl Ordered for Task<'_> {
53+
fn priority(&self) -> Option<c_longlong> {
54+
self.priority
55+
}
56+
}
57+
4758
#[cfg(test)]
4859
mod tests {
4960
use crate::co_pool::task::Task;
@@ -57,6 +68,7 @@ mod tests {
5768
p
5869
},
5970
None,
71+
None,
6072
);
6173
assert_eq!((String::from("test"), Ok(None)), task.run());
6274
}
@@ -69,6 +81,7 @@ mod tests {
6981
panic!("test panic, just ignore it");
7082
},
7183
None,
84+
None,
7285
);
7386
assert_eq!(
7487
(String::from("test"), Err("test panic, just ignore it")),

core/src/common/mod.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ pub mod timer;
3838
/// queue.push(7);
3939
///
4040
/// let local0 = queue.local_queue();
41-
/// local0.push_back(2);
42-
/// local0.push_back(3);
43-
/// local0.push_back(4);
44-
/// local0.push_back(5);
41+
/// local0.push(2);
42+
/// local0.push(3);
43+
/// local0.push(4);
44+
/// local0.push(5);
4545
///
4646
/// let local1 = queue.local_queue();
47-
/// local1.push_back(0);
48-
/// local1.push_back(1);
47+
/// local1.push(0);
48+
/// local1.push(1);
4949
/// for i in 0..8 {
50-
/// assert_eq!(local1.pop_front(), Some(i));
50+
/// assert_eq!(local1.pop(), Some(i));
5151
/// }
52-
/// assert_eq!(local0.pop_front(), None);
53-
/// assert_eq!(local1.pop_front(), None);
52+
/// assert_eq!(local0.pop(), None);
53+
/// assert_eq!(local1.pop(), None);
5454
/// assert_eq!(queue.pop(), None);
5555
/// ```
5656
///
@@ -83,16 +83,16 @@ pub mod work_steal;
8383
/// local1.push_with_priority(i, i);
8484
/// }
8585
/// for i in 0..2 {
86-
/// assert_eq!(local1.pop_front(), Some(i));
86+
/// assert_eq!(local1.pop(), Some(i));
8787
/// }
8888
/// for i in (2..6).rev() {
89-
/// assert_eq!(local1.pop_front(), Some(i));
89+
/// assert_eq!(local1.pop(), Some(i));
9090
/// }
9191
/// for i in 6..8 {
92-
/// assert_eq!(local1.pop_front(), Some(i));
92+
/// assert_eq!(local1.pop(), Some(i));
9393
/// }
94-
/// assert_eq!(local0.pop_front(), None);
95-
/// assert_eq!(local1.pop_front(), None);
94+
/// assert_eq!(local0.pop(), None);
95+
/// assert_eq!(local1.pop(), None);
9696
/// assert_eq!(queue.pop(), None);
9797
/// ```
9898
///

core/src/common/ordered_work_steal.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,19 @@ use std::ffi::c_longlong;
77
use std::fmt::Debug;
88
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
99

10+
/// The highest precedence.
11+
pub const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN;
12+
13+
/// The lowest precedence.
14+
pub const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX;
15+
16+
/// The default precedence.
17+
pub const DEFAULT_PRECEDENCE: c_longlong = 0;
18+
1019
/// Ordered trait for user's datastructures.
1120
pub trait Ordered {
12-
/// The highest precedence.
13-
const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN;
14-
/// The lowest precedence.
15-
const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX;
16-
/// The default precedence.
17-
const DEFAULT_PRECEDENCE: c_longlong = 0;
1821
/// Get the priority of the element.
19-
fn priority(&self) -> c_longlong;
22+
fn priority(&self) -> Option<c_longlong>;
2023
}
2124

2225
/// Work stealing global queue, shared by multiple threads.
@@ -48,7 +51,7 @@ impl<T: Debug> Drop for OrderedWorkStealQueue<T> {
4851
impl<T: Debug + Ordered> OrderedWorkStealQueue<T> {
4952
/// Push an element to the global queue.
5053
pub fn push(&self, item: T) {
51-
self.push_with_priority(item.priority(), item);
54+
self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item);
5255
}
5356
}
5457

@@ -159,7 +162,7 @@ impl<T: Debug + Ordered> OrderedLocalQueue<'_, T> {
159162
/// If the queue is full, first push half to global,
160163
/// then push the item to global.
161164
pub fn push(&self, item: T) {
162-
self.push_with_priority(item.priority(), item);
165+
self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item);
163166
}
164167
}
165168

@@ -196,10 +199,10 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
196199
/// local.push_with_priority(i, i);
197200
/// }
198201
/// assert!(local.is_full());
199-
/// assert_eq!(local.pop_front(), Some(0));
202+
/// assert_eq!(local.pop(), Some(0));
200203
/// assert_eq!(local.len(), 1);
201-
/// assert_eq!(local.pop_front(), Some(1));
202-
/// assert_eq!(local.pop_front(), None);
204+
/// assert_eq!(local.pop(), Some(1));
205+
/// assert_eq!(local.pop(), None);
203206
/// assert!(local.is_empty());
204207
/// ```
205208
pub fn is_full(&self) -> bool {
@@ -253,9 +256,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
253256
/// local.push_with_priority(i, i);
254257
/// }
255258
/// for i in 0..4 {
256-
/// assert_eq!(local.pop_front(), Some(i));
259+
/// assert_eq!(local.pop(), Some(i));
257260
/// }
258-
/// assert_eq!(local.pop_front(), None);
261+
/// assert_eq!(local.pop(), None);
259262
/// ```
260263
pub fn push_with_priority(&self, priority: c_longlong, item: T) {
261264
if self.is_full() {
@@ -314,9 +317,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
314317
/// }
315318
/// let local = queue.local_queue();
316319
/// for i in 0..4 {
317-
/// assert_eq!(local.pop_front(), Some(i));
320+
/// assert_eq!(local.pop(), Some(i));
318321
/// }
319-
/// assert_eq!(local.pop_front(), None);
322+
/// assert_eq!(local.pop(), None);
320323
/// assert_eq!(queue.pop(), None);
321324
/// ```
322325
///
@@ -336,23 +339,23 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
336339
/// }
337340
/// assert_eq!(local1.len(), 2);
338341
/// for i in 0..2 {
339-
/// assert_eq!(local1.pop_front(), Some(i));
342+
/// assert_eq!(local1.pop(), Some(i));
340343
/// }
341344
/// for i in (2..6).rev() {
342-
/// assert_eq!(local1.pop_front(), Some(i));
345+
/// assert_eq!(local1.pop(), Some(i));
343346
/// }
344-
/// assert_eq!(local0.pop_front(), None);
345-
/// assert_eq!(local1.pop_front(), None);
347+
/// assert_eq!(local0.pop(), None);
348+
/// assert_eq!(local1.pop(), None);
346349
/// assert_eq!(queue.pop(), None);
347350
/// ```
348-
pub fn pop_front(&self) -> Option<T> {
351+
pub fn pop(&self) -> Option<T> {
349352
//每从本地弹出61次,就从全局队列弹出
350353
if self.tick() % 61 == 0 {
351354
if let Some(val) = self.shared.pop() {
352355
return Some(val);
353356
}
354357
}
355-
if let Some(val) = self.pop() {
358+
if let Some(val) = self.pop_local() {
356359
return Some(val);
357360
}
358361
if self.try_lock() {
@@ -398,7 +401,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
398401
.is_ok()
399402
{
400403
self.release_lock();
401-
return self.pop();
404+
return self.pop_local();
402405
}
403406
}
404407
}
@@ -409,7 +412,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
409412
self.shared.pop()
410413
}
411414

412-
fn pop(&self) -> Option<T> {
415+
fn pop_local(&self) -> Option<T> {
413416
//从本地队列弹出元素
414417
for entry in self.queue {
415418
if let Some(val) = entry.value().pop() {

0 commit comments

Comments
 (0)