Skip to content

Commit f462c3b

Browse files
emidiocrucianiLorenzoMassarini
authored andcommitted
MDI240 | handle aws send email throttling requeing the email in status READY
1 parent 78c4e50 commit f462c3b

File tree

6 files changed

+141
-2
lines changed

6 files changed

+141
-2
lines changed

docs/error-handling.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ Quando l'invio SMTP fallisce:
4141
- Stato: `FAILED`
4242
- Reason: Messaggio di errore originale dall'SMTP client
4343

44+
### SMTP Throttling (454)
45+
Quando l'invio SMTP fallisce con codice `454` (throttling):
46+
- Stato: ritorna a `READY`
47+
- Record `PROCESSING` rimosso da `email_statuses`
48+
- Reason: vuota
49+
4450
### Callback Failed
4551
Quando il callback HTTP fallisce dopo tutti i retry:
4652
- Stato rimane: `CALLING-SENT-CALLBACK` o `CALLING-FAILED-CALLBACK`

internal/outbox/outbox.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,60 @@ func (o *Outbox) Update(ctx context.Context, id string, status string, errorReas
237237
return err
238238
}
239239

240+
// Requeue updates the email from PROCESSING back to READY and removes the PROCESSING history row.
241+
// The operation is executed within a transaction with retry logic for transient errors.
242+
// Note: ttl parameter is ignored for MySQL.
243+
func (o *Outbox) Requeue(ctx context.Context, id string, _ *int64) error {
244+
updateQuery := `
245+
UPDATE emails
246+
SET status = ?, reason = ?, version = version + 1
247+
WHERE id = ? AND status = ?
248+
`
249+
deleteProcessingQuery := `
250+
DELETE FROM email_statuses
251+
WHERE email_id = ? AND status = ?
252+
ORDER BY id DESC
253+
LIMIT 1
254+
`
255+
256+
var err error
257+
for attempt := range maxAttempts {
258+
err = o.executeInTransaction(ctx, func(tx *sql.Tx) error {
259+
result, execErr := tx.ExecContext(ctx, updateQuery, StatusReady, "", id, StatusProcessing)
260+
if execErr != nil {
261+
return execErr
262+
}
263+
264+
affected, affErr := result.RowsAffected()
265+
if affErr != nil {
266+
return affErr
267+
}
268+
269+
if affected == 0 {
270+
return ErrLockNotAcquired
271+
}
272+
273+
_, delErr := tx.ExecContext(ctx, deleteProcessingQuery, id, StatusProcessing)
274+
return delErr
275+
})
276+
277+
if err == nil || !o.shouldRetryMySQL(err) {
278+
return err
279+
}
280+
281+
sleep := o.backoffDuration(attempt)
282+
timer := time.NewTimer(sleep)
283+
select {
284+
case <-ctx.Done():
285+
timer.Stop()
286+
return ctx.Err()
287+
case <-timer.C:
288+
}
289+
}
290+
291+
return err
292+
}
293+
240294
// Ready updates the email to READY status with the eml file path.
241295
// Expected from status is INTAKING.
242296
// The operation is executed within a transaction with retry logic for transient errors.

internal/pipeline/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ type outboxService interface {
99
Query(ctx context.Context, status string, limit int) ([]outbox.Email, error)
1010
Update(ctx context.Context, id string, status string, errorReason string, ttl *int64) error
1111
Ready(ctx context.Context, id string, emlFilePath string, ttl *int64) error
12+
Requeue(ctx context.Context, id string, ttl *int64) error
1213
}

internal/pipeline/sender.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package pipeline
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log/slog"
8+
"net/textproto"
79
"sync"
810

911
"mailculator-processor/internal/email"
@@ -59,8 +61,15 @@ func (p *MainSenderPipeline) Process(ctx context.Context) {
5961
}
6062

6163
if err = p.client.Send(payload, p.attachmentsBasePath); err != nil {
62-
logger.Error(fmt.Sprintf("failed to send, error: %v", err))
63-
p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, err.Error(), outboxEmail.TTL)
64+
if isSMTPThrottling(err) {
65+
logger.Warn(fmt.Sprintf("smtp throttling, requeueing: %v", err))
66+
if requeueErr := p.outbox.Requeue(context.Background(), outboxEmail.Id, outboxEmail.TTL); requeueErr != nil {
67+
logger.Error(fmt.Sprintf("error requeueing email, error: %v", requeueErr))
68+
}
69+
} else {
70+
logger.Error(fmt.Sprintf("failed to send, error: %v", err))
71+
p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, err.Error(), outboxEmail.TTL)
72+
}
6473
} else {
6574
logger.Info("successfully sent")
6675
p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusSent, "", outboxEmail.TTL)
@@ -77,3 +86,16 @@ func (p *MainSenderPipeline) handle(ctx context.Context, logger *slog.Logger, em
7786
logger.Error(msg)
7887
}
7988
}
89+
90+
func isSMTPThrottling(err error) bool {
91+
if err == nil {
92+
return false
93+
}
94+
95+
var smtpErr *textproto.Error
96+
if errors.As(err, &smtpErr) {
97+
return smtpErr.Code == 454
98+
}
99+
100+
return false
101+
}

internal/pipeline/sender_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"encoding/json"
88
"errors"
9+
"net/textproto"
910
"os"
1011
"strings"
1112
"testing"
@@ -123,6 +124,25 @@ func TestSendEmailError(t *testing.T) {
123124
)
124125
}
125126

127+
func TestSendEmailThrottlingRequeue(t *testing.T) {
128+
payloadFile := createPayloadFile(t)
129+
buf, logger := mocks.NewLoggerMock()
130+
outboxServiceMock := mocks.NewOutboxMock(
131+
mocks.Email(outbox.Email{Id: "1", Status: "", PayloadFilePath: payloadFile}),
132+
)
133+
senderServiceMock := newSenderMock(&textproto.Error{Code: 454, Msg: "Throttling failure"})
134+
sender := MainSenderPipeline{outbox: outboxServiceMock, client: senderServiceMock, attachmentsBasePath: "/base/path/", logger: logger}
135+
136+
sender.Process(context.TODO())
137+
138+
assert.Equal(t, 0, senderServiceMock.sendMethodCounter)
139+
assert.Equal(t, "requeue", outboxServiceMock.LastMethod())
140+
assert.Equal(t,
141+
"level=INFO msg=\"processing outbox 1\"\nlevel=WARN msg=\"smtp throttling, requeueing: 454 Throttling failure\" outbox=1",
142+
strings.TrimSpace(buf.String()),
143+
)
144+
}
145+
126146
func TestHandleUpdateError(t *testing.T) {
127147
payloadFile := createPayloadFile(t)
128148
buf, logger := mocks.NewLoggerMock()

internal/testutils/mocks/outbox.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ type OutboxMock struct {
1010
updateMethodError error
1111
updateMethodCall int
1212
updateMethodFailsCall int
13+
requeueMethodError error
14+
requeueMethodCall int
15+
requeueMethodFailsCall int
1316
email outbox.Email
17+
lastMethod string
1418
}
1519

1620
type OutboxMockOptions func(*OutboxMock)
@@ -33,6 +37,18 @@ func UpdateMethodFailsCall(updateMethodFailsCall int) OutboxMockOptions {
3337
}
3438
}
3539

40+
func RequeueMethodError(requeueMethodError error) OutboxMockOptions {
41+
return func(o *OutboxMock) {
42+
o.requeueMethodError = requeueMethodError
43+
}
44+
}
45+
46+
func RequeueMethodFailsCall(requeueMethodFailsCall int) OutboxMockOptions {
47+
return func(o *OutboxMock) {
48+
o.requeueMethodFailsCall = requeueMethodFailsCall
49+
}
50+
}
51+
3652
func Email(email outbox.Email) OutboxMockOptions {
3753
return func(o *OutboxMock) {
3854
o.email = email
@@ -45,7 +61,11 @@ func NewOutboxMock(opts ...OutboxMockOptions) *OutboxMock {
4561
updateMethodError: nil,
4662
updateMethodCall: 0,
4763
updateMethodFailsCall: 1,
64+
requeueMethodError: nil,
65+
requeueMethodCall: 0,
66+
requeueMethodFailsCall: 1,
4867
email: outbox.Email{},
68+
lastMethod: "",
4969
}
5070
for _, opt := range opts {
5171
opt(o)
@@ -54,10 +74,12 @@ func NewOutboxMock(opts ...OutboxMockOptions) *OutboxMock {
5474
}
5575

5676
func (m *OutboxMock) Query(ctx context.Context, status string, limit int) ([]outbox.Email, error) {
77+
m.lastMethod = "query"
5778
return []outbox.Email{m.email}, m.queryMethodError
5879
}
5980

6081
func (m *OutboxMock) Update(ctx context.Context, id string, status string, errorReason string, ttl *int64) error {
82+
m.lastMethod = "update"
6183
m.updateMethodCall++
6284
if m.updateMethodCall == m.updateMethodFailsCall {
6385
return m.updateMethodError
@@ -66,9 +88,23 @@ func (m *OutboxMock) Update(ctx context.Context, id string, status string, error
6688
}
6789

6890
func (m *OutboxMock) Ready(ctx context.Context, id string, emlFilePath string, ttl *int64) error {
91+
m.lastMethod = "ready"
6992
m.updateMethodCall++
7093
if m.updateMethodCall == m.updateMethodFailsCall {
7194
return m.updateMethodError
7295
}
7396
return nil
7497
}
98+
99+
func (m *OutboxMock) Requeue(ctx context.Context, id string, ttl *int64) error {
100+
m.lastMethod = "requeue"
101+
m.requeueMethodCall++
102+
if m.requeueMethodCall == m.requeueMethodFailsCall {
103+
return m.requeueMethodError
104+
}
105+
return nil
106+
}
107+
108+
func (m *OutboxMock) LastMethod() string {
109+
return m.lastMethod
110+
}

0 commit comments

Comments
 (0)