-
Notifications
You must be signed in to change notification settings - Fork 490
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Feature request
Description
I would like to be able to send events to a different topic during processing.
In case of a business error during processing of an event, I'd like to introduce a split of the events.
Example use case
For example a signup of a new customer. It either works, or not. In the handler function I'd like to be able to send the successfully processed signups to the publishTopic as normal, but in case of errors during processing, send them to a dead letter topic which I can manually review or just let them sit there to rot.
How it can look like in code
func signupHandlerFunc() func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
slog.Info("Processing signup message", "message_uuid", msg.UUID, "payload", string(msg.Payload))
consumedPayload := signupEvent{}
err := json.Unmarshal(msg.Payload, &consumedPayload)
if err != nil {
return nil, err
}
// Create processed event
newEvent := processedSignupEvent{
ProcessedID: consumedPayload.ID,
Time: time.Now(),
SuccessMessage: "Signup successful",
ErrorMessage: "",
}
// make 10% of the events fail
if consumedPayload.ID%10 == 0 {
newEvent.SuccessMessage = ""
newEvent.ErrorMessage = "Signup failed due to some error"
}
newPayload, err := json.Marshal(newEvent)
if err != nil {
return nil, err
}
newMessage := message.NewMessage(watermill.NewUUID(), newPayload)
// if there was an error, send the message to a different topic
if newEvent.ErrorMessage != "" {
// Send to error topic
return []*message.Message{newMessage}, nil
}
return []*message.Message{newMessage}, nil
}
}If there is another way to do this, please tell me :) Thanks a lot for this project. Awesome work.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request