Skip to content

Commit 3c26e2d

Browse files
committed
feat: Implement redis-based distributed lock for infra declaration
1 parent b93afc5 commit 3c26e2d

File tree

7 files changed

+494
-46
lines changed

7 files changed

+494
-46
lines changed

cmd/e2e/configs/basic.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/google/uuid"
1111
"github.com/hookdeck/outpost/internal/config"
1212
"github.com/hookdeck/outpost/internal/infra"
13+
"github.com/hookdeck/outpost/internal/redis"
1314
"github.com/hookdeck/outpost/internal/util/testinfra"
1415
"github.com/hookdeck/outpost/internal/util/testutil"
1516
"github.com/stretchr/testify/require"
@@ -74,10 +75,15 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
7475

7576
// Setup cleanup
7677
t.Cleanup(func() {
77-
if err := infra.Teardown(context.Background(), infra.Config{
78+
redisClient, err := redis.New(context.Background(), c.Redis.ToConfig())
79+
if err != nil {
80+
log.Println("Failed to create redis client:", err)
81+
}
82+
outpostInfra := infra.NewInfra(infra.Config{
7883
DeliveryMQ: c.MQs.ToInfraConfig("deliverymq"),
7984
LogMQ: c.MQs.ToInfraConfig("logmq"),
80-
}); err != nil {
85+
}, redisClient)
86+
if err := outpostInfra.Teardown(context.Background()); err != nil {
8187
log.Println("Teardown failed:", err)
8288
}
8389
})

internal/app/app.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/hookdeck/outpost/internal/logging"
1414
"github.com/hookdeck/outpost/internal/migrator"
1515
"github.com/hookdeck/outpost/internal/otel"
16+
"github.com/hookdeck/outpost/internal/redis"
1617
"github.com/hookdeck/outpost/internal/services/api"
1718
"github.com/hookdeck/outpost/internal/services/delivery"
1819
"github.com/hookdeck/outpost/internal/services/log"
@@ -53,14 +54,20 @@ func run(mainContext context.Context, cfg *config.Config) error {
5354
return err
5455
}
5556

56-
if err := infra.Declare(mainContext, infra.Config{
57+
redisClient, err := redis.New(mainContext, cfg.Redis.ToConfig())
58+
if err != nil {
59+
return err
60+
}
61+
62+
outpostInfra := infra.NewInfra(infra.Config{
5763
DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"),
5864
LogMQ: cfg.MQs.ToInfraConfig("logmq"),
59-
}); err != nil {
65+
}, redisClient)
66+
if err := outpostInfra.Declare(mainContext); err != nil {
6067
return err
6168
}
6269

63-
installationID, err := getInstallation(mainContext, cfg.Redis.ToConfig(), cfg.Telemetry.ToTelemetryConfig())
70+
installationID, err := getInstallation(mainContext, redisClient, cfg.Telemetry.ToTelemetryConfig())
6471
if err != nil {
6572
return err
6673
}

internal/app/installation.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@ const (
1313
installationKey = "installation"
1414
)
1515

16-
func getInstallation(ctx context.Context, redisConfig *redis.RedisConfig, telemetryConfig telemetry.TelemetryConfig) (string, error) {
16+
func getInstallation(ctx context.Context, redisClient *redis.Client, telemetryConfig telemetry.TelemetryConfig) (string, error) {
1717
if telemetryConfig.Disabled {
1818
return "", nil
1919
}
2020

21-
redisClient, err := redis.New(ctx, redisConfig)
22-
if err != nil {
23-
return "", err
24-
}
25-
2621
// TODO: consider using WATCH to avoid race condition
2722
// There's a potential race condition when multiple Outpost instances are started at the same time.
2823
// However, given this is for telemetry purposes, and it will be a temporary issue, we can ignore it for now.

internal/infra/infra.go

Lines changed: 143 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,32 @@ package infra
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/hookdeck/outpost/internal/mqinfra"
9+
"github.com/hookdeck/outpost/internal/redis"
710
)
811

12+
const (
13+
lockKey = "outpost:lock"
14+
lockAttempts = 5
15+
lockDelay = 5 * time.Second
16+
lockTTL = 10 * time.Second
17+
)
18+
19+
type Infra struct {
20+
lock Lock
21+
provider InfraProvider
22+
}
23+
24+
// InfraProvider handles the actual infrastructure operations
25+
type InfraProvider interface {
26+
Exist(ctx context.Context) (bool, error)
27+
Declare(ctx context.Context) error
28+
Teardown(ctx context.Context) error
29+
}
30+
931
type Config struct {
1032
DeliveryMQ *mqinfra.MQInfraConfig
1133
LogMQ *mqinfra.MQInfraConfig
@@ -16,59 +38,145 @@ func (cfg *Config) SetSensiblePolicyDefaults() {
1638
cfg.LogMQ.Policy.RetryLimit = 5
1739
}
1840

19-
func Declare(ctx context.Context, cfg Config) error {
41+
type Lock interface {
42+
AttemptLock(ctx context.Context) (bool, error)
43+
Unlock(ctx context.Context) (bool, error)
44+
}
45+
46+
// infraProvider implements InfraProvider using real MQ infrastructure
47+
type infraProvider struct {
48+
deliveryMQ mqinfra.MQInfra
49+
logMQ mqinfra.MQInfra
50+
}
51+
52+
func (p *infraProvider) Exist(ctx context.Context) (bool, error) {
53+
if exists, err := p.deliveryMQ.Exist(ctx); err != nil {
54+
return false, err
55+
} else if !exists {
56+
return false, nil
57+
}
58+
59+
if exists, err := p.logMQ.Exist(ctx); err != nil {
60+
return false, err
61+
} else if !exists {
62+
return false, nil
63+
}
64+
65+
return true, nil
66+
}
67+
68+
func (p *infraProvider) Declare(ctx context.Context) error {
69+
if err := p.deliveryMQ.Declare(ctx); err != nil {
70+
return err
71+
}
72+
73+
if err := p.logMQ.Declare(ctx); err != nil {
74+
return err
75+
}
76+
77+
return nil
78+
}
79+
80+
func (p *infraProvider) Teardown(ctx context.Context) error {
81+
if err := p.deliveryMQ.TearDown(ctx); err != nil {
82+
return err
83+
}
84+
85+
if err := p.logMQ.TearDown(ctx); err != nil {
86+
return err
87+
}
88+
89+
return nil
90+
}
91+
92+
func NewInfra(cfg Config, redisClient *redis.Client) Infra {
2093
cfg.SetSensiblePolicyDefaults()
2194

22-
// Check existence first
23-
var deliveryMQExists, logMQExists bool
24-
var deliveryMQ, logMQ mqinfra.MQInfra
95+
provider := &infraProvider{
96+
deliveryMQ: mqinfra.New(cfg.DeliveryMQ),
97+
logMQ: mqinfra.New(cfg.LogMQ),
98+
}
2599

26-
if cfg.DeliveryMQ != nil {
27-
deliveryMQ = mqinfra.New(cfg.DeliveryMQ)
28-
exists, err := deliveryMQ.Exist(ctx)
29-
if err != nil {
30-
return err
31-
}
32-
deliveryMQExists = exists
100+
return Infra{
101+
lock: NewRedisLock(redisClient),
102+
provider: provider,
103+
}
104+
}
105+
106+
// NewInfraWithProvider creates an Infra instance with custom lock and provider (for testing)
107+
func NewInfraWithProvider(lock Lock, provider InfraProvider) *Infra {
108+
return &Infra{
109+
lock: lock,
110+
provider: provider,
33111
}
112+
}
34113

35-
if cfg.LogMQ != nil {
36-
logMQ = mqinfra.New(cfg.LogMQ)
37-
exists, err := logMQ.Exist(ctx)
114+
func (infra *Infra) Declare(ctx context.Context) error {
115+
for attempt := 0; attempt < lockAttempts; attempt++ {
116+
shouldDeclare, hasLocked, err := infra.shouldDeclareAndAcquireLock(ctx)
38117
if err != nil {
39118
return err
40119
}
41-
logMQExists = exists
42-
}
120+
if !shouldDeclare {
121+
return nil
122+
}
43123

44-
// Declare if necessary
45-
if cfg.DeliveryMQ != nil && !deliveryMQExists {
46-
if err := deliveryMQ.Declare(ctx); err != nil {
47-
return err
124+
if hasLocked {
125+
// We got the lock, declare infrastructure
126+
defer func() {
127+
// TODO: improve error handling
128+
unlocked, err := infra.lock.Unlock(ctx)
129+
if err != nil {
130+
panic(err)
131+
}
132+
if !unlocked {
133+
panic("failed to unlock lock")
134+
}
135+
}()
136+
137+
if err := infra.provider.Declare(ctx); err != nil {
138+
return err
139+
}
140+
141+
return nil
48142
}
49-
}
50143

51-
if cfg.LogMQ != nil && !logMQExists {
52-
if err := logMQ.Declare(ctx); err != nil {
53-
return err
144+
// We didn't get the lock, wait before retry
145+
if attempt < lockAttempts-1 {
146+
time.Sleep(lockDelay)
54147
}
55148
}
56149

57-
return nil
150+
return fmt.Errorf("failed to acquire lock after %d attempts", lockAttempts)
58151
}
59152

60-
func Teardown(ctx context.Context, cfg Config) error {
61-
if cfg.DeliveryMQ != nil {
62-
if err := mqinfra.New(cfg.DeliveryMQ).TearDown(ctx); err != nil {
63-
return err
64-
}
153+
func (infra *Infra) Teardown(ctx context.Context) error {
154+
return infra.provider.Teardown(ctx)
155+
}
156+
157+
// shouldDeclareAndAcquireLock checks if
158+
func (infra *Infra) shouldDeclareAndAcquireLock(ctx context.Context) (shouldDeclare bool, hasLocked bool, err error) {
159+
shouldDeclare = false
160+
hasLocked = false
161+
err = nil
162+
163+
exists, err := infra.provider.Exist(ctx)
164+
if err != nil {
165+
err = fmt.Errorf("failed to check if infra exists: %w", err)
166+
return
167+
}
168+
if exists {
169+
// if infra exists, return early, no need to acquire lock
170+
shouldDeclare = false
171+
return
65172
}
173+
shouldDeclare = true
66174

67-
if cfg.LogMQ != nil {
68-
if err := mqinfra.New(cfg.LogMQ).TearDown(ctx); err != nil {
69-
return err
70-
}
175+
hasLocked, err = infra.lock.AttemptLock(ctx)
176+
if err != nil {
177+
err = fmt.Errorf("failed to acquire lock: %w", err)
178+
return
71179
}
72180

73-
return nil
181+
return
74182
}

0 commit comments

Comments
 (0)