Skip to content

Commit cf0abd1

Browse files
committed
fix(timer): add default channel to Timer
1 parent 0d1182c commit cf0abd1

File tree

3 files changed

+43
-19
lines changed

3 files changed

+43
-19
lines changed

src/timer/epoll.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7-
use crate::Result;
87
use crate::utils::check_err;
8+
use crate::Result;
99

10-
use std::sync::{mpsc::Sender, Arc, Mutex};
10+
use std::sync::{
11+
mpsc::{channel, Receiver, Sender},
12+
Arc, Mutex,
13+
};
1114
use std::{thread, thread::JoinHandle};
1215

1316
#[derive(Debug, Default)]
@@ -23,14 +26,21 @@ impl Timer {
2326
pub fn initialize(self) -> Result<Self> {
2427
let txs = Arc::clone(&self.txs);
2528

29+
// Add Default tx
30+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
31+
txs.lock()?.push(tx);
32+
2633
let timer_fd = Timer::set_timerfd()?;
2734
let epoll_fd = Timer::create_epollfd(timer_fd)?;
2835

2936
let handle = Some(thread::spawn(move || {
3037
loop {
3138
// Exit thread if there are no listeners
3239
if txs.lock()?.len() == 0 {
33-
// TODO: should close file descriptors?
40+
// Close file descriptors
41+
unsafe { libc::close(timer_fd) };
42+
unsafe { libc::close(epoll_fd) };
43+
3444
return Ok(());
3545
}
3646

@@ -45,7 +55,10 @@ impl Timer {
4555
// Iterate through Senders
4656
txs.lock()?.iter().for_each(|tx| {
4757
// Send event to attached Sender
48-
tx.send(current).unwrap();
58+
match tx.send(current) {
59+
Ok(_) => {}
60+
Err(_) => {}
61+
}
4962
});
5063
}
5164
}));
@@ -186,25 +199,23 @@ pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
186199
}
187200

188201
/// libc::epoll_ctl wrapper
189-
pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> {
190-
check_err(unsafe {
191-
libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event)
192-
})?;
202+
pub fn epoll_ctl(
203+
epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event,
204+
) -> Result<()> {
205+
check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?;
193206
Ok(())
194207
}
195208

196209
/// libc::epoll_wait wrapper
197-
pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> {
198-
check_err(unsafe {
199-
libc::epoll_wait(epoll_fd, events, maxevents, timeout)
200-
})?;
210+
pub fn epoll_wait(
211+
epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int,
212+
) -> Result<()> {
213+
check_err(unsafe { libc::epoll_wait(epoll_fd, events, maxevents, timeout) })?;
201214
Ok(())
202215
}
203216

204217
/// libc::read wrapper
205218
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
206-
check_err(unsafe {
207-
libc::read(timer_fd, bufptr, count)
208-
})?;
219+
check_err(unsafe { libc::read(timer_fd, bufptr, count) })?;
209220
Ok(())
210221
}

src/timer/kqueue.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ impl Timer {
2323
pub fn initialize(self) -> Result<Self> {
2424
let txs = Arc::clone(&self.txs);
2525

26+
// Add Default tx
27+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
28+
txs.lock()?.push(tx);
29+
2630
let kqueue = kqueue()?;
2731

2832
let handle = Some(thread::spawn(move || {
@@ -49,7 +53,10 @@ impl Timer {
4953
// Iterate through Senders
5054
txs.lock()?.iter().for_each(|tx| {
5155
// Send event to attached Sender
52-
tx.send(current).unwrap();
56+
match tx.send(current) {
57+
Ok(_) => {},
58+
Err(_) => {},
59+
}
5360
});
5461

5562
// Wait 10s
@@ -137,10 +144,12 @@ impl Timer {
137144
}
138145
}
139146

147+
/// libc::kqueue wrapper
140148
fn kqueue() -> Result<i32> {
141149
check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32)
142150
}
143151

152+
/// libc::kevent wrapper
144153
fn kevent(
145154
kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent,
146155
e_count: libc::c_int, timeout: *const libc::timespec,

src/timer/sleep.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ impl Timer {
3131
pub fn initialize(self) -> Result<Self> {
3232
let txs = Arc::clone(&self.txs);
3333

34-
// Add tx
35-
// txs.lock().unwrap().push(tx);
34+
// Add Default tx
35+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
36+
txs.lock()?.push(tx);
3637

3738
// Spawn a Thread
3839
let handle = Some(thread::spawn(move || {
@@ -61,7 +62,10 @@ impl Timer {
6162
// Iterate through Senders
6263
txs.lock()?.iter().for_each(|tx| {
6364
// Send event to attached Sender
64-
tx.send(current).unwrap();
65+
match tx.send(current) {
66+
Ok(_) => {}
67+
Err(_) => {}
68+
}
6569
});
6670

6771
// Sleep for 10s

0 commit comments

Comments
 (0)