Skip to content

Commit 358dbe7

Browse files
committed
Fix outstanding messages
1 parent deeb68d commit 358dbe7

File tree

12 files changed

+78
-164
lines changed

12 files changed

+78
-164
lines changed

api/pkg/entities/heartbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ type Heartbeat struct {
1212
Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" example:"+18005550199"`
1313
UserID UserID `json:"userID" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"`
1414
Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" example:"2022-06-05T14:26:01.520828+03:00"`
15-
Quantity int `json:"quantity" example:"2"`
15+
MessageID uuid.UUID `json:"message_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
1616
}
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package events
22

33
import (
4+
"time"
5+
46
"github.com/NdoleStudio/http-sms-manager/pkg/entities"
57
"github.com/google/uuid"
68
)
@@ -10,9 +12,10 @@ const EventTypeMessagePhoneSending = "message.phone.sending"
1012

1113
// MessagePhoneSendingPayload is the payload of the EventTypeMessageSent event
1214
type MessagePhoneSendingPayload struct {
13-
ID uuid.UUID `json:"id"`
14-
UserID entities.UserID `json:"user_id"`
15-
Owner string `json:"owner"`
16-
Contact string `json:"contact"`
17-
Content string `json:"content"`
15+
ID uuid.UUID `json:"id"`
16+
UserID entities.UserID `json:"user_id"`
17+
Timestamp time.Time `json:"timestamp"`
18+
Owner string `json:"owner"`
19+
Contact string `json:"contact"`
20+
Content string `json:"content"`
1821
}

api/pkg/handlers/message_handler.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,19 @@ func (h *MessageHandler) PostSend(c *fiber.Ctx) error {
9393
return h.responseOK(c, "message added to queue", message)
9494
}
9595

96-
// GetOutstanding returns entities.Message which are still to be sent by the mobile phone
97-
// @Summary Get outstanding messages
98-
// @Description Get list of messages which are outstanding to be sent by the phone
96+
// GetOutstanding returns an entities.Message which is still to be sent by the mobile phone
97+
// @Summary Get an outstanding message
98+
// @Description Get an outstanding message to be sent by an android phone
9999
// @Security ApiKeyAuth
100100
// @Tags Messages
101101
// @Accept json
102102
// @Produce json
103-
// @Param owner query string true "The owner's phone number" default(+18005550199)
104-
// @Param limit query int false "Number of outstanding messages to fetch" minimum(1) maximum(10)
105-
// @Success 200 {object} responses.MessagesResponse
106-
// @Failure 400 {object} responses.BadRequest
103+
// @Param message_id query string true "The ID of the message" default(32343a19-da5e-4b1b-a767-3298a73703cb)
104+
// @Success 200 {object} responses.MessageResponse
105+
// @Failure 400 {object} responses.BadRequest
107106
// @Failure 401 {object} responses.Unauthorized
108-
// @Failure 422 {object} responses.UnprocessableEntity
109-
// @Failure 500 {object} responses.InternalServerError
107+
// @Failure 422 {object} responses.UnprocessableEntity
108+
// @Failure 500 {object} responses.InternalServerError
110109
// @Router /messages/outstanding [get]
111110
func (h *MessageHandler) GetOutstanding(c *fiber.Ctx) error {
112111
ctx, span := h.tracer.StartFromFiberCtx(c)
@@ -128,14 +127,14 @@ func (h *MessageHandler) GetOutstanding(c *fiber.Ctx) error {
128127
return h.responseUnprocessableEntity(c, errors, "validation errors while fetching outstanding messages")
129128
}
130129

131-
messages, err := h.service.GetOutstanding(ctx, request.ToGetOutstandingParams(c.OriginalURL(), h.userIDFomContext(c), timestamp))
130+
message, err := h.service.GetOutstanding(ctx, request.ToGetOutstandingParams(c.OriginalURL(), h.userIDFomContext(c), timestamp))
132131
if err != nil {
133-
msg := fmt.Sprintf("cannot get messgaes with URL [%s]", c.OriginalURL())
132+
msg := fmt.Sprintf("cannot get outstnading messgage with ID [%s]", request.MessageID)
134133
ctxLogger.Error(stacktrace.Propagate(err, msg))
135134
return h.responseInternalServerError(c)
136135
}
137136

138-
return h.responseOK(c, fmt.Sprintf("fetched %d %s", len(*messages), h.pluralize("message", len(*messages))), messages)
137+
return h.responseOK(c, "outstanding message fetched successfully", message)
139138
}
140139

141140
// Index returns messages sent between 2 phone numbers

api/pkg/listeners/heartbeat_listener.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ func NewHeartbeatListener(
3131
}
3232

3333
return l, map[string]events.EventListener{
34-
events.EventTypeHeartbeatPhoneOutstanding: l.onHeartbeatPhoneOutstanding,
34+
events.EventTypeMessagePhoneSending: l.onMessagePhoneSending,
3535
}
3636
}
3737

38-
// onHeartbeatPhoneOutstanding handles the events.EventTypeHeartbeatPhoneOutstanding event
39-
func (listener *HeartbeatListener) onHeartbeatPhoneOutstanding(ctx context.Context, event cloudevents.Event) error {
38+
// onMessagePhoneSending handles the events.EventTypeMessagePhoneSending event
39+
func (listener *HeartbeatListener) onMessagePhoneSending(ctx context.Context, event cloudevents.Event) error {
4040
ctx, span := listener.tracer.Start(ctx)
4141
defer span.End()
4242

43-
var payload events.HeartbeatPhoneOutstandingPayload
43+
var payload events.MessagePhoneSendingPayload
4444
if err := event.DataAs(&payload); err != nil {
4545
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
4646
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -49,7 +49,8 @@ func (listener *HeartbeatListener) onHeartbeatPhoneOutstanding(ctx context.Conte
4949
storeParams := services.HeartbeatStoreParams{
5050
Owner: payload.Owner,
5151
Timestamp: payload.Timestamp,
52-
Quantity: payload.Quantity,
52+
UserID: payload.UserID,
53+
MessageID: payload.ID,
5354
}
5455

5556
if _, err := listener.service.Store(ctx, storeParams); err != nil {

api/pkg/repositories/gorm_heartbeat_repository.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ func NewGormHeartbeatRepository(
3131
}
3232

3333
// Index entities.Message between 2 parties
34-
func (repository *gormHeartbeatRepository) Index(ctx context.Context, owner string, params IndexParams) (*[]entities.Heartbeat, error) {
34+
func (repository *gormHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) {
3535
ctx, span := repository.tracer.Start(ctx)
3636
defer span.End()
3737

38-
query := repository.db.WithContext(ctx).Where("owner = ?", owner)
38+
query := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("owner = ?", owner)
3939
if len(params.Query) > 0 {
4040
queryPattern := "%" + params.Query + "%"
4141
query.Where("quantity ILIKE ?", queryPattern)

api/pkg/repositories/gorm_message_repository.go

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -106,36 +106,25 @@ func (repository *gormMessageRepository) Update(ctx context.Context, message *en
106106
}
107107

108108
// GetOutstanding fetches messages that still to be sent to the phone
109-
func (repository *gormMessageRepository) GetOutstanding(ctx context.Context, userID entities.UserID, owner string, take int) (*[]entities.Message, error) {
109+
func (repository *gormMessageRepository) GetOutstanding(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
110110
ctx, span := repository.tracer.Start(ctx)
111111
defer span.End()
112112

113-
messages := new([]entities.Message)
113+
message := new(entities.Message)
114114
err := crdbgorm.ExecuteTx(ctx, repository.db, nil,
115115
func(tx *gorm.DB) error {
116-
return tx.WithContext(ctx).Model(messages).
116+
return tx.WithContext(ctx).Model(message).
117117
Clauses(clause.Returning{}).
118118
Where("user_id = ?", userID).
119-
Where(
120-
"id IN (?)",
121-
tx.Model(&entities.Message{}).
122-
Where("owner = ?", owner).
123-
Where(
124-
repository.db.
125-
Where("status = ?", entities.MessageStatusPending).
126-
Where("age(now(), created_at) < ?", "5 minutes"),
127-
).
128-
Order("request_received_at ASC").
129-
Select("id").
130-
Limit(take),
131-
).
132-
Update("status", "sending").Error
119+
Where("id = ?", messageID).
120+
Where("status = ?", entities.MessageStatusPending).
121+
Update("status", entities.MessageStatusSending).Error
133122
},
134123
)
135124
if err != nil {
136-
msg := fmt.Sprintf("cannot fetch [%d] outstanding messages", take)
125+
msg := fmt.Sprintf("cannot fetch outstanding message with userID [%s] and messageID [%s]", userID, messageID)
137126
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
138127
}
139128

140-
return messages, nil
129+
return message, nil
141130
}

api/pkg/repositories/heartbeat_repository.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ type HeartbeatRepository interface {
1212
Store(ctx context.Context, heartbeat *entities.Heartbeat) error
1313

1414
// Index entities.Heartbeat of an owner
15-
Index(ctx context.Context, owner string, params IndexParams) (*[]entities.Heartbeat, error)
15+
Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error)
1616
}

api/pkg/repositories/message_repository.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ type MessageRepository interface {
2121
// Index entities.Message between 2 phone numbers
2222
Index(ctx context.Context, userID entities.UserID, owner string, contact string, params IndexParams) (*[]entities.Message, error)
2323

24-
// GetOutstanding fetches list of outstanding []entities.Message
25-
GetOutstanding(ctx context.Context, userID entities.UserID, owner string, limit int) (*[]entities.Message, error)
24+
// GetOutstanding fetches an entities.Message which is outstanding
25+
GetOutstanding(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error)
2626
}
Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,23 @@
11
package requests
22

33
import (
4-
"strconv"
54
"strings"
65
"time"
76

87
"github.com/NdoleStudio/http-sms-manager/pkg/entities"
9-
108
"github.com/NdoleStudio/http-sms-manager/pkg/services"
9+
"github.com/google/uuid"
1110
)
1211

1312
// MessageOutstanding is the payload fetching outstanding entities.Message
1413
type MessageOutstanding struct {
1514
request
16-
Owner string `json:"owner" query:"owner"`
17-
Limit string `json:"limit" query:"limit"`
15+
MessageID string `json:"message_id" query:"message_id"`
1816
}
1917

2018
// Sanitize sets defaults to MessageOutstanding
2119
func (input *MessageOutstanding) Sanitize() MessageOutstanding {
22-
if strings.TrimSpace(input.Limit) == "" {
23-
input.Limit = "1"
24-
}
25-
26-
input.Owner = input.sanitizeAddress(input.Owner)
27-
if input.Owner == "" {
28-
input.Owner = "+37259139660"
29-
}
30-
20+
input.MessageID = strings.TrimSpace(input.MessageID)
3121
return *input
3222
}
3323

@@ -36,14 +26,7 @@ func (input *MessageOutstanding) ToGetOutstandingParams(source string, userID en
3626
return services.MessageGetOutstandingParams{
3727
Source: source,
3828
UserID: userID,
39-
Owner: input.Owner,
29+
MessageID: uuid.MustParse(input.MessageID),
4030
Timestamp: timestamp,
41-
Limit: input.getLimit(),
4231
}
4332
}
44-
45-
// getLimit gets the take as a string
46-
func (input *MessageOutstanding) getLimit() int {
47-
val, _ := strconv.Atoi(input.Limit)
48-
return val
49-
}

api/pkg/services/heartbeat_service.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ func NewHeartbeatService(
3535
}
3636

3737
// Index fetches the heartbeats for a phone number
38-
func (service *HeartbeatService) Index(ctx context.Context, owner string, params repositories.IndexParams) (*[]entities.Heartbeat, error) {
38+
func (service *HeartbeatService) Index(ctx context.Context, userID entities.UserID, owner string, params repositories.IndexParams) (*[]entities.Heartbeat, error) {
3939
ctx, span := service.tracer.Start(ctx)
4040
defer span.End()
4141

4242
ctxLogger := service.tracer.CtxLogger(service.logger, span)
4343

44-
heartbeats, err := service.repository.Index(ctx, owner, params)
44+
heartbeats, err := service.repository.Index(ctx, userID, owner, params)
4545
if err != nil {
4646
msg := fmt.Sprintf("could not fetch heartbeats with parms [%+#v]", params)
4747
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -55,7 +55,8 @@ func (service *HeartbeatService) Index(ctx context.Context, owner string, params
5555
type HeartbeatStoreParams struct {
5656
Owner string
5757
Timestamp time.Time
58-
Quantity int
58+
UserID entities.UserID
59+
MessageID uuid.UUID
5960
}
6061

6162
// Store a new entities.Heartbeat
@@ -69,7 +70,8 @@ func (service *HeartbeatService) Store(ctx context.Context, params HeartbeatStor
6970
ID: uuid.New(),
7071
Owner: params.Owner,
7172
Timestamp: params.Timestamp,
72-
Quantity: params.Quantity,
73+
MessageID: params.MessageID,
74+
UserID: params.UserID,
7375
}
7476

7577
if err := service.repository.Store(ctx, heartbeat); err != nil {

0 commit comments

Comments
 (0)