Skip to content

Commit 05eee25

Browse files
committed
refactor memory pool and io_uring macro
1 parent 5a39680 commit 05eee25

File tree

24 files changed

+489
-216
lines changed

24 files changed

+489
-216
lines changed

core/src/common/memory_pool.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use crate::common::now;
2+
use crossbeam_utils::atomic::AtomicCell;
3+
use std::collections::BinaryHeap;
4+
use std::num::NonZeroUsize;
5+
use std::sync::atomic::{AtomicU64, AtomicUsize};
6+
7+
/// A trait for reusable memory.
8+
///
9+
/// # Safety
10+
/// The memory requires exclusive use when writing.
11+
pub unsafe trait ReusableMemory: Ord + Clone {
12+
/// Create a new instance of `ReusableMemory`.
13+
fn new(size: usize) -> std::io::Result<Self>
14+
where
15+
Self: Sized;
16+
17+
/// Get the reference count of the memory.
18+
fn reference_count(&self) -> usize;
19+
20+
/// Get the top of the memory.
21+
fn top(&self) -> NonZeroUsize;
22+
23+
/// Get the bottom of the memory.
24+
fn bottom(&self) -> NonZeroUsize;
25+
26+
/// Get the size of the memory.
27+
fn size(&self) -> usize {
28+
self.top()
29+
.get()
30+
.checked_sub(self.bottom().get())
31+
.expect("the `bottom` is bigger than `top`")
32+
}
33+
34+
/// Get the creation time of the memory.
35+
fn create_time(&self) -> u64;
36+
37+
/// Callback when the memory is reused.
38+
fn on_reuse(&mut self) -> std::io::Result<()>;
39+
}
40+
41+
/// A memory pool for reusing.
42+
#[repr(C)]
43+
#[derive(educe::Educe)]
44+
#[educe(Debug)]
45+
pub struct MemoryPool<M: ReusableMemory> {
46+
#[educe(Debug(ignore))]
47+
pool: AtomicCell<BinaryHeap<M>>,
48+
len: AtomicUsize,
49+
//最小内存数,即核心内存数
50+
min_count: AtomicUsize,
51+
//非核心内存的最大存活时间,单位ns
52+
keep_alive_time: AtomicU64,
53+
}
54+
55+
unsafe impl<M: ReusableMemory> Send for MemoryPool<M> {}
56+
57+
unsafe impl<M: ReusableMemory> Sync for MemoryPool<M> {}
58+
59+
impl<M: ReusableMemory> Default for MemoryPool<M> {
60+
fn default() -> Self {
61+
Self::new(0, 0)
62+
}
63+
}
64+
65+
#[allow(missing_docs)]
66+
impl<M: ReusableMemory> MemoryPool<M> {
67+
/// Create a new instance of `MemoryPool`.
68+
#[must_use]
69+
pub fn new(min_count: usize, keep_alive_time: u64) -> Self {
70+
Self {
71+
pool: AtomicCell::new(BinaryHeap::default()),
72+
len: AtomicUsize::default(),
73+
min_count: AtomicUsize::new(min_count),
74+
keep_alive_time: AtomicU64::new(keep_alive_time),
75+
}
76+
}
77+
78+
pub fn allocate(&self, memory_size: usize) -> std::io::Result<M> {
79+
let heap = unsafe {
80+
self.pool
81+
.as_ptr()
82+
.as_mut()
83+
.expect("MemoryPool is not unique")
84+
};
85+
// find min memory
86+
let mut not_use = Vec::new();
87+
while let Some(memory) = heap.peek() {
88+
if memory.reference_count() > 1 {
89+
// can't use the memory
90+
break;
91+
}
92+
if let Some(mut memory) = heap.pop() {
93+
self.sub_len();
94+
if memory_size <= memory.size() {
95+
for s in not_use {
96+
heap.push(s);
97+
self.add_len();
98+
}
99+
heap.push(memory.clone());
100+
self.add_len();
101+
return memory.on_reuse().map(|()| memory);
102+
}
103+
if self.min_count() < self.len()
104+
&& now() <= memory.create_time().saturating_add(self.keep_alive_time())
105+
{
106+
// clean the expired memory
107+
continue;
108+
}
109+
not_use.push(memory);
110+
}
111+
}
112+
let memory = M::new(memory_size)?;
113+
heap.push(memory.clone());
114+
self.add_len();
115+
Ok(memory)
116+
}
117+
118+
pub fn set_keep_alive_time(&self, keep_alive_time: u64) -> &Self {
119+
self.keep_alive_time
120+
.store(keep_alive_time, std::sync::atomic::Ordering::Release);
121+
self
122+
}
123+
124+
pub fn keep_alive_time(&self) -> u64 {
125+
self.keep_alive_time
126+
.load(std::sync::atomic::Ordering::Acquire)
127+
}
128+
129+
pub fn set_min_count(&self, min_count: usize) -> &Self {
130+
self.min_count
131+
.store(min_count, std::sync::atomic::Ordering::Release);
132+
self
133+
}
134+
135+
pub fn min_count(&self) -> usize {
136+
self.min_count.load(std::sync::atomic::Ordering::Acquire)
137+
}
138+
139+
pub fn is_empty(&self) -> bool {
140+
0 == self.len()
141+
}
142+
143+
pub fn len(&self) -> usize {
144+
self.len.load(std::sync::atomic::Ordering::Acquire)
145+
}
146+
147+
fn add_len(&self) {
148+
self.len.store(
149+
self.len().saturating_add(1),
150+
std::sync::atomic::Ordering::Release,
151+
);
152+
}
153+
154+
fn sub_len(&self) {
155+
self.len.store(
156+
self.len().saturating_sub(1),
157+
std::sync::atomic::Ordering::Release,
158+
);
159+
}
160+
161+
/// Clean the expired memory.
162+
#[allow(dead_code)]
163+
pub fn clean(&self) {
164+
let heap = unsafe {
165+
self.pool
166+
.as_ptr()
167+
.as_mut()
168+
.expect("MemoryPool is not unique")
169+
};
170+
let mut maybe_free = Vec::new();
171+
while let Some(memory) = heap.peek() {
172+
if memory.reference_count() > 1 {
173+
// can't free the memory
174+
break;
175+
}
176+
if let Some(memory) = heap.pop() {
177+
self.sub_len();
178+
maybe_free.push(memory);
179+
}
180+
}
181+
for memory in maybe_free {
182+
if self.min_count() < self.len()
183+
&& now() <= memory.create_time().saturating_add(self.keep_alive_time())
184+
{
185+
// free the memory
186+
continue;
187+
}
188+
heap.push(memory);
189+
self.add_len();
190+
}
191+
}
192+
}

core/src/common/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ pub mod constants;
1313
/// Check <https://www.rustwiki.org.cn/en/reference/introduction.html> for help information.
1414
pub(crate) mod macros;
1515

16+
/// Reuse memory.
17+
pub mod memory_pool;
18+
1619
/// `BeanFactory` impls.
1720
pub mod beans;
1821

core/src/coroutine/korosensei.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::catch;
22
use crate::common::constants::CoroutineState;
33
use crate::coroutine::listener::Listener;
44
use crate::coroutine::local::CoroutineLocal;
5-
use crate::coroutine::stack_pool::{MemoryPool, PooledStack};
5+
use crate::coroutine::stack_pool::{PooledStack, StackPool};
66
use crate::coroutine::suspender::Suspender;
77
use crate::coroutine::StackInfo;
88
use corosensei::stack::Stack;
@@ -326,17 +326,14 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
326326
stack_size: usize,
327327
callback: F,
328328
) -> std::io::Result<R> {
329-
let stack_pool = MemoryPool::get_instance();
329+
let stack_pool = StackPool::get_instance();
330330
if let Some(co) = Self::current() {
331331
let remaining_stack = unsafe { co.remaining_stack() };
332332
if remaining_stack >= red_zone {
333333
return Ok(callback());
334334
}
335335
return stack_pool.allocate(stack_size).map(|stack| {
336-
co.stack_infos_mut().push_back(StackInfo {
337-
stack_top: stack.base().get(),
338-
stack_bottom: stack.limit().get(),
339-
});
336+
co.stack_infos_mut().push_back(StackInfo::from(&stack));
340337
let r = corosensei::on_stack(stack, callback);
341338
_ = co.stack_infos_mut().pop_back();
342339
r
@@ -357,10 +354,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
357354
}
358355
stack_pool.allocate(stack_size).map(|stack| {
359356
STACK_INFOS.with(|s| {
360-
s.borrow_mut().push_back(StackInfo {
361-
stack_top: stack.base().get(),
362-
stack_bottom: stack.limit().get(),
363-
});
357+
s.borrow_mut().push_back(StackInfo::from(&stack));
364358
});
365359
let r = corosensei::on_stack(stack, callback);
366360
_ = STACK_INFOS.with(|s| s.borrow_mut().pop_back());
@@ -398,7 +392,7 @@ where
398392
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
399393
{
400394
let stack_size = stack_size.max(crate::common::page_size());
401-
let stack = MemoryPool::get_instance().allocate(stack_size)?;
395+
let stack = StackPool::get_instance().allocate(stack_size)?;
402396
let stack_infos = UnsafeCell::new(VecDeque::from([StackInfo {
403397
stack_top: stack.base().get(),
404398
stack_bottom: stack.limit().get(),
@@ -469,3 +463,12 @@ where
469463
}
470464
}
471465
}
466+
467+
impl<S: Stack> From<&S> for StackInfo {
468+
fn from(stack: &S) -> Self {
469+
Self {
470+
stack_top: stack.base().get(),
471+
stack_bottom: stack.limit().get(),
472+
}
473+
}
474+
}

0 commit comments

Comments
 (0)