Multiple event types on the same Kafka topic #3674
-
|
Hi, We want to produce different events on the same Kafka topic, to ensure the order of them. On the consumer side, we would like to create their appropriate handlers.
Is there a way to implement subscriptions to this? Does Brighter support this? Our current approaches so far: A) If we create two message mappers and subscribe to the same topic, only one of them will be invoked. B) If we implement one mapper and deserialize based on a custom header value (specifying the payload's type) no eventhandler will be invoked after the Something like this: C) Our only approach that worked is to create one message mapper and one event handler. This means that we must differentiate the business logic in the handler and cannot develop separate handlers for different events. Could you please advise? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
|
@iancooper you are working on this for V10 right? for V9 do you have any suggestion? I think you can use the mediator pattern, keep on message for subscription and send the command in message handler like // Your current code
public class UserCreatedEvent : UserEvent { ... }
public class UserUpdatedEvent : UserEvent { ... }
public class UserMapper
{
public UserEvent MapToRequest(Message message)
{
var requestType = GetHeaderValue(message, "RequestType");
var json = message.Body.Value;
return requestType switch
{
"UserCreated" => JsonSerializer.Deserialize<UserCreatedEvent>(json),
"UserUpdated" => JsonSerializer.Deserialize<UserUpdatedEvent>(json),
_ => throw new ArgumentException($"Unknown message type: {requestType}")
};
}
}
// My suggestion
public class UserCreatedEventHandler : RequestHandler<UserCreatedEvent> { ... }
public class UserUpdatedEventHandler : RequestHandler<UserUpdatedEvent> { ... }
public class UserEventHandler(IAmACommandProcessor processor) : RequestHandler<UserEvent>
{
public void Handle(UserEvent request)
{
if(request is UserCreatedEvent created)
{
processor.Send(created);
}
else if(request is UserUpdatedEvent updated)
{
processor.Send(updated);
}
}
} |
Beta Was this translation helpful? Give feedback.
-
|
So @lillo42's answer is the best option in V9, as we only support DataType Channels in that model and use snapshot events. As a broad point, in an event-driven architecture, you are listening to events about two things from an upstream:
The former is best handled as snapshots, i.e., give me the current view of a User. Typically, version stamp it, and downstream only overwrite if it's later than a version you have already seen (this allows you to cope with out-of-order updates if they occur). By using this approach, you can have compaction on your stream. The latter might be deltas, just representing the last change, but typically are homomorphic because they are about the same thing. That said, within a service boundary, for durable execution, it can be helpful to use deltas; so, let's assume you still want to. In V9, our best approach is to define a standard base class, as @lillo42 describes above. In V10, we will allow you to determine the type that you return from a channel from the message contents and call the relevant mapping and handling pipeline. I am working on this right now; the V10 final is scheduled for August. |
Beta Was this translation helpful? Give feedback.
@iancooper you are working on this for V10 right? for V9 do you have any suggestion?
I think you can use the mediator pattern, keep on message for subscription and send the command in message handler like