Skip to content

Commit a38e329

Browse files
author
Armin
committed
add circuit breaker implementation and refactor SMS sending logic
1 parent b02645b commit a38e329

File tree

6 files changed

+235
-13
lines changed

6 files changed

+235
-13
lines changed

internal/operator/operatorA/service.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,23 @@ package operatorA
22

33
import (
44
"context"
5+
"sms-gateway/app"
56
"sms-gateway/internal/model"
67
)
78

89
type OA struct{}
910

10-
func (o OA) Send(ctx context.Context, s model.SMS) {}
11+
func (o OA) Send(ctx context.Context, s model.SMS) error {
12+
13+
//return errors.New("fall down") for test circuit breaker
14+
15+
for _, v := range s.Recipients {
16+
app.Logger.Info("your sms has sent ",
17+
"user id : ", s.CustomerID,
18+
"msg : ", s.Text,
19+
"number : ", v,
20+
"operator:", "A")
21+
}
22+
23+
return nil
24+
}

internal/operator/operatorB/service.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,21 @@ package operatorB
22

33
import (
44
"context"
5+
"sms-gateway/app"
56
"sms-gateway/internal/model"
67
)
78

89
type OB struct{}
910

10-
func (o OB) Send(ctx context.Context, s model.SMS) {}
11+
func (o OB) Send(ctx context.Context, s model.SMS) error {
12+
13+
for _, v := range s.Recipients {
14+
app.Logger.Info("your sms has sent ",
15+
"user id : ", s.CustomerID,
16+
"msg : ", s.Text,
17+
"number : ", v,
18+
"operator:", "B")
19+
}
20+
21+
return nil
22+
}

internal/operator/service.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,87 @@ package operator
22

33
import (
44
"context"
5+
"fmt"
6+
"log/slog"
7+
"time"
8+
59
"sms-gateway/internal/model"
10+
operatorA "sms-gateway/internal/operator/operatorA"
11+
operatorB "sms-gateway/internal/operator/operatorB"
12+
"sms-gateway/pkg/circuitbreaker"
613
)
714

815
type Operator interface {
9-
Send(ctx context.Context, s model.SMS)
16+
Send(ctx context.Context, s model.SMS) error
17+
}
18+
19+
var (
20+
primaryOperator Operator = operatorA.OA{}
21+
fallbackOperator Operator = operatorB.OB{}
22+
23+
operatorBreaker = circuitbreaker.New(circuitbreaker.Config{
24+
FailureThreshold: 3,
25+
SuccessThreshold: 2,
26+
OpenTimeout: 5 * time.Second,
27+
})
28+
29+
retryBackoff = 200 * time.Millisecond
30+
maxSendRetries = 2
31+
operatorTimeout = 2 * time.Second
32+
)
33+
34+
func Send(ctx context.Context, s model.SMS) (string, error) {
35+
if provider, err := dispatch(ctx, "operatorA", primaryOperator, operatorBreaker, s); err == nil {
36+
return provider, nil
37+
}
38+
39+
slog.Warn("primary operator failed, falling back")
40+
41+
return dispatch(ctx, "operatorB", fallbackOperator, nil, s)
42+
}
43+
44+
func dispatch(ctx context.Context, name string, op Operator, breaker *circuitbreaker.Breaker, s model.SMS) (string, error) {
45+
if breaker != nil {
46+
if err := breaker.Allow(); err != nil {
47+
return "", err
48+
}
49+
}
50+
51+
var lastErr error
52+
backoff := retryBackoff
53+
54+
for attempt := 0; attempt <= maxSendRetries; attempt++ {
55+
sendCtx, cancel := context.WithTimeout(ctx, operatorTimeout)
56+
err := op.Send(sendCtx, s)
57+
cancel()
58+
59+
if err == nil {
60+
if breaker != nil {
61+
breaker.MarkSuccess()
62+
}
63+
return name, nil
64+
}
65+
66+
lastErr = err
67+
if breaker != nil {
68+
breaker.MarkFailure()
69+
}
70+
71+
if attempt == maxSendRetries {
72+
break
73+
}
74+
75+
select {
76+
case <-time.After(backoff):
77+
backoff *= 2
78+
case <-ctx.Done():
79+
return "", ctx.Err()
80+
}
81+
}
82+
83+
if lastErr == nil {
84+
return "", fmt.Errorf("%s failed without an explicit error", name)
85+
}
86+
87+
return "", lastErr
1088
}

internal/sms/consumer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func StartConsumers(ctx context.Context) error {
2929

3030
app.Logger.Info("got message", "message", message)
3131

32-
if err := sendSmsToProvider(ctx, message); err != nil {
32+
if err := sendSms(ctx, message); err != nil {
3333
return err
3434
}
3535

@@ -48,13 +48,15 @@ func StartConsumers(ctx context.Context) error {
4848
func(ctx context.Context, evt amqp091.Delivery) error {
4949
var message model.SMS
5050
if err := json.Unmarshal(evt.Body, &message); err != nil {
51+
evt.Ack(false)
5152
app.Logger.Error("cannot unmarshal sms", "error", err)
5253
return fmt.Errorf("cannot unmarshal sms : %w", err)
5354
}
5455

5556
app.Logger.Info("got message", "message", message)
5657

57-
if err := sendSmsToProvider(ctx, message); err != nil {
58+
if err := sendSms(ctx, message); err != nil {
59+
evt.Ack(false)
5860
return err
5961
}
6062

internal/sms/service.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"sms-gateway/app"
77
"sms-gateway/internal/model"
8+
"sms-gateway/internal/operator"
89
"strings"
910
)
1011

@@ -16,30 +17,41 @@ const (
1617
Done State = "done"
1718
)
1819

19-
func sendSmsToProvider(ctx context.Context, s model.SMS) error {
20+
func sendSms(ctx context.Context, s model.SMS) error {
2021
if err := UpdateSMS(ctx, s, Init); err != nil {
2122
return err
2223
}
2324

24-
if err := UpdateSMS(ctx, s, Done); err != nil {
25+
provider, err := operator.Send(ctx, s)
26+
if err != nil {
2527
return err
2628
}
29+
30+
if err := UpdateSMS(ctx, s, Done, provider); err != nil {
31+
return err
32+
}
33+
2734
return nil
2835
}
2936

30-
func UpdateSMS(ctx context.Context, s model.SMS, state State) error {
37+
func UpdateSMS(ctx context.Context, s model.SMS, state State, provider ...string) error {
3138
if len(s.Recipients) == 0 {
3239
return errors.New("no Recipients")
3340
}
3441

42+
var providerName string
43+
if len(provider) > 0 {
44+
providerName = provider[0]
45+
}
46+
3547
valueStrings := make([]string, 0, len(s.Recipients))
36-
valueArgs := make([]any, 0, len(s.Recipients)*4)
48+
valueArgs := make([]any, 0, len(s.Recipients)*5)
3749
for _, recipient := range s.Recipients {
38-
valueStrings = append(valueStrings, "(?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
39-
valueArgs = append(valueArgs, s.CustomerID, s.Type, state, recipient)
50+
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
51+
valueArgs = append(valueArgs, s.CustomerID, s.Type, state, recipient, providerName)
4052
}
41-
query := ` INSERT INTO sms_status
42-
(user_id,type, status, recipient, created_at, updated_at)
53+
query := ` INSERT INTO sms_status
54+
(user_id,type, status, recipient, provider, created_at, updated_at)
4355
VALUES ` + strings.Join(valueStrings, ",")
4456
if _, err := app.DB.ExecContext(ctx, query, valueArgs...); err != nil {
4557
return err
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package circuitbreaker
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
)
8+
9+
type State int
10+
11+
const (
12+
StateClosed State = iota
13+
StateOpen
14+
StateHalfOpen
15+
)
16+
17+
type Config struct {
18+
FailureThreshold int
19+
SuccessThreshold int
20+
OpenTimeout time.Duration
21+
}
22+
23+
type Breaker struct {
24+
mu sync.Mutex
25+
state State
26+
failureCount int
27+
successCount int
28+
reopenDeadline time.Time
29+
cfg Config
30+
}
31+
32+
var ErrOpen = errors.New("circuit open")
33+
34+
func New(cfg Config) *Breaker {
35+
if cfg.FailureThreshold <= 0 {
36+
cfg.FailureThreshold = 3
37+
}
38+
if cfg.SuccessThreshold <= 0 {
39+
cfg.SuccessThreshold = 1
40+
}
41+
if cfg.OpenTimeout <= 0 {
42+
cfg.OpenTimeout = 5 * time.Second
43+
}
44+
45+
return &Breaker{cfg: cfg}
46+
}
47+
48+
func (cb *Breaker) Allow() error {
49+
cb.mu.Lock()
50+
defer cb.mu.Unlock()
51+
52+
now := time.Now()
53+
switch cb.state {
54+
case StateOpen:
55+
if now.Before(cb.reopenDeadline) {
56+
return ErrOpen
57+
}
58+
cb.transitionTo(StateHalfOpen)
59+
}
60+
61+
return nil
62+
}
63+
64+
func (cb *Breaker) MarkSuccess() {
65+
cb.mu.Lock()
66+
defer cb.mu.Unlock()
67+
68+
if cb.state == StateHalfOpen {
69+
cb.successCount++
70+
if cb.successCount >= cb.cfg.SuccessThreshold {
71+
cb.reset()
72+
}
73+
return
74+
}
75+
76+
cb.reset()
77+
}
78+
79+
func (cb *Breaker) MarkFailure() {
80+
cb.mu.Lock()
81+
defer cb.mu.Unlock()
82+
83+
cb.failureCount++
84+
if cb.failureCount >= cb.cfg.FailureThreshold {
85+
cb.trip()
86+
}
87+
}
88+
89+
func (cb *Breaker) transitionTo(state State) {
90+
cb.state = state
91+
cb.failureCount = 0
92+
cb.successCount = 0
93+
if state == StateOpen {
94+
cb.reopenDeadline = time.Now().Add(cb.cfg.OpenTimeout)
95+
}
96+
}
97+
98+
func (cb *Breaker) reset() {
99+
cb.transitionTo(StateClosed)
100+
}
101+
102+
func (cb *Breaker) trip() {
103+
cb.transitionTo(StateOpen)
104+
}

0 commit comments

Comments
 (0)