|
| 1 | +use alloc::sync::{Arc, Weak}; |
| 2 | + |
| 3 | +use crate::{sync::UPSafeCell, task::suspend_current_and_run_next}; |
| 4 | + |
| 5 | +use super::File; |
| 6 | + |
| 7 | +pub struct Pipe { |
| 8 | + readable: bool, |
| 9 | + writable: bool, |
| 10 | + buffer: Arc<UPSafeCell<PipeRingBuffer>>, |
| 11 | +} |
| 12 | + |
| 13 | +const RING_BUFFER_SIZE: usize = 32; |
| 14 | + |
| 15 | +#[derive(Clone, Copy, PartialEq)] |
| 16 | +enum RingBufferStatus { |
| 17 | + FULL, |
| 18 | + EMPTY, |
| 19 | + NORMAL, |
| 20 | +} |
| 21 | + |
| 22 | +pub struct PipeRingBuffer { |
| 23 | + arr: [u8; RING_BUFFER_SIZE], |
| 24 | + head: usize, |
| 25 | + tail: usize, |
| 26 | + status: RingBufferStatus, |
| 27 | + write_end: Option<Weak<Pipe>>, // to tell if all write ends been closed |
| 28 | +} |
| 29 | + |
| 30 | +impl PipeRingBuffer { |
| 31 | + pub fn new() -> Self { |
| 32 | + Self { |
| 33 | + arr: [0; RING_BUFFER_SIZE], |
| 34 | + head: 0, |
| 35 | + tail: 0, |
| 36 | + status: RingBufferStatus::EMPTY, |
| 37 | + write_end: None, |
| 38 | + } |
| 39 | + } |
| 40 | + |
| 41 | + pub fn set_write_end(&mut self, write_end: &Arc<Pipe>) { |
| 42 | + self.write_end = Some(Arc::downgrade(write_end)); |
| 43 | + } |
| 44 | + |
| 45 | + pub fn read_byte(&mut self) -> u8 { |
| 46 | + self.status = RingBufferStatus::NORMAL; |
| 47 | + let c = self.arr[self.head]; |
| 48 | + self.head = (self.head + 1) % RING_BUFFER_SIZE; |
| 49 | + if self.head == self.tail { |
| 50 | + self.status = RingBufferStatus::EMPTY; |
| 51 | + } |
| 52 | + c |
| 53 | + } |
| 54 | + |
| 55 | + pub fn write_byte(&mut self, byte: u8) { |
| 56 | + self.status = RingBufferStatus::NORMAL; |
| 57 | + self.arr[self.tail] = byte; |
| 58 | + self.tail = (self.tail + 1) % RING_BUFFER_SIZE; |
| 59 | + if self.head == self.tail { |
| 60 | + self.status = RingBufferStatus::FULL; |
| 61 | + } |
| 62 | + } |
| 63 | + |
| 64 | + pub fn available_read(&self) -> usize { |
| 65 | + if self.status == RingBufferStatus::EMPTY { |
| 66 | + 0 |
| 67 | + } else { |
| 68 | + // | head .. tail | |
| 69 | + if self.tail > self.head { |
| 70 | + self.tail - self.head |
| 71 | + } |
| 72 | + // | .. .. head | .. tail |
| 73 | + // ___________/ |
| 74 | + // / tail .. .. | |
| 75 | + else { |
| 76 | + self.tail + RING_BUFFER_SIZE - self.head |
| 77 | + } |
| 78 | + } |
| 79 | + } |
| 80 | + |
| 81 | + pub fn available_write(&self) -> usize { |
| 82 | + if self.status == RingBufferStatus::FULL { |
| 83 | + 0 |
| 84 | + } else { |
| 85 | + RING_BUFFER_SIZE - self.available_read() |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + pub fn all_write_ends_closed(&self) -> bool { |
| 90 | + if let Some(weak) = &self.write_end { |
| 91 | + weak.upgrade().is_none() |
| 92 | + } else { |
| 93 | + panic!("PipeRingBuffer write_end not set!") |
| 94 | + } |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +impl Pipe { |
| 99 | + pub fn read_end_with_buffer(buffer: Arc<UPSafeCell<PipeRingBuffer>>) -> Self { |
| 100 | + Self { |
| 101 | + readable: true, |
| 102 | + writable: false, |
| 103 | + buffer, |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + pub fn write_end_with_buffer(buffer: Arc<UPSafeCell<PipeRingBuffer>>) -> Self { |
| 108 | + Self { |
| 109 | + readable: false, |
| 110 | + writable: true, |
| 111 | + buffer, |
| 112 | + } |
| 113 | + } |
| 114 | +} |
| 115 | + |
| 116 | +impl File for Pipe { |
| 117 | + fn readable(&self) -> bool { |
| 118 | + self.readable |
| 119 | + } |
| 120 | + |
| 121 | + fn writable(&self) -> bool { |
| 122 | + self.writable |
| 123 | + } |
| 124 | + |
| 125 | + fn read(&self, buf: crate::mm::UserBuffer) -> usize { |
| 126 | + assert!(self.readable); |
| 127 | + let want_to_read = buf.len(); |
| 128 | + let mut buf_iter = buf.into_iter(); |
| 129 | + let mut already_read = 0; |
| 130 | + loop { |
| 131 | + let mut rb = self.buffer.exclusive_access(); |
| 132 | + // how many bytes allowed to read during this iteration |
| 133 | + let loop_read = rb.available_read(); |
| 134 | + if loop_read == 0 { |
| 135 | + // if no more available && all write end closed, that's all we get |
| 136 | + if rb.all_write_ends_closed() { |
| 137 | + return already_read; |
| 138 | + } |
| 139 | + // else if write end still alive, we wait for more coming |
| 140 | + // aka, suspend and run other task(maybe the one holds writer) |
| 141 | + // to fill ring_buffer, and before that, we must release it |
| 142 | + // to avoid deadlock (coz task switch will not auto drop it) |
| 143 | + drop(rb); |
| 144 | + suspend_current_and_run_next(); |
| 145 | + continue; |
| 146 | + } |
| 147 | + // if loop_read bytes can be read |
| 148 | + for _ in 0..loop_read { |
| 149 | + // read into buf one-by-one |
| 150 | + if let Some(byte_ref) = buf_iter.next() { |
| 151 | + unsafe { |
| 152 | + *byte_ref = rb.read_byte(); |
| 153 | + } |
| 154 | + already_read += 1; |
| 155 | + // return if reach number we need |
| 156 | + if already_read == want_to_read { |
| 157 | + return want_to_read; |
| 158 | + } |
| 159 | + } else { |
| 160 | + // I think it's exact same with above return condition .. |
| 161 | + return already_read; |
| 162 | + } |
| 163 | + } |
| 164 | + } |
| 165 | + } |
| 166 | + |
| 167 | + fn write(&self, buf: crate::mm::UserBuffer) -> usize { |
| 168 | + assert!(self.writable); |
| 169 | + let want_to_write = buf.len(); |
| 170 | + let mut buf_iter = buf.into_iter(); |
| 171 | + let mut already_write = 0; |
| 172 | + loop { |
| 173 | + let mut rb = self.buffer.exclusive_access(); |
| 174 | + let loop_write = rb.available_write(); |
| 175 | + if loop_write == 0 { |
| 176 | + drop(rb); |
| 177 | + suspend_current_and_run_next(); |
| 178 | + continue; |
| 179 | + } |
| 180 | + for _ in 0..loop_write { |
| 181 | + if let Some(byte_ref) = buf_iter.next() { |
| 182 | + rb.write_byte(unsafe { *byte_ref }); |
| 183 | + already_write += 1; |
| 184 | + if already_write == want_to_write { |
| 185 | + return want_to_write; |
| 186 | + } |
| 187 | + } else { |
| 188 | + return already_write; |
| 189 | + } |
| 190 | + } |
| 191 | + } |
| 192 | + } |
| 193 | +} |
| 194 | + |
| 195 | +/// Return (read_end, write_end) |
| 196 | +pub fn make_pipe() -> (Arc<Pipe>, Arc<Pipe>) { |
| 197 | + let buffer = Arc::new(unsafe { UPSafeCell::new(PipeRingBuffer::new()) }); |
| 198 | + let r = Arc::new(Pipe::read_end_with_buffer(buffer.clone())); |
| 199 | + let w = Arc::new(Pipe::write_end_with_buffer(buffer.clone())); |
| 200 | + buffer.exclusive_access().set_write_end(&w); |
| 201 | + (r, w) |
| 202 | +} |
0 commit comments