Skip to content

Commit 6e17dd2

Browse files
committed
implement proper error handling
1 parent 807ddbb commit 6e17dd2

36 files changed

+420
-226
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
# 0.6.0
2+
3+
- add error handling
4+
- `ActorBuilder.spawn()` now returns a Result, containing either the `ActorWrapper<A>` or an `ActorError`
5+
- added `ActorResult` that is returned by all relevant `Actor` functions
6+
- configures how the Actor should proceed
7+
- Proper panic handling now in all parts of the `Actor` and `Handler<M>` and `ActorFactory<A>`
8+
- panic now triggers `Actor.on_panic` providing the source of panic and allows User to determine how to proceed
9+
- `Actor.on_panic` is allowed to panic once and will be re-triggered in that case. If another panic happens in the retry, the Actor will be stopped
10+
- handling a panic within `ActorFactory<A>.new_actor()` by returning `ActorResult::Restart()` in `Actor.on_panic` can trigger a restart loop that will block the thread until `ActorFactory<A>.new_actor()` was successful
11+
- replaced `RestartPolicy` with `ActorResult`
12+
113
# 0.5.0
214

315
- added `send_after` to `ActorWrapper<A>` to allow sending of delayed messages

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tyra"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
authors = ["sers.dev <[email protected]>"]
55
edition = "2018"
66
license = "MIT OR Apache-2.0"
@@ -26,6 +26,7 @@ crossbeam-channel = "^0.5"
2626
crossbeam-utils = "^0.8"
2727
dashmap = "^4.0"
2828
serde = { version = "^1.0", features = ["derive"] }
29+
thiserror = "1.0.31"
2930

3031
[dev-dependencies]
3132
bincode = "1.3.3"

examples/actor.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::process::exit;
22
use std::time::Duration;
3-
use tyra::prelude::{
4-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, Handler, TyraConfig,
5-
};
3+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, Handler, TyraConfig};
64

75
struct MessageA {
86
text: String,
@@ -41,18 +39,20 @@ impl ActorFactory<HelloWorld> for HelloWorldFactory {
4139
}
4240
}
4341
impl Handler<MessageA> for HelloWorld {
44-
fn handle(&mut self, msg: MessageA, _context: &ActorContext<Self>) {
42+
fn handle(&mut self, msg: MessageA, _context: &ActorContext<Self>) -> ActorResult {
4543
let text: String = [self.text.clone(), String::from(msg.text)].join(" -> ");
4644
self.count += 1;
47-
println!("AAAA: {} Count: {}", text, self.count)
45+
println!("AAAA: {} Count: {}", text, self.count);
46+
ActorResult::Ok
4847
}
4948
}
5049

5150
impl Handler<MessageB> for HelloWorld {
52-
fn handle(&mut self, msg: MessageB, _context: &ActorContext<Self>) {
51+
fn handle(&mut self, msg: MessageB, _context: &ActorContext<Self>) -> ActorResult {
5352
let text: String = [self.text.clone(), String::from(msg.text)].join(" -> ");
5453
self.count -= 1;
55-
println!("BBBB: {} Count: {}", text, self.count)
54+
println!("BBBB: {} Count: {}", text, self.count);
55+
ActorResult::Ok
5656
}
5757
}
5858

examples/benchmark.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use std::process::exit;
22
use std::thread::sleep;
33
use std::time::{Duration, Instant};
4-
use tyra::prelude::{
5-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, Handler, TyraConfig,
6-
};
4+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, Handler, TyraConfig};
75

86
struct MessageA {}
97

@@ -41,7 +39,7 @@ impl Benchmark {
4139
impl Actor for Benchmark {}
4240

4341
impl Handler<MessageA> for Benchmark {
44-
fn handle(&mut self, _msg: MessageA, context: &ActorContext<Self>) {
42+
fn handle(&mut self, _msg: MessageA, context: &ActorContext<Self>) -> ActorResult {
4543
if self.count == 0 {
4644
println!("Sleep 3 now");
4745
sleep(Duration::from_secs((3) as u64));
@@ -63,6 +61,7 @@ impl Handler<MessageA> for Benchmark {
6361
if self.count == self.total_msgs {
6462
context.system.stop(Duration::from_secs(60));
6563
}
64+
ActorResult::Ok
6665
}
6766
}
6867

examples/bulk_router_benchmark.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use std::process::exit;
22
use std::thread::sleep;
33
use std::time::{Duration, Instant};
4-
use tyra::prelude::{
5-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, ActorWrapper, Handler, TyraConfig,
6-
};
4+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, ActorWrapper, Handler, TyraConfig};
75
use tyra::router::{AddActorMessage, BulkRouterMessage, RoundRobinRouterFactory};
86

97
struct MessageA {}
@@ -63,7 +61,7 @@ impl Benchmark {
6361
impl Actor for Benchmark {}
6462

6563
impl Handler<MessageA> for Benchmark {
66-
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) {
64+
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) -> ActorResult {
6765
if self.count == 0 {
6866
sleep(Duration::from_secs((3) as u64));
6967
self.start = Instant::now();
@@ -79,6 +77,7 @@ impl Handler<MessageA> for Benchmark {
7977
if self.count == self.total_msgs {
8078
self.aggregator.send(Finish {});
8179
}
80+
ActorResult::Ok
8281
}
8382
}
8483

@@ -116,7 +115,7 @@ impl ActorFactory<Aggregator> for AggregatorFactory {
116115
}
117116

118117
impl Handler<Finish> for Aggregator {
119-
fn handle(&mut self, _msg: Finish, _context: &ActorContext<Self>) {
118+
fn handle(&mut self, _msg: Finish, _context: &ActorContext<Self>) -> ActorResult {
120119
self.actors_finished += 1;
121120
if self.actors_finished == self.total_actors {
122121
let duration = self.start.elapsed();
@@ -126,13 +125,15 @@ impl Handler<Finish> for Aggregator {
126125
);
127126
self.ctx.system.stop(Duration::from_secs(60));
128127
}
128+
ActorResult::Ok
129129
}
130130
}
131131

132132
impl Handler<Start> for Aggregator {
133-
fn handle(&mut self, _msg: Start, _context: &ActorContext<Self>) {
133+
fn handle(&mut self, _msg: Start, _context: &ActorContext<Self>) -> ActorResult {
134134
sleep(Duration::from_secs((3) as u64));
135135
self.start = Instant::now();
136+
ActorResult::Ok
136137
}
137138
}
138139

examples/delayed.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ impl ActorFactory<TemplateActor> for TemplateActorFactory {
5454
//Handler//
5555
///////////
5656
impl Handler<TemplateMessage> for TemplateActor {
57-
fn handle(&mut self, _msg: TemplateMessage, _context: &ActorContext<Self>) {
58-
println!("{}", _msg.id)
57+
fn handle(&mut self, _msg: TemplateMessage, _context: &ActorContext<Self>) -> ActorResult {
58+
println!("{}", _msg.id);
59+
ActorResult::Ok
5960
}
6061
}
6162

examples/error.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::process::exit;
22
use std::time::Duration;
3-
use tyra::prelude::{
4-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, Handler, TyraConfig,
5-
};
3+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, Handler, TyraConfig};
64

75
#[derive(Clone)]
86
struct ErrMsg {
@@ -19,12 +17,13 @@ struct ErrActor {
1917
impl Actor for ErrActor {}
2018

2119
impl Handler<ErrMsg> for ErrActor {
22-
fn handle(&mut self, msg: ErrMsg, _context: &ActorContext<Self>) {
20+
fn handle(&mut self, msg: ErrMsg, _context: &ActorContext<Self>) -> ActorResult {
2321
self.counter += 1;
2422
if msg.text == "sers+1" {
2523
panic!("ficl");
2624
}
2725
println!("Received SERS: {}", self.counter);
26+
ActorResult::Ok
2827
}
2928
}
3029

examples/getting_started.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ impl ActorFactory<TemplateActor> for TemplateActorFactory {
5353
//Handler//
5454
///////////
5555
impl Handler<TemplateMessage> for TemplateActor {
56-
fn handle(&mut self, _msg: TemplateMessage, _context: &ActorContext<Self>) {
57-
println!("SERS: {}", _msg.id)
56+
fn handle(&mut self, _msg: TemplateMessage, _context: &ActorContext<Self>) -> ActorResult {
57+
println!("SERS: {}", _msg.id);
58+
ActorResult::Ok
5859
}
5960
}
6061

examples/router.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use std::process::exit;
22
use std::thread::sleep;
33
use std::time::Duration;
4-
use tyra::prelude::{
5-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, Handler, TyraConfig,
6-
};
4+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, Handler, TyraConfig};
75
use tyra::router::{AddActorMessage, RemoveActorMessage, RoundRobinRouterFactory, RouterMessage};
86

97
struct MessageA {}
@@ -23,9 +21,10 @@ impl ActorFactory<HelloWorld> for HelloWorldFactory {
2321
}
2422
}
2523
impl Handler<MessageA> for HelloWorld {
26-
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) {
24+
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) -> ActorResult {
2725
self.counter += 1;
2826
println!("Received MSG {}", self.counter);
27+
ActorResult::Ok
2928
}
3029
}
3130

examples/router_benchmark.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use std::process::exit;
2-
use std::thread::sleep;
32
use std::time::{Duration, Instant};
4-
use tyra::prelude::{
5-
Actor, ActorContext, ActorFactory, ActorMessage, ActorSystem, ActorWrapper, Handler, TyraConfig,
6-
};
3+
use tyra::prelude::{Actor, ActorContext, ActorFactory, ActorMessage, ActorResult, ActorSystem, ActorWrapper, Handler, TyraConfig};
74
use tyra::router::{AddActorMessage, RoundRobinRouterFactory, RouterMessage};
85

96
struct MessageA {}
@@ -53,9 +50,9 @@ impl Benchmark {
5350
impl Actor for Benchmark {}
5451

5552
impl Handler<MessageA> for Benchmark {
56-
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) {
53+
fn handle(&mut self, _msg: MessageA, _context: &ActorContext<Self>) -> ActorResult {
5754
if self.count == 0 {
58-
sleep(Duration::from_secs((3) as u64));
55+
//sleep(Duration::from_secs((3) as u64));
5956
self.start = Instant::now();
6057
}
6158
self.count += 1;
@@ -69,6 +66,7 @@ impl Handler<MessageA> for Benchmark {
6966
if self.count == self.total_msgs {
7067
self.aggregator.send(Finish {});
7168
}
69+
ActorResult::Ok
7270
}
7371
}
7472

@@ -106,7 +104,7 @@ impl ActorFactory<Aggregator> for AggregatorFactory {
106104
}
107105

108106
impl Handler<Finish> for Aggregator {
109-
fn handle(&mut self, _msg: Finish, _context: &ActorContext<Self>) {
107+
fn handle(&mut self, _msg: Finish, _context: &ActorContext<Self>) -> ActorResult {
110108
self.actors_finished += 1;
111109
if self.actors_finished == self.total_actors {
112110
let duration = self.start.elapsed();
@@ -116,13 +114,15 @@ impl Handler<Finish> for Aggregator {
116114
);
117115
self.ctx.system.stop(Duration::from_secs(60));
118116
}
117+
ActorResult::Ok
119118
}
120119
}
121120

122121
impl Handler<Start> for Aggregator {
123-
fn handle(&mut self, _msg: Start, _context: &ActorContext<Self>) {
124-
sleep(Duration::from_secs((3) as u64));
122+
fn handle(&mut self, _msg: Start, _context: &ActorContext<Self>) -> ActorResult {
123+
//sleep(Duration::from_secs((3) as u64));
125124
self.start = Instant::now();
125+
ActorResult::Ok
126126
}
127127
}
128128

@@ -132,7 +132,7 @@ fn main() {
132132

133133
let message_count = 10000000;
134134
// ideal number is "amount of threads - 3"
135-
let actor_count = 7;
135+
let actor_count = 10;
136136

137137
let router_factory = RoundRobinRouterFactory::new();
138138
let router = actor_system

0 commit comments

Comments
 (0)