Replies: 5 comments 3 replies
-
|
Thanks for sharing your considerations on this topic. I add my thoughts at a very early stage. I still need more reflection and discussion. First of all, we should distinguish between poison message and poison pill, which refer to two different concepts. The literature does not seem to be unified on the terminology, and this is partly my fault as I wasn't aware of this ambiguity when I first mentioned it. Poison messages are those that cause a consumer to fail every time they are received. This is what you're referring to in the first link. I believe this concept is mostly relevant in truly distributed systems, where senders expect acknowledgments ("ack") when a message is processed. For example, in RabbitMQ:
The main issue here is that failing messages cause requeueing, which can lead to infinite processing loops or similar problems. In contrast, SObjectizer uses a fire-and-forget messaging model, so we do not face the requeue problem. However, any effort toward issue detection could be beneficial, and your reflections on runtime monitoring and "urgent deregistration" are quite interesting. Now let me introduce the other pattern: the poison pill. Again, terminology on the web is mixed, but I interpret a poison pill as a special message that signals no more messages will be sent, allowing consumers to terminate. A similar concept is explained here. A poison pill could be seen as a special message that signals to a subscriber: please, unsubscribe. Then the subscriber can take appropriate action. This raises new questions:
Technically, we can already handle a poison pill signal. The opportunity lies in formalizing and simplifying its usage. I see two main opportunities:
// Using a standard poison pill type
agent_t(agent_context_t ctx)
: agent_t(ctx + deactivate_on_poison_pill())
// Specifying a custom poison pill type
agent_t(agent_context_t ctx)
: agent_t(ctx + deactivate_on<poison_pill_type>())
// Or specifying the reaction with an enum
agent_t(agent_context_t ctx)
: agent_t(ctx + on_poison_pill(deactivate))
agent_t(agent_context_t ctx)
: agent_t(ctx + on_poison_pill(deregister))
agent_t(agent_context_t ctx)
: agent_t(ctx + on_poison_pill(unsubscribe))And the send might just: send<so_5::poison_pill>(...);
// or
send_poison_pill(...);I think your "urgent deregistration" idea overlaps a bit with this. That's all for now. I won’t add more to these ramblings. Please let me know your thoughts. |
Beta Was this translation helpful? Give feedback.
-
|
Sure, I recognize this as a common pattern. However, the key point is that RabbitMQ inherently supports an "ack" request-response mechanism, whereas in SObjectizer, we need to implement such behavior ourselves. My thought is that the concept of a poison message seems more naturally rooted to systems where the acknowledgment cycle is built into the infrastructure. That said, having support for detecting and reacting to such issues in SObjectizer would still be valuable. |
Beta Was this translation helpful? Give feedback.
-
|
A few words about the "urgent message delivery" idea. However, it is not an idea yet, just a fantasy, but I have nothing better at the moment. Let's suppose that I mean that now we have void
agent_t::demand_handler_on_message(
current_thread_id_t working_thread_id,
execution_demand_t & d )
{
message_limit::control_block_t::decrement( d.m_limit );
auto handler = d.m_receiver->m_handler_finder(
d, "demand_handler_on_message" );
if( handler )
process_message(
working_thread_id,
d,
handler->m_thread_safety,
handler->m_method );
}So This logic could be changed this way: void
agent_t::demand_handler_on_message(
current_thread_id_t working_thread_id,
execution_demand_t & d )
{
message_limit::control_block_t::decrement( d.m_limit );
std::unique_ptr<execution_demand_t> urgent_demand{
m_urgent_demand.exchange(nullptr)
};
if( urgent_demand )
{
// Handle urgent_demand first, and only then we'll return to the
// current incoming demand.
auto handler = d.m_receiver->m_handler_finder(
*urgent_demand, "demand_handler_on_message" );
if( handler )
process_message(
working_thread_id,
*urgent_demand,
handler->m_thread_safety,
handler->m_method );
}
auto handler = d.m_receiver->m_handler_finder(
d, "demand_handler_on_message" );
if( handler )
process_message(
working_thread_id,
d,
handler->m_thread_safety,
handler->m_method );
}It means that an additional field of It's an open question how "urgent message" will be sent to an agent. Maybe it will be a special function, something like: so_5::send_urgently<msg_type>(dest, ...);Maybe it will be another form of existing so_5::send<msg_type>(so_5::urgent_delivery_to(agent_ptr), ...);or: so_5::send<so_5::urgent_message<msg_type>>(dest, ...);But it seems that such an urgent message can't be delivered via I don't know how a message instance passed to It also seems that a new urgent message will replace the existing one. For example: so_5::send_urgently<msg_type1>(dest, ...); // (1)
so_5::send_urgently<msg_type2>(dest, ...); // (2)It's possible that the first message will be handled before completion of point (1). It also seems that urgent messages won't be counted by message limits. There is also an open question with exception safety for an agent. This area has to be investigated. |
Beta Was this translation helpful? Give feedback.
-
|
I agree with your point that handling the poison pill should ideally be as "automatic" as deregistration. However, backward compatibility could be a concern, as older agents might be unintentionally deactivated upon receiving a poison pill. That's why I opted for a more conservative approach by introducing a context option. As I imagine it, poisoning could be triggered either via a standard This brings us to the next question: should a poisoned agent transition to the deactivated state, or should we introduce a new state like poisoned? And if all agents within a cooperation are poisoned, should that trigger deregistration of the entire cooperation? Personally, I lean toward using the deactivated state followed by deregistration, as it keeps the model simpler, but I might be overlooking something. Maybe cooperation deregistration could be triggered automatically by the last poisoned agent? Regarding internal changes: I agree this is a relatively niche feature. If the implementation cost is significant, it might be worth reconsidering. Lastly, how does this relate to issue #96? When a poison pill is received, it’s expected to take precedence over other messages in the queue. This is similar to #96, though it’s not part of a shutdown process. Perhaps there are shortcuts or ideas from #96 that could be reused here? |
Beta Was this translation helpful? Give feedback.
-
|
I have a couple of comments:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
There is an interesting topic in message-handling systems: "poison pill" messages. A very good introduction to this topic can be found here: Kafka Poison Pill.
An application built on SObjectizer can also be seen as a "message handling system", so it also can suffer from "poison pill" messages. So it's interesting to investigate such a topic relative to SObjectizer. This post contains the very first thoughts on this topic. Maybe this post starts a journey to invent and implement some useful tools to cope with the "poison pill" message problem.
Disclaimer: everything described below is my personal opinion so it may be totally wrong. And if you find something that contradicts your point of view then feel free to tell me about it.
First of all, I think that the "poison pill" effect is a consequence of an error in application code. An agent receives a message it can't handle because of an error in agent implementation (or in the code that agent uses, for example, agent tries to deserialize message data and encounters a bug in external deserialization library). There are could be different consequences of such errors:
SObjectizer can't protect from errors in the code of user-written agents or in libraries used in user-written agents. But we can discuss how SObjectizer could help with the consequences of errors in the users agents.
There is nothing to do if an error leads to an application crash. I think the only good way to fight with such an error is to use multi-process configuration where agents work in different processes with separate address spaces and communicate via some form of interprocess communications (IPC) like sockets, pipes, message brokers, database tables or even files. In such a configuration a problematic agent will crash just one process, but doesn't affect other processes. The crashed process will be restarted by some supervisor.
If a problematic agent throws exceptions then this case can be handled by exception reactions already provided by SObjectizer. Please see the corresponding section in the project Wiki for more details.
The most interesting case is when an agent stays in the SObjectizer Environment, but doesn't work properly.
SObjectizer can't control the correctness of the agent's behaviour. For example, SObjectizer doesn't know that when an agent receives message A it has to reply by message B. This is a part of application logic and SObjectizer doesn't know anything about this logic.
But maybe SObjectizer can help in the following two areas.
The first. An agent hangs and doesn't return control back to the dispatcher. Think of it as an infinite loop in the agent's event handler. The agent's work can't be interrupted, but if such a case is diagnosed then the application will have a chance to fail and restart as early as possible.
I think it could be done by using SObjectizer's run-time monitoring facilities (information from dispatchers in particular). At the moment it's possible to control the amount of demands in dispatchers' queues.
But the count of demands in a queue is not a precise indicator, especially if an agent receives messages from time to time (once in several seconds or even in several minutes).
Maybe it is worth adding a data source like the time of the last event handler execution time. Now SObjectizer provides the
work_thread_activitymessage that contains only the number of event handler executions and total+average times. It's possible to analyze such information (for example, if total time grows but the number of handled demands does not, then it's a sign that the last event handler works too long). But I think it's not as convenient as it should be.So there is an idea to add another message to
so_5::stats::messagesnamespace. Something likelast_handler_execution_time. This message may be sent only if the execution time exceeds some threshold (1s for example).An application can have a monitor agent that receives
last_handler_execution_timeand checks a number inside the message. If the number exceeds some maximum value then the application may decide that some of its agents hang and initiate the immediate restart.There is an open question: what should be included into
last_handler_execution_timein addition to a time duration? The name of the agent? The direct mbox of the agent? The coop_handle of the agent cooperation?The second. Deregistration of the problematic agent. It's not hard to deregister an agent if you have
coop_handle_trelated to the agent's coop. We won't discuss how such a handle can be obtained.The main problem I think is that the normal deregistration procedure could be inappropriate if we decided that an agent doesn't work properly. The normal procedure means that all pending demands will be delivered to the agent. And this may not be what we wanted.
So I wonder if there is a way to do something like "a distant call of agent's
so_deactivate_agent" from outside an agent. May be in the form of "urgent deregistration". Maybe it's possible to add some hypotheticalderegister_coop_urgentlymethod to theenvironment_tclass. This method will do the typical deregistration related actions with one additional step: all agents in the cooperation will be deactivated (so_deactivate_agentwill be called for every agent somehow). Because of that additional step agents from the cooperation being deregistered won't handle pending messages.This "urgent deregistration" won't remove the coop from SOEnv immediately, this operation will still be asynchronous with unpredictable delays. But if there are 100500 pending messages waiting for processing they will be skipped because all agents from deregistered cooperation will be deactivated first.
Disclaimer №1. I don't know yet how this "distant deactivation" can be implemented. Just believe that this is possible somehow.
Disclaimer №2. The described approach with "urgent deregistration" won't help with agents that are hung in infinite loops.
Beta Was this translation helpful? Give feedback.
All reactions