Skip to content

Commit 618e05a

Browse files
Added SyncChannel and split into modules
SyncChannel can be a lazily created as statics to give to signal handlers which can use them to signal events have occurred to the main program thread.
1 parent f566c77 commit 618e05a

File tree

4 files changed

+179
-79
lines changed

4 files changed

+179
-79
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,6 @@ authors = ["Michael Micucci <[email protected]>"]
55

66
[dependencies]
77
libc = "*"
8+
9+
[dev-dependencies]
10+
lazy_static = "*"

src/channel.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use std::sync::mpsc::{Sender, Receiver, channel, TryRecvError};
2+
use std::sync::{Arc, Mutex};
3+
4+
#[derive(Debug)]
5+
pub enum ChannelError {
6+
SyncError(String),
7+
SendError,
8+
RecvError,
9+
10+
}
11+
12+
pub struct SyncChannel<T> {
13+
_s: Arc<Mutex<Sender<T>>>,
14+
_r: Arc<Mutex<Receiver<T>>>
15+
}
16+
17+
impl<T> SyncChannel<T> {
18+
pub fn new() -> Self {
19+
let (s, r) = channel::<T>();
20+
SyncChannel { _s: Arc::new(Mutex::new(s)), _r: Arc::new(Mutex::new(r)) }
21+
}
22+
23+
pub fn send(&self, data: T) -> Result<(), ChannelError> {
24+
self._s.lock()
25+
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
26+
.send(data)
27+
.map_err(|_| { ChannelError::SendError })
28+
}
29+
30+
pub fn recv(&self) -> Result<T, ChannelError> {
31+
self._r.lock()
32+
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
33+
.recv()
34+
.map_err(|_| { ChannelError::RecvError })
35+
}
36+
37+
pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
38+
match self._r.lock()
39+
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
40+
.try_recv() {
41+
42+
Err(TryRecvError::Empty) => Ok(None),
43+
Err(_) => Err(ChannelError::RecvError),
44+
Ok(d) => Ok(Some(d))
45+
}
46+
47+
}
48+
}
49+
50+
impl<T> Clone for SyncChannel<T> {
51+
fn clone(&self) -> Self {
52+
SyncChannel {
53+
_r: self._r.clone(),
54+
_s: self._s.clone(),
55+
}
56+
}
57+
}
58+
unsafe impl<T> Send for SyncChannel<T> {}
59+
unsafe impl<T> Sync for SyncChannel<T> {}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use super::*;
64+
use std::thread;
65+
use std::sync::atomic::{AtomicBool, Ordering};
66+
67+
#[test]
68+
fn test_channels() {
69+
let ch: SyncChannel<i32> = SyncChannel::new();
70+
let ch_clone = ch.clone();
71+
thread::spawn(move || {
72+
ch_clone.send(3)
73+
});
74+
assert_eq!(ch.recv().unwrap(), 3);
75+
}
76+
77+
#[test]
78+
fn test_channels_try_recv() {
79+
let ch: SyncChannel<i32> = SyncChannel::new();
80+
let ch_clone = ch.clone();
81+
let switch = Arc::new(AtomicBool::new(false));
82+
let sw_clone = switch.clone();
83+
assert_eq!(ch.try_recv().unwrap(), None);
84+
85+
thread::spawn(move || {
86+
ch_clone.send(3).unwrap();
87+
sw_clone.store(true, Ordering::Relaxed);
88+
});
89+
loop {
90+
if switch.load(Ordering::Relaxed) { break; }
91+
}
92+
assert_eq!(ch.try_recv().unwrap(), Some(3));
93+
}
94+
}

src/lib.rs

Lines changed: 3 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,5 @@
11
extern crate libc;
2+
#[cfg(test)] #[macro_use] extern crate lazy_static;
23

3-
use std::mem::{transmute, uninitialized};
4-
5-
pub use libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
6-
pub use libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
7-
pub use libc::c_int as int;
8-
9-
pub fn signal(signum: int, sig_handler: fn(int), flags: Option<int>) {
10-
let int_sig_handler: libc::size_t = unsafe { transmute(sig_handler) };
11-
signal_internal(signum, int_sig_handler, flags)
12-
}
13-
14-
fn signal_internal(signum: int, sig_handler: libc::size_t, flags: Option<int>) {
15-
let mut sigset: libc::sigset_t = unsafe { uninitialized() };
16-
let _ = unsafe { libc::sigemptyset(&mut sigset as *mut libc::sigset_t) };
17-
18-
let mut siga = unsafe { uninitialized::<libc::sigaction>() };
19-
let mut oldact = unsafe { uninitialized::<libc::sigaction>() };
20-
21-
siga.sa_sigaction = sig_handler;
22-
siga.sa_mask = sigset;
23-
siga.sa_flags = flags.map(|x| { x - libc::SA_SIGINFO }).unwrap_or(libc::SA_ONSTACK | libc::SA_RESTART);
24-
siga.sa_restorer = None;
25-
26-
unsafe { libc::sigaction(signum, &siga as *const libc::sigaction, &mut oldact as *mut libc::sigaction) };
27-
}
28-
29-
pub fn default(signum: int) {
30-
let int_sig_handler: libc::size_t = unsafe { transmute(libc::SIG_DFL) };
31-
signal_internal(signum, int_sig_handler, None)
32-
}
33-
34-
pub fn ignore(signum: int) {
35-
let int_sig_handler: libc::size_t = unsafe { transmute(libc::SIG_IGN) };
36-
signal_internal(signum, int_sig_handler, None)
37-
}
38-
39-
pub fn kill(signal: int) {
40-
unsafe { libc::kill(libc::getpid(), signal) };
41-
}
42-
43-
#[cfg(test)]
44-
mod tests {
45-
use super::*;
46-
47-
static mut HANDLER_RUN:u32 = 0;
48-
49-
fn test_handler(_: int) {
50-
println!("Handler running!");
51-
unsafe { HANDLER_RUN = HANDLER_RUN + 1 };
52-
}
53-
54-
#[test]
55-
fn test_signals() {
56-
signal(SIGUSR1, test_handler, None);
57-
58-
kill(SIGUSR1);
59-
::std::thread::sleep(::std::time::Duration::from_secs(1));
60-
unsafe { assert_eq!(HANDLER_RUN, 1) } ;
61-
62-
default(SIGUSR1);
63-
64-
ignore(SIGUSR1);
65-
66-
kill(SIGUSR1);
67-
::std::thread::sleep(::std::time::Duration::from_secs(1));
68-
69-
signal(SIGUSR1, test_handler, None);
70-
signal(SIGUSR2, test_handler, None);
71-
72-
kill(SIGUSR1);
73-
::std::thread::sleep(::std::time::Duration::from_secs(1));
74-
unsafe { assert_eq!(HANDLER_RUN, 2) } ;
75-
76-
kill(SIGUSR2);
77-
::std::thread::sleep(::std::time::Duration::from_secs(2));
78-
unsafe { assert_eq!(HANDLER_RUN, 3) } ;
79-
}
80-
81-
}
4+
pub mod channel;
5+
pub mod signal;

src/signal.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use std::mem::{transmute, uninitialized};
2+
3+
use libc;
4+
5+
pub use libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
6+
pub use libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
7+
pub use libc::c_int as int;
8+
9+
pub fn signal(signum: int, sig_handler: fn(int), flags: Option<int>) {
10+
let int_sig_handler: libc::size_t = unsafe { transmute(sig_handler) };
11+
signal_internal(signum, int_sig_handler, flags)
12+
}
13+
14+
fn signal_internal(signum: int, sig_handler: libc::size_t, flags: Option<int>) {
15+
let mut sigset: libc::sigset_t = unsafe { uninitialized() };
16+
let _ = unsafe { libc::sigemptyset(&mut sigset as *mut libc::sigset_t) };
17+
18+
let mut siga = unsafe { uninitialized::<libc::sigaction>() };
19+
let mut oldact = unsafe { uninitialized::<libc::sigaction>() };
20+
21+
siga.sa_sigaction = sig_handler;
22+
siga.sa_mask = sigset;
23+
siga.sa_flags = flags.map(|x| { x - libc::SA_SIGINFO }).unwrap_or(libc::SA_ONSTACK | libc::SA_RESTART);
24+
siga.sa_restorer = None;
25+
26+
unsafe { libc::sigaction(signum, &siga as *const libc::sigaction, &mut oldact as *mut libc::sigaction) };
27+
}
28+
29+
pub fn default(signum: int) {
30+
let int_sig_handler: libc::size_t = unsafe { transmute(libc::SIG_DFL) };
31+
signal_internal(signum, int_sig_handler, None)
32+
}
33+
34+
pub fn ignore(signum: int) {
35+
let int_sig_handler: libc::size_t = unsafe { transmute(libc::SIG_IGN) };
36+
signal_internal(signum, int_sig_handler, None)
37+
}
38+
39+
pub fn kill(signal: int) {
40+
unsafe { libc::kill(libc::getpid(), signal) };
41+
}
42+
43+
#[cfg(test)]
44+
mod tests {
45+
use super::*;
46+
use channel::SyncChannel;
47+
48+
lazy_static! {
49+
static ref TERM_CH: SyncChannel<()> = SyncChannel::new();
50+
}
51+
52+
fn test_handler(_: int) {
53+
TERM_CH.send(()).unwrap();
54+
}
55+
56+
#[test]
57+
fn test_signals() {
58+
signal(SIGUSR1, test_handler, None);
59+
60+
kill(SIGUSR1);
61+
TERM_CH.recv().unwrap();
62+
63+
default(SIGUSR1);
64+
65+
ignore(SIGUSR1);
66+
67+
kill(SIGUSR1);
68+
69+
signal(SIGUSR1, test_handler, None);
70+
signal(SIGUSR2, test_handler, None);
71+
72+
kill(SIGUSR1);
73+
TERM_CH.recv().unwrap();
74+
75+
kill(SIGUSR2);
76+
TERM_CH.recv().unwrap();
77+
}
78+
79+
}

0 commit comments

Comments
 (0)