Skip to content

Commit f5c447c

Browse files
pipe: return EOF when no active writers
Signed-off-by: Andy-Python-Programmer <[email protected]>
1 parent 366dcd4 commit f5c447c

File tree

6 files changed

+72
-18
lines changed

6 files changed

+72
-18
lines changed

src/aero_kernel/src/fs/inode.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ pub trait INodeInterface: Send + Sync {
227227
Err(FileSystemError::NotSocket)
228228
}
229229

230+
fn accept(&self, _address: SocketAddr) -> Result<()> {
231+
Err(FileSystemError::NotSocket)
232+
}
233+
230234
/// Returns the inner UNIX socket inode if bound to one.
231235
fn as_unix_socket(&self) -> Result<Arc<dyn INodeInterface>> {
232236
Err(FileSystemError::NotSocket)

src/aero_kernel/src/fs/pipe.rs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use core::sync::atomic::{AtomicUsize, Ordering};
2+
3+
use aero_syscall::OpenFlags;
14
use alloc::{sync::Arc, vec::Vec};
25

36
use crate::utils::sync::{BlockQueue, Mutex};
@@ -38,41 +41,59 @@ impl Buffer {
3841
}
3942
}
4043

41-
struct BufferQueue {
42-
buffer: Mutex<Buffer>,
43-
}
44-
45-
impl BufferQueue {
46-
fn new() -> Self {
47-
Self {
48-
buffer: Mutex::new(Buffer::new()),
49-
}
50-
}
51-
}
52-
5344
pub struct Pipe {
54-
queue: BufferQueue,
45+
queue: Mutex<Buffer>,
5546

5647
readers: BlockQueue,
5748
writers: BlockQueue,
49+
50+
/// The number of writers currently connected to the pipe.
51+
num_writers: AtomicUsize,
5852
}
5953

6054
impl Pipe {
6155
pub fn new() -> Arc<Self> {
6256
Arc::new(Self {
63-
queue: BufferQueue::new(),
57+
queue: Mutex::new(Buffer::new()),
6458

6559
readers: BlockQueue::new(),
6660
writers: BlockQueue::new(),
61+
62+
num_writers: AtomicUsize::new(0),
6763
})
6864
}
65+
66+
/// Returns whether the pipe has active writers.
67+
pub fn has_active_writers(&self) -> usize {
68+
self.num_writers.load(Ordering::SeqCst)
69+
}
6970
}
7071

7172
impl INodeInterface for Pipe {
73+
fn open(&self, flags: OpenFlags) -> super::Result<()> {
74+
// Write end of the pipe:
75+
if flags.contains(OpenFlags::O_WRONLY) {
76+
self.num_writers.fetch_add(1, Ordering::SeqCst);
77+
}
78+
79+
Ok(())
80+
}
81+
82+
fn close(&self, flags: OpenFlags) {
83+
// Write end of the pipe:
84+
if flags.contains(OpenFlags::O_WRONLY) {
85+
let active_writers = (self.num_writers.fetch_sub(1, Ordering::SeqCst) - 1) == 0;
86+
// There are no active writers and no data to read (reached EOF).
87+
if active_writers {
88+
self.readers.notify_complete();
89+
}
90+
}
91+
}
92+
7293
fn read_at(&self, _offset: usize, buf: &mut [u8]) -> super::Result<usize> {
73-
let mut buffer = self
74-
.readers
75-
.block_on(&self.queue.buffer, |lock| lock.has_data())?;
94+
let mut buffer = self.readers.block_on(&self.queue, |lock| {
95+
lock.has_data() || !self.has_active_writers() == 0
96+
})?;
7697

7798
let read = buffer.read_data(buf);
7899

@@ -85,7 +106,7 @@ impl INodeInterface for Pipe {
85106
}
86107

87108
fn write_at(&self, offset: usize, buf: &[u8]) -> super::Result<usize> {
88-
let res = offset + self.queue.buffer.lock().write_data(buf);
109+
let res = offset + self.queue.lock().write_data(buf);
89110
self.readers.notify_complete();
90111

91112
Ok(res)

src/aero_kernel/src/socket/unix.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ impl INodeInterface for UnixSocket {
163163
Ok(())
164164
}
165165

166+
fn accept(&self, _address: SocketAddr) -> Result<()> {
167+
unimplemented!()
168+
}
169+
166170
fn poll(&self, table: Option<&mut PollTable>) -> Result<PollFlags> {
167171
table.map(|e| e.insert(&self.wq));
168172

src/aero_kernel/src/syscall/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ pub fn generic_do_syscall(
218218
SYS_BIND => net::bind(b, c, d),
219219
SYS_CONNECT => net::connect(b, c, d),
220220
SYS_LISTEN => net::listen(b, c),
221+
SYS_ACCEPT => net::accept(b, c, d),
221222

222223
SYS_GETTIME => time::gettime(b, c),
223224
SYS_SLEEP => time::sleep(b),

src/aero_kernel/src/syscall/net.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ pub fn connect(fd: usize, address: usize, length: usize) -> Result<usize, Syscal
3333
Ok(0)
3434
}
3535

36+
/// Accept a connection on a socket.
37+
#[syscall]
38+
pub fn accept(fd: usize, address: usize, _length: &mut u32) -> Result<usize, SyscallError> {
39+
let address = socket_addr_from_addr(VirtAddr::new(address as u64))?;
40+
let file = scheduler::get_scheduler()
41+
.current_task()
42+
.file_table
43+
.get_handle(fd)
44+
.ok_or(SyscallError::EINVAL)?;
45+
46+
file.inode().accept(address)?;
47+
Ok(0)
48+
}
49+
3650
/// Marks the socket as a passive socket (i.e. as a socket that will be used to accept incoming
3751
/// connection requests).
3852
#[syscall]

src/aero_kernel/src/userland/task.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,16 @@ impl Task {
450450
}
451451

452452
pub(super) fn update_state(&self, state: TaskState) {
453+
if state != TaskState::Runnable {
454+
log::warn!(
455+
"Task::update_state() updated the task state to {state:?}! (pid={:?}, tid={:?})",
456+
self.pid,
457+
self.tid
458+
);
459+
460+
crate::unwind::unwind_stack_trace();
461+
}
462+
453463
self.state.store(state as _, Ordering::SeqCst);
454464
}
455465

0 commit comments

Comments
 (0)