Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
da0dacd
First attempt for stream_listener
ElFantasma Jun 30, 2025
c15cd54
add test to stream listener
juan518munoz Jul 1, 2025
cfca334
add util fn to convert receiver to stream
juan518munoz Jul 2, 2025
4e3db7f
add bounded channel
juan518munoz Jul 2, 2025
de32666
add broadcast listener
juan518munoz Jul 2, 2025
6bfb8a1
fix `spawn_broadcast_listener`
juan518munoz Jul 2, 2025
7a6f79a
unify spawn_listener & remove oneline functions
juan518munoz Jul 2, 2025
4d49882
doc update
juan518munoz Jul 2, 2025
5138e09
add impl of sync spawn listener
juan518munoz Jul 3, 2025
04222ae
rename spawn_listener to spawn_listener_from_iter, and port spawn_lis…
juan518munoz Jul 3, 2025
51ecbb2
add bound channel to threads concurrency
juan518munoz Jul 3, 2025
0a78b64
merge duplicated code
juan518munoz Jul 3, 2025
835bf08
add cancel token with 'flaky' test
juan518munoz Jul 3, 2025
0c24840
unflaky the test
juan518munoz Jul 3, 2025
541cc1e
add cancellation to task impl of spawn_listener
juan518munoz Jul 3, 2025
2950644
docs & clippy
juan518munoz Jul 3, 2025
376deda
Merge branch 'main' into stream_listener
juan518munoz Jul 4, 2025
6f7a305
use futures select inside spawn listener
juan518munoz Jul 4, 2025
702e4df
remove bounded channels from tasks impl
juan518munoz Jul 4, 2025
2bed000
remove sync channels from threads impl
juan518munoz Jul 4, 2025
ce77a36
deprecate spawn_listener for threads impl
juan518munoz Jul 4, 2025
2dc4cc1
fix imports
juan518munoz Jul 4, 2025
2638334
reword example for clarity
juan518munoz Jul 8, 2025
bbabca9
add comment for clarity
juan518munoz Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rust 1.85.1
rust 1.88.0
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "3"
members = [
"rt",
"examples/bank",
Expand All @@ -17,5 +18,3 @@ spawned-rt = { version = "0.1.0", path = "rt" }
spawned-concurrency = { version = "0.1.0", path = "concurrency" }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }


5 changes: 4 additions & 1 deletion concurrency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ spawned-rt = { workspace = true }
tracing = { workspace = true }
futures = "0.3.1"

[dev-dependencies]
tokio-stream = { version = "0.1.17" }

[lib]
path = "./src/lib.rs"
path = "./src/lib.rs"
14 changes: 7 additions & 7 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::GenServerError;

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
pub tx: mpsc::Sender<GenServerInMsg<G>>,
pub tx: mpsc::UnboundedSender<GenServerInMsg<G>>,
}

impl<G: GenServer> Clone for GenServerHandle<G> {
Expand All @@ -21,7 +21,7 @@ impl<G: GenServer> Clone for GenServerHandle<G> {

impl<G: GenServer> GenServerHandle<G> {
pub(crate) fn new(initial_state: G::State) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let (tx, mut rx) = mpsc::unbounded_channel::<GenServerInMsg<G>>();
let handle = GenServerHandle { tx };
let mut gen_server: G = GenServer::new();
let handle_clone = handle.clone();
Expand All @@ -39,7 +39,7 @@ impl<G: GenServer> GenServerHandle<G> {
}

pub(crate) fn new_blocking(initial_state: G::State) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let (tx, mut rx) = mpsc::unbounded_channel::<GenServerInMsg<G>>();
let handle = GenServerHandle { tx };
let mut gen_server: G = GenServer::new();
let handle_clone = handle.clone();
Expand All @@ -58,7 +58,7 @@ impl<G: GenServer> GenServerHandle<G> {
handle_clone
}

pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
pub fn sender(&self) -> mpsc::UnboundedSender<GenServerInMsg<G>> {
self.tx.clone()
}

Expand Down Expand Up @@ -131,7 +131,7 @@ where
fn run(
&mut self,
handle: &GenServerHandle<Self>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
rx: &mut mpsc::UnboundedReceiver<GenServerInMsg<Self>>,
state: Self::State,
) -> impl Future<Output = Result<(), GenServerError>> + Send {
async {
Expand Down Expand Up @@ -162,7 +162,7 @@ where
fn main_loop(
&mut self,
handle: &GenServerHandle<Self>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
rx: &mut mpsc::UnboundedReceiver<GenServerInMsg<Self>>,
mut state: Self::State,
) -> impl Future<Output = Result<(), GenServerError>> + Send {
async {
Expand All @@ -181,7 +181,7 @@ where
fn receive(
&mut self,
handle: &GenServerHandle<Self>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
rx: &mut mpsc::UnboundedReceiver<GenServerInMsg<Self>>,
state: Self::State,
) -> impl Future<Output = Result<(Self::State, bool), GenServerError>> + Send {
async move {
Expand Down
4 changes: 4 additions & 0 deletions concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

mod gen_server;
mod process;
mod stream;
mod time;

#[cfg(test)]
mod stream_tests;
#[cfg(test)]
mod timer_tests;

pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
pub use time::{send_after, send_interval};
28 changes: 16 additions & 12 deletions concurrency/src/tasks/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::future::Future;

#[derive(Debug)]
pub struct ProcessInfo<T> {
pub tx: mpsc::Sender<T>,
pub tx: mpsc::UnboundedSender<T>,
pub handle: JoinHandle<()>,
}

impl<T> ProcessInfo<T> {
pub fn sender(&self) -> mpsc::Sender<T> {
pub fn sender(&self) -> mpsc::UnboundedSender<T> {
self.tx.clone()
}

Expand All @@ -26,7 +26,7 @@ where
{
fn spawn(mut self) -> impl Future<Output = ProcessInfo<T>> + Send {
async {
let (tx, mut rx) = mpsc::channel::<T>();
let (tx, mut rx) = mpsc::unbounded_channel::<T>();
let tx_clone = tx.clone();
let handle = rt::spawn(async move {
self.run(&tx_clone, &mut rx).await;
Expand All @@ -37,8 +37,8 @@ where

fn run(
&mut self,
tx: &mpsc::Sender<T>,
rx: &mut mpsc::Receiver<T>,
tx: &mpsc::UnboundedSender<T>,
rx: &mut mpsc::UnboundedReceiver<T>,
) -> impl Future<Output = ()> + Send {
async {
self.init(tx).await;
Expand All @@ -48,8 +48,8 @@ where

fn main_loop(
&mut self,
tx: &mpsc::Sender<T>,
rx: &mut mpsc::Receiver<T>,
tx: &mpsc::UnboundedSender<T>,
rx: &mut mpsc::UnboundedReceiver<T>,
) -> impl Future<Output = ()> + Send {
async {
loop {
Expand All @@ -66,14 +66,14 @@ where
false
}

fn init(&mut self, _tx: &mpsc::Sender<T>) -> impl Future<Output = ()> + Send {
fn init(&mut self, _tx: &mpsc::UnboundedSender<T>) -> impl Future<Output = ()> + Send {
async {}
}

fn receive(
&mut self,
tx: &mpsc::Sender<T>,
rx: &mut mpsc::Receiver<T>,
tx: &mpsc::UnboundedSender<T>,
rx: &mut mpsc::UnboundedReceiver<T>,
) -> impl std::future::Future<Output = T> + Send {
async {
match rx.recv().await {
Expand All @@ -83,10 +83,14 @@ where
}
}

fn handle(&mut self, message: T, tx: &mpsc::Sender<T>) -> impl Future<Output = T> + Send;
fn handle(
&mut self,
message: T,
tx: &mpsc::UnboundedSender<T>,
) -> impl Future<Output = T> + Send;
}

pub fn send<T>(tx: &mpsc::Sender<T>, message: T)
pub fn send<T>(tx: &mpsc::UnboundedSender<T>, message: T)
where
T: Send,
{
Expand Down
50 changes: 50 additions & 0 deletions concurrency/src/tasks/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::tasks::{GenServer, GenServerHandle};
use futures::{Stream, StreamExt};
use spawned_rt::tasks::{CancellationToken, JoinHandle};

/// Spawns a listener that listens to a stream and sends messages to a GenServer.
///
/// Items sent through the stream are required to be wrapped in a Result type.
///
/// This function returns a handle to the spawned task and a cancellation token
/// to stop it.
pub fn spawn_listener<T, F, S, I, E>(
mut handle: GenServerHandle<T>,
message_builder: F,
mut stream: S,
) -> (JoinHandle<()>, CancellationToken)
where
T: GenServer + 'static,
F: Fn(I) -> T::CastMsg + Send + 'static,
I: Send,
E: std::fmt::Debug + Send,
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
{
let cancelation_token = CancellationToken::new();
Copy link
Collaborator

@ElFantasma ElFantasma Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the idea of creating the CancellationToken in the GenServer on initialization. And every time a listener or a timer is spawned to interact with the GenServer it should request it for the CancellationToken to reuse the same one.
Then, it's the GenServer responsibility to cancel the token on termination. That way, graceful cancellation is transparent for the external process.
Anyway, we can try to implement that on this PR, or ticket it for a future improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'd be a good idea to have the cancellation token inside of the gen server, considering this change would also requiere a change in the timer implementation, I believe we should make this change on a separate PR.

I'll branch from this one and start work on it ASAP.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given GenServer is just a Trait, I guess we should put the token in the GenServerHandler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've taken an approach of that kind, see the PR here

let cloned_token = cancelation_token.clone();
let join_handle = spawned_rt::tasks::spawn(async move {
loop {
if cloned_token.is_cancelled() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the way an async CancellationToken is supposed to be used:
stream.next() is a future and execution can be held at that point, even when the token gets cancelled. We should use futures::future::select like we do on timer module (or tokio::select! if we decide to allow macros) to await on both futures at the same time. See graceful shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

tracing::trace!("Received signal to stop listener, stopping");
break;
}

match stream.next().await {
Some(res) => match res {
Ok(i) => {
let _ = handle.cast(message_builder(i)).await;
}
Err(e) => {
tracing::trace!("Received Error in msg {e:?}");
break;
}
},
None => {
tracing::trace!("Stream finished");
break;
}
}
}
});
(join_handle, cancelation_token)
}
Loading