-
Notifications
You must be signed in to change notification settings - Fork 1
Receive and Send Messages
For class-based actors, we don't explicitly ask the actor to receive messages, because it starts listening for messages once it is running. We only need to define the behavior of message processing, i.e., by overriding ActorBehavior::behavior().
For lambda-based actors or for the cases we want to customize how the actors receive messages, we can use ActorBehavior::receive() to receive multiple messages, or ActorBehavior::receive_once() to receive one message. Both receive() and receive_once() require a MessageHandlers. ActorBehavior::receive() will keep receiving message until ActorBehavior::deactivate() is called. Here are some examples:
actor_system.spawn([&](ActorBehavior& self) {
// 1. receive multiple messages
self.receive({
Code{0} - [&](...) { ... },
Code{1} - [&](...) { ... },
...
Code{99} - [&]() { self.deactivate(); } // this will stop `receive()`
});
// 2. wait until one message is received
self.receive_once({
Code{0} - [&](...) { ... }
});
// 3. receive one message with timeout
self.receive_once({
Code{0} - [&](...) { ... }
}, std::chrono::milliseconds{1000});
// 4. Try to receive a message. Return immediately if there is no message currently.
self.receive_once({
Code{0} - [&](...) { ... }
}, true);
// 5. Process raw Messages
self.receive([&](Message* m) {
...
delete m; // need to delete the message after use.
});
// 6. Process one raw Messages
self.receive_once([&](Message* m) {
...
delete m; // need to delete the message after use.
});
});There are different ways to deliver a message to an actor, including send, reply, request and delayed_send.
Suppose we have the Actor handle of the receiver actor who we want to send a message to:
// `this` is an ActorBehavior
this->send(
receiver, // (Actor) the actor who receives the message.
code, // (size_t or Code) the code of the message, used to locate the MessageHandler in the receiver side.
type, // (Message::Type) (Optional. Default to Normal) the type of the mssage.
content ... // (Any type) the elements of the message.
);The actor will not wait for the reply of the message that is sent (if there is any reply). It only guarantees the message is sent but does not guarantee the message is received or processed by the receiver. The processing logic is asynchronous.
The order of the messages sent from one actor to another actor is FIFO.
ActorBehavior::reply is a shortcut for sending a message back to the sender actor who sends the message that is being processed at the moment.
// `this` is an ActorBehavior
this->reply(code, type, content ...);
// which is equivalent to
// this->send(this->get_current_sender_actor(), code, type, content ...);For some cases, we want to send a message and process its reply, before we process other messages. This can be done by ActorBehavior::request, where we send out a Message::Type::Request message, wait for the reply and process the reply by the MessageHandler specified in the on_reply:
this->request(receiver, code, type, content ...).on_reply({
// What reply we should receive and how we process it.
code - [&](...) { ... }
});
// which is equivalent to
// this->send(receiver, code, Message::Type::Request, content ...).on_reply(...);After ActorBehavior::request() is called and the request is sent, the actor will actively wait for a Message::Type::Response message. Before receiving a response message, messages of the other types will be stored and the processing of these messages will be delayed. ActorBehavior::request() is synchronous.
To reply to a Message::Type::Request message, the actor should send a Message::Type::Response message:
// By ActorBehavior::send, set the type to Message::Type::Response
this->send(receiver, code, Message::Type::Response, content ...);
// By ActorBehavior::reply, which sets the type of the message to Message::Type::Response, if the received message is Message::Type::Request.
this->reply(code, content ...);That delayed_send allows us to specify a delay for sending a message. E.g., an actor sends a message to itself with a fixed delay repeatedly in order to take some actions periodically.
// `this` is an `ActorBehavior`
this->delayed_send(
delay, // std::chrono::duration
receiver, code, Message::Type::Response, content ...);Below shows an example where the actor executes a periodical task every 500ms.
struct NewActor : public ActorBehavior {
constexpr static Code PeriodicalTask{0};
void start() override {
this->send(*this, PeriodicalTask);
}
ActorBehavior behavior() override {
return {
PeriodicalTask - [&]() {
this->periodical_task();
this->delayed_send(std::chrono::milliseconds{500}, *this, PeriodicalTask);
}
};
}
}Note that the delay specified in delayed_send only means the message will be sent after the delay, instead of being sent right at the time when the delay is reached.