Skip to content

Commit e742b57

Browse files
committed
Squashed commit of the following:
commit f42d7e9 Author: EnvOut <31275761+EnvOut@users.noreply.github.com> Date: Fri Nov 29 17:53:45 2024 +0200 Implement the ability to manually work with channels from an actor commit b0ee60e Author: EnvOut <31275761+EnvOut@users.noreply.github.com> Date: Fri Nov 29 13:42:31 2024 +0200 global system commit aa381d8 Author: EnvOut <31275761+EnvOut@users.noreply.github.com> Date: Sun Nov 24 23:47:28 2024 +0200 cargo fmt commit 75746fc Author: EnvOut <31275761+EnvOut@users.noreply.github.com> Date: Sun Nov 24 23:42:59 2024 +0200 timeout decorator
1 parent 11610ec commit e742b57

23 files changed

+512
-255
lines changed

Cargo.lock

Lines changed: 11 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/uactor/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@ thiserror = { workspace = true }
1717
tracing = { workspace = true }
1818
paste = "1.0"
1919
derive_more = { workspace = true }
20-
derive-new = { workspace = true }
2120
bytes = { version = "1", optional = true }
22-
rand = "0.8"
23-
24-
either = "1.13.0"
21+
lazy_static = "1.5.0"
2522

2623
[dev-dependencies]
24+
more-asserts = "0.3.1"
2725
tracing-subscriber = { version = "0.3" }
2826
anyhow = "1"
2927
time = "0.3"

src/uactor/examples/dependency_injection.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ mod actor1 {
5858

5959
impl Actor for Actor1 {
6060
type Context = Context;
61+
type RouteMessage = Actor1Msg;
6162
type Inject = Services;
6263
type State = ();
6364
}
@@ -86,7 +87,7 @@ mod actor1 {
8687
Services { actor2_ref, .. }: &mut Self::Inject,
8788
msg: MessageWithoutReply,
8889
_ctx: &mut Context,
89-
state: &Self::State,
90+
_state: &Self::State,
9091
) -> HandleResult {
9192
println!("actor1: Received {msg:?} message, sending PrintMessage to the actor2");
9293
actor2_ref.send(PrintMessage::new(msg.into()))?;
@@ -124,6 +125,7 @@ mod actor2 {
124125

125126
impl Actor for Actor2 {
126127
type Context = Context;
128+
type RouteMessage = Actor2Msg;
127129
type Inject = Services;
128130
type State = ();
129131
}
@@ -196,15 +198,16 @@ async fn main() -> anyhow::Result<()> {
196198

197199
// Init system and register services
198200
let mut system = System::global()
199-
.extension(service1)
200-
.extension(service2)
201-
.build();
201+
.with(|mut system| {
202+
system.extension(service1.clone())
203+
.extension(service2.clone());
204+
}).await;
202205

203206
// Init actor (instance + spawn actor)
204-
let (actor1_ref, actor1_stream) = system.register_ref::<Actor1, _, Actor1MpscRef>("actor1");
207+
let (actor1_ref, actor1_stream) = system.register_ref::<Actor1, _, Actor1MpscRef>("actor1").await;
205208

206209
// Init actor2 (instance + spawn actor)
207-
let (actor2_ref, actor2_stream) = system.register_ref::<Actor2, _, Actor2MpscRef>("actor2");
210+
let (actor2_ref, actor2_stream) = system.register_ref::<Actor2, _, Actor2MpscRef>("actor2").await;
208211

209212
// Run actors
210213
let actor1 = Actor1;
@@ -214,17 +217,16 @@ async fn main() -> anyhow::Result<()> {
214217
system.spawn_actor(actor2_ref.name(), actor2, *actor2_ref.state(), (actor2_stream)).await?;
215218

216219
// Case #1: send messages and call injected (not from &self) services inside handlers
217-
println!(
218-
"-- Case #1: send messages and call injected (not from &self) services inside handlers"
219-
);
220+
println!("-- Case #1: send messages and call injected (not from &self) services inside handlers");
220221
let pong1 = actor1_ref.ask::<PongMsg>(PingMsg).await?;
221222
let pong2 = actor2_ref.ask::<PongMsg>(PingMsg).await?;
222-
println!("main: received {pong1:?} and {pong2:?} messages");
223+
println!("main: received {pong1:?} and {pong2:?} messages\n");
223224

224225
// Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1
225-
println!("\n-- Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1");
226+
println!("-- Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1");
226227
actor1_ref.send(MessageWithoutReply("login:password".to_owned()))?;
227228

229+
// wait for actors to finish
228230
tokio::time::sleep(1.std_milliseconds()).await;
229231
Ok(())
230232
}

src/uactor/examples/interval.rs

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
11
use time::ext::NumericalStdDuration;
2-
use tokio::sync::mpsc::UnboundedSender;
2+
use tokio_stream::StreamExt;
33
use uactor::actor::abstract_actor::MessageSender;
44

55
use uactor::system::System;
66

7-
use crate::actor1::{Actor1, Actor1MpscRef};
87
use crate::actor1::Actor1Msg;
9-
use crate::actor1::Actor1Ref;
10-
use crate::messages::{PingMsg, PongMsg};
8+
use crate::actor1::{Actor1, Actor1MpscRef};
9+
use crate::messages::{AskTicksCountMsg, TicksCount, UpdateMetrics};
10+
use more_asserts as ma;
11+
use uactor::actor::message::IntervalMessage;
12+
use uactor::data::datasource_decorator::DataSourceMapExt;
1113

1214
mod messages {
1315
use uactor::actor::message::{Message, Reply};
1416

15-
pub struct PingMsg(pub Reply<PongMsg>);
17+
pub struct AskTicksCountMsg(pub Reply<TicksCount>);
18+
pub struct UpdateMetrics;
1619

1720
#[derive(Debug)]
18-
pub struct PongMsg;
21+
pub struct TicksCount(pub usize);
1922

20-
uactor::message_impl!(PingMsg, PongMsg);
23+
uactor::message_impl!(AskTicksCountMsg, TicksCount, UpdateMetrics);
2124
}
2225

2326
mod actor1 {
2427
use uactor::actor::abstract_actor::{Actor, HandleResult, Handler};
25-
use uactor::actor::context::Context;
28+
use uactor::actor::context::{ActorContext, Context};
2629
use uactor::actor::message::IntervalMessage;
2730

28-
use crate::messages::{PingMsg, PongMsg};
31+
use crate::messages::{AskTicksCountMsg, TicksCount, UpdateMetrics};
2932

3033
#[derive(Default)]
3134
pub struct Actor1 {
@@ -34,21 +37,39 @@ mod actor1 {
3437

3538
impl Actor for Actor1 {
3639
type Context = Context;
40+
type RouteMessage = Actor1Msg;
3741
type Inject = ();
3842
type State = ();
3943
}
4044

41-
impl Handler<PingMsg> for Actor1 {
45+
impl Handler<AskTicksCountMsg> for Actor1 {
4246
async fn handle(
4347
&mut self,
4448
_: &mut Self::Inject,
45-
ping: PingMsg,
46-
_ctx: &mut Context,
47-
state: &Self::State,
49+
AskTicksCountMsg(reply): AskTicksCountMsg,
50+
ctx: &mut Context,
51+
_state: &Self::State,
4852
) -> HandleResult {
4953
println!("actor1: Received ping message");
50-
let PingMsg(reply) = ping;
51-
let _ = reply.send(PongMsg);
54+
let _ = reply.send(TicksCount(self.interval_count as usize));
55+
ctx.kill();
56+
Ok(())
57+
}
58+
}
59+
60+
impl Handler<UpdateMetrics> for Actor1 {
61+
async fn handle(
62+
&mut self,
63+
_: &mut Self::Inject,
64+
_: UpdateMetrics,
65+
_ctx: &mut Context,
66+
_state: &Self::State,
67+
) -> HandleResult {
68+
self.interval_count += 1;
69+
println!(
70+
"actor1: received {}nd UpdateMetrics message",
71+
self.interval_count
72+
);
5273
Ok(())
5374
}
5475
}
@@ -62,36 +83,41 @@ mod actor1 {
6283
duration: _,
6384
}: IntervalMessage,
6485
_ctx: &mut Context,
65-
state: &Self::State,
86+
_state: &Self::State,
6687
) -> HandleResult {
67-
self.interval_count += 1;
68-
println!(
69-
"actor1: received {}nd interval message",
70-
self.interval_count
71-
);
88+
// some important logic
7289
Ok(())
7390
}
7491
}
7592

76-
uactor::generate_actor_ref!(Actor1, { PingMsg });
93+
uactor::generate_actor_ref!(Actor1, { AskTicksCountMsg, UpdateMetrics, IntervalMessage });
7794
}
7895

7996
#[tokio::main]
8097
async fn main() -> anyhow::Result<()> {
81-
let mut system = System::global().build();
98+
use uactor::data::datasource_decorator::DataSourceMapExt;
99+
let mut system = System::global();
82100

83-
// 1 second interval
84-
let interval = tokio::time::interval(1.std_seconds());
101+
// 100 milliseconds interval for updating metrics
102+
let metrics_update_interval = tokio::time::interval(100.std_milliseconds())
103+
.map(|_: IntervalMessage| Actor1Msg::UpdateMetrics(UpdateMetrics));
104+
// 1 second interval for other tasks
105+
let other_interval = tokio::time::interval(1.std_seconds())
106+
.map(Actor1Msg::IntervalMessage);
85107

86-
let (actor1_ref, actor1_stream) = system.register_ref::<Actor1, _, Actor1MpscRef>("actor1");
108+
// Initialize actor's reference
109+
let (actor1_ref, actor1_stream) = system.register_ref::<Actor1, _, Actor1MpscRef>("actor1").await;
87110

111+
// Spawn actor
88112
let actor1 = Actor1::default();
89-
system.spawn_actor(actor1_ref.name(), actor1, *actor1_ref.state(), (actor1_stream, interval)).await?;
113+
let (_state, actor_handle) = system.spawn_actor(actor1_ref.name(), actor1, *actor1_ref.state(), (actor1_stream, metrics_update_interval, other_interval)).await?;
90114

91-
let pong = actor1_ref.ask::<PongMsg>(PingMsg).await?;
92-
println!("main: received {pong:?} message");
115+
// Wait for 5 ticks
116+
tokio::time::sleep(500.std_milliseconds()).await;
117+
let TicksCount(ticks_count) = actor1_ref.ask::<TicksCount>(AskTicksCountMsg).await?;
118+
ma::assert_ge!(ticks_count, 5, "waiting 5 ticks and expecting at least 5 messages received");
93119

94-
// waiting 10 seconds and expecting new message each 1 second
95-
tokio::time::sleep(10.std_seconds()).await;
120+
tokio::time::sleep(1.std_microseconds()).await;
121+
assert!(actor_handle.is_finished(), "actor should be finished after receiving AskTicksCountMsg");
96122
Ok(())
97123
}

0 commit comments

Comments
 (0)