Skip to content

Commit db7127e

Browse files
committed
Add update for threads
1 parent 6422b50 commit db7127e

File tree

6 files changed

+111
-8
lines changed

6 files changed

+111
-8
lines changed

api/pkg/entities/message.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ func (message Message) IsSending() bool {
7272
return message.Status == MessageStatusSending
7373
}
7474

75+
// Sent registers a message as sent
76+
func (message *Message) Sent(timestamp time.Time) *Message {
77+
sendDuration := timestamp.UnixNano() - message.RequestReceivedAt.UnixNano()
78+
message.SentAt = &timestamp
79+
message.Status = MessageStatusSent
80+
message.OrderTimestamp = timestamp
81+
message.SendDuration = &sendDuration
82+
return message
83+
}
84+
7585
// AddSendAttempt configures a Message for sending
7686
func (message *Message) AddSendAttempt(timestamp time.Time) *Message {
7787
message.Status = MessageStatusSending

api/pkg/handlers/message_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (h *MessageHandler) RegisterRoutes(router fiber.Router) {
4141
router.Post("/messages/send", h.PostSend)
4242
router.Get("/messages/outstanding", h.GetOutstanding)
4343
router.Get("/messages", h.Index)
44-
router.Post("/messages/:messageID/event", h.PostEvent)
44+
router.Post("/messages/:messageID/events", h.PostEvent)
4545
}
4646

4747
// PostSend a new entities.Message
@@ -183,7 +183,7 @@ func (h *MessageHandler) Index(c *fiber.Ctx) error {
183183
// @Success 400 {object} responses.BadRequest
184184
// @Success 422 {object} responses.UnprocessableEntity
185185
// @Success 500 {object} responses.InternalServerError
186-
// @Router /messages/:messageID/event [post]
186+
// @Router /messages/:messageID/events [post]
187187
func (h *MessageHandler) PostEvent(c *fiber.Ctx) error {
188188
ctx, span := h.tracer.StartFromFiberCtx(c)
189189
defer span.End()

api/pkg/handlers/message_thread_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (h *MessageThreadHandler) RegisterRoutes(router fiber.Router) {
4444
// Index returns message threads for a phone number
4545
// @Summary Get message threads for a phone number
4646
// @Description Get list of contacts which a phone number has communicated with (threads). It will be sorted by timestamp in descending order.
47-
// @Tags Messages
47+
// @Tags MessageThreads
4848
// @Accept json
4949
// @Produce json
5050
// @Param owner query string true "owner phone number" default(+18005550199)

api/pkg/listeners/message_listener.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func NewMessageListener(
3939
return l, map[string]events.EventListener{
4040
events.EventTypeMessageAPISent: l.OnMessageAPISent,
4141
events.EventTypeMessagePhoneSending: l.OnMessagePhoneSending,
42+
events.EventTypeMessagePhoneSent: l.OnMessagePhoneSent,
4243
}
4344
}
4445

@@ -107,7 +108,7 @@ func (listener *MessageListener) OnMessagePhoneSending(ctx context.Context, even
107108
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
108109
}
109110

110-
handleParams := services.HandleMessageSendingParams{
111+
handleParams := services.HandleMessageParams{
111112
ID: payload.ID,
112113
Timestamp: event.Time(),
113114
}
@@ -120,6 +121,43 @@ func (listener *MessageListener) OnMessagePhoneSending(ctx context.Context, even
120121
return listener.storeEventListenerLog(ctx, listener.signature(event), event)
121122
}
122123

124+
// OnMessagePhoneSent handles the events.EventTypeMessagePhoneSent event
125+
func (listener *MessageListener) OnMessagePhoneSent(ctx context.Context, event cloudevents.Event) error {
126+
ctx, span := listener.tracer.Start(ctx)
127+
defer span.End()
128+
129+
handled, err := listener.repository.Has(ctx, event.ID(), listener.signature(event))
130+
if err != nil {
131+
msg := fmt.Sprintf("cannot verify if event [%s] has been handled by [%T]", event.ID(), listener.signature(event))
132+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
133+
}
134+
135+
ctxLogger := listener.tracer.CtxLogger(listener.logger, span)
136+
137+
if handled {
138+
ctxLogger.Info(fmt.Sprintf("event [%s] has already been handled by [%s]", event.ID(), listener.signature(event)))
139+
return nil
140+
}
141+
142+
var payload events.MessagePhoneSentPayload
143+
if err = event.DataAs(&payload); err != nil {
144+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
145+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
146+
}
147+
148+
handleParams := services.HandleMessageParams{
149+
ID: payload.ID,
150+
Timestamp: event.Time(),
151+
}
152+
153+
if err = listener.service.HandleMessageSent(ctx, handleParams); err != nil {
154+
msg := fmt.Sprintf("cannot handle [%s] for message with ID [%s] for event with ID [%s]", event.Type(), handleParams.ID, event.ID())
155+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
156+
}
157+
158+
return listener.storeEventListenerLog(ctx, listener.signature(event), event)
159+
}
160+
123161
func (listener *MessageListener) signature(event cloudevents.Event) string {
124162
return listener.handlerSignature(listener, event)
125163
}

api/pkg/listeners/message_thread_listener.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func NewMessageThreadListener(
3939
return l, map[string]events.EventListener{
4040
events.EventTypeMessageAPISent: l.OnMessageAPISent,
4141
events.EventTypeMessagePhoneSending: l.OnMessagePhoneSending,
42+
events.EventTypeMessagePhoneSent: l.OnMessagePhoneSent,
4243
}
4344
}
4445

@@ -96,6 +97,33 @@ func (listener *MessageThreadListener) OnMessagePhoneSending(ctx context.Context
9697
return nil
9798
}
9899

100+
// OnMessagePhoneSent handles the events.EventTypeMessagePhoneSent event
101+
func (listener *MessageThreadListener) OnMessagePhoneSent(ctx context.Context, event cloudevents.Event) error {
102+
ctx, span := listener.tracer.Start(ctx)
103+
defer span.End()
104+
105+
var payload events.MessagePhoneSentPayload
106+
if err := event.DataAs(&payload); err != nil {
107+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
108+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
109+
}
110+
111+
updateParams := services.MessageThreadUpdateParams{
112+
Owner: payload.From,
113+
Contact: payload.To,
114+
Timestamp: event.Time(),
115+
Content: payload.Content,
116+
MessageID: payload.ID,
117+
}
118+
119+
if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
120+
msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
121+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
122+
}
123+
124+
return nil
125+
}
126+
99127
func (listener *MessageThreadListener) updateThread(ctx context.Context, params services.MessageThreadUpdateParams) error {
100128
return listener.service.UpdateThread(ctx, params)
101129
}

api/pkg/services/message_service.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (service *MessageService) SendMessage(ctx context.Context, params MessageSe
226226

227227
event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
228228
if err != nil {
229-
msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event)
229+
msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.ID)
230230
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
231231
}
232232

@@ -293,14 +293,14 @@ func (service *MessageService) StoreMessage(ctx context.Context, params MessageS
293293
return message, nil
294294
}
295295

296-
// HandleMessageSendingParams are parameters for registering a new message being sent
297-
type HandleMessageSendingParams struct {
296+
// HandleMessageParams are parameters for handling a message event
297+
type HandleMessageParams struct {
298298
ID uuid.UUID
299299
Timestamp time.Time
300300
}
301301

302302
// HandleMessageSending handles when a message is being sent
303-
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageSendingParams) error {
303+
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
304304
ctx, span := service.tracer.Start(ctx)
305305
defer span.End()
306306

@@ -326,6 +326,33 @@ func (service *MessageService) HandleMessageSending(ctx context.Context, params
326326
return nil
327327
}
328328

329+
// HandleMessageSent handles when a message is has been sent by a mobile phone
330+
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
331+
ctx, span := service.tracer.Start(ctx)
332+
defer span.End()
333+
334+
ctxLogger := service.tracer.CtxLogger(service.logger, span)
335+
336+
message, err := service.repository.Load(ctx, params.ID)
337+
if err != nil {
338+
msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
339+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
340+
}
341+
342+
if !message.IsSending() {
343+
msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
344+
return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
345+
}
346+
347+
if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
348+
msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
349+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
350+
}
351+
352+
ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
353+
return nil
354+
}
355+
329356
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
330357
return service.createEvent(events.EventTypeMessageAPISent, source, payload)
331358
}

0 commit comments

Comments
 (0)