Skip to content

Commit 6422b50

Browse files
committed
Add ability to consume events
1 parent 6fe74ae commit 6422b50

File tree

6 files changed

+177
-0
lines changed

6 files changed

+177
-0
lines changed

api/pkg/entities/message.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ const (
3737
MessageStatusFailed = "failed"
3838
)
3939

40+
// MessageEventName is the type of event generated by the mobile phone for a message
41+
type MessageEventName string
42+
43+
const (
44+
// MessageEventNameSent is emitted when a message is sent by the mobile phone
45+
MessageEventNameSent = "SENT"
46+
)
47+
4048
// Message represents a message sent between 2 phone numbers
4149
type Message struct {
4250
ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
11
package events
22

3+
import "github.com/google/uuid"
4+
35
// EventTypeMessagePhoneSent is emitted when the phone sends a message
46
const EventTypeMessagePhoneSent = "message.phone.sent"
7+
8+
// MessagePhoneSentPayload is the payload of the EventTypeMessagePhoneSent event
9+
type MessagePhoneSentPayload struct {
10+
ID uuid.UUID `json:"id"`
11+
From string `json:"from"`
12+
To string `json:"to"`
13+
Content string `json:"content"`
14+
}

api/pkg/handlers/message_handler.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (h *MessageHandler) RegisterRoutes(router fiber.Router) {
4141
router.Post("/messages/send", h.PostSend)
4242
router.Get("/messages/outstanding", h.GetOutstanding)
4343
router.Get("/messages", h.Index)
44+
router.Post("/messages/:messageID/event", h.PostEvent)
4445
}
4546

4647
// PostSend a new entities.Message
@@ -169,3 +170,46 @@ func (h *MessageHandler) Index(c *fiber.Ctx) error {
169170

170171
return h.responseOK(c, fmt.Sprintf("fetched %d %s", len(*messages), h.pluralize("message", len(*messages))), messages)
171172
}
173+
174+
// PostEvent registers an event on a message
175+
// @Summary Store an event for a message on the mobile phone
176+
// @Description Use this endpoint to send events for a message when it is failed, sent or delivered by the mobile phone.
177+
// @Tags Messages
178+
// @Accept json
179+
// @Produce json
180+
// @Param messageID path string true "ID of the message" default(32343a19-da5e-4b1b-a767-3298a73703ca)
181+
// @Param payload body requests.MessageEvent true "Payload of the event emitted."
182+
// @Success 200 {object} responses.MessageResponse
183+
// @Success 400 {object} responses.BadRequest
184+
// @Success 422 {object} responses.UnprocessableEntity
185+
// @Success 500 {object} responses.InternalServerError
186+
// @Router /messages/:messageID/event [post]
187+
func (h *MessageHandler) PostEvent(c *fiber.Ctx) error {
188+
ctx, span := h.tracer.StartFromFiberCtx(c)
189+
defer span.End()
190+
191+
ctxLogger := h.tracer.CtxLogger(h.logger, span)
192+
193+
var request requests.MessageEvent
194+
if err := c.BodyParser(&request); err != nil {
195+
msg := fmt.Sprintf("cannot marshall [%s] into %T", c.Body(), request)
196+
ctxLogger.Warn(stacktrace.Propagate(err, msg))
197+
return h.responseBadRequest(c, err)
198+
}
199+
200+
request.MessageID = c.Params("messageID")
201+
if errors := h.validator.ValidateMessageEvent(ctx, request); len(errors) != 0 {
202+
msg := fmt.Sprintf("validation errors [%s], while storing event [%s] for message [%s]", spew.Sdump(errors), c.Body(), request.MessageID)
203+
ctxLogger.Warn(stacktrace.NewError(msg))
204+
return h.responseUnprocessableEntity(c, errors, "validation errors while storing event")
205+
}
206+
207+
message, err := h.service.StoreEvent(ctx, request.ToMessageStoreEventParams(c.OriginalURL()))
208+
if err != nil {
209+
msg := fmt.Sprintf("cannot store event for message [%s] with paylod [%s]", request.MessageID, c.Body())
210+
ctxLogger.Error(stacktrace.Propagate(err, msg))
211+
return h.responseInternalServerError(c)
212+
}
213+
214+
return h.responseOK(c, "message event stored successfully", message)
215+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package requests
2+
3+
import (
4+
"time"
5+
6+
"github.com/NdoleStudio/http-sms-manager/pkg/entities"
7+
"github.com/google/uuid"
8+
9+
"github.com/NdoleStudio/http-sms-manager/pkg/services"
10+
)
11+
12+
// MessageEvent is the payload for sending and SMS message
13+
type MessageEvent struct {
14+
// Timestamp is the time when the event was emitted, Please send the timestamp in UTC with as much precision as possible
15+
Timestamp time.Time `json:"sent_at" example:"2022-06-05T14:26:09.527976+03:00"`
16+
17+
// EventName is the type of event
18+
// * SENT: is emitted when a message is sent by the mobile phone (only SENT is implemented)
19+
// * FAILED: is event is emitted when the message could not be sent by the mobile phone
20+
// * DELIVERED: is event is emitted when a delivery report has been received by the mobile phone
21+
EventName string `json:"event_name" example:"SENT"`
22+
23+
MessageID string `json:"messageID" swaggerignore:"true"` // used internally for validation
24+
}
25+
26+
// ToMessageStoreEventParams converts MessageEvent to services.MessageStorePhoneEventParams
27+
func (input MessageEvent) ToMessageStoreEventParams(source string) services.MessageStorePhoneEventParams {
28+
return services.MessageStorePhoneEventParams{
29+
MessageID: uuid.MustParse(input.MessageID),
30+
Source: source,
31+
EventName: entities.MessageEventName(input.EventName),
32+
Timestamp: input.Timestamp,
33+
}
34+
}

api/pkg/services/message_service.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,62 @@ func (service *MessageService) GetMessages(ctx context.Context, params MessageGe
8686
return messages, nil
8787
}
8888

89+
// MessageStorePhoneEventParams parameters registering a message event
90+
type MessageStorePhoneEventParams struct {
91+
MessageID uuid.UUID
92+
EventName entities.MessageEventName
93+
Timestamp time.Time
94+
Source string
95+
}
96+
97+
// StoreEvent handles event generated by a mobile phone
98+
func (service *MessageService) StoreEvent(ctx context.Context, params MessageStorePhoneEventParams) (*entities.Message, error) {
99+
ctx, span := service.tracer.Start(ctx)
100+
defer span.End()
101+
102+
message, err := service.repository.Load(ctx, params.MessageID)
103+
if err != nil {
104+
msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
105+
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
106+
}
107+
108+
switch params.EventName {
109+
case entities.MessageEventNameSent:
110+
err = service.handleMessageSentEvent(ctx, params, message)
111+
default:
112+
return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
113+
}
114+
115+
if err != nil {
116+
msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
117+
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
118+
}
119+
120+
return service.repository.Load(ctx, params.MessageID)
121+
}
122+
123+
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStorePhoneEventParams, message *entities.Message) error {
124+
ctx, span := service.tracer.Start(ctx)
125+
defer span.End()
126+
127+
event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
128+
ID: message.ID,
129+
From: message.From,
130+
To: message.To,
131+
Content: message.Content,
132+
})
133+
if err != nil {
134+
msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
135+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
136+
}
137+
138+
if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
139+
msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
140+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
141+
}
142+
return nil
143+
}
144+
89145
func (service *MessageService) handleOutstandingMessages(ctx context.Context, source string, messages *[]entities.Message) *[]entities.Message {
90146
ctx, span := service.tracer.Start(ctx)
91147
defer span.End()
@@ -278,6 +334,10 @@ func (service *MessageService) createMessagePhoneSendingEvent(source string, pay
278334
return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
279335
}
280336

337+
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
338+
return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
339+
}
340+
281341
func (service *MessageService) createEvent(eventType string, source string, payload any) (cloudevents.Event, error) {
282342
event := cloudevents.NewEvent()
283343

api/pkg/validators/message_handler_validator.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,24 @@ func (validator MessageHandlerValidator) ValidateMessageIndex(_ context.Context,
115115
})
116116
return v.ValidateStruct()
117117
}
118+
119+
// ValidateMessageEvent validates the requests.MessageEvent request
120+
func (validator MessageHandlerValidator) ValidateMessageEvent(_ context.Context, request requests.MessageEvent) url.Values {
121+
v := govalidator.New(govalidator.Options{
122+
Data: &request,
123+
Rules: govalidator.MapData{
124+
"sent_at": []string{
125+
"required",
126+
},
127+
"event_name": []string{
128+
"required",
129+
"in:SENT",
130+
},
131+
"messageID": []string{
132+
"required",
133+
"uuid",
134+
},
135+
},
136+
})
137+
return v.ValidateStruct()
138+
}

0 commit comments

Comments
 (0)