Skip to content

Commit fcc971a

Browse files
authored
feat: support configurable idempotency TTL (#544)
* feat: support configurable idempotency key ttl * refactor: idempotence params # Conflicts: # internal/services/api/router_test.go # Conflicts: # internal/services/delivery/delivery.go * chore: remove unused func
1 parent f958d21 commit fcc971a

File tree

10 files changed

+60
-45
lines changed

10 files changed

+60
-45
lines changed

docs/pages/references/configuration.mdx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
4141
| `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes |
4242
| `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes |
4343
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
44+
| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). | `86400` | No |
4445
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
4546
| `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No |
4647
| `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No |
@@ -105,6 +106,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
105106
| `PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS` | JSON string or path to a file containing GCP service account credentials for the Pub/Sub publish topic. Required if GCP Pub/Sub is chosen and not using implicit credentials. | `nil` | Conditional |
106107
| `PUBLISH_GCP_PUBSUB_SUBSCRIPTION` | Name of the GCP Pub/Sub subscription to read published events from. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional |
107108
| `PUBLISH_GCP_PUBSUB_TOPIC` | Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional |
109+
| `PUBLISH_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). | `86400` | No |
108110
| `PUBLISH_MAX_CONCURRENCY` | Maximum number of messages to process concurrently from the publish queue. | `1` | No |
109111
| `PUBLISH_RABBITMQ_EXCHANGE` | Name of the RabbitMQ exchange for the publish queue. | `nil` | No |
110112
| `PUBLISH_RABBITMQ_QUEUE` | Name of the RabbitMQ queue for publishing events. Required if RabbitMQ is the chosen publish MQ provider. | `nil` | Conditional |
@@ -166,6 +168,9 @@ alert:
166168
# Enables or disables audit logging for significant events.
167169
audit_log: true
168170

171+
# Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours).
172+
delivery_idempotency_key_ttl: 86400
173+
169174
# Maximum number of delivery attempts to process concurrently.
170175
delivery_max_concurrency: 1
171176

@@ -451,6 +456,9 @@ portal:
451456
# Required: Y
452457
postgres: ""
453458

459+
# Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours).
460+
publish_idempotency_key_ttl: 86400
461+
454462
publishmq:
455463
# Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured.
456464
aws_sqs:

internal/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ type Config struct {
8383
MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
8484
DeliveryTimeoutSeconds int `yaml:"delivery_timeout_seconds" env:"DELIVERY_TIMEOUT_SECONDS" desc:"Timeout in seconds for HTTP requests made during event delivery to webhook destinations." required:"N"`
8585

86+
// Idempotency
87+
PublishIdempotencyKeyTTL int `yaml:"publish_idempotency_key_ttl" env:"PUBLISH_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours)." required:"N"`
88+
DeliveryIdempotencyKeyTTL int `yaml:"delivery_idempotency_key_ttl" env:"DELIVERY_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours)." required:"N"`
89+
8690
// Destination Registry
8791
DestinationMetadataPath string `yaml:"destination_metadata_path" env:"DESTINATION_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set." required:"N"`
8892

@@ -160,6 +164,8 @@ func (c *Config) InitDefaults() {
160164
c.RetryMaxLimit = 10
161165
c.MaxDestinationsPerTenant = 20
162166
c.DeliveryTimeoutSeconds = 5
167+
c.PublishIdempotencyKeyTTL = 86400 // 24 hours
168+
c.DeliveryIdempotencyKeyTTL = 86400 // 24 hours
163169
c.LogBatchThresholdSeconds = 10
164170
c.LogBatchSize = 1000
165171

internal/deliverymq/messagehandler.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/hookdeck/outpost/internal/logging"
1515
"github.com/hookdeck/outpost/internal/models"
1616
"github.com/hookdeck/outpost/internal/mqs"
17-
"github.com/hookdeck/outpost/internal/redis"
1817
"github.com/hookdeck/outpost/internal/scheduler"
1918
"go.opentelemetry.io/otel/trace"
2019
"go.uber.org/zap"
@@ -110,7 +109,6 @@ type AlertMonitor interface {
110109

111110
func NewMessageHandler(
112111
logger *logging.Logger,
113-
redisClient redis.Cmdable,
114112
logMQ LogPublisher,
115113
entityStore DestinationGetter,
116114
logStore EventGetter,
@@ -120,6 +118,7 @@ func NewMessageHandler(
120118
retryBackoff backoff.Backoff,
121119
retryMaxLimit int,
122120
alertMonitor AlertMonitor,
121+
idempotence idempotence.Idempotence,
123122
) consumer.MessageHandler {
124123
return &messageHandler{
125124
eventTracer: eventTracer,
@@ -131,11 +130,8 @@ func NewMessageHandler(
131130
retryScheduler: retryScheduler,
132131
retryBackoff: retryBackoff,
133132
retryMaxLimit: retryMaxLimit,
134-
idempotence: idempotence.New(redisClient,
135-
idempotence.WithTimeout(5*time.Second),
136-
idempotence.WithSuccessfulTTL(24*time.Hour),
137-
),
138-
alertMonitor: alertMonitor,
133+
idempotence: idempotence,
134+
alertMonitor: alertMonitor,
139135
}
140136
}
141137

internal/deliverymq/messagehandler_test.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/hookdeck/outpost/internal/backoff"
1111
"github.com/hookdeck/outpost/internal/deliverymq"
1212
"github.com/hookdeck/outpost/internal/destregistry"
13+
"github.com/hookdeck/outpost/internal/idempotence"
1314
"github.com/hookdeck/outpost/internal/idgen"
1415
"github.com/hookdeck/outpost/internal/models"
1516
"github.com/hookdeck/outpost/internal/util/testutil"
@@ -48,7 +49,6 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) {
4849
// Setup message handler
4950
handler := deliverymq.NewMessageHandler(
5051
testutil.CreateTestLogger(t),
51-
testutil.CreateTestRedisClient(t),
5252
newMockLogPublisher(nil),
5353
destGetter,
5454
eventGetter,
@@ -58,6 +58,7 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) {
5858
&backoff.ConstantBackoff{Interval: 1 * time.Second},
5959
10,
6060
alertMonitor,
61+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
6162
)
6263

6364
// Create and handle message
@@ -112,7 +113,6 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) {
112113
// Setup message handler
113114
handler := deliverymq.NewMessageHandler(
114115
testutil.CreateTestLogger(t),
115-
testutil.CreateTestRedisClient(t),
116116
logPublisher,
117117
destGetter,
118118
eventGetter,
@@ -122,6 +122,7 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) {
122122
&backoff.ConstantBackoff{Interval: 1 * time.Second},
123123
10,
124124
alertMonitor,
125+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
125126
)
126127

127128
// Create and handle message
@@ -173,7 +174,6 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) {
173174
// Setup message handler
174175
handler := deliverymq.NewMessageHandler(
175176
testutil.CreateTestLogger(t),
176-
testutil.CreateTestRedisClient(t),
177177
logPublisher,
178178
destGetter,
179179
eventGetter,
@@ -183,6 +183,7 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) {
183183
&backoff.ConstantBackoff{Interval: 1 * time.Second},
184184
10,
185185
alertMonitor,
186+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
186187
)
187188

188189
// Create and handle message
@@ -244,7 +245,6 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) {
244245
// Setup message handler
245246
handler := deliverymq.NewMessageHandler(
246247
testutil.CreateTestLogger(t),
247-
testutil.CreateTestRedisClient(t),
248248
logPublisher,
249249
destGetter,
250250
eventGetter,
@@ -254,6 +254,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) {
254254
&backoff.ConstantBackoff{Interval: 1 * time.Second},
255255
10,
256256
alertMonitor,
257+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
257258
)
258259

259260
// Create and handle message
@@ -318,7 +319,6 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) {
318319
// Setup message handler
319320
handler := deliverymq.NewMessageHandler(
320321
testutil.CreateTestLogger(t),
321-
testutil.CreateTestRedisClient(t),
322322
logPublisher,
323323
destGetter,
324324
eventGetter,
@@ -328,6 +328,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) {
328328
&backoff.ConstantBackoff{Interval: 1 * time.Second},
329329
10,
330330
alertMonitor,
331+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
331332
)
332333

333334
// Create and handle message
@@ -382,7 +383,6 @@ func TestMessageHandler_EventGetterError(t *testing.T) {
382383
// Setup message handler
383384
handler := deliverymq.NewMessageHandler(
384385
testutil.CreateTestLogger(t),
385-
testutil.CreateTestRedisClient(t),
386386
logPublisher,
387387
destGetter,
388388
eventGetter,
@@ -392,6 +392,7 @@ func TestMessageHandler_EventGetterError(t *testing.T) {
392392
&backoff.ConstantBackoff{Interval: 1 * time.Second},
393393
10,
394394
alertMonitor,
395+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
395396
)
396397

397398
// Create and handle message simulating a retry
@@ -448,7 +449,6 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
448449
// Setup message handler
449450
handler := deliverymq.NewMessageHandler(
450451
testutil.CreateTestLogger(t),
451-
testutil.CreateTestRedisClient(t),
452452
logPublisher,
453453
destGetter,
454454
eventGetter,
@@ -458,6 +458,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
458458
&backoff.ConstantBackoff{Interval: 1 * time.Second},
459459
10,
460460
newMockAlertMonitor(),
461+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
461462
)
462463

463464
// Create and handle message simulating a retry
@@ -517,7 +518,6 @@ func TestMessageHandler_Idempotency(t *testing.T) {
517518
redis := testutil.CreateTestRedisClient(t)
518519
handler := deliverymq.NewMessageHandler(
519520
testutil.CreateTestLogger(t),
520-
redis,
521521
logPublisher,
522522
destGetter,
523523
eventGetter,
@@ -527,6 +527,7 @@ func TestMessageHandler_Idempotency(t *testing.T) {
527527
&backoff.ConstantBackoff{Interval: 1 * time.Second},
528528
10,
529529
newMockAlertMonitor(),
530+
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
530531
)
531532

532533
// Create message with fixed ID for idempotency check
@@ -585,7 +586,6 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) {
585586
redis := testutil.CreateTestRedisClient(t)
586587
handler := deliverymq.NewMessageHandler(
587588
testutil.CreateTestLogger(t),
588-
redis,
589589
logPublisher,
590590
destGetter,
591591
eventGetter,
@@ -595,6 +595,7 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) {
595595
&backoff.ConstantBackoff{Interval: 1 * time.Second},
596596
10,
597597
newMockAlertMonitor(),
598+
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
598599
)
599600

600601
// Create retry message
@@ -662,7 +663,6 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) {
662663
// Setup message handler
663664
handler := deliverymq.NewMessageHandler(
664665
testutil.CreateTestLogger(t),
665-
testutil.CreateTestRedisClient(t),
666666
logPublisher,
667667
destGetter,
668668
eventGetter,
@@ -672,6 +672,7 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) {
672672
&backoff.ConstantBackoff{Interval: 1 * time.Second},
673673
10,
674674
alertMonitor,
675+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
675676
)
676677

677678
// Create and handle message
@@ -725,7 +726,6 @@ func TestMessageHandler_LogPublisherError(t *testing.T) {
725726
// Setup message handler
726727
handler := deliverymq.NewMessageHandler(
727728
testutil.CreateTestLogger(t),
728-
testutil.CreateTestRedisClient(t),
729729
logPublisher,
730730
destGetter,
731731
eventGetter,
@@ -735,6 +735,7 @@ func TestMessageHandler_LogPublisherError(t *testing.T) {
735735
&backoff.ConstantBackoff{Interval: 1 * time.Second},
736736
10,
737737
newMockAlertMonitor(),
738+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
738739
)
739740

740741
// Create and handle message
@@ -787,7 +788,6 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) {
787788
// Setup message handler
788789
handler := deliverymq.NewMessageHandler(
789790
testutil.CreateTestLogger(t),
790-
testutil.CreateTestRedisClient(t),
791791
logPublisher,
792792
destGetter,
793793
eventGetter,
@@ -797,6 +797,7 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) {
797797
&backoff.ConstantBackoff{Interval: 1 * time.Second},
798798
10,
799799
newMockAlertMonitor(),
800+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
800801
)
801802

802803
// Create and handle message
@@ -850,7 +851,6 @@ func TestManualDelivery_Success(t *testing.T) {
850851
// Setup message handler
851852
handler := deliverymq.NewMessageHandler(
852853
testutil.CreateTestLogger(t),
853-
testutil.CreateTestRedisClient(t),
854854
logPublisher,
855855
destGetter,
856856
eventGetter,
@@ -860,6 +860,7 @@ func TestManualDelivery_Success(t *testing.T) {
860860
&backoff.ConstantBackoff{Interval: 1 * time.Second},
861861
10,
862862
alertMonitor,
863+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
863864
)
864865

865866
// Create and handle message
@@ -924,7 +925,6 @@ func TestManualDelivery_PublishError(t *testing.T) {
924925
// Setup message handler
925926
handler := deliverymq.NewMessageHandler(
926927
testutil.CreateTestLogger(t),
927-
testutil.CreateTestRedisClient(t),
928928
logPublisher,
929929
destGetter,
930930
eventGetter,
@@ -934,6 +934,7 @@ func TestManualDelivery_PublishError(t *testing.T) {
934934
&backoff.ConstantBackoff{Interval: 1 * time.Second},
935935
10,
936936
alertMonitor,
937+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
937938
)
938939

939940
// Create and handle message
@@ -990,7 +991,6 @@ func TestManualDelivery_CancelError(t *testing.T) {
990991
// Setup message handler
991992
handler := deliverymq.NewMessageHandler(
992993
testutil.CreateTestLogger(t),
993-
testutil.CreateTestRedisClient(t),
994994
logPublisher,
995995
destGetter,
996996
eventGetter,
@@ -1000,6 +1000,7 @@ func TestManualDelivery_CancelError(t *testing.T) {
10001000
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10011001
10,
10021002
alertMonitor,
1003+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
10031004
)
10041005

10051006
// Create and handle message
@@ -1058,7 +1059,6 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) {
10581059
// Setup message handler
10591060
handler := deliverymq.NewMessageHandler(
10601061
testutil.CreateTestLogger(t),
1061-
testutil.CreateTestRedisClient(t),
10621062
logPublisher,
10631063
destGetter,
10641064
eventGetter,
@@ -1068,6 +1068,7 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) {
10681068
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10691069
10,
10701070
alertMonitor,
1071+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
10711072
)
10721073

10731074
// Create and handle message
@@ -1131,7 +1132,6 @@ func TestMessageHandler_PublishSuccess(t *testing.T) {
11311132
// Setup message handler
11321133
handler := deliverymq.NewMessageHandler(
11331134
testutil.CreateTestLogger(t),
1134-
testutil.CreateTestRedisClient(t),
11351135
logPublisher,
11361136
destGetter,
11371137
eventGetter,
@@ -1141,6 +1141,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) {
11411141
&backoff.ConstantBackoff{Interval: 1 * time.Second},
11421142
10,
11431143
alertMonitor,
1144+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
11441145
)
11451146

11461147
// Create and handle message
@@ -1192,7 +1193,6 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
11921193
// Setup message handler
11931194
handler := deliverymq.NewMessageHandler(
11941195
testutil.CreateTestLogger(t),
1195-
testutil.CreateTestRedisClient(t),
11961196
logPublisher,
11971197
destGetter,
11981198
eventGetter,
@@ -1202,6 +1202,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
12021202
&backoff.ConstantBackoff{Interval: 1 * time.Second},
12031203
10,
12041204
alertMonitor,
1205+
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
12051206
)
12061207

12071208
// Create and handle message

0 commit comments

Comments
 (0)