Skip to content

Commit 9acec1c

Browse files
authored
Added support for AmqpWebSockets (#153)
* Added support for AmqpWebSockets * EventHub + ServiceBus ver. 4.4
1 parent 7a4424a commit 9acec1c

File tree

10 files changed

+108
-24
lines changed

10 files changed

+108
-24
lines changed

src/NLog.Extensions.AzureEventHub/EventHubTarget.cs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,21 @@ public class EventHubTarget : AsyncTaskTarget
120120
/// </summary>
121121
public Layout AccessKey { get; set; }
122122

123+
/// <summary>
124+
/// The connection uses the AMQP protocol over web sockets. See also <see cref="EventHubsTransportType.AmqpWebSockets"/>
125+
/// </summary>
126+
public Layout UseWebSockets { get; set; }
127+
128+
/// <summary>
129+
/// The proxy to use for communication over web sockets.
130+
/// </summary>
131+
public Layout WebSocketProxyAddress { get; set; }
132+
133+
/// <summary>
134+
/// Custom endpoint address that can be used when establishing the connection.
135+
/// </summary>
136+
public Layout CustomEndpointAddress { get; set; }
137+
123138
/// <summary>
124139
/// Gets a list of user properties (aka custom properties) to add to the AMQP message
125140
/// </summary>
@@ -164,6 +179,9 @@ protected override void InitializeTarget()
164179
string storageAccountName = string.Empty;
165180
string storageAccountAccessKey = string.Empty;
166181
string eventHubName = string.Empty;
182+
string useWebSockets = string.Empty;
183+
string webSocketProxyAddress = string.Empty;
184+
string customEndPointAddress = string.Empty;
167185

168186
var defaultLogEvent = LogEventInfo.CreateNullEvent();
169187

@@ -182,7 +200,15 @@ protected override void InitializeTarget()
182200
storageAccountAccessKey = AccessKey?.Render(defaultLogEvent);
183201
}
184202

185-
_eventHubService.Connect(connectionString, eventHubName, serviceUri, tenantIdentity, resourceIdentifier, clientIdentity, sharedAccessSignature, storageAccountName, storageAccountAccessKey);
203+
useWebSockets = UseWebSockets?.Render(defaultLogEvent) ?? string.Empty;
204+
if (!string.IsNullOrEmpty(useWebSockets) && (string.Equals(useWebSockets.Trim(), bool.TrueString, StringComparison.OrdinalIgnoreCase) || string.Equals(useWebSockets.Trim(), "1", StringComparison.OrdinalIgnoreCase)))
205+
{
206+
useWebSockets = bool.TrueString;
207+
}
208+
customEndPointAddress = CustomEndpointAddress?.Render(defaultLogEvent) ?? string.Empty;
209+
webSocketProxyAddress = WebSocketProxyAddress?.Render(defaultLogEvent) ?? string.Empty;
210+
211+
_eventHubService.Connect(connectionString, eventHubName, serviceUri, tenantIdentity, resourceIdentifier, clientIdentity, sharedAccessSignature, storageAccountName, storageAccountAccessKey, bool.TrueString == useWebSockets, webSocketProxyAddress, customEndPointAddress);
186212
InternalLogger.Debug("AzureEventHubTarget(Name={0}): Initialized", Name);
187213
}
188214
catch (Exception ex)
@@ -470,33 +496,42 @@ private sealed class EventHubService : IEventHubService
470496

471497
public string EventHubName { get; private set; }
472498

473-
public void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey)
499+
public void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webSocketsProxyAddress, string endPointAddress)
474500
{
475501
EventHubName = eventHubName;
476502

503+
Azure.Messaging.EventHubs.Producer.EventHubProducerClientOptions options = default;
504+
if (useWebSockets || !string.IsNullOrEmpty(webSocketsProxyAddress) || !string.IsNullOrEmpty(endPointAddress))
505+
{
506+
options = new Azure.Messaging.EventHubs.Producer.EventHubProducerClientOptions();
507+
options.ConnectionOptions.TransportType = useWebSockets ? EventHubsTransportType.AmqpWebSockets : options.ConnectionOptions.TransportType;
508+
options.ConnectionOptions.Proxy = !string.IsNullOrEmpty(webSocketsProxyAddress) ? new System.Net.WebProxy(webSocketsProxyAddress, true) : options.ConnectionOptions.Proxy;
509+
options.ConnectionOptions.CustomEndpointAddress = !string.IsNullOrEmpty(endPointAddress) ? new Uri(endPointAddress) : options.ConnectionOptions.CustomEndpointAddress;
510+
}
511+
477512
if (string.IsNullOrWhiteSpace(serviceUri))
478513
{
479514
if (string.IsNullOrWhiteSpace(eventHubName))
480515
{
481-
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(connectionString);
516+
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(connectionString, options);
482517
}
483518
else
484519
{
485-
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(connectionString, eventHubName);
520+
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(connectionString, eventHubName, options);
486521
}
487522
}
488523
else if (!string.IsNullOrWhiteSpace(sharedAccessSignature))
489524
{
490-
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, new Azure.AzureSasCredential(sharedAccessSignature));
525+
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, new Azure.AzureSasCredential(sharedAccessSignature), options);
491526
}
492527
else if (!string.IsNullOrWhiteSpace(storageAccountName))
493528
{
494-
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, new Azure.AzureNamedKeyCredential(storageAccountName, storageAccountAccessKey));
529+
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, new Azure.AzureNamedKeyCredential(storageAccountName, storageAccountAccessKey), options);
495530
}
496531
else
497532
{
498533
var tokenCredentials = AzureCredentialHelpers.CreateTokenCredentials(clientIdentity, tenantIdentity, resourceIdentifier);
499-
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, tokenCredentials);
534+
_client = new Azure.Messaging.EventHubs.Producer.EventHubProducerClient(serviceUri, eventHubName, tokenCredentials, options);
500535
}
501536
}
502537

src/NLog.Extensions.AzureEventHub/IEventHubService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace NLog.Extensions.AzureStorage
88
internal interface IEventHubService
99
{
1010
string EventHubName { get; }
11-
void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey);
11+
void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webSocketsProxyAddress, string endPointAddress);
1212
Task CloseAsync();
1313
Task SendAsync(IEnumerable<EventData> eventDataBatch, string partitionKey, CancellationToken cancellationToken);
1414
}

src/NLog.Extensions.AzureEventHub/NLog.Extensions.AzureEventHub.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
<PropertyGroup>
44
<TargetFrameworks>netstandard2.0</TargetFrameworks>
55
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
6-
<DefineConstants>$(DefineConstants);NETCORE</DefineConstants>
76

8-
<Version>4.3.1</Version>
7+
<Version>4.4.0</Version>
98

109
<Description>NLog EventHubTarget for writing to Azure Event Hubs datastreams</Description>
1110
<Authors>jdetmar</Authors>
@@ -19,8 +18,9 @@
1918
<RepositoryUrl>https://github.com/JDetmar/NLog.Extensions.AzureStorage.git</RepositoryUrl>
2019
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2120
<PackageReleaseNotes>
22-
- Updated dependency Azure.Messaging.EventHubs v5.9.3
23-
- Updated dependency Azure.Identity v1.10.3
21+
- Added option UseWebSockets (true / false)
22+
- Added option WebSocketProxyAddress (Uri)
23+
- Added option CustomEndpointAddress (Uri)
2424

2525
Docs: https://github.com/JDetmar/NLog.Extensions.AzureStorage/blob/master/src/NLog.Extensions.AzureEventHub/README.md
2626
</PackageReleaseNotes>

src/NLog.Extensions.AzureEventHub/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ _messageId_ - EventData MessageId. [Layout](https://github.com/NLog/NLog/wiki/La
5252

5353
_correlationId_ - EventData Correlationid. [Layout](https://github.com/NLog/NLog/wiki/Layouts)
5454

55+
_useWebSockets_ - Enable AmqpWebSockets. Ex. true/false (optional)
56+
57+
_webSocketProxyAddress_ - Custom WebProxy address for WebSockets (optional)
58+
59+
_customEndpointAddress_ - Custom endpoint address that can be used when establishing the connection (optional)
60+
5561
_serviceUri_ - Alternative to ConnectionString, where Managed Identiy is applied from DefaultAzureCredential for User delegation SAS.
5662

5763
_tenantIdentity_ - Alternative to ConnectionString. Used together with ServiceUri. Input for DefaultAzureCredential.

src/NLog.Extensions.AzureServiceBus/ICloudServiceBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ internal interface ICloudServiceBus
1010
{
1111
string EntityPath { get; }
1212
TimeSpan? DefaultTimeToLive { get; }
13-
void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, TimeSpan? timeToLive);
13+
void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webSocketProxyAddress, string endPointAddress, TimeSpan? timeToLive);
1414
Task SendAsync(IEnumerable<ServiceBusMessage> messages, CancellationToken cancellationToken);
1515
Task CloseAsync();
1616
}

src/NLog.Extensions.AzureServiceBus/NLog.Extensions.AzureServiceBus.csproj

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
66

7-
<Version>4.3.1</Version>
7+
<Version>4.4.0</Version>
88

99
<Description>NLog ServiceBusTarget for writing to Azure Service Bus Topic or Queue</Description>
1010
<Authors>jdetmar</Authors>
@@ -18,8 +18,10 @@
1818
<RepositoryUrl>https://github.com/JDetmar/NLog.Extensions.AzureStorage.git</RepositoryUrl>
1919
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2020
<PackageReleaseNotes>
21-
- Updated dependency Azure.Messaging.ServiceBus v7.16.1
22-
- Updated dependency Azure.Identity v1.10.3
21+
- Added option UseWebSockets (true / false)
22+
- Added option WebSocketProxyAddress (Uri)
23+
- Added option CustomEndpointAddress (Uri)
24+
- Updated dependency Azure.Messaging.ServiceBus v7.16.2
2325

2426
Docs: https://github.com/JDetmar/NLog.Extensions.AzureStorage/blob/master/src/NLog.Extensions.AzureServiceBus/README.md
2527
</PackageReleaseNotes>

src/NLog.Extensions.AzureServiceBus/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ _timeToLiveSeconds_ - Default Time-To-Live (TTL) for ServiceBus messages in seco
6565

6666
_timeToLiveDays_ - Default Time-To-Live (TTL) for ServiceBus messages in days (Optional)
6767

68+
_useWebSockets_ - Enable AmqpWebSockets. Ex. true/false (optional)
69+
70+
_webSocketProxyAddress_ - Custom WebProxy address for WebSockets (optional)
71+
72+
_customEndpointAddress_ - Custom endpoint address that can be used when establishing the connection (optional)
73+
6874
_serviceUri_ - Alternative to ConnectionString, where Managed Identiy is applied from DefaultAzureCredential for User delegation SAS.
6975

7076
_tenantIdentity_ - Alternative to ConnectionString. Used together with ServiceUri. Input for DefaultAzureCredential.

src/NLog.Extensions.AzureServiceBus/ServiceBusTarget.cs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ public sealed class ServiceBusTarget : AsyncTaskTarget
141141
/// </summary>
142142
public Layout AccessKey { get; set; }
143143

144+
/// <summary>
145+
/// The connection uses the AMQP protocol over web sockets. See also <see cref="ServiceBusTransportType.AmqpWebSockets"/>
146+
/// </summary>
147+
public Layout UseWebSockets { get; set; }
148+
149+
/// <summary>
150+
/// The proxy to use for communication over web sockets.
151+
/// </summary>
152+
public Layout WebSocketProxyAddress { get; set; }
153+
154+
/// <summary>
155+
/// Custom endpoint address that can be used when establishing the connection.
156+
/// </summary>
157+
public Layout CustomEndpointAddress { get; set; }
158+
144159
/// <summary>
145160
/// Gets a list of user properties (aka custom application properties) to add to the AMQP message
146161
/// </summary>
@@ -187,6 +202,9 @@ protected override void InitializeTarget()
187202
string storageAccountName = string.Empty;
188203
string storageAccountAccessKey = string.Empty;
189204
string queueOrTopicName = string.Empty;
205+
string useWebSockets = string.Empty;
206+
string webSocketProxyAddress = string.Empty;
207+
string customEndPointAddress = string.Empty;
190208

191209
var defaultLogEvent = LogEventInfo.CreateNullEvent();
192210

@@ -212,13 +230,21 @@ protected override void InitializeTarget()
212230
storageAccountAccessKey = AccessKey?.Render(defaultLogEvent);
213231
}
214232

233+
useWebSockets = UseWebSockets?.Render(defaultLogEvent) ?? string.Empty;
234+
if (!string.IsNullOrEmpty(useWebSockets) && (string.Equals(useWebSockets.Trim(), bool.TrueString, StringComparison.OrdinalIgnoreCase) || string.Equals(useWebSockets.Trim(), "1", StringComparison.OrdinalIgnoreCase)))
235+
{
236+
useWebSockets = bool.TrueString;
237+
}
238+
webSocketProxyAddress = WebSocketProxyAddress?.Render(defaultLogEvent) ?? string.Empty;
239+
customEndPointAddress = CustomEndpointAddress?.Render(defaultLogEvent) ?? string.Empty;
240+
215241
var timeToLive = RenderDefaultTimeToLive();
216242
if (timeToLive <= TimeSpan.Zero)
217243
{
218244
timeToLive = default(TimeSpan?);
219245
}
220246

221-
_cloudServiceBus.Connect(connectionString, queueOrTopicName, serviceUri, tenantIdentity, resourceIdentifier, clientIdentity, sharedAccessSignature, storageAccountName, storageAccountAccessKey, timeToLive);
247+
_cloudServiceBus.Connect(connectionString, queueOrTopicName, serviceUri, tenantIdentity, resourceIdentifier, clientIdentity, sharedAccessSignature, storageAccountName, storageAccountAccessKey, bool.TrueString == useWebSockets, webSocketProxyAddress, customEndPointAddress, timeToLive);
222248
InternalLogger.Debug("AzureServiceBusTarget(Name={0}): Initialized", Name);
223249
}
224250
catch (Exception ex)
@@ -571,27 +597,36 @@ private sealed class CloudServiceBus : ICloudServiceBus
571597

572598
public string EntityPath { get; private set; }
573599

574-
public void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, TimeSpan? timeToLive)
600+
public void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webSocketProxyAddress, string endPointAddress, TimeSpan? timeToLive)
575601
{
576602
EntityPath = queueOrTopicName;
577603
DefaultTimeToLive = timeToLive;
578604

605+
Azure.Messaging.ServiceBus.ServiceBusClientOptions options = default;
606+
if (useWebSockets || !string.IsNullOrEmpty(webSocketProxyAddress) || !string.IsNullOrEmpty(endPointAddress))
607+
{
608+
options = new Azure.Messaging.ServiceBus.ServiceBusClientOptions();
609+
options.TransportType = useWebSockets ? ServiceBusTransportType.AmqpWebSockets : options.TransportType;
610+
options.WebProxy = !string.IsNullOrEmpty(webSocketProxyAddress) ? new System.Net.WebProxy(webSocketProxyAddress, true) : options.WebProxy;
611+
options.CustomEndpointAddress = !string.IsNullOrEmpty(endPointAddress) ? new Uri(endPointAddress) : options.CustomEndpointAddress;
612+
}
613+
579614
if (string.IsNullOrEmpty(serviceUri))
580615
{
581-
_client = new ServiceBusClient(connectionString);
616+
_client = new ServiceBusClient(connectionString, options);
582617
}
583618
else if (!string.IsNullOrWhiteSpace(sharedAccessSignature))
584619
{
585-
_client = new ServiceBusClient(serviceUri, new Azure.AzureSasCredential(sharedAccessSignature));
620+
_client = new ServiceBusClient(serviceUri, new Azure.AzureSasCredential(sharedAccessSignature), options);
586621
}
587622
else if (!string.IsNullOrWhiteSpace(storageAccountName))
588623
{
589-
_client = new ServiceBusClient(serviceUri, new Azure.AzureNamedKeyCredential(storageAccountName, storageAccountAccessKey));
624+
_client = new ServiceBusClient(serviceUri, new Azure.AzureNamedKeyCredential(storageAccountName, storageAccountAccessKey), options);
590625
}
591626
else
592627
{
593628
var tokenCredentials = AzureCredentialHelpers.CreateTokenCredentials(clientIdentity, tenantIdentity, resourceIdentifier);
594-
_client = new ServiceBusClient(serviceUri, tokenCredentials);
629+
_client = new ServiceBusClient(serviceUri, tokenCredentials, options);
595630
}
596631

597632
_sender = _client.CreateSender(queueOrTopicName);

test/NLog.Extensions.AzureEventHub.Tests/EventHubServiceMock.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task CloseAsync()
2323
EventDataSent.Clear();
2424
}
2525

26-
public void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey)
26+
public void Connect(string connectionString, string eventHubName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webSocketsProxyAddress, string endPointAddress)
2727
{
2828
ConnectionString = connectionString;
2929
EventHubName = eventHubName;

test/NLog.Extensions.AzureServiceBus.Tests/ServiceBusMock.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public string PeekLastMessageBody()
3131
return null;
3232
}
3333

34-
public void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, TimeSpan? timeToLive)
34+
public void Connect(string connectionString, string queueOrTopicName, string serviceUri, string tenantIdentity, string resourceIdentifier, string clientIdentity, string sharedAccessSignature, string storageAccountName, string storageAccountAccessKey, bool useWebSockets, string webProxy, string endPointAddress, TimeSpan? timeToLive)
3535
{
3636
ConnectionString = connectionString;
3737
EntityPath = queueOrTopicName;

0 commit comments

Comments
 (0)