11use time:: ext:: NumericalStdDuration ;
2- use tokio :: sync :: mpsc :: UnboundedSender ;
2+ use tokio_stream :: StreamExt ;
33use uactor:: actor:: abstract_actor:: MessageSender ;
44
55use uactor:: system:: System ;
66
7- use crate :: actor1:: { Actor1 , Actor1MpscRef } ;
87use crate :: actor1:: Actor1Msg ;
9- use crate :: actor1:: Actor1Ref ;
10- use crate :: messages:: { PingMsg , PongMsg } ;
8+ use crate :: actor1:: { Actor1 , Actor1MpscRef } ;
9+ use crate :: messages:: { AskTicksCountMsg , TicksCount , UpdateMetrics } ;
10+ use more_asserts as ma;
11+ use uactor:: actor:: message:: IntervalMessage ;
12+ use uactor:: data:: datasource_decorator:: DataSourceMapExt ;
1113
1214mod messages {
1315 use uactor:: actor:: message:: { Message , Reply } ;
1416
15- pub struct PingMsg ( pub Reply < PongMsg > ) ;
17+ pub struct AskTicksCountMsg ( pub Reply < TicksCount > ) ;
18+ pub struct UpdateMetrics ;
1619
1720 #[ derive( Debug ) ]
18- pub struct PongMsg ;
21+ pub struct TicksCount ( pub usize ) ;
1922
20- uactor:: message_impl!( PingMsg , PongMsg ) ;
23+ uactor:: message_impl!( AskTicksCountMsg , TicksCount , UpdateMetrics ) ;
2124}
2225
2326mod actor1 {
2427 use uactor:: actor:: abstract_actor:: { Actor , HandleResult , Handler } ;
25- use uactor:: actor:: context:: Context ;
28+ use uactor:: actor:: context:: { ActorContext , Context } ;
2629 use uactor:: actor:: message:: IntervalMessage ;
2730
28- use crate :: messages:: { PingMsg , PongMsg } ;
31+ use crate :: messages:: { AskTicksCountMsg , TicksCount , UpdateMetrics } ;
2932
3033 #[ derive( Default ) ]
3134 pub struct Actor1 {
@@ -34,21 +37,39 @@ mod actor1 {
3437
3538 impl Actor for Actor1 {
3639 type Context = Context ;
40+ type RouteMessage = Actor1Msg ;
3741 type Inject = ( ) ;
3842 type State = ( ) ;
3943 }
4044
41- impl Handler < PingMsg > for Actor1 {
45+ impl Handler < AskTicksCountMsg > for Actor1 {
4246 async fn handle (
4347 & mut self ,
4448 _: & mut Self :: Inject ,
45- ping : PingMsg ,
46- _ctx : & mut Context ,
47- state : & Self :: State ,
49+ AskTicksCountMsg ( reply ) : AskTicksCountMsg ,
50+ ctx : & mut Context ,
51+ _state : & Self :: State ,
4852 ) -> HandleResult {
4953 println ! ( "actor1: Received ping message" ) ;
50- let PingMsg ( reply) = ping;
51- let _ = reply. send ( PongMsg ) ;
54+ let _ = reply. send ( TicksCount ( self . interval_count as usize ) ) ;
55+ ctx. kill ( ) ;
56+ Ok ( ( ) )
57+ }
58+ }
59+
60+ impl Handler < UpdateMetrics > for Actor1 {
61+ async fn handle (
62+ & mut self ,
63+ _: & mut Self :: Inject ,
64+ _: UpdateMetrics ,
65+ _ctx : & mut Context ,
66+ _state : & Self :: State ,
67+ ) -> HandleResult {
68+ self . interval_count += 1 ;
69+ println ! (
70+ "actor1: received {}nd UpdateMetrics message" ,
71+ self . interval_count
72+ ) ;
5273 Ok ( ( ) )
5374 }
5475 }
@@ -62,36 +83,41 @@ mod actor1 {
6283 duration : _,
6384 } : IntervalMessage ,
6485 _ctx : & mut Context ,
65- state : & Self :: State ,
86+ _state : & Self :: State ,
6687 ) -> HandleResult {
67- self . interval_count += 1 ;
68- println ! (
69- "actor1: received {}nd interval message" ,
70- self . interval_count
71- ) ;
88+ // some important logic
7289 Ok ( ( ) )
7390 }
7491 }
7592
76- uactor:: generate_actor_ref!( Actor1 , { PingMsg } ) ;
93+ uactor:: generate_actor_ref!( Actor1 , { AskTicksCountMsg , UpdateMetrics , IntervalMessage } ) ;
7794}
7895
7996#[ tokio:: main]
8097async fn main ( ) -> anyhow:: Result < ( ) > {
81- let mut system = System :: global ( ) . build ( ) ;
98+ use uactor:: data:: datasource_decorator:: DataSourceMapExt ;
99+ let mut system = System :: global ( ) ;
82100
83- // 1 second interval
84- let interval = tokio:: time:: interval ( 1 . std_seconds ( ) ) ;
101+ // 100 milliseconds interval for updating metrics
102+ let metrics_update_interval = tokio:: time:: interval ( 100 . std_milliseconds ( ) )
103+ . map ( |_: IntervalMessage | Actor1Msg :: UpdateMetrics ( UpdateMetrics ) ) ;
104+ // 1 second interval for other tasks
105+ let other_interval = tokio:: time:: interval ( 1 . std_seconds ( ) )
106+ . map ( Actor1Msg :: IntervalMessage ) ;
85107
86- let ( actor1_ref, actor1_stream) = system. register_ref :: < Actor1 , _ , Actor1MpscRef > ( "actor1" ) ;
108+ // Initialize actor's reference
109+ let ( actor1_ref, actor1_stream) = system. register_ref :: < Actor1 , _ , Actor1MpscRef > ( "actor1" ) . await ;
87110
111+ // Spawn actor
88112 let actor1 = Actor1 :: default ( ) ;
89- system. spawn_actor ( actor1_ref. name ( ) , actor1, * actor1_ref. state ( ) , ( actor1_stream, interval ) ) . await ?;
113+ let ( _state , actor_handle ) = system. spawn_actor ( actor1_ref. name ( ) , actor1, * actor1_ref. state ( ) , ( actor1_stream, metrics_update_interval , other_interval ) ) . await ?;
90114
91- let pong = actor1_ref. ask :: < PongMsg > ( PingMsg ) . await ?;
92- println ! ( "main: received {pong:?} message" ) ;
115+ // Wait for 5 ticks
116+ tokio:: time:: sleep ( 500 . std_milliseconds ( ) ) . await ;
117+ let TicksCount ( ticks_count) = actor1_ref. ask :: < TicksCount > ( AskTicksCountMsg ) . await ?;
118+ ma:: assert_ge!( ticks_count, 5 , "waiting 5 ticks and expecting at least 5 messages received" ) ;
93119
94- // waiting 10 seconds and expecting new message each 1 second
95- tokio :: time :: sleep ( 10 . std_seconds ( ) ) . await ;
120+ tokio :: time :: sleep ( 1 . std_microseconds ( ) ) . await ;
121+ assert ! ( actor_handle . is_finished ( ) , "actor should be finished after receiving AskTicksCountMsg" ) ;
96122 Ok ( ( ) )
97123}
0 commit comments