Skip to content

Commit f9cf376

Browse files
committed
feat: enhance process management, IPC and synchronization primitives
- Integrated new thread states: Sleeping, BlockedWait, BlockedJoin, and BlockedSync - Implemented core synchronization primitives: Semaphore, RwLock, and Condvar - Added Service Registry to IPC for name-based communication - Improved Priority and RoundRobin schedulers to handle new thread states and wake-up logic - Added public APIs for thread_sleep, wait_child, and thread_join - Enhanced Mutex with priority inheritance support and dedicated blocking state - Refactored PCB and ProcessManager to handle process-level waiting and signals
1 parent c1d37b7 commit f9cf376

File tree

13 files changed

+1472
-8
lines changed

13 files changed

+1472
-8
lines changed

kernel/src/ipc/mod.rs

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
//! Inter-Process Communication (IPC) for Proka Kernel
2+
//!
3+
//! This module provides message passing between threads.
4+
//! Design principles:
5+
//! - Simple synchronous message passing
6+
//! - Support for timeout and asynchronous send
7+
//! - Efficient for kernel-level communication
8+
9+
use crate::process::scheduler;
10+
use crate::process::thread::Tid;
11+
use alloc::collections::VecDeque;
12+
use alloc::vec::Vec;
13+
use spin::Mutex;
14+
15+
/// Maximum message size in bytes
16+
pub const MAX_MESSAGE_SIZE: usize = 1024;
17+
18+
/// Maximum number of pending messages per queue
19+
pub const MAX_QUEUE_SIZE: usize = 64;
20+
21+
/// IPC message structure
22+
#[derive(Debug, Clone)]
23+
pub struct Message {
24+
/// Sender thread ID
25+
pub sender: Tid,
26+
/// Message type (user-defined)
27+
pub msg_type: u64,
28+
/// Message payload
29+
pub payload: Vec<u8>,
30+
}
31+
32+
impl Message {
33+
/// Create a new message
34+
pub fn new(sender: Tid, msg_type: u64, payload: Vec<u8>) -> Result<Self, IpcError> {
35+
if payload.len() > MAX_MESSAGE_SIZE {
36+
return Err(IpcError::MessageTooLarge);
37+
}
38+
Ok(Self {
39+
sender,
40+
msg_type,
41+
payload,
42+
})
43+
}
44+
45+
/// Create a simple message with no payload
46+
pub fn simple(sender: Tid, msg_type: u64) -> Self {
47+
Self {
48+
sender,
49+
msg_type,
50+
payload: Vec::new(),
51+
}
52+
}
53+
}
54+
55+
/// Message queue for a thread
56+
pub struct MessageQueue {
57+
/// Owner thread ID
58+
owner: Tid,
59+
/// Pending messages
60+
messages: VecDeque<Message>,
61+
/// Threads waiting to receive from this queue
62+
waiters: Vec<Tid>,
63+
}
64+
65+
impl MessageQueue {
66+
/// Create a new message queue
67+
pub fn new(owner: Tid) -> Self {
68+
Self {
69+
owner,
70+
messages: VecDeque::new(),
71+
waiters: Vec::new(),
72+
}
73+
}
74+
75+
/// Send a message to this queue
76+
pub fn send(&mut self, msg: Message) -> Result<(), IpcError> {
77+
if self.messages.len() >= MAX_QUEUE_SIZE {
78+
return Err(IpcError::QueueFull);
79+
}
80+
self.messages.push_back(msg);
81+
82+
// Wake up a waiter if any
83+
if let Some(waiter) = self.waiters.pop() {
84+
let _ = scheduler::unblock(waiter);
85+
}
86+
87+
Ok(())
88+
}
89+
90+
/// Try to receive a message
91+
pub fn try_recv(&mut self) -> Option<Message> {
92+
self.messages.pop_front()
93+
}
94+
95+
/// Add a waiter to this queue
96+
pub fn add_waiter(&mut self, tid: Tid) {
97+
if !self.waiters.contains(&tid) {
98+
self.waiters.push(tid);
99+
}
100+
}
101+
102+
/// Remove a waiter from this queue
103+
pub fn remove_waiter(&mut self, tid: Tid) {
104+
self.waiters.retain(|&t| t != tid);
105+
}
106+
107+
/// Check if queue is empty
108+
pub fn is_empty(&self) -> bool {
109+
self.messages.is_empty()
110+
}
111+
112+
/// Get the number of pending messages
113+
pub fn len(&self) -> usize {
114+
self.messages.len()
115+
}
116+
}
117+
118+
/// Global message queue registry
119+
static MESSAGE_QUEUES: Mutex<Option<alloc::vec::Vec<Option<Mutex<MessageQueue>>>>> =
120+
Mutex::new(None);
121+
122+
/// Global service registry (name -> tid)
123+
static SERVICE_REGISTRY: Mutex<Option<alloc::collections::BTreeMap<alloc::string::String, Tid>>> =
124+
Mutex::new(None);
125+
126+
/// Initialize IPC subsystem
127+
pub fn init() {
128+
let mut queues = MESSAGE_QUEUES.lock();
129+
*queues = Some(alloc::vec![]);
130+
let mut services = SERVICE_REGISTRY.lock();
131+
*services = Some(alloc::collections::BTreeMap::new());
132+
log::info!("IPC subsystem initialized");
133+
}
134+
135+
/// Create a message queue for a thread
136+
pub fn create_queue(tid: Tid) -> Result<(), IpcError> {
137+
let mut queues_opt = MESSAGE_QUEUES.lock();
138+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
139+
140+
let idx = tid as usize;
141+
if idx >= queues.len() {
142+
queues.resize_with(idx + 1, || None);
143+
}
144+
145+
if queues[idx].is_some() {
146+
return Err(IpcError::QueueExists);
147+
}
148+
149+
queues[idx] = Some(Mutex::new(MessageQueue::new(tid)));
150+
Ok(())
151+
}
152+
153+
/// Destroy a message queue
154+
pub fn destroy_queue(tid: Tid) {
155+
let mut queues_opt = MESSAGE_QUEUES.lock();
156+
if let Some(queues) = queues_opt.as_mut() {
157+
let idx = tid as usize;
158+
if idx < queues.len() {
159+
// Wake up all waiters before destroying
160+
if let Some(queue) = queues[idx].take() {
161+
let q = queue.lock();
162+
for waiter in &q.waiters {
163+
let _ = scheduler::unblock(*waiter);
164+
}
165+
}
166+
}
167+
}
168+
}
169+
170+
/// Send a message to a target thread
171+
///
172+
/// # Arguments
173+
/// * `target` - Target thread ID
174+
/// * `msg` - Message to send
175+
/// * `block` - If true, block until message can be sent
176+
///
177+
/// # Returns
178+
/// * `Ok(())` - Message sent successfully
179+
/// * `Err(IpcError)` - Send failed
180+
pub fn send(target: Tid, msg: Message, block: bool) -> Result<(), IpcError> {
181+
loop {
182+
{
183+
let mut queues_opt = MESSAGE_QUEUES.lock();
184+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
185+
186+
let idx = target as usize;
187+
if idx >= queues.len() || queues[idx].is_none() {
188+
return Err(IpcError::ThreadNotFound);
189+
}
190+
191+
let mut queue = queues[idx].as_ref().unwrap().lock();
192+
match queue.send(msg.clone()) {
193+
Ok(()) => return Ok(()),
194+
Err(IpcError::QueueFull) if !block => return Err(IpcError::QueueFull),
195+
Err(e) => return Err(e),
196+
#[allow(unreachable_patterns)]
197+
_ => {}
198+
}
199+
}
200+
201+
// Queue is full and we're blocking - yield and retry
202+
if block {
203+
scheduler::yield_thread();
204+
}
205+
}
206+
}
207+
208+
/// Register a service name for the current thread
209+
pub fn register_service(name: &str) -> Result<(), IpcError> {
210+
let tid = scheduler::current_tid().ok_or(IpcError::NotInitialized)?;
211+
let mut registry_opt = SERVICE_REGISTRY.lock();
212+
let registry = registry_opt.as_mut().ok_or(IpcError::NotInitialized)?;
213+
214+
if registry.contains_key(name) {
215+
return Err(IpcError::ServiceExists);
216+
}
217+
218+
registry.insert(alloc::string::String::from(name), tid);
219+
Ok(())
220+
}
221+
222+
/// Look up a thread ID by service name
223+
pub fn lookup_service(name: &str) -> Option<Tid> {
224+
let registry_opt = SERVICE_REGISTRY.lock();
225+
let registry = registry_opt.as_ref()?;
226+
registry.get(name).cloned()
227+
}
228+
229+
/// Receive a message
230+
///
231+
/// # Arguments
232+
/// * `sender` - If Some, only receive from this sender; if None, receive from any
233+
/// * `timeout_ms` - If Some, block for at most this many milliseconds
234+
///
235+
/// # Returns
236+
/// * `Ok(Message)` - Message received
237+
/// * `Err(IpcError)` - Receive failed or timed out
238+
pub fn recv(sender: Option<Tid>, timeout_ms: Option<u64>) -> Result<Message, IpcError> {
239+
let current_tid = scheduler::current_tid().ok_or(IpcError::NotInitialized)?;
240+
241+
// Ensure we have a queue
242+
if let Err(e) = create_queue(current_tid) {
243+
if !matches!(e, IpcError::QueueExists) {
244+
return Err(e);
245+
}
246+
}
247+
248+
// Try to receive without blocking first
249+
{
250+
let mut queues_opt = MESSAGE_QUEUES.lock();
251+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
252+
253+
let idx = current_tid as usize;
254+
if let Some(queue) = queues[idx].as_ref() {
255+
let mut q = queue.lock();
256+
257+
// Check for matching message
258+
if let Some(pos) = q
259+
.messages
260+
.iter()
261+
.position(|m| sender.map_or(true, |s| m.sender == s))
262+
{
263+
return Ok(q.messages.remove(pos).unwrap());
264+
}
265+
}
266+
}
267+
268+
// No message available, block if requested
269+
if timeout_ms.is_some() || sender.is_some() {
270+
// Add ourselves to waiters
271+
{
272+
let mut queues_opt = MESSAGE_QUEUES.lock();
273+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
274+
let idx = current_tid as usize;
275+
if let Some(queue) = queues[idx].as_ref() {
276+
queue.lock().add_waiter(current_tid);
277+
}
278+
}
279+
280+
// Block waiting for IPC
281+
scheduler::block_ipc(sender, timeout_ms);
282+
283+
// We've been unblocked, try to receive again
284+
// Note: In a real implementation, we'd need to handle timeout
285+
// by checking the current time against the deadline
286+
287+
{
288+
let mut queues_opt = MESSAGE_QUEUES.lock();
289+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
290+
let idx = current_tid as usize;
291+
if let Some(queue) = queues[idx].as_ref() {
292+
let mut q = queue.lock();
293+
q.remove_waiter(current_tid);
294+
295+
// Check for matching message again
296+
if let Some(pos) = q
297+
.messages
298+
.iter()
299+
.position(|m| sender.map_or(true, |s| m.sender == s))
300+
{
301+
return Ok(q.messages.remove(pos).unwrap());
302+
}
303+
}
304+
}
305+
306+
// No message received (likely timeout)
307+
Err(IpcError::Timeout)
308+
} else {
309+
Err(IpcError::WouldBlock)
310+
}
311+
}
312+
313+
/// Try to receive a message without blocking
314+
pub fn try_recv(sender: Option<Tid>) -> Result<Message, IpcError> {
315+
let current_tid = scheduler::current_tid().ok_or(IpcError::NotInitialized)?;
316+
317+
let mut queues_opt = MESSAGE_QUEUES.lock();
318+
let queues = queues_opt.as_mut().ok_or(IpcError::NotInitialized)?;
319+
320+
let idx = current_tid as usize;
321+
if idx >= queues.len() || queues[idx].is_none() {
322+
return Err(IpcError::NoQueue);
323+
}
324+
325+
let mut q = queues[idx].as_ref().unwrap().lock();
326+
327+
// Check for matching message
328+
if let Some(pos) = q
329+
.messages
330+
.iter()
331+
.position(|m| sender.map_or(true, |s| m.sender == s))
332+
{
333+
Ok(q.messages.remove(pos).unwrap())
334+
} else {
335+
Err(IpcError::WouldBlock)
336+
}
337+
}
338+
339+
/// IPC errors
340+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341+
pub enum IpcError {
342+
NotInitialized,
343+
ThreadNotFound,
344+
QueueFull,
345+
QueueExists,
346+
NoQueue,
347+
MessageTooLarge,
348+
Timeout,
349+
WouldBlock,
350+
ServiceExists,
351+
}
352+
353+
#[cfg(test)]
354+
mod tests {
355+
use super::*;
356+
357+
#[test_case]
358+
fn test_message_creation() {
359+
let msg = Message::simple(1, 42);
360+
assert_eq!(msg.sender, 1);
361+
assert_eq!(msg.msg_type, 42);
362+
assert!(msg.payload.is_empty());
363+
}
364+
365+
#[test_case]
366+
fn test_message_queue() {
367+
let mut queue = MessageQueue::new(1);
368+
assert!(queue.is_empty());
369+
370+
let msg = Message::simple(2, 1);
371+
queue.send(msg.clone()).unwrap();
372+
assert_eq!(queue.len(), 1);
373+
374+
let received = queue.try_recv().unwrap();
375+
assert_eq!(received.sender, msg.sender);
376+
assert!(queue.is_empty());
377+
}
378+
379+
#[test_case]
380+
fn test_queue_full() {
381+
let mut queue = MessageQueue::new(1);
382+
383+
// Fill the queue
384+
for i in 0..MAX_QUEUE_SIZE {
385+
let msg = Message::simple(i as Tid, i as u64);
386+
queue.send(msg).unwrap();
387+
}
388+
389+
// Next send should fail
390+
let msg = Message::simple(999, 999);
391+
assert!(matches!(queue.send(msg), Err(IpcError::QueueFull)));
392+
}
393+
}

0 commit comments

Comments
 (0)