Skip to content

Commit 05c14da

Browse files
committed
add stack pool
1 parent d9a05c7 commit 05c14da

File tree

3 files changed

+188
-5
lines changed

3 files changed

+188
-5
lines changed

core/src/coroutine/korosensei.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use crate::catch;
22
use crate::common::constants::CoroutineState;
33
use crate::coroutine::listener::Listener;
44
use crate::coroutine::local::CoroutineLocal;
5+
use crate::coroutine::stack_pool::{PooledStack, StackPool};
56
use crate::coroutine::suspender::Suspender;
67
use crate::coroutine::StackInfo;
7-
use corosensei::stack::{DefaultStack, Stack};
8+
use corosensei::stack::Stack;
89
use corosensei::trap::TrapHandlerRegs;
910
use corosensei::CoroutineResult;
11+
use once_cell::sync::Lazy;
1012
use std::cell::{Cell, RefCell};
1113
use std::collections::VecDeque;
1214
use std::ffi::c_longlong;
@@ -22,11 +24,13 @@ cfg_if::cfg_if! {
2224
}
2325
}
2426

27+
static STACK_POOL: Lazy<StackPool> = Lazy::new(StackPool::default);
28+
2529
/// Use `corosensei` as the low-level coroutine.
2630
#[repr(C)]
2731
pub struct Coroutine<'c, Param, Yield, Return> {
2832
pub(crate) name: String,
29-
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
33+
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
3034
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
3135
pub(crate) stack_size: usize,
3236
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
@@ -314,7 +318,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
314318
if remaining_stack >= red_zone {
315319
return Ok(callback());
316320
}
317-
return DefaultStack::new(stack_size).map(|stack| {
321+
return STACK_POOL.allocate(stack_size).map(|stack| {
318322
co.stack_infos.borrow_mut().push_back(StackInfo {
319323
stack_top: stack.base().get(),
320324
stack_bottom: stack.limit().get(),
@@ -335,7 +339,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
335339
return Ok(callback());
336340
}
337341
}
338-
DefaultStack::new(stack_size).map(|stack| {
342+
STACK_POOL.allocate(stack_size).map(|stack| {
339343
STACK_INFOS.with(|s| {
340344
s.borrow_mut().push_back(StackInfo {
341345
stack_top: stack.base().get(),
@@ -378,7 +382,7 @@ where
378382
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
379383
{
380384
let stack_size = stack_size.max(crate::common::page_size());
381-
let stack = DefaultStack::new(stack_size)?;
385+
let stack = STACK_POOL.allocate(stack_size)?;
382386
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
383387
stack_top: stack.base().get(),
384388
stack_bottom: stack.limit().get(),

core/src/coroutine/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct StackInfo {
7676
/// Coroutine state abstraction and impl.
7777
mod state;
7878

79+
mod stack_pool;
80+
7981
impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
8082
/// Get the name of this coroutine.
8183
pub fn name(&self) -> &str {

core/src/coroutine/stack_pool.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use crate::common::now;
2+
use corosensei::stack::{DefaultStack, Stack, StackPointer};
3+
use std::cell::RefCell;
4+
use std::cmp::Ordering;
5+
use std::collections::BinaryHeap;
6+
use std::ops::Deref;
7+
use std::rc::Rc;
8+
use std::sync::atomic::{AtomicU64, AtomicUsize};
9+
10+
pub(crate) struct PooledStack {
11+
stack: Rc<DefaultStack>,
12+
create_time: u64,
13+
}
14+
15+
impl Deref for PooledStack {
16+
type Target = Rc<DefaultStack>;
17+
18+
fn deref(&self) -> &Rc<DefaultStack> {
19+
&self.stack
20+
}
21+
}
22+
23+
impl Clone for PooledStack {
24+
fn clone(&self) -> Self {
25+
Self {
26+
stack: self.stack.clone(),
27+
create_time: self.create_time,
28+
}
29+
}
30+
}
31+
32+
impl PartialEq<Self> for PooledStack {
33+
fn eq(&self, other: &Self) -> bool {
34+
Rc::strong_count(other).eq(&Rc::strong_count(self))
35+
}
36+
}
37+
38+
impl Eq for PooledStack {}
39+
40+
impl PartialOrd<Self> for PooledStack {
41+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
42+
Some(self.cmp(other))
43+
}
44+
}
45+
46+
impl Ord for PooledStack {
47+
fn cmp(&self, other: &Self) -> Ordering {
48+
// BinaryHeap defaults to a large top heap, but we need a small top heap
49+
Rc::strong_count(other).cmp(&Rc::strong_count(self))
50+
}
51+
}
52+
53+
unsafe impl Stack for PooledStack {
54+
#[inline]
55+
fn base(&self) -> StackPointer {
56+
self.deref().base()
57+
}
58+
59+
#[inline]
60+
fn limit(&self) -> StackPointer {
61+
self.deref().limit()
62+
}
63+
64+
#[cfg(windows)]
65+
#[inline]
66+
fn teb_fields(&self) -> corosensei::stack::StackTebFields {
67+
self.deref().teb_fields()
68+
}
69+
70+
#[cfg(windows)]
71+
#[inline]
72+
fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) {
73+
while let Some(stack) = Rc::get_mut(self) {
74+
stack.update_teb_fields(stack_limit, guaranteed_stack_bytes);
75+
return;
76+
}
77+
}
78+
}
79+
80+
impl PooledStack {
81+
pub(crate) fn new(stack: Rc<DefaultStack>, create_time: u64) -> Self {
82+
Self { stack, create_time }
83+
}
84+
}
85+
86+
pub(crate) struct StackPool {
87+
pool: RefCell<BinaryHeap<PooledStack>>,
88+
len: AtomicUsize,
89+
//最小内存数,即核心内存数
90+
min_size: AtomicUsize,
91+
//非核心内存的最大存活时间,单位ns
92+
keep_alive_time: AtomicU64,
93+
}
94+
95+
unsafe impl Send for StackPool {}
96+
97+
unsafe impl Sync for StackPool {}
98+
99+
impl Default for StackPool {
100+
fn default() -> Self {
101+
Self::new(0, 10_000_000_000)
102+
}
103+
}
104+
105+
impl StackPool {
106+
pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
107+
Self {
108+
pool: RefCell::new(BinaryHeap::default()),
109+
len: AtomicUsize::default(),
110+
min_size: AtomicUsize::new(min_size),
111+
keep_alive_time: AtomicU64::new(keep_alive_time),
112+
}
113+
}
114+
115+
pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result<PooledStack> {
116+
loop {
117+
if let Ok(mut heap) = self.pool.try_borrow_mut() {
118+
if let Some(stack) = heap.peek() {
119+
if Rc::strong_count(stack) == 1 {
120+
// can use the stack
121+
if self.min_size() < self.len()
122+
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
123+
{
124+
drop(heap.pop());
125+
self.len.store(
126+
self.len().saturating_sub(1),
127+
std::sync::atomic::Ordering::Release,
128+
);
129+
continue;
130+
}
131+
return Ok(stack.clone());
132+
}
133+
}
134+
let stack = PooledStack::new(Rc::new(DefaultStack::new(stack_size)?), now());
135+
heap.push(stack.clone());
136+
self.len.store(
137+
self.len().saturating_add(1),
138+
std::sync::atomic::Ordering::Release,
139+
);
140+
return Ok(stack);
141+
}
142+
}
143+
}
144+
145+
pub(crate) fn keep_alive_time(&self) -> u64 {
146+
self.keep_alive_time
147+
.load(std::sync::atomic::Ordering::Acquire)
148+
}
149+
150+
pub(crate) fn min_size(&self) -> usize {
151+
self.min_size.load(std::sync::atomic::Ordering::Acquire)
152+
}
153+
154+
pub(crate) fn len(&self) -> usize {
155+
self.len.load(std::sync::atomic::Ordering::Acquire)
156+
}
157+
}
158+
159+
#[cfg(test)]
160+
mod tests {
161+
use super::*;
162+
use crate::common::constants::DEFAULT_STACK_SIZE;
163+
164+
#[test]
165+
fn test_stack_pool() -> std::io::Result<()> {
166+
let pool = StackPool::default();
167+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
168+
assert_eq!(Rc::strong_count(&stack), 2);
169+
drop(stack);
170+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
171+
assert_eq!(Rc::strong_count(&stack), 2);
172+
assert_eq!(pool.len(), 1);
173+
_ = pool.allocate(DEFAULT_STACK_SIZE)?;
174+
assert_eq!(pool.len(), 2);
175+
Ok(())
176+
}
177+
}

0 commit comments

Comments
 (0)