Skip to content

Commit 62e8375

Browse files
committed
add stack pool
1 parent 32b3bd3 commit 62e8375

File tree

5 files changed

+307
-5
lines changed

5 files changed

+307
-5
lines changed

core/src/common/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
1515
/// Task global queue bean name.
1616
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";
1717

18+
/// Stack pool bean name.
19+
pub const STACK_POOL_BEAN: &str = "stackPoolBean";
20+
1821
/// Monitor bean name.
1922
pub const MONITOR_BEAN: &str = "monitorBean";
2023

core/src/coroutine/korosensei.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use crate::catch;
22
use crate::common::constants::CoroutineState;
33
use crate::coroutine::listener::Listener;
44
use crate::coroutine::local::CoroutineLocal;
5+
use crate::coroutine::stack_pool::{PooledStack, StackPool};
56
use crate::coroutine::suspender::Suspender;
67
use crate::coroutine::StackInfo;
7-
use corosensei::stack::{DefaultStack, Stack};
8+
use corosensei::stack::Stack;
89
use corosensei::trap::TrapHandlerRegs;
910
use corosensei::CoroutineResult;
1011
use std::cell::{Cell, RefCell};
@@ -26,7 +27,7 @@ cfg_if::cfg_if! {
2627
#[repr(C)]
2728
pub struct Coroutine<'c, Param, Yield, Return> {
2829
pub(crate) name: String,
29-
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
30+
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
3031
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
3132
pub(crate) stack_size: usize,
3233
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
@@ -307,12 +308,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
307308
stack_size: usize,
308309
callback: F,
309310
) -> std::io::Result<R> {
311+
let stack_pool = StackPool::get_instance();
310312
if let Some(co) = Self::current() {
311313
let remaining_stack = unsafe { co.remaining_stack() };
312314
if remaining_stack >= red_zone {
313315
return Ok(callback());
314316
}
315-
return DefaultStack::new(stack_size).map(|stack| {
317+
return stack_pool.allocate(stack_size).map(|stack| {
316318
co.stack_infos.borrow_mut().push_back(StackInfo {
317319
stack_top: stack.base().get(),
318320
stack_bottom: stack.limit().get(),
@@ -335,7 +337,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
335337
return Ok(callback());
336338
}
337339
}
338-
DefaultStack::new(stack_size).map(|stack| {
340+
stack_pool.allocate(stack_size).map(|stack| {
339341
STACK_INFOS.with(|s| {
340342
s.borrow_mut().push_back(StackInfo {
341343
stack_top: stack.base().get(),
@@ -378,7 +380,7 @@ where
378380
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
379381
{
380382
let stack_size = stack_size.max(crate::common::page_size());
381-
let stack = DefaultStack::new(stack_size)?;
383+
let stack = StackPool::get_instance().allocate(stack_size)?;
382384
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
383385
stack_top: stack.base().get(),
384386
stack_bottom: stack.limit().get(),

core/src/coroutine/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct StackInfo {
7676
/// Coroutine state abstraction and impl.
7777
mod state;
7878

79+
pub(crate) mod stack_pool;
80+
7981
impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
8082
/// Get the name of this coroutine.
8183
pub fn name(&self) -> &str {

core/src/coroutine/stack_pool.rs

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
use crate::common::beans::BeanFactory;
2+
use crate::common::constants::STACK_POOL_BEAN;
3+
use crate::common::now;
4+
use corosensei::stack::{DefaultStack, Stack, StackPointer};
5+
use std::cell::UnsafeCell;
6+
use std::cmp::Ordering;
7+
use std::collections::BinaryHeap;
8+
use std::ops::{Deref, DerefMut};
9+
use std::rc::Rc;
10+
use std::sync::atomic::{AtomicU64, AtomicUsize};
11+
12+
pub(crate) struct PooledStack {
13+
stack_size: usize,
14+
stack: Rc<UnsafeCell<DefaultStack>>,
15+
create_time: u64,
16+
}
17+
18+
impl Deref for PooledStack {
19+
type Target = DefaultStack;
20+
21+
fn deref(&self) -> &DefaultStack {
22+
unsafe {
23+
self.stack
24+
.deref()
25+
.get()
26+
.as_ref()
27+
.expect("PooledStack is not unique")
28+
}
29+
}
30+
}
31+
32+
impl DerefMut for PooledStack {
33+
fn deref_mut(&mut self) -> &mut Self::Target {
34+
unsafe {
35+
self.stack
36+
.deref()
37+
.get()
38+
.as_mut()
39+
.expect("PooledStack is not unique")
40+
}
41+
}
42+
}
43+
44+
impl Clone for PooledStack {
45+
fn clone(&self) -> Self {
46+
Self {
47+
stack_size: self.stack_size,
48+
stack: self.stack.clone(),
49+
create_time: self.create_time,
50+
}
51+
}
52+
}
53+
54+
impl PartialEq<Self> for PooledStack {
55+
fn eq(&self, other: &Self) -> bool {
56+
Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack))
57+
}
58+
}
59+
60+
impl Eq for PooledStack {}
61+
62+
impl PartialOrd<Self> for PooledStack {
63+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
64+
Some(self.cmp(other))
65+
}
66+
}
67+
68+
impl Ord for PooledStack {
69+
fn cmp(&self, other: &Self) -> Ordering {
70+
// BinaryHeap defaults to a large top heap, but we need a small top heap
71+
match Rc::strong_count(&other.stack).cmp(&Rc::strong_count(&self.stack)) {
72+
Ordering::Less => Ordering::Less,
73+
Ordering::Equal => other.stack_size.cmp(&self.stack_size),
74+
Ordering::Greater => Ordering::Greater,
75+
}
76+
}
77+
}
78+
79+
unsafe impl Stack for PooledStack {
80+
#[inline]
81+
fn base(&self) -> StackPointer {
82+
self.deref().base()
83+
}
84+
85+
#[inline]
86+
fn limit(&self) -> StackPointer {
87+
self.deref().limit()
88+
}
89+
90+
#[cfg(windows)]
91+
#[inline]
92+
fn teb_fields(&self) -> corosensei::stack::StackTebFields {
93+
self.deref().teb_fields()
94+
}
95+
96+
#[cfg(windows)]
97+
#[inline]
98+
fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) {
99+
self.deref_mut()
100+
.update_teb_fields(stack_limit, guaranteed_stack_bytes);
101+
}
102+
}
103+
104+
impl PooledStack {
105+
pub(crate) fn new(stack_size: usize, create_time: u64) -> std::io::Result<Self> {
106+
Ok(Self {
107+
stack_size,
108+
stack: Rc::new(UnsafeCell::new(DefaultStack::new(stack_size)?)),
109+
create_time,
110+
})
111+
}
112+
113+
/// This function must be called after a stack has finished running a coroutine
114+
/// so that the `StackLimit` and `GuaranteedStackBytes` fields from the TEB can
115+
/// be updated in the stack. This is necessary if the stack is reused for
116+
/// another coroutine.
117+
#[inline]
118+
#[cfg(windows)]
119+
pub(crate) fn update_stack_teb_fields(&mut self) {
120+
cfg_if::cfg_if! {
121+
if #[cfg(target_arch = "x86_64")] {
122+
type StackWord = u64;
123+
} else if #[cfg(target_arch = "x86")] {
124+
type StackWord = u32;
125+
}
126+
}
127+
let base = self.base().get() as *const StackWord;
128+
unsafe {
129+
let stack_limit = usize::try_from(*base.sub(1)).expect("stack limit overflow");
130+
let guaranteed_stack_bytes =
131+
usize::try_from(*base.sub(2)).expect("guaranteed stack bytes overflow");
132+
self.update_teb_fields(stack_limit, guaranteed_stack_bytes);
133+
}
134+
}
135+
}
136+
137+
pub(crate) struct StackPool {
138+
pool: UnsafeCell<BinaryHeap<PooledStack>>,
139+
len: AtomicUsize,
140+
//最小内存数,即核心内存数
141+
min_size: AtomicUsize,
142+
//非核心内存的最大存活时间,单位ns
143+
keep_alive_time: AtomicU64,
144+
}
145+
146+
unsafe impl Send for StackPool {}
147+
148+
unsafe impl Sync for StackPool {}
149+
150+
impl Default for StackPool {
151+
fn default() -> Self {
152+
Self::new(0, 10_000_000_000)
153+
}
154+
}
155+
156+
impl StackPool {
157+
pub(crate) fn get_instance<'m>() -> &'m Self {
158+
BeanFactory::get_or_default(STACK_POOL_BEAN)
159+
}
160+
161+
pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
162+
Self {
163+
pool: UnsafeCell::new(BinaryHeap::default()),
164+
len: AtomicUsize::default(),
165+
min_size: AtomicUsize::new(min_size),
166+
keep_alive_time: AtomicU64::new(keep_alive_time),
167+
}
168+
}
169+
170+
pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result<PooledStack> {
171+
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
172+
// find min stack
173+
let mut not_use = Vec::new();
174+
while let Some(stack) = heap.peek() {
175+
if Rc::strong_count(&stack.stack) > 1 {
176+
// can't use the stack
177+
break;
178+
}
179+
#[allow(unused_mut)]
180+
if let Some(mut stack) = heap.pop() {
181+
self.sub_len();
182+
if stack_size <= stack.stack_size {
183+
for s in not_use {
184+
heap.push(s);
185+
self.add_len();
186+
}
187+
heap.push(stack.clone());
188+
self.add_len();
189+
#[cfg(windows)]
190+
stack.update_stack_teb_fields();
191+
return Ok(stack);
192+
}
193+
if self.min_size() < self.len()
194+
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
195+
{
196+
// clean the expired stack
197+
continue;
198+
}
199+
not_use.push(stack);
200+
}
201+
}
202+
let stack = PooledStack::new(stack_size, now())?;
203+
heap.push(stack.clone());
204+
self.add_len();
205+
Ok(stack)
206+
}
207+
208+
#[allow(dead_code)]
209+
pub(crate) fn set_keep_alive_time(&self, keep_alive_time: u64) {
210+
self.keep_alive_time
211+
.store(keep_alive_time, std::sync::atomic::Ordering::Release);
212+
}
213+
214+
pub(crate) fn keep_alive_time(&self) -> u64 {
215+
self.keep_alive_time
216+
.load(std::sync::atomic::Ordering::Acquire)
217+
}
218+
219+
#[allow(dead_code)]
220+
pub(crate) fn set_min_size(&self, min_size: usize) {
221+
self.min_size
222+
.store(min_size, std::sync::atomic::Ordering::Release);
223+
}
224+
225+
pub(crate) fn min_size(&self) -> usize {
226+
self.min_size.load(std::sync::atomic::Ordering::Acquire)
227+
}
228+
229+
pub(crate) fn len(&self) -> usize {
230+
self.len.load(std::sync::atomic::Ordering::Acquire)
231+
}
232+
233+
fn add_len(&self) {
234+
self.len.store(
235+
self.len().saturating_add(1),
236+
std::sync::atomic::Ordering::Release,
237+
);
238+
}
239+
240+
fn sub_len(&self) {
241+
self.len.store(
242+
self.len().saturating_sub(1),
243+
std::sync::atomic::Ordering::Release,
244+
);
245+
}
246+
247+
/// Clean the expired stack.
248+
#[allow(dead_code)]
249+
pub(crate) fn clean(&self) {
250+
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
251+
let mut maybe_free = Vec::new();
252+
while let Some(stack) = heap.peek() {
253+
if Rc::strong_count(&stack.stack) > 1 {
254+
// can't free the stack
255+
break;
256+
}
257+
if let Some(stack) = heap.pop() {
258+
self.sub_len();
259+
maybe_free.push(stack);
260+
}
261+
}
262+
for stack in maybe_free {
263+
if self.min_size() < self.len()
264+
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
265+
{
266+
// free the stack
267+
continue;
268+
}
269+
heap.push(stack);
270+
self.add_len();
271+
}
272+
}
273+
}
274+
275+
#[cfg(test)]
276+
mod tests {
277+
use super::*;
278+
use crate::common::constants::DEFAULT_STACK_SIZE;
279+
280+
#[test]
281+
fn test_stack_pool() -> std::io::Result<()> {
282+
let pool = StackPool::default();
283+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
284+
assert_eq!(Rc::strong_count(&stack.stack), 2);
285+
drop(stack);
286+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
287+
assert_eq!(Rc::strong_count(&stack.stack), 2);
288+
assert_eq!(pool.len(), 1);
289+
_ = pool.allocate(DEFAULT_STACK_SIZE)?;
290+
assert_eq!(pool.len(), 2);
291+
Ok(())
292+
}
293+
}

core/src/monitor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN};
33
use crate::common::{get_timeout_time, now, CondvarBlocker};
44
use crate::coroutine::listener::Listener;
55
use crate::coroutine::local::CoroutineLocal;
6+
use crate::coroutine::stack_pool::StackPool;
67
use crate::scheduler::SchedulableSuspender;
78
use crate::{catch, error, impl_current_for, impl_display_by_debug, info};
89
use nix::sys::pthread::{pthread_kill, pthread_self, Pthread};
@@ -136,6 +137,7 @@ impl Monitor {
136137
);
137138
}
138139
}
140+
StackPool::get_instance().clean();
139141
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
140142
monitor.blocker.clone().block(Duration::from_millis(1));
141143
}

0 commit comments

Comments
 (0)