Skip to content

Commit d6400b8

Browse files
committed
Add Nonblock option for Sender and Receiver.
And, Make Sender and Receiver can not be constructed out of this crate.
1 parent 2b23af6 commit d6400b8

File tree

2 files changed

+45
-6
lines changed

2 files changed

+45
-6
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fifo"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
license = "MIT"
55
authors = ["qupeng <onlyqupeng@gmail.com>"]
66
description = "First-in-first-out lock-free ring-buffer like kfifo in Linux"

src/lib.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,29 @@ impl Drop for Inner {
5757

5858
unsafe impl Sync for Inner {}
5959

60+
/// What can we do when operations on `Sender` or `Receiver` would block.
61+
///
62+
/// 1) Just return `Err(ErrorKind::WouldBlock)` immediately; or
63+
/// 2) sleep for some milliseconds.
64+
///
65+
/// The default on `Sender` and `Receiver` is Sleep(10).
66+
pub enum WouldBlock {
67+
Nonblock,
68+
Sleep(u64),
69+
}
70+
6071
/// The fifo sender. It's `Send` but `!Send`.
6172
pub struct Sender {
73+
_private: (),
6274
inner: Arc<Inner>,
75+
would_block: WouldBlock,
6376
}
6477

6578
/// The fifo receiver. It's `Send` but `!Send`.
6679
pub struct Receiver {
80+
_private: (),
6781
inner: Arc<Inner>,
82+
would_block: WouldBlock,
6883
}
6984

7085
impl Drop for Sender {
@@ -107,12 +122,24 @@ pub fn align_up_for_fifo_size(size: usize) -> usize {
107122
pub fn fifo(size: usize) -> (Sender, Receiver) {
108123
let size = align_up_for_fifo_size(size);
109124
let inner = Arc::new(Inner::new(size));
110-
let sender = Sender { inner: inner.clone() };
111-
let receiver = Receiver { inner: inner };
125+
let sender = Sender {
126+
_private: (),
127+
inner: inner.clone(),
128+
would_block: WouldBlock::Sleep(10),
129+
};
130+
let receiver = Receiver {
131+
_private: (),
132+
inner: inner,
133+
would_block: WouldBlock::Sleep(10),
134+
};
112135
(sender, receiver)
113136
}
114137

115138
impl Sender {
139+
pub fn set_would_block(&mut self, would_block: WouldBlock) {
140+
self.would_block = would_block;
141+
}
142+
116143
fn do_write<T>(&mut self, bytes: usize, mut cp_data_to: T) -> io::Result<usize>
117144
where T: FnMut(&mut [u8], usize, usize) -> io::Result<usize>
118145
{
@@ -128,9 +155,13 @@ impl Sender {
128155
break;
129156
} else {
130157
if inner.shutdown.shuted(SHUT_READ) {
131-
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"));
158+
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed on read end"));
159+
}
160+
if let WouldBlock::Sleep(sleep) = self.would_block {
161+
thread::sleep(time::Duration::from_millis(sleep));
162+
} else {
163+
return Err(io::Error::new(io::ErrorKind::WouldBlock, "buffer is full"));
132164
}
133-
thread::sleep(time::Duration::from_millis(10));
134165
};
135166
}
136167
let start_pos = pin & (inner.size - 1);
@@ -161,6 +192,10 @@ impl io::Write for Sender {
161192
}
162193

163194
impl Receiver {
195+
pub fn set_would_block(&mut self, would_block: WouldBlock) {
196+
self.would_block = would_block;
197+
}
198+
164199
fn do_write<T>(&mut self, bytes: usize, mut cp_data_from: T) -> io::Result<usize>
165200
where T: FnMut(&[u8], usize, usize) -> io::Result<usize>
166201
{
@@ -178,7 +213,11 @@ impl Receiver {
178213
if inner.shutdown.shuted(SHUT_WRITE) {
179214
return Ok(0);
180215
}
181-
thread::sleep(time::Duration::from_millis(10));
216+
if let WouldBlock::Sleep(sleep) = self.would_block {
217+
thread::sleep(time::Duration::from_millis(sleep));
218+
} else {
219+
return Err(io::Error::new(io::ErrorKind::WouldBlock, "buffer is empty"));
220+
}
182221
}
183222
}
184223
let start_pos = pout & (inner.size - 1);

0 commit comments

Comments
 (0)