Skip to content

Commit 62042b0

Browse files
authored
refactor: Implement destination registry (#123)
* refactor: Inverse dependency between models & destinationadapter * refactor: Rename & update interface of destinationadapter to destregistry * refactor: Implement destination registry interface
1 parent b5e1b6e commit 62042b0

File tree

23 files changed

+392
-343
lines changed

23 files changed

+392
-343
lines changed

internal/deliverymq/messagehandler.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/google/uuid"
99
"github.com/hookdeck/outpost/internal/backoff"
1010
"github.com/hookdeck/outpost/internal/consumer"
11+
"github.com/hookdeck/outpost/internal/destregistry"
1112
"github.com/hookdeck/outpost/internal/eventtracer"
1213
"github.com/hookdeck/outpost/internal/idempotence"
1314
"github.com/hookdeck/outpost/internal/logmq"
@@ -29,6 +30,7 @@ type messageHandler struct {
2930
retryBackoff backoff.Backoff
3031
retryMaxCount int
3132
idempotence idempotence.Idempotence
33+
registry destregistry.Registry
3234
}
3335

3436
var _ consumer.MessageHandler = (*messageHandler)(nil)
@@ -39,6 +41,7 @@ func NewMessageHandler(
3941
logMQ *logmq.LogMQ,
4042
entityStore models.EntityStore,
4143
logStore models.LogStore,
44+
registry destregistry.Registry,
4245
eventTracer eventtracer.EventTracer,
4346
retryScheduler scheduler.Scheduler,
4447
retryBackoff backoff.Backoff,
@@ -50,6 +53,7 @@ func NewMessageHandler(
5053
logMQ: logMQ,
5154
entityStore: entityStore,
5255
logStore: logStore,
56+
registry: registry,
5357
retryScheduler: retryScheduler,
5458
retryBackoff: retryBackoff,
5559
retryMaxCount: retryMaxCount,
@@ -103,10 +107,16 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
103107
span.RecordError(errors.New("destination not found"))
104108
return err
105109
}
106-
err = destination.Publish(ctx, &deliveryEvent.Event)
110+
provider, err := h.registry.GetProvider(destination.Type)
107111
if err != nil {
108-
logger.Error("failed to publish event", zap.Error(err))
112+
logger.Error("failed to get destination provider", zap.Error(err))
109113
span.RecordError(err)
114+
return err
115+
}
116+
var finalErr error
117+
if err := provider.Publish(ctx, destination, &deliveryEvent.Event); err != nil {
118+
logger.Error("failed to publish event", zap.Error(err))
119+
finalErr = err
110120
deliveryEvent.Delivery = &models.Delivery{
111121
ID: uuid.New().String(),
112122
DeliveryEventID: deliveryEvent.ID,
@@ -127,11 +137,17 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
127137
}
128138
logErr := h.logMQ.Publish(ctx, deliveryEvent)
129139
if logErr != nil {
130-
logger.Error("failed to publish log event", zap.Error(err))
131-
span.RecordError(err)
132-
err = errors.Join(err, logErr)
140+
logger.Error("failed to publish log event", zap.Error(logErr))
141+
if finalErr == nil {
142+
finalErr = logErr
143+
} else {
144+
finalErr = errors.Join(finalErr, logErr)
145+
}
133146
}
134-
return err
147+
if finalErr != nil {
148+
span.RecordError(finalErr)
149+
}
150+
return finalErr
135151
}
136152

137153
// shouldRetry checks if the event should be retried.
@@ -146,7 +162,7 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
146162
// say logmq.Publish fails. Should that count as an attempt? What about an error BEFORE deliverying the message?
147163
// Should we write code to differentiate between these two types of errors (predeliveryErr and postdeliveryErr, for example)?
148164
func (h *messageHandler) shouldRetry(err error, deliveryEvent models.DeliveryEvent) bool {
149-
_, isPublishErr := err.(*models.DestinationPublishError)
165+
_, isPublishErr := err.(*destregistry.ErrDestinationPublish)
150166
if !isPublishErr {
151167
return true
152168
}

internal/deliverymq/retry_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) {
7979
logMQ,
8080
s.entityStore,
8181
s.logStore,
82+
testutil.Registry,
8283
testutil.NewMockEventTracer(s.exporter),
8384
retryScheduler,
8485
&backoff.ConstantBackoff{Interval: 1 * time.Second},

internal/destinationadapter/adapters/adapter.go

Lines changed: 0 additions & 29 deletions
This file was deleted.

internal/destinationadapter/destinationadapter.go

Lines changed: 0 additions & 25 deletions
This file was deleted.

internal/destregistry/error.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package destregistry
2+
3+
import "fmt"
4+
5+
func NewErrDestinationValidation(err error) error {
6+
return fmt.Errorf("validation failed: %w", err)
7+
}
8+
9+
type ErrDestinationPublish struct {
10+
Err error
11+
}
12+
13+
var _ error = &ErrDestinationPublish{}
14+
15+
func (e *ErrDestinationPublish) Error() string {
16+
return e.Err.Error()
17+
}
18+
19+
func NewErrDestinationPublish(err error) error {
20+
return &ErrDestinationPublish{Err: err}
21+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package destregistrydefault
2+
3+
import (
4+
"github.com/hookdeck/outpost/internal/destregistry"
5+
"github.com/hookdeck/outpost/internal/destregistry/providers/destaws"
6+
"github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq"
7+
"github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook"
8+
)
9+
10+
func RegisterDefault(registry destregistry.Registry) {
11+
registry.RegisterProvider("aws", destaws.New())
12+
registry.RegisterProvider("rabbitmq", destrabbitmq.New())
13+
registry.RegisterProvider("webhook", destwebhook.New())
14+
}

internal/destinationadapter/adapters/aws/aws.go renamed to internal/destregistry/providers/destaws/destaws.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package aws
1+
package destaws
22

33
import (
44
"context"
@@ -11,7 +11,8 @@ import (
1111
"github.com/aws/aws-sdk-go-v2/service/sqs"
1212
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
1313
"github.com/aws/aws-sdk-go/aws"
14-
"github.com/hookdeck/outpost/internal/destinationadapter/adapters"
14+
"github.com/hookdeck/outpost/internal/destregistry"
15+
"github.com/hookdeck/outpost/internal/models"
1516
)
1617

1718
type AWSDestination struct {
@@ -28,34 +29,39 @@ type AWSDestinationCredentials struct {
2829
Session string // optional
2930
}
3031

31-
var _ adapters.DestinationAdapter = (*AWSDestination)(nil)
32+
var _ destregistry.Provider = (*AWSDestination)(nil)
3233

3334
func New() *AWSDestination {
3435
return &AWSDestination{}
3536
}
3637

37-
func (d *AWSDestination) Validate(ctx context.Context, destination adapters.DestinationAdapterValue) error {
38+
func (d *AWSDestination) Validate(ctx context.Context, destination *models.Destination) error {
3839
_, err := parseConfig(destination)
3940
if err != nil {
40-
return err
41+
return destregistry.NewErrDestinationValidation(err)
4142
}
42-
_, err = parseCredentials(destination)
43-
return err
43+
if _, err = parseCredentials(destination); err != nil {
44+
return destregistry.NewErrDestinationValidation(err)
45+
}
46+
return nil
4447
}
4548

46-
func (d *AWSDestination) Publish(ctx context.Context, destination adapters.DestinationAdapterValue, event *adapters.Event) error {
49+
func (d *AWSDestination) Publish(ctx context.Context, destination *models.Destination, event *models.Event) error {
4750
config, err := parseConfig(destination)
4851
if err != nil {
49-
return err
52+
return destregistry.NewErrDestinationPublish(err)
5053
}
5154
credentials, err := parseCredentials(destination)
5255
if err != nil {
53-
return err
56+
return destregistry.NewErrDestinationPublish(err)
57+
}
58+
if err := publishEvent(ctx, config, credentials, event); err != nil {
59+
return destregistry.NewErrDestinationPublish(err)
5460
}
55-
return publishEvent(ctx, config, credentials, event)
61+
return nil
5662
}
5763

58-
func parseConfig(destination adapters.DestinationAdapterValue) (*AWSDestinationConfig, error) {
64+
func parseConfig(destination *models.Destination) (*AWSDestinationConfig, error) {
5965
if destination.Type != "aws" {
6066
return nil, errors.New("invalid destination type")
6167
}
@@ -72,7 +78,7 @@ func parseConfig(destination adapters.DestinationAdapterValue) (*AWSDestinationC
7278
return destinationConfig, nil
7379
}
7480

75-
func parseCredentials(destination adapters.DestinationAdapterValue) (*AWSDestinationCredentials, error) {
81+
func parseCredentials(destination *models.Destination) (*AWSDestinationCredentials, error) {
7682
if destination.Type != "aws" {
7783
return nil, errors.New("invalid destination type")
7884
}
@@ -94,7 +100,7 @@ func parseCredentials(destination adapters.DestinationAdapterValue) (*AWSDestina
94100
return destinationCredentials, nil
95101
}
96102

97-
func publishEvent(ctx context.Context, cfg *AWSDestinationConfig, creds *AWSDestinationCredentials, event *adapters.Event) error {
103+
func publishEvent(ctx context.Context, cfg *AWSDestinationConfig, creds *AWSDestinationCredentials, event *models.Event) error {
98104
dataBytes, err := json.Marshal(event.Data)
99105
if err != nil {
100106
return err

0 commit comments

Comments
 (0)