Skip to content

Commit 9241bcf

Browse files
committed
Using double buffer to reduse mutex.lock
Signed-off-by: kexuan.yang <kexuan.yang@gmail.com>
1 parent f06ec81 commit 9241bcf

File tree

1 file changed

+20
-19
lines changed

1 file changed

+20
-19
lines changed

src/core/io_looper.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use crate::Error::{IOError, LockError};
2-
use crate::Result;
31
use std::any::Any;
42
use std::collections::VecDeque;
53
use std::sync::atomic::{AtomicBool, Ordering};
@@ -8,6 +6,9 @@ use std::sync::{Arc, Mutex};
86
use std::thread;
97
use std::thread::JoinHandle;
108

9+
use crate::Error::{IOError, LockError};
10+
use crate::Result;
11+
1112
const LOG_TAG: &str = "MMKV:IO";
1213

1314
type Job = Box<dyn FnOnce(&mut dyn Any) + Send + 'static>;
@@ -72,10 +73,12 @@ impl<T: Callback + 'static> IOLooper<T> {
7273

7374
self.sender
7475
.as_ref()
75-
.unwrap()
76-
.send(Signal::Normal)
77-
.map_err(|e| IOError(e.to_string()))?;
78-
Ok(())
76+
.map(|sender| {
77+
sender
78+
.send(Signal::Normal)
79+
.map_err(|e| IOError(e.to_string()))
80+
})
81+
.ok_or(IOError("channel closed".to_string()))?
7982
}
8083

8184
pub fn sync(&self) {
@@ -106,28 +109,25 @@ impl<T> Drop for IOLooper<T> {
106109

107110
impl Executor {
108111
pub fn new<T: Callback + 'static>(receiver: Receiver<Signal>, mut callback: T) -> Self {
112+
let mut buffer: VecDeque<Job> = VecDeque::with_capacity(100);
109113
let queue: Arc<Mutex<VecDeque<Job>>> = Arc::new(Mutex::new(VecDeque::with_capacity(100)));
110114
let queue_clone = Arc::clone(&queue);
111115
let handle = thread::spawn(move || loop {
112-
let callback = &mut callback;
113116
let signal = receiver.recv();
114117

115118
match signal {
116119
Ok(Signal::Kill(job)) => {
117-
job(callback);
120+
job(&mut callback);
118121
break;
119122
}
120-
Ok(Signal::Normal) => loop {
121-
let mut locked_queue = queue.lock().unwrap();
122-
let job = locked_queue.pop_front();
123-
drop(locked_queue);
124-
match job {
125-
Some(job) => {
126-
job(callback);
127-
}
128-
None => break,
123+
Ok(Signal::Normal) => {
124+
let mut current_queue = queue.lock().unwrap();
125+
std::mem::swap(&mut buffer, &mut *current_queue);
126+
drop(current_queue);
127+
while let Some(job) = buffer.pop_front() {
128+
job(&mut callback);
129129
}
130-
},
130+
}
131131
Err(_) => {
132132
break;
133133
}
@@ -143,11 +143,12 @@ impl Executor {
143143

144144
#[cfg(test)]
145145
mod tests {
146-
use crate::core::io_looper::{Callback, IOLooper};
147146
use std::sync::{Arc, Mutex};
148147
use std::thread;
149148
use std::time::Duration;
150149

150+
use crate::core::io_looper::{Callback, IOLooper};
151+
151152
struct SimpleCallback;
152153

153154
impl Callback for SimpleCallback {}

0 commit comments

Comments
 (0)