Skip to content

Commit f06ec81

Browse files
committed
Add generic type to IOLooper
Change-Id: I41579bd7d194dc9c577c6470c32dc3c15647b02c Signed-off-by: kexuan.yang <kexuan.yang@gmail.com>
1 parent c4e942b commit f06ec81

File tree

7 files changed

+46
-60
lines changed

7 files changed

+46
-60
lines changed

src/core/buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ include!(concat!(env!("OUT_DIR"), "/protos/mod.rs"));
1313
#[repr(transparent)]
1414
pub struct Buffer(KV);
1515

16-
pub trait Encoder: Send {
16+
pub trait Encoder: Send + Sync {
1717
fn encode_to_bytes(&self, raw_buffer: &Buffer, position: u32) -> Result<Vec<u8>>;
1818
}
1919

@@ -22,7 +22,7 @@ pub struct DecodeResult {
2222
pub len: u32,
2323
}
2424

25-
pub trait Decoder: Send {
25+
pub trait Decoder: Send + Sync {
2626
fn decode_bytes(&self, data: &[u8], position: u32) -> Result<DecodeResult>;
2727
}
2828

src/core/io_looper.rs

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::Error::{IOError, LockError};
22
use crate::Result;
3+
use std::any::Any;
34
use std::collections::VecDeque;
45
use std::sync::atomic::{AtomicBool, Ordering};
56
use std::sync::mpsc::{channel, Receiver, Sender};
@@ -9,47 +10,47 @@ use std::thread::JoinHandle;
910

1011
const LOG_TAG: &str = "MMKV:IO";
1112

12-
type Job = Box<dyn FnOnce(&mut dyn std::any::Any) + Send + 'static>;
13+
type Job = Box<dyn FnOnce(&mut dyn Any) + Send + 'static>;
1314

1415
enum Signal {
1516
Normal,
1617
Kill(Job),
1718
}
1819

19-
pub trait Callback: Send {}
20+
pub trait Callback: Send + Any {}
2021

21-
pub struct IOLooper {
22+
pub struct IOLooper<T> {
2223
sender: Option<Sender<Signal>>,
2324
executor: Executor,
25+
_marker: std::marker::PhantomData<T>,
2426
}
2527

2628
struct Executor {
2729
queue: Arc<Mutex<VecDeque<Job>>>,
2830
join_handle: Option<JoinHandle<()>>,
2931
}
3032

31-
impl IOLooper {
32-
pub fn new<T>(callback: T) -> Self
33-
where
34-
T: Callback + 'static,
35-
{
33+
impl<T: Callback + 'static> IOLooper<T> {
34+
pub fn new(callback: T) -> Self {
3635
let (sender, receiver) = channel::<Signal>();
3736
let executor = Executor::new(receiver, callback);
3837
IOLooper {
3938
sender: Some(sender),
4039
executor,
40+
_marker: std::marker::PhantomData,
4141
}
4242
}
4343

44-
pub fn post_and_kill<F>(&mut self, task: F)
45-
where
46-
F: FnOnce(&mut dyn std::any::Any) + Send + 'static,
47-
{
44+
pub fn post_and_kill<F: FnOnce(&mut T) + Send + 'static>(&mut self, task: F) {
45+
let job: Job = Box::new(|callback| {
46+
let callback = callback.downcast_mut::<T>().unwrap();
47+
task(callback)
48+
});
4849
self.executor.queue.lock().unwrap().clear();
4950
self.sender
5051
.as_ref()
5152
.unwrap()
52-
.send(Signal::Kill(Box::new(task)))
53+
.send(Signal::Kill(job))
5354
.unwrap();
5455
drop(self.sender.take());
5556
if let Some(handle) = self.executor.join_handle.take() {
@@ -58,14 +59,15 @@ impl IOLooper {
5859
}
5960
}
6061

61-
pub fn post<F>(&self, task: F) -> Result<()>
62-
where
63-
F: FnOnce(&mut dyn std::any::Any) + Send + 'static,
64-
{
62+
pub fn post<F: FnOnce(&mut T) + Send + 'static>(&self, task: F) -> Result<()> {
63+
let job: Job = Box::new(|callback| {
64+
let callback = callback.downcast_mut::<T>().unwrap();
65+
task(callback)
66+
});
6567
self.executor
6668
.queue
6769
.lock()
68-
.map(|mut queue| queue.push_back(Box::new(task)))
70+
.map(|mut queue| queue.push_back(job))
6971
.map_err(|e| LockError(e.to_string()))?;
7072

7173
self.sender
@@ -91,7 +93,7 @@ impl IOLooper {
9193
}
9294
}
9395

94-
impl Drop for IOLooper {
96+
impl<T> Drop for IOLooper<T> {
9597
fn drop(&mut self) {
9698
drop(self.sender.take());
9799

@@ -103,10 +105,7 @@ impl Drop for IOLooper {
103105
}
104106

105107
impl Executor {
106-
pub fn new<T>(receiver: Receiver<Signal>, mut callback: T) -> Self
107-
where
108-
T: Callback + std::any::Any + 'static,
109-
{
108+
pub fn new<T: Callback + 'static>(receiver: Receiver<Signal>, mut callback: T) -> Self {
110109
let queue: Arc<Mutex<VecDeque<Job>>> = Arc::new(Mutex::new(VecDeque::with_capacity(100)));
111110
let queue_clone = Arc::clone(&queue);
112111
let handle = thread::spawn(move || loop {
@@ -164,31 +163,20 @@ mod tests {
164163
io_looper
165164
.post(|callback| {
166165
thread::sleep(Duration::from_millis(100));
167-
callback
168-
.downcast_ref::<SimpleCallback>()
169-
.unwrap()
170-
.print("first job")
166+
callback.print("first job")
171167
})
172168
.expect("failed to execute job");
173169
io_looper
174170
.post(|callback| {
175171
thread::sleep(Duration::from_millis(100));
176-
callback
177-
.downcast_ref::<SimpleCallback>()
178-
.unwrap()
179-
.print("second job")
172+
callback.print("second job")
180173
})
181174
.expect("failed to execute job");
182175
assert!(io_looper.sender.is_some());
183176
assert_eq!(io_looper.executor.queue.lock().unwrap().len(), 2);
184177
assert!(io_looper.executor.join_handle.is_some());
185178
thread::sleep(Duration::from_millis(50));
186-
io_looper.post_and_kill(|callback| {
187-
callback
188-
.downcast_ref::<SimpleCallback>()
189-
.unwrap()
190-
.print("last job")
191-
});
179+
io_looper.post_and_kill(|callback| callback.print("last job"));
192180
assert!(io_looper.sender.is_none());
193181
assert!(io_looper.executor.queue.lock().unwrap().is_empty());
194182
assert!(io_looper.executor.join_handle.is_none());

src/core/memory_map.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ impl DerefMut for RawMmap {
7676

7777
unsafe impl Send for RawMmap {}
7878

79+
unsafe impl Sync for RawMmap {}
80+
7981
#[derive(Debug)]
8082
#[repr(transparent)]
8183
pub struct MemoryMap(RawMmap);

src/core/mmkv_impl.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const LOG_TAG: &str = "MMKV:Core";
1919
pub struct MmkvImpl {
2020
kv_map: HashMap<String, Buffer>,
2121
is_valid: bool,
22-
io_looper: Option<IOLooper>,
22+
io_looper: Option<IOLooper<IOWriter>>,
2323
#[cfg(feature = "encryption")]
2424
encryptor: Encryptor,
2525
}
@@ -77,12 +77,10 @@ impl MmkvImpl {
7777
}
7878
let result = self.kv_map.insert(key.to_string(), raw_buffer.clone());
7979
let duplicated = result.is_some();
80-
self.io_looper.as_ref().unwrap().post(move |callback| {
81-
callback
82-
.downcast_mut::<IOWriter>()
83-
.unwrap()
84-
.write(raw_buffer, duplicated)
85-
})
80+
self.io_looper
81+
.as_ref()
82+
.unwrap()
83+
.post(move |writer| writer.write(raw_buffer, duplicated))
8684
}
8785

8886
pub fn get(&self, key: &str) -> Result<&Buffer> {
@@ -104,12 +102,10 @@ impl MmkvImpl {
104102
return Ok(());
105103
}
106104
let buffer = Buffer::deleted_buffer(key);
107-
self.io_looper.as_ref().unwrap().post(move |callback| {
108-
callback
109-
.downcast_mut::<IOWriter>()
110-
.unwrap()
111-
.write(buffer, true)
112-
})
105+
self.io_looper
106+
.as_ref()
107+
.unwrap()
108+
.post(move |writer| writer.write(buffer, true))
113109
}
114110

115111
pub fn clear_data(&mut self) {
@@ -121,8 +117,8 @@ impl MmkvImpl {
121117
self.kv_map.clear();
122118
#[cfg(feature = "encryption")]
123119
let meta_file = self.encryptor.meta_file_path.clone();
124-
self.io_looper.as_mut().unwrap().post_and_kill(|callback| {
125-
callback.downcast_mut::<IOWriter>().unwrap().remove_file();
120+
self.io_looper.as_mut().unwrap().post_and_kill(|writer| {
121+
writer.remove_file();
126122
#[cfg(feature = "encryption")]
127123
let _ = fs::remove_file(meta_file);
128124
info!(LOG_TAG, "data cleared");

src/ffi/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ pub struct NativeLogger {
5757

5858
unsafe impl Send for NativeLogger {}
5959

60+
unsafe impl Sync for NativeLogger {}
61+
6062
impl Drop for NativeLogger {
6163
fn drop(&mut self) {
6264
verbose!(LOG_TAG, "release {:?}", self);

src/log/logger.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{process, thread};
88

99
use crate::log::{LogLevel, Logger};
1010
struct LogWrapper {
11-
io_looper: IOLooper,
11+
io_looper: IOLooper<LogWriter>,
1212
}
1313

1414
struct LogWriter {
@@ -54,8 +54,7 @@ impl LogWrapper {
5454
let log_str = format!("[{}] {}", tag, content);
5555
// write log in io thread
5656
self.io_looper
57-
.post(move |callback| {
58-
let writer = callback.downcast_ref::<LogWriter>().unwrap();
57+
.post(move |writer| {
5958
writer.write(level, time, pid, tid, log_str);
6059
})
6160
.unwrap();
@@ -89,8 +88,7 @@ pub fn set_logger(log_impl: Option<Box<dyn Logger>>) {
8988
// Here move the log_impl to io thread
9089
LOG_WRAPPER
9190
.io_looper
92-
.post(|callback| {
93-
let writer = callback.downcast_mut::<LogWriter>().unwrap();
91+
.post(|writer| {
9492
drop(writer.inner_logger.take());
9593
writer.inner_logger = log_impl;
9694
})

src/log/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ See [MMKV::set_logger](crate::MMKV::set_logger)
77
88
Logger should be [`Send`], cause it will be moved into io thread
99
*/
10-
pub trait Logger: Debug + Send {
10+
pub trait Logger: Debug + Send + Sync {
1111
fn verbose(&self, log_str: String);
1212
fn info(&self, log_str: String);
1313
fn debug(&self, log_str: String);

0 commit comments

Comments
 (0)