Skip to content

Commit 6a45e8e

Browse files
abossardfamartingclaudedapr-botyaron2
authored
Supporting more properties for rabbitmq (#3806)
Signed-off-by: Andre Bossard <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Co-authored-by: Andre Bossard <[email protected]> Co-authored-by: Fabian Martinez <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Dapr Bot <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent b01c4b4 commit 6a45e8e

File tree

16 files changed

+826
-104
lines changed

16 files changed

+826
-104
lines changed

bindings/rabbitmq/rabbitmq.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
amqp "github.com/rabbitmq/amqp091-go"
3232

3333
"github.com/dapr/components-contrib/bindings"
34+
common "github.com/dapr/components-contrib/common/component/rabbitmq"
3435
"github.com/dapr/components-contrib/metadata"
3536
"github.com/dapr/kit/logger"
3637
kitmd "github.com/dapr/kit/metadata"
@@ -228,11 +229,6 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
228229
pub.Headers[k] = v
229230
}
230231

231-
contentType, ok := metadata.TryGetContentType(req.Metadata)
232-
if ok {
233-
pub.ContentType = contentType
234-
}
235-
236232
// The default time to live has been set in the queue
237233
// We allow overriding on each call, by setting a value in request metadata
238234
ttl, ok, err := metadata.TryGetTTL(req.Metadata)
@@ -252,6 +248,8 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
252248
pub.Priority = priority
253249
}
254250

251+
common.ApplyMetadataToPublishing(req.Metadata, &pub)
252+
255253
err = ch.PublishWithContext(ctx, "", r.metadata.QueueName, false, false, pub)
256254
if err != nil {
257255
return nil, fmt.Errorf("failed to publish message: %w", err)
@@ -473,9 +471,9 @@ func (r *RabbitMQ) handleMessage(ctx context.Context, handler bindings.Handler,
473471
// Passthrough any custom metadata to the handler.
474472
for k, v := range d.Headers {
475473
if s, ok := v.(string); ok {
476-
// Escape the key and value to ensure they are valid URL query parameters.
474+
// Escape the key to ensure they are valid URL query parameters.
477475
// This is necessary for them to be sent as HTTP Metadata.
478-
metadata[url.QueryEscape(k)] = url.QueryEscape(s)
476+
metadata[url.QueryEscape(k)] = s
479477
}
480478
}
481479

bindings/rabbitmq/rabbitmq_integration_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,3 +447,67 @@ func TestPublishWithHeaders(t *testing.T) {
447447
// assert.Contains(t, msg.Header, "custom_header1")
448448
// assert.Contains(t, msg.Header, "custom_header2")
449449
}
450+
451+
func TestPublishMetadataProperties(t *testing.T) {
452+
rabbitmqHost := getTestRabbitMQHost()
453+
require.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s'", testRabbitMQHostEnvKey))
454+
455+
queueName := uuid.New().String()
456+
durable := true
457+
exclusive := false
458+
459+
metadata := bindings.Metadata{
460+
Base: contribMetadata.Base{
461+
Name: "testQueue",
462+
Properties: map[string]string{
463+
"queueName": queueName,
464+
"host": rabbitmqHost,
465+
"deleteWhenUnused": strconv.FormatBool(exclusive),
466+
"durable": strconv.FormatBool(durable),
467+
},
468+
},
469+
}
470+
471+
logger := logger.NewLogger("test")
472+
r := NewRabbitMQ(logger).(*RabbitMQ)
473+
err := r.Init(t.Context(), metadata)
474+
require.NoError(t, err)
475+
476+
conn, err := amqp.Dial(rabbitmqHost)
477+
require.NoError(t, err)
478+
defer conn.Close()
479+
480+
ch, err := conn.Channel()
481+
require.NoError(t, err)
482+
defer ch.Close()
483+
484+
const messageData = "test message"
485+
const msgID = "msg-123"
486+
const corrID = "corr-456"
487+
const msgType = "testType"
488+
const contentType = "application/json"
489+
490+
writeRequest := bindings.InvokeRequest{
491+
Data: []byte(messageData),
492+
Metadata: map[string]string{
493+
"messageID": msgID,
494+
"correlationID": corrID,
495+
"type": msgType,
496+
"contentType": contentType,
497+
},
498+
}
499+
_, err = r.Invoke(t.Context(), &writeRequest)
500+
require.NoError(t, err)
501+
502+
// Retrieve the message.
503+
msg, ok, err := getMessageWithRetries(ch, queueName, 2*time.Second)
504+
require.NoError(t, err)
505+
assert.True(t, ok)
506+
assert.Equal(t, messageData, string(msg.Body))
507+
assert.Equal(t, msgID, msg.MessageId)
508+
assert.Equal(t, corrID, msg.CorrelationId)
509+
assert.Equal(t, msgType, msg.Type)
510+
assert.Equal(t, contentType, msg.ContentType)
511+
512+
require.NoError(t, r.Close())
513+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package rabbitmq
2+
3+
import (
4+
"strings"
5+
6+
amqp "github.com/rabbitmq/amqp091-go"
7+
)
8+
9+
const (
10+
MetadataKeyMessageID = "messageID"
11+
MetadataKeyCorrelationID = "correlationID"
12+
MetadataKeyContentType = "contentType"
13+
MetadataKeyType = "type"
14+
MetadataKeyPriority = "priority"
15+
MetadataKeyTTL = "ttl"
16+
)
17+
18+
// TryGetProperty finds a property value using case-insensitive matching
19+
func TryGetProperty(props map[string]string, key string) (string, bool) {
20+
// First try exact match
21+
if val, ok := props[key]; ok && val != "" {
22+
return val, true
23+
}
24+
25+
// Then try case-insensitive match
26+
for k, v := range props {
27+
if v != "" && strings.EqualFold(key, k) {
28+
return v, true
29+
}
30+
}
31+
32+
return "", false
33+
}
34+
35+
// ApplyMetadataToPublishing applies common metadata fields to an AMQP publishing
36+
func ApplyMetadataToPublishing(metadata map[string]string, publishing *amqp.Publishing) {
37+
if contentType, ok := TryGetProperty(metadata, MetadataKeyContentType); ok {
38+
publishing.ContentType = contentType
39+
}
40+
41+
if messageID, ok := TryGetProperty(metadata, MetadataKeyMessageID); ok {
42+
publishing.MessageId = messageID
43+
}
44+
45+
if correlationID, ok := TryGetProperty(metadata, MetadataKeyCorrelationID); ok {
46+
publishing.CorrelationId = correlationID
47+
}
48+
49+
if aType, ok := TryGetProperty(metadata, MetadataKeyType); ok {
50+
publishing.Type = aType
51+
}
52+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package rabbitmq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestTryGetProperty(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
props map[string]string
13+
key string
14+
expected string
15+
found bool
16+
}{
17+
{
18+
name: "exact match",
19+
props: map[string]string{"messageID": "test-id"},
20+
key: "messageID",
21+
expected: "test-id",
22+
found: true,
23+
},
24+
{
25+
name: "case insensitive match",
26+
props: map[string]string{"messageid": "test-id"},
27+
key: "messageID",
28+
expected: "test-id",
29+
found: true,
30+
},
31+
{
32+
name: "uppercase match",
33+
props: map[string]string{"MESSAGEID": "test-id"},
34+
key: "messageID",
35+
expected: "test-id",
36+
found: true,
37+
},
38+
{
39+
name: "not found",
40+
props: map[string]string{"otherKey": "value"},
41+
key: "messageID",
42+
expected: "",
43+
found: false,
44+
},
45+
{
46+
name: "empty value",
47+
props: map[string]string{"messageID": ""},
48+
key: "messageID",
49+
expected: "",
50+
found: false,
51+
},
52+
{
53+
name: "whitespace value",
54+
props: map[string]string{"messageID": " "},
55+
key: "messageID",
56+
expected: " ",
57+
found: true,
58+
},
59+
}
60+
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
value, found := TryGetProperty(tt.props, tt.key)
64+
assert.Equal(t, tt.expected, value)
65+
assert.Equal(t, tt.found, found)
66+
})
67+
}
68+
}

conversation/deepseek/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ status: alpha
77
title: "Deepseek"
88
urls:
99
- title: Reference
10-
url: https://docs.dapr.io/reference/components-reference/supported-conversation/deepseek/
10+
url: https://docs.dapr.io/reference/components-reference/supported-conversation/setup-deepseek/
1111
authenticationProfiles:
1212
- title: "API Key"
1313
description: "Authenticate using an API key"

metadata/utils.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,6 @@ func IsRawPayload(props map[string]string) (bool, error) {
114114
return false, nil
115115
}
116116

117-
func TryGetContentType(props map[string]string) (string, bool) {
118-
if val, ok := props[ContentType]; ok && val != "" {
119-
return val, true
120-
}
121-
122-
return "", false
123-
}
124-
125117
func TryGetQueryIndexName(props map[string]string) (string, bool) {
126118
if val, ok := props[QueryIndexName]; ok && val != "" {
127119
return val, true

metadata/utils_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -175,34 +175,6 @@ func TestIsRawPayload(t *testing.T) {
175175
})
176176
}
177177

178-
func TestTryGetContentType(t *testing.T) {
179-
t.Run("Metadata without content type", func(t *testing.T) {
180-
val, ok := TryGetContentType(map[string]string{})
181-
182-
assert.Equal(t, "", val)
183-
assert.False(t, ok)
184-
})
185-
186-
t.Run("Metadata with empty content type", func(t *testing.T) {
187-
val, ok := TryGetContentType(map[string]string{
188-
"contentType": "",
189-
})
190-
191-
assert.Equal(t, "", val)
192-
assert.False(t, ok)
193-
})
194-
195-
t.Run("Metadata with corrent content type", func(t *testing.T) {
196-
const contentType = "application/cloudevent+json"
197-
val, ok := TryGetContentType(map[string]string{
198-
"contentType": contentType,
199-
})
200-
201-
assert.Equal(t, contentType, val)
202-
assert.True(t, ok)
203-
})
204-
}
205-
206178
func TestMetadataStructToStringMap(t *testing.T) {
207179
t.Run("Test metadata struct to metadata info conversion", func(t *testing.T) {
208180
type NestedStruct struct {

0 commit comments

Comments
 (0)