Skip to content

Commit 345bd98

Browse files
Allow the endpoint to be overridden so the library can be used with TCP proxies. (Azure#23843)
Fixes Azure#23668
1 parent f312915 commit 345bd98

File tree

12 files changed

+139
-14
lines changed

12 files changed

+139
-14
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# Release History
22

3-
## 1.2.4 (Unreleased)
3+
## 1.3.0 (Unreleased)
44

55
### Features Added
66

7+
- ProducerClient and ConsumerClient allow the endpoint to be overridden with CustomEndpoint, allowing the use of TCP proxies with AMQP.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/messaging/azeventhubs/consumer_client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ type ConsumerClientOptions struct {
2020
// ApplicationID is used as the identifier when setting the User-Agent property.
2121
ApplicationID string
2222

23+
// A custom endpoint address that can be used when establishing the connection to the service.
24+
CustomEndpoint string
25+
2326
// InstanceID is a unique name used to identify the consumer. This can help with
2427
// diagnostics as this name will be returned in error messages. By default,
2528
// an identifier will be automatically generated.
@@ -232,6 +235,10 @@ func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions)
232235
nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(options.ApplicationID))
233236
}
234237

238+
if options.CustomEndpoint != "" {
239+
nsOptions = append(nsOptions, internal.NamespaceWithCustomEndpoint(options.CustomEndpoint))
240+
}
241+
235242
nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(options.RetryOptions))
236243

237244
tempNS, err := internal.NewNamespace(nsOptions...)

sdk/messaging/azeventhubs/example_consumerclient_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,22 @@ func ExampleConsumerClient_NewPartitionClient_configuringPrefetch() {
151151
fmt.Printf("Body: %s\n", string(evt.Body))
152152
}
153153
}
154+
155+
func ExampleNewConsumerClient_usingCustomEndpoint() {
156+
// `DefaultAzureCredential` tries several common credential types. For more credential types
157+
// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
158+
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
159+
160+
if err != nil {
161+
panic(err)
162+
}
163+
164+
consumerClient, err = azeventhubs.NewConsumerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, &azeventhubs.ConsumerClientOptions{
165+
// A custom endpoint can be used when you need to connect to a TCP proxy.
166+
CustomEndpoint: "<address/hostname of TCP proxy>",
167+
})
168+
169+
if err != nil {
170+
panic(err)
171+
}
172+
}

sdk/messaging/azeventhubs/example_producerclient_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,22 @@ func ExampleProducerClient_GetPartitionProperties() {
180180
fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber)
181181
fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber)
182182
}
183+
184+
func ExampleNewProducerClient_usingCustomEndpoint() {
185+
// `DefaultAzureCredential` tries several common credential types. For more credential types
186+
// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
187+
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
188+
189+
if err != nil {
190+
panic(err)
191+
}
192+
193+
producerClient, err = azeventhubs.NewProducerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", defaultAzureCred, &azeventhubs.ProducerClientOptions{
194+
// A custom endpoint can be used when you need to connect to a TCP proxy.
195+
CustomEndpoint: "<address/hostname of TCP proxy>",
196+
})
197+
198+
if err != nil {
199+
panic(err)
200+
}
201+
}

sdk/messaging/azeventhubs/internal/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
package internal
55

66
// Version is the semantic version number
7-
const Version = "v1.2.4"
7+
const Version = "v1.3.0"

sdk/messaging/azeventhubs/internal/namespace.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type (
5555

5656
// newClientFn exists so we can stub out newClient for unit tests.
5757
newClientFn func(ctx context.Context, connID uint64) (amqpwrap.AMQPClient, error)
58+
59+
customEndpoint string
5860
}
5961

6062
// NamespaceOption provides structure for configuring a new Event Hub namespace
@@ -102,6 +104,17 @@ func NamespaceWithConnectionString(connStr string) NamespaceOption {
102104
}
103105
}
104106

107+
// NamespaceWithCustomEndpoint sets a custom endpoint, useful for when you're connecting through a TCP proxy.
108+
// When establishing a TCP connection we connect to this address. The audience is extracted from the
109+
// fullyQualifiedNamespace given to NamespaceWithTokenCredential or the endpoint in the connection string passed
110+
// to NamespaceWithConnectionString.
111+
func NamespaceWithCustomEndpoint(customEndpoint string) NamespaceOption {
112+
return func(ns *Namespace) error {
113+
ns.customEndpoint = customEndpoint
114+
return nil
115+
}
116+
}
117+
105118
// NamespaceWithTLSConfig appends to the TLS config.
106119
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption {
107120
return func(ns *Namespace) error {
@@ -170,6 +183,7 @@ func (ns *Namespace) newClientImpl(ctx context.Context, connID uint64) (amqpwrap
170183
"framework": runtime.Version(),
171184
"user-agent": ns.getUserAgent(),
172185
},
186+
HostName: ns.FQDN,
173187
}
174188

175189
if ns.tlsConfig != nil {
@@ -190,7 +204,7 @@ func (ns *Namespace) newClientImpl(ctx context.Context, connID uint64) (amqpwrap
190204
return &amqpwrap.AMQPClientWrapper{Inner: client, ConnID: connID}, err
191205
}
192206

193-
client, err := amqp.Dial(ctx, ns.getAMQPHostURI(), &connOptions)
207+
client, err := amqp.Dial(ctx, ns.getAMQPHostURI(true), &connOptions)
194208
return &amqpwrap.AMQPClientWrapper{Inner: client, ConnID: connID}, err
195209
}
196210

@@ -461,11 +475,17 @@ func (ns *Namespace) getWSSHostURI() string {
461475
return fmt.Sprintf("wss://%s/", ns.FQDN)
462476
}
463477

464-
func (ns *Namespace) getAMQPHostURI() string {
478+
func (ns *Namespace) getAMQPHostURI(useCustomEndpoint bool) string {
479+
fqdn := ns.FQDN
480+
481+
if useCustomEndpoint && ns.customEndpoint != "" {
482+
fqdn = ns.customEndpoint
483+
}
484+
465485
if ns.TokenProvider.InsecureDisableTLS {
466-
return fmt.Sprintf("amqp://%s/", ns.FQDN)
486+
return fmt.Sprintf("amqp://%s/", fqdn)
467487
} else {
468-
return fmt.Sprintf("amqps://%s/", ns.FQDN)
488+
return fmt.Sprintf("amqps://%s/", fqdn)
469489
}
470490
}
471491

@@ -474,7 +494,7 @@ func (ns *Namespace) GetHTTPSHostURI() string {
474494
}
475495

476496
func (ns *Namespace) GetEntityAudience(entityPath string) string {
477-
return ns.getAMQPHostURI() + entityPath
497+
return ns.getAMQPHostURI(false) + entityPath
478498
}
479499

480500
func (ns *Namespace) getUserAgent() string {

sdk/messaging/azeventhubs/producer_client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type ProducerClientOptions struct {
3131
// Application ID that will be passed to the namespace.
3232
ApplicationID string
3333

34+
// A custom endpoint address that can be used when establishing the connection to the service.
35+
CustomEndpoint string
36+
3437
// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
3538
// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
3639
NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error)
@@ -261,6 +264,10 @@ func newProducerClientImpl(creds producerClientCreds, options *ProducerClientOpt
261264
nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(options.ApplicationID))
262265
}
263266

267+
if options.CustomEndpoint != "" {
268+
nsOptions = append(nsOptions, internal.NamespaceWithCustomEndpoint(options.CustomEndpoint))
269+
}
270+
264271
nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(options.RetryOptions))
265272
}
266273

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# Release History
22

3-
## 1.7.4 (Unreleased)
3+
## 1.8.0 (Unreleased)
44

55
### Features Added
66

7+
- Client allows the endpoint to be overridden with CustomEndpoint, allowing the use of TCP proxies with AMQP.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/messaging/azservicebus/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ type ClientOptions struct {
4949
// Application ID that will be passed to the namespace.
5050
ApplicationID string
5151

52+
// A custom endpoint address that can be used when establishing the connection to the service.
53+
CustomEndpoint string
54+
5255
// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
5356
// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
5457
NewWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)
@@ -160,6 +163,10 @@ func newClientImpl(creds clientCreds, args clientImplArgs) (*Client, error) {
160163
nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(args.ClientOptions.ApplicationID))
161164
}
162165

166+
if args.ClientOptions.CustomEndpoint != "" {
167+
nsOptions = append(nsOptions, internal.NamespaceWithCustomEndpoint(args.ClientOptions.CustomEndpoint))
168+
}
169+
163170
nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(args.ClientOptions.RetryOptions))
164171
}
165172

sdk/messaging/azservicebus/example_client_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,28 @@ func Example_enablingLogging() {
9090
)
9191
}
9292

93+
func ExampleNewClient_usingCustomEndpoint() {
94+
// NOTE: If you'd like to authenticate using a Service Bus connection string
95+
// look at `NewClientFromConnectionString` instead.
96+
97+
// `DefaultAzureCredential` tries several common credential types. For more credential types
98+
// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
99+
credential, err := azidentity.NewDefaultAzureCredential(nil)
100+
101+
if err != nil {
102+
panic(err)
103+
}
104+
105+
client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, &azservicebus.ClientOptions{
106+
// A custom endpoint can be used when you need to connect to a TCP proxy.
107+
CustomEndpoint: "<address/hostname of TCP proxy>",
108+
})
109+
110+
if err != nil {
111+
panic(err)
112+
}
113+
}
114+
93115
// fakes for examples
94116
var endpoint string
95117
var tokenCredential azcore.TokenCredential

0 commit comments

Comments
 (0)