Skip to content

Commit 062867a

Browse files
author
unbalancedparentheses
committed
fix tests and add spawn link
1 parent a5ab9b4 commit 062867a

File tree

3 files changed

+103
-37
lines changed

3 files changed

+103
-37
lines changed

concurrency/src/registry.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,17 @@ pub fn clear() {
193193
#[cfg(test)]
194194
mod tests {
195195
use super::*;
196+
use std::sync::Mutex;
196197

197-
// Helper to ensure test isolation
198+
// Mutex to serialize tests that need an isolated registry
199+
static TEST_MUTEX: Mutex<()> = Mutex::new(());
200+
201+
// Helper to ensure test isolation - clears registry and holds lock
198202
fn with_clean_registry<F, R>(f: F) -> R
199203
where
200204
F: FnOnce() -> R,
201205
{
206+
let _guard = TEST_MUTEX.lock().unwrap();
202207
clear();
203208
let result = f();
204209
clear();
@@ -325,33 +330,28 @@ mod tests {
325330

326331
#[test]
327332
fn test_count() {
328-
// Note: Due to parallel test execution, we can't rely on absolute counts.
329-
// Instead, we verify that registration increases count and unregistration decreases it.
330-
let pid1 = Pid::new();
331-
let pid2 = Pid::new();
332-
333-
// Use unique names to avoid conflicts with parallel tests
334-
let name1 = format!("count_test_{}", pid1.id());
335-
let name2 = format!("count_test_{}", pid2.id());
336-
337-
let count_before = count();
338-
register(&name1, pid1).unwrap();
339-
let count_after_first = count();
340-
assert!(
341-
count_after_first > count_before,
342-
"Count should increase after registration"
343-
);
344-
345-
register(&name2, pid2).unwrap();
346-
let count_after_second = count();
347-
assert!(
348-
count_after_second > count_after_first,
349-
"Count should increase after second registration"
350-
);
351-
352-
// Clean up our registrations
353-
unregister(&name1);
354-
unregister(&name2);
333+
// Use with_clean_registry for test isolation
334+
with_clean_registry(|| {
335+
let pid1 = Pid::new();
336+
let pid2 = Pid::new();
337+
338+
let name1 = format!("count_test_{}", pid1.id());
339+
let name2 = format!("count_test_{}", pid2.id());
340+
341+
assert_eq!(count(), 0, "Registry should be empty");
342+
343+
register(&name1, pid1).unwrap();
344+
assert_eq!(count(), 1, "Count should be 1 after first registration");
345+
346+
register(&name2, pid2).unwrap();
347+
assert_eq!(count(), 2, "Count should be 2 after second registration");
348+
349+
unregister(&name1);
350+
assert_eq!(count(), 1, "Count should be 1 after unregistration");
351+
352+
unregister(&name2);
353+
assert_eq!(count(), 0, "Count should be 0 after all unregistrations");
354+
});
355355
}
356356

357357
#[test]

concurrency/src/tasks/gen_server.rs

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -412,24 +412,65 @@ pub trait GenServer: Send + Sized {
412412
GenServerHandle::new(self)
413413
}
414414

415-
/// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't
416-
/// happen if the task is blocking the thread. As such, for sync compute task
415+
/// Tokio tasks depend on a collaborative multitasking model. "Work stealing" can't
416+
/// happen if the task is blocking the thread. As such, for sync compute tasks
417417
/// or other blocking tasks need to be in their own separate thread, and the OS
418418
/// will manage them through hardware interrupts.
419-
/// Start blocking provides such thread.
419+
/// `start_blocking` provides such a thread.
420420
fn start_blocking(self) -> GenServerHandle<Self> {
421421
GenServerHandle::new_blocking(self)
422422
}
423423

424-
/// For some "singleton" GenServers that run througout the whole execution of the
424+
/// For some "singleton" GenServers that run throughout the whole execution of the
425425
/// program, it makes sense to run in their own dedicated thread to avoid interference
426426
/// with the rest of the tasks' runtime.
427-
/// The use of tokio::task::spawm_blocking is not recommended for these scenarios
428-
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end
427+
/// The use of `tokio::task::spawn_blocking` is not recommended for these scenarios
428+
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end.
429429
fn start_on_thread(self) -> GenServerHandle<Self> {
430430
GenServerHandle::new_on_thread(self)
431431
}
432432

433+
/// Start the GenServer and create a bidirectional link with another process.
434+
///
435+
/// This is equivalent to calling `start()` followed by `link()`, but as an
436+
/// atomic operation. If the link fails, the GenServer is stopped.
437+
///
438+
/// # Example
439+
///
440+
/// ```ignore
441+
/// let parent = ParentServer::new().start();
442+
/// let child = ChildServer::new().start_linked(&parent)?;
443+
/// // Now if either crashes, the other will be notified
444+
/// ```
445+
fn start_linked(self, other: &impl HasPid) -> Result<GenServerHandle<Self>, LinkError> {
446+
let handle = self.start();
447+
handle.link(other)?;
448+
Ok(handle)
449+
}
450+
451+
/// Start the GenServer and set up monitoring from another process.
452+
///
453+
/// This is equivalent to calling `start()` followed by `monitor()`, but as an
454+
/// atomic operation. The monitoring process will receive a DOWN message when
455+
/// this GenServer exits.
456+
///
457+
/// # Example
458+
///
459+
/// ```ignore
460+
/// let supervisor = SupervisorServer::new().start();
461+
/// let (worker, monitor_ref) = WorkerServer::new().start_monitored(&supervisor)?;
462+
/// // supervisor will receive DOWN message when worker exits
463+
/// ```
464+
fn start_monitored(
465+
self,
466+
monitor_from: &impl HasPid,
467+
) -> Result<(GenServerHandle<Self>, MonitorRef), LinkError> {
468+
let handle = self.start();
469+
let monitor_ref = monitor_from.pid();
470+
let actual_ref = process_table::monitor(monitor_ref, handle.pid())?;
471+
Ok((handle, actual_ref))
472+
}
473+
433474
fn run(
434475
self,
435476
handle: &GenServerHandle<Self>,

concurrency/src/threads/gen_server.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,10 @@ pub enum InitResult<G: GenServer> {
290290
}
291291

292292
pub trait GenServer: Send + Sized {
293-
type CallMsg: Clone + Send + Sized;
294-
type CastMsg: Clone + Send + Sized;
293+
type CallMsg: Clone + Send + Sized + Sync;
294+
type CastMsg: Clone + Send + Sized + Sync;
295295
type OutMsg: Send + Sized;
296-
type Error: Debug;
296+
type Error: Debug + Send;
297297

298298
fn start(self) -> GenServerHandle<Self> {
299299
GenServerHandle::new(self)
@@ -305,6 +305,31 @@ pub trait GenServer: Send + Sized {
305305
GenServerHandle::new(self)
306306
}
307307

308+
/// Start the GenServer and create a bidirectional link with another process.
309+
///
310+
/// This is equivalent to calling `start()` followed by `link()`, but as an
311+
/// atomic operation. If the link fails, the GenServer is stopped.
312+
fn start_linked(self, other: &impl HasPid) -> Result<GenServerHandle<Self>, LinkError> {
313+
let handle = self.start();
314+
handle.link(other)?;
315+
Ok(handle)
316+
}
317+
318+
/// Start the GenServer and set up monitoring from another process.
319+
///
320+
/// This is equivalent to calling `start()` followed by `monitor()`, but as an
321+
/// atomic operation. The monitoring process will receive a DOWN message when
322+
/// this GenServer exits.
323+
fn start_monitored(
324+
self,
325+
monitor_from: &impl HasPid,
326+
) -> Result<(GenServerHandle<Self>, MonitorRef), LinkError> {
327+
let handle = self.start();
328+
let monitor_ref = monitor_from.pid();
329+
let actual_ref = process_table::monitor(monitor_ref, handle.pid())?;
330+
Ok((handle, actual_ref))
331+
}
332+
308333
fn run(
309334
self,
310335
handle: &GenServerHandle<Self>,

0 commit comments

Comments
 (0)