Skip to content

Commit 6b6811d

Browse files
Adding in raw AMQP message support (Azure#19156)
Enabling raw AMQP message support with two pieces: * ReceivedEventData now has a RawAMQPMessage, of type AMQPAnnotatedMessage. The AMQPAnnotatedMessage is the same as what we have in azservicebus except it doesn't need some unexported fields since we don't do message settlement. * EventDataBatch has a new function (AddAMQPAnnotatedMessage) which takes an AMQPAnnotatedMessage as a parameter. It functions exactly the same as it's EventData counterpart (AddEventData). There are a few new .go files for AMQP support, but they are largely duplicates of what we have in azservicebus.
1 parent 7e8768a commit 6b6811d

File tree

10 files changed

+593
-46
lines changed

10 files changed

+593
-46
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Raw AMQP message support, including full support for encoding Body (Value, Sequence and also multiple byte slices for Data). See ExampleEventDataBatch_AddEventData_rawAMQPMessages for some concrete examples.
8+
79
### Breaking Changes
810

911
- EventDataBatch.NumMessages() renamed to EventDataBatch.NumEvents()

sdk/messaging/azeventhubs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Examples for various scenarios can be found on [pkg.go.dev](https://pkg.go.dev/g
6767

6868
This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`.
6969

70-
Use the `azcore/log` package to control log event output or to enable logs for `azservicebus` only. For example:
70+
Use the `azcore/log` package to control log event output or to enable logs for `azeventhubs` only. For example:
7171

7272
```go
7373
import (
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azeventhubs
5+
6+
import (
7+
"time"
8+
9+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
10+
)
11+
12+
// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs.
13+
// For details about these properties, refer to the AMQP specification:
14+
//
15+
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
16+
//
17+
// Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some
18+
// cases slices and maps.
19+
//
20+
// AMQP simple types include:
21+
// - int (any size), uint (any size)
22+
// - float (any size)
23+
// - string
24+
// - bool
25+
// - time.Time
26+
type AMQPAnnotatedMessage struct {
27+
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
28+
//
29+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
30+
ApplicationProperties map[string]any
31+
32+
// Body represents the body of an AMQP message.
33+
Body AMQPAnnotatedMessageBody
34+
35+
// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
36+
//
37+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
38+
DeliveryAnnotations map[any]any
39+
40+
// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
41+
// for this message.
42+
DeliveryTag []byte
43+
44+
// Footer is the transport footers for this AMQP message.
45+
//
46+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
47+
Footer map[any]any
48+
49+
// Header is the transport headers for this AMQP message.
50+
Header *AMQPAnnotatedMessageHeader
51+
52+
// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
53+
//
54+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
55+
MessageAnnotations map[any]any
56+
57+
// Properties corresponds to the properties section of an AMQP message.
58+
Properties *AMQPAnnotatedMessageProperties
59+
}
60+
61+
// AMQPAnnotatedMessageProperties represents the properties of an AMQP message.
62+
// See here for more details:
63+
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
64+
type AMQPAnnotatedMessageProperties struct {
65+
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
66+
AbsoluteExpiryTime *time.Time
67+
68+
// ContentEncoding corresponds to the 'content-encoding' property.
69+
ContentEncoding *string
70+
71+
// ContentType corresponds to the 'content-type' property
72+
ContentType *string
73+
74+
// CorrelationID corresponds to the 'correlation-id' property.
75+
// The type of CorrelationID can be a uint64, UUID, []byte, or a string
76+
CorrelationID any
77+
78+
// CreationTime corresponds to the 'creation-time' property.
79+
CreationTime *time.Time
80+
81+
// GroupID corresponds to the 'group-id' property.
82+
GroupID *string
83+
84+
// GroupSequence corresponds to the 'group-sequence' property.
85+
GroupSequence *uint32
86+
87+
// MessageID corresponds to the 'message-id' property.
88+
// The type of MessageID can be a uint64, UUID, []byte, or string
89+
MessageID any
90+
91+
// ReplyTo corresponds to the 'reply-to' property.
92+
ReplyTo *string
93+
94+
// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
95+
ReplyToGroupID *string
96+
97+
// Subject corresponds to the 'subject' property.
98+
Subject *string
99+
100+
// To corresponds to the 'to' property.
101+
To *string
102+
103+
// UserID corresponds to the 'user-id' property.
104+
UserID []byte
105+
}
106+
107+
// AMQPAnnotatedMessageBody represents the body of an AMQP message.
108+
// Only one of these fields can be used a a time. They are mutually exclusive.
109+
type AMQPAnnotatedMessageBody struct {
110+
// Data is encoded/decoded as multiple data sections in the body.
111+
Data [][]byte
112+
113+
// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
114+
//
115+
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
116+
Sequence [][]any
117+
118+
// Value is encoded/decoded as the amqp-value section in the body.
119+
//
120+
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
121+
// as well as slices or maps of AMQP simple types.
122+
Value any
123+
}
124+
125+
// AMQPAnnotatedMessageHeader carries standard delivery details about the transfer
126+
// of a message.
127+
// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
128+
// for more details.
129+
type AMQPAnnotatedMessageHeader struct {
130+
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
131+
// It corresponds to the 'delivery-count' property.
132+
DeliveryCount uint32
133+
134+
// Durable corresponds to the 'durable' property.
135+
Durable bool
136+
137+
// FirstAcquirer corresponds to the 'first-acquirer' property.
138+
FirstAcquirer bool
139+
140+
// Priority corresponds to the 'priority' property.
141+
Priority uint8
142+
143+
// TTL corresponds to the 'ttl' property.
144+
TTL time.Duration
145+
}
146+
147+
// toAMQPMessage converts between our (azeventhubs) AMQP message
148+
// to the underlying message used by go-amqp.
149+
func (am *AMQPAnnotatedMessage) toAMQPMessage() *amqp.Message {
150+
var header *amqp.MessageHeader
151+
152+
if am.Header != nil {
153+
header = &amqp.MessageHeader{
154+
DeliveryCount: am.Header.DeliveryCount,
155+
Durable: am.Header.Durable,
156+
FirstAcquirer: am.Header.FirstAcquirer,
157+
Priority: am.Header.Priority,
158+
TTL: am.Header.TTL,
159+
}
160+
}
161+
162+
var properties *amqp.MessageProperties
163+
164+
if am.Properties != nil {
165+
properties = &amqp.MessageProperties{
166+
AbsoluteExpiryTime: am.Properties.AbsoluteExpiryTime,
167+
ContentEncoding: am.Properties.ContentEncoding,
168+
ContentType: am.Properties.ContentType,
169+
CorrelationID: am.Properties.CorrelationID,
170+
CreationTime: am.Properties.CreationTime,
171+
GroupID: am.Properties.GroupID,
172+
GroupSequence: am.Properties.GroupSequence,
173+
MessageID: am.Properties.MessageID,
174+
ReplyTo: am.Properties.ReplyTo,
175+
ReplyToGroupID: am.Properties.ReplyToGroupID,
176+
Subject: am.Properties.Subject,
177+
To: am.Properties.To,
178+
UserID: am.Properties.UserID,
179+
}
180+
} else {
181+
properties = &amqp.MessageProperties{}
182+
}
183+
184+
var footer amqp.Annotations
185+
186+
if am.Footer != nil {
187+
footer = (amqp.Annotations)(am.Footer)
188+
}
189+
190+
return &amqp.Message{
191+
Annotations: copyAnnotations(am.MessageAnnotations),
192+
ApplicationProperties: am.ApplicationProperties,
193+
Data: am.Body.Data,
194+
DeliveryAnnotations: amqp.Annotations(am.DeliveryAnnotations),
195+
DeliveryTag: am.DeliveryTag,
196+
Footer: footer,
197+
Header: header,
198+
Properties: properties,
199+
Sequence: am.Body.Sequence,
200+
Value: am.Body.Value,
201+
}
202+
}
203+
204+
func copyAnnotations(src map[any]any) amqp.Annotations {
205+
if src == nil {
206+
return amqp.Annotations{}
207+
}
208+
209+
dest := amqp.Annotations{}
210+
211+
for k, v := range src {
212+
dest[k] = v
213+
}
214+
215+
return dest
216+
}
217+
218+
func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
219+
var header *AMQPAnnotatedMessageHeader
220+
221+
if goAMQPMessage.Header != nil {
222+
header = &AMQPAnnotatedMessageHeader{
223+
DeliveryCount: goAMQPMessage.Header.DeliveryCount,
224+
Durable: goAMQPMessage.Header.Durable,
225+
FirstAcquirer: goAMQPMessage.Header.FirstAcquirer,
226+
Priority: goAMQPMessage.Header.Priority,
227+
TTL: goAMQPMessage.Header.TTL,
228+
}
229+
}
230+
231+
var properties *AMQPAnnotatedMessageProperties
232+
233+
if goAMQPMessage.Properties != nil {
234+
properties = &AMQPAnnotatedMessageProperties{
235+
AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime,
236+
ContentEncoding: goAMQPMessage.Properties.ContentEncoding,
237+
ContentType: goAMQPMessage.Properties.ContentType,
238+
CorrelationID: goAMQPMessage.Properties.CorrelationID,
239+
CreationTime: goAMQPMessage.Properties.CreationTime,
240+
GroupID: goAMQPMessage.Properties.GroupID,
241+
GroupSequence: goAMQPMessage.Properties.GroupSequence,
242+
MessageID: goAMQPMessage.Properties.MessageID,
243+
ReplyTo: goAMQPMessage.Properties.ReplyTo,
244+
ReplyToGroupID: goAMQPMessage.Properties.ReplyToGroupID,
245+
Subject: goAMQPMessage.Properties.Subject,
246+
To: goAMQPMessage.Properties.To,
247+
UserID: goAMQPMessage.Properties.UserID,
248+
}
249+
}
250+
251+
var footer map[any]any
252+
253+
if goAMQPMessage.Footer != nil {
254+
footer = (map[any]any)(goAMQPMessage.Footer)
255+
}
256+
257+
return &AMQPAnnotatedMessage{
258+
MessageAnnotations: map[any]any(goAMQPMessage.Annotations),
259+
ApplicationProperties: goAMQPMessage.ApplicationProperties,
260+
Body: AMQPAnnotatedMessageBody{
261+
Data: goAMQPMessage.Data,
262+
Sequence: goAMQPMessage.Sequence,
263+
Value: goAMQPMessage.Value,
264+
},
265+
DeliveryAnnotations: map[any]any(goAMQPMessage.DeliveryAnnotations),
266+
DeliveryTag: goAMQPMessage.DeliveryTag,
267+
Footer: footer,
268+
Header: header,
269+
Properties: properties,
270+
}
271+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azeventhubs
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestAMQPAnnotatedMessageUnitTest(t *testing.T) {
13+
t.Run("Default", func(t *testing.T) {
14+
msg := &AMQPAnnotatedMessage{}
15+
amqpMessage := msg.toAMQPMessage()
16+
17+
// we duplicate/inflate these since we modify them
18+
// in various parts of the API.
19+
require.NotNil(t, amqpMessage.Properties)
20+
require.NotNil(t, amqpMessage.Annotations)
21+
})
22+
}

sdk/messaging/azeventhubs/event_data.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ type ReceivedEventData struct {
5454
// Offset is the offset of the event.
5555
Offset *int64
5656

57+
// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
58+
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
59+
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
60+
// and Header fields.
61+
RawAMQPMessage *AMQPAnnotatedMessage
62+
5763
// SequenceNumber is a unique number assigned to a message by Event Hubs.
5864
SequenceNumber int64
5965

@@ -100,7 +106,9 @@ func (e *EventData) toAMQPMessage() *amqp.Message {
100106
// NOTE: this converter assumes that the Body of this message will be the first
101107
// serialized byte array in the Data section of the messsage.
102108
func newReceivedEventData(amqpMsg *amqp.Message) (*ReceivedEventData, error) {
103-
re := &ReceivedEventData{}
109+
re := &ReceivedEventData{
110+
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
111+
}
104112

105113
if len(amqpMsg.Data) == 1 {
106114
re.Body = amqpMsg.Data[0]

0 commit comments

Comments
 (0)