Skip to content

Commit d66d7b9

Browse files
authored
fix: Send system metadata to destrabbitmq & destawssqs (#297)
1 parent 8b16155 commit d66d7b9

File tree

5 files changed

+73
-21
lines changed

5 files changed

+73
-21
lines changed

internal/destregistry/basepublisher.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package destregistry
22

33
import (
4+
"fmt"
45
"sync"
56
"sync/atomic"
7+
"time"
8+
9+
"github.com/hookdeck/outpost/internal/models"
610
)
711

812
// BasePublisher provides common publisher functionality
@@ -30,3 +34,19 @@ func (p *BasePublisher) StartClose() {
3034
p.closed.Store(true)
3135
p.active.Wait()
3236
}
37+
38+
func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) map[string]string {
39+
systemMetadata := map[string]string{
40+
"timestamp": fmt.Sprintf("%d", timestamp.Unix()),
41+
"event-id": event.ID,
42+
"topic": event.Topic,
43+
}
44+
metadata := make(map[string]string)
45+
for k, v := range systemMetadata {
46+
metadata[k] = v
47+
}
48+
for k, v := range event.Metadata {
49+
metadata[k] = v
50+
}
51+
return metadata
52+
}

internal/destregistry/providers/destawssqs/destawssqs.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/url"
88
"strings"
9+
"time"
910

1011
awssdk "github.com/aws/aws-sdk-go-v2/aws"
1112
"github.com/aws/aws-sdk-go-v2/config"
@@ -139,18 +140,21 @@ func (p *AWSSQSPublisher) Format(ctx context.Context, event *models.Event) (*sqs
139140
return nil, err
140141
}
141142

142-
attrs := make(map[string]types.MessageAttributeValue)
143-
for k, v := range event.Metadata {
144-
attrs[k] = types.MessageAttributeValue{
145-
DataType: aws.String("String"),
146-
StringValue: aws.String(v),
147-
}
143+
metadata := p.BasePublisher.MakeMetadata(event, time.Now())
144+
metadataBytes, err := json.Marshal(metadata)
145+
if err != nil {
146+
return nil, err
148147
}
149148

150149
return &sqs.SendMessageInput{
151-
QueueUrl: awssdk.String(p.queueURL),
152-
MessageBody: awssdk.String(string(dataBytes)),
153-
MessageAttributes: attrs,
150+
QueueUrl: awssdk.String(p.queueURL),
151+
MessageBody: awssdk.String(string(dataBytes)),
152+
MessageAttributes: map[string]types.MessageAttributeValue{
153+
"metadata": {
154+
DataType: aws.String("String"),
155+
StringValue: aws.String(string(metadataBytes)),
156+
},
157+
},
154158
}, nil
155159
}
156160

internal/destregistry/providers/destawssqs/destawssqs_publish_test.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ package destawssqs_test
22

33
import (
44
"context"
5+
"encoding/json"
56
"testing"
67

78
"github.com/aws/aws-sdk-go-v2/aws"
89
"github.com/aws/aws-sdk-go-v2/service/sqs"
910
"github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs"
1011
testsuite "github.com/hookdeck/outpost/internal/destregistry/testing"
12+
"github.com/hookdeck/outpost/internal/models"
1113
"github.com/hookdeck/outpost/internal/util/awsutil"
1214
"github.com/hookdeck/outpost/internal/util/testinfra"
1315
"github.com/hookdeck/outpost/internal/util/testutil"
16+
"github.com/stretchr/testify/assert"
1417
"github.com/stretchr/testify/require"
1518
"github.com/stretchr/testify/suite"
1619
)
@@ -52,8 +55,10 @@ func (c *SQSConsumer) consume() {
5255

5356
for _, msg := range result.Messages {
5457
metadata := make(map[string]string)
55-
for k, v := range msg.MessageAttributes {
56-
metadata[k] = *v.StringValue
58+
if metaAttr, ok := msg.MessageAttributes["metadata"]; ok {
59+
if err := json.Unmarshal([]byte(*metaAttr.StringValue), &metadata); err != nil {
60+
continue
61+
}
5762
}
5863

5964
c.msgChan <- testsuite.Message{
@@ -81,6 +86,23 @@ func (c *SQSConsumer) Close() error {
8186
return nil
8287
}
8388

89+
type SQSAsserter struct{}
90+
91+
func (a *SQSAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) {
92+
// Metadata is already parsed in the consumer
93+
metadata := msg.Metadata
94+
95+
// Verify system metadata
96+
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
97+
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
98+
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")
99+
100+
// Verify custom metadata
101+
for k, v := range event.Metadata {
102+
assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k)
103+
}
104+
}
105+
84106
type AWSSQSSuite struct {
85107
testsuite.PublisherSuite
86108
consumer *SQSConsumer
@@ -122,11 +144,12 @@ func (s *AWSSQSSuite) SetupSuite() {
122144
}),
123145
)
124146

125-
// Initialize publisher suite
147+
// Initialize publisher suite with custom asserter
126148
cfg := testsuite.Config{
127149
Provider: provider,
128150
Dest: &destination,
129151
Consumer: s.consumer,
152+
Asserter: &SQSAsserter{},
130153
}
131154
s.InitSuite(cfg)
132155
}

internal/destregistry/providers/destrabbitmq/destrabbitmq.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"time"
910

1011
"github.com/hookdeck/outpost/internal/destregistry"
1112
"github.com/hookdeck/outpost/internal/destregistry/metadata"
@@ -150,7 +151,8 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (*
150151
}
151152

152153
headers := make(amqp091.Table)
153-
for k, v := range event.Metadata {
154+
metadata := p.BasePublisher.MakeMetadata(event, time.Now())
155+
for k, v := range metadata {
154156
headers[k] = v
155157
}
156158

internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,17 @@ func (a *RabbitMQAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mes
143143
// Assert RabbitMQ-specific properties
144144
assert.Equal(t, "application/json", delivery.ContentType)
145145
assert.Equal(t, event.Topic, delivery.RoutingKey, "routing key should match event topic")
146-
// assert.NotEmpty(t, delivery.MessageId)
147-
// assert.NotEmpty(t, delivery.Timestamp)
148-
149-
// Could add more RabbitMQ-specific assertions:
150-
// - Exchange routing
151-
// - Message persistence
152-
// - Priority
153-
// - etc.
146+
147+
// Verify system metadata
148+
metadata := msg.Metadata
149+
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
150+
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
151+
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")
152+
153+
// Verify custom metadata
154+
for k, v := range event.Metadata {
155+
assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k)
156+
}
154157
}
155158

156159
// RabbitMQPublishSuite reimplements the publish tests using the shared test suite

0 commit comments

Comments
 (0)