Skip to content

Commit 0885abd

Browse files
committed
Add ASBS_6 to asb-integration-pubsub sample
1 parent 2c1db4c commit 0885abd

File tree

13 files changed

+363
-0
lines changed

13 files changed

+363
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
Microsoft Visual Studio Solution File, Format Version 12.00
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29728.190
4+
MinimumVisualStudioVersion = 15.0.26730.12
5+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shared", "Shared\Shared.csproj", "{0D0EF536-3C89-4191-A0EF-C0742236FC46}"
6+
EndProject
7+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NativeSubscriberB", "NativeSubscriberB\NativeSubscriberB.csproj", "{D32FE422-7DA6-484C-91C2-2E48AE014040}"
8+
EndProject
9+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NativeSubscriberA", "NativeSubscriberA\NativeSubscriberA.csproj", "{9B7F1E99-B489-4432-923F-2C803480FC37}"
10+
EndProject
11+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NativeShared", "NativeShared\NativeShared.csproj", "{EF53E157-CE4B-497C-B53E-0102D4D4D7FF}"
12+
EndProject
13+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Publisher", "Publisher\Publisher.csproj", "{96A0B2AD-FEAC-4F43-9811-D99420D1CCC3}"
14+
EndProject
15+
Global
16+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
17+
Debug|Any CPU = Debug|Any CPU
18+
Release|Any CPU = Release|Any CPU
19+
EndGlobalSection
20+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
21+
{0D0EF536-3C89-4191-A0EF-C0742236FC46}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
22+
{0D0EF536-3C89-4191-A0EF-C0742236FC46}.Debug|Any CPU.Build.0 = Debug|Any CPU
23+
{0D0EF536-3C89-4191-A0EF-C0742236FC46}.Release|Any CPU.ActiveCfg = Release|Any CPU
24+
{0D0EF536-3C89-4191-A0EF-C0742236FC46}.Release|Any CPU.Build.0 = Release|Any CPU
25+
{D32FE422-7DA6-484C-91C2-2E48AE014040}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
26+
{D32FE422-7DA6-484C-91C2-2E48AE014040}.Debug|Any CPU.Build.0 = Debug|Any CPU
27+
{D32FE422-7DA6-484C-91C2-2E48AE014040}.Release|Any CPU.ActiveCfg = Release|Any CPU
28+
{D32FE422-7DA6-484C-91C2-2E48AE014040}.Release|Any CPU.Build.0 = Release|Any CPU
29+
{9B7F1E99-B489-4432-923F-2C803480FC37}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
30+
{9B7F1E99-B489-4432-923F-2C803480FC37}.Debug|Any CPU.Build.0 = Debug|Any CPU
31+
{9B7F1E99-B489-4432-923F-2C803480FC37}.Release|Any CPU.ActiveCfg = Release|Any CPU
32+
{9B7F1E99-B489-4432-923F-2C803480FC37}.Release|Any CPU.Build.0 = Release|Any CPU
33+
{EF53E157-CE4B-497C-B53E-0102D4D4D7FF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
34+
{EF53E157-CE4B-497C-B53E-0102D4D4D7FF}.Debug|Any CPU.Build.0 = Debug|Any CPU
35+
{EF53E157-CE4B-497C-B53E-0102D4D4D7FF}.Release|Any CPU.ActiveCfg = Release|Any CPU
36+
{EF53E157-CE4B-497C-B53E-0102D4D4D7FF}.Release|Any CPU.Build.0 = Release|Any CPU
37+
{96A0B2AD-FEAC-4F43-9811-D99420D1CCC3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
38+
{96A0B2AD-FEAC-4F43-9811-D99420D1CCC3}.Debug|Any CPU.Build.0 = Debug|Any CPU
39+
{96A0B2AD-FEAC-4F43-9811-D99420D1CCC3}.Release|Any CPU.ActiveCfg = Release|Any CPU
40+
{96A0B2AD-FEAC-4F43-9811-D99420D1CCC3}.Release|Any CPU.Build.0 = Release|Any CPU
41+
EndGlobalSection
42+
GlobalSection(SolutionProperties) = preSolution
43+
HideSolutionNode = FALSE
44+
EndGlobalSection
45+
GlobalSection(ExtensibilityGlobals) = postSolution
46+
SolutionGuid = {C0348CF9-2853-4649-B5A2-A29A74386A2A}
47+
EndGlobalSection
48+
EndGlobal
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.*" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using Azure.Messaging.ServiceBus;
2+
using Azure.Messaging.ServiceBus.Administration;
3+
4+
namespace NativeSender
5+
{
6+
using System;
7+
using System.Threading.Tasks;
8+
9+
public static class TopologyManager
10+
{
11+
public static async Task CreateSubscription(string connectionString, string subscriptionName, string ruleName, SqlRuleFilter sqlFilter, string topicPath, string forwardTo)
12+
{
13+
var client = new ServiceBusAdministrationClient(connectionString);
14+
15+
try
16+
{
17+
await client.CreateQueueAsync(forwardTo);
18+
}
19+
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
20+
{
21+
}
22+
23+
try
24+
{
25+
await client.CreateTopicAsync(topicPath);
26+
}
27+
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
28+
{
29+
}
30+
31+
try
32+
{
33+
#region SubscriptionCreation
34+
35+
await client.CreateSubscriptionAsync(new CreateSubscriptionOptions(topicPath, subscriptionName)
36+
{
37+
LockDuration = TimeSpan.FromMinutes(5),
38+
EnableDeadLetteringOnFilterEvaluationExceptions = false,
39+
MaxDeliveryCount = int.MaxValue,
40+
EnableBatchedOperations = true,
41+
ForwardTo = forwardTo,
42+
}, new CreateRuleOptions(ruleName, sqlFilter));
43+
44+
#endregion
45+
}
46+
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
47+
{
48+
}
49+
}
50+
}
51+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\NativeShared\NativeShared.csproj" />
9+
<ProjectReference Include="..\Shared\Shared.csproj" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Text;
3+
using System.Threading.Tasks;
4+
using Azure.Messaging.ServiceBus;
5+
using Azure.Messaging.ServiceBus.Administration;
6+
using NativeSender;
7+
8+
static class Program
9+
{
10+
const string EnclosedMessageTypesHeader = "NServiceBus.EnclosedMessageTypes";
11+
12+
static string ConnectionString
13+
{
14+
get
15+
{
16+
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
17+
if (string.IsNullOrWhiteSpace(connectionString))
18+
{
19+
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
20+
}
21+
22+
return connectionString;
23+
}
24+
}
25+
26+
static async Task Main()
27+
{
28+
var queueName = "Samples.ASB.NativeIntegration.NativeSubscriberA";
29+
30+
Console.Title = queueName;
31+
32+
#region EventOneSubscription
33+
await TopologyManager.CreateSubscription(
34+
ConnectionString,
35+
queueName,
36+
ruleName:"$default",
37+
sqlFilter: new TrueRuleFilter(),
38+
topicPath: "EventOne",
39+
forwardTo: queueName
40+
);
41+
#endregion
42+
43+
var serviceBusClient = new ServiceBusClient(ConnectionString);
44+
var serviceBusProcessor = serviceBusClient.CreateProcessor("Samples.ASB.NativeIntegration.NativeSubscriberA");
45+
46+
serviceBusProcessor.ProcessMessageAsync += MessageHandler;
47+
serviceBusProcessor.ProcessErrorAsync += ErrorHandler;
48+
49+
await serviceBusProcessor.StartProcessingAsync();
50+
51+
Console.WriteLine("Press any key to exit");
52+
Console.ReadKey();
53+
}
54+
55+
static async Task MessageHandler(ProcessMessageEventArgs args)
56+
{
57+
var messageType = (string) args.Message.ApplicationProperties[EnclosedMessageTypesHeader];
58+
var bodyJson = Encoding.UTF8.GetString(args.Message.Body.ToArray());
59+
60+
Console.WriteLine($"Received: {messageType}");
61+
Console.WriteLine(bodyJson);
62+
63+
// complete the message. messages is deleted from the subscription.
64+
await args.CompleteMessageAsync(args.Message);
65+
}
66+
67+
static Task ErrorHandler(ProcessErrorEventArgs args)
68+
{
69+
Console.WriteLine(args.Exception.ToString());
70+
return Task.CompletedTask;
71+
}
72+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\NativeShared\NativeShared.csproj" />
9+
<ProjectReference Include="..\Shared\Shared.csproj" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using System;
2+
using System.Text;
3+
using System.Threading.Tasks;
4+
using Azure.Messaging.ServiceBus;
5+
using Azure.Messaging.ServiceBus.Administration;
6+
using NativeSender;
7+
8+
static class Program
9+
{
10+
const string EnclosedMessageTypesHeader = "NServiceBus.EnclosedMessageTypes";
11+
12+
static string ConnectionString
13+
{
14+
get
15+
{
16+
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
17+
if (string.IsNullOrWhiteSpace(connectionString))
18+
{
19+
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
20+
}
21+
22+
return connectionString;
23+
}
24+
}
25+
26+
static async Task Main()
27+
{
28+
var queueName = "Samples.ASB.NativeIntegration.NativeSubscriberB";
29+
30+
Console.Title = queueName;
31+
32+
await TopologyManager.CreateSubscription(
33+
ConnectionString,
34+
queueName,
35+
ruleName:"$default",
36+
sqlFilter: new TrueRuleFilter(),
37+
topicPath: "EventOne",
38+
forwardTo: queueName
39+
);
40+
await TopologyManager.CreateSubscription(
41+
ConnectionString,
42+
queueName,
43+
ruleName:"$default",
44+
sqlFilter: new TrueRuleFilter(),
45+
topicPath: "EventTwo",
46+
forwardTo: queueName
47+
);
48+
49+
var serviceBusClient = new ServiceBusClient(ConnectionString);
50+
var serviceBusProcessor = serviceBusClient.CreateProcessor(queueName);
51+
52+
serviceBusProcessor.ProcessMessageAsync += MessageHandler;
53+
serviceBusProcessor.ProcessErrorAsync += ErrorHandler;
54+
55+
await serviceBusProcessor.StartProcessingAsync();
56+
57+
Console.WriteLine("Press any key to exit");
58+
Console.ReadKey();
59+
}
60+
61+
static async Task MessageHandler(ProcessMessageEventArgs args)
62+
{
63+
var messageType = (string) args.Message.ApplicationProperties[EnclosedMessageTypesHeader];
64+
var bodyJson = Encoding.UTF8.GetString(args.Message.Body.ToArray());
65+
66+
Console.WriteLine($"Received: {messageType}");
67+
Console.WriteLine(bodyJson);
68+
69+
// complete the message. messages is deleted from the subscription.
70+
await args.CompleteMessageAsync(args.Message);
71+
}
72+
73+
static Task ErrorHandler(ProcessErrorEventArgs args)
74+
{
75+
Console.WriteLine(args.Exception.ToString());
76+
return Task.CompletedTask;
77+
}
78+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
5+
Console.Title = "Publisher";
6+
7+
var endpointConfiguration = new EndpointConfiguration("Samples.ASB.Publisher");
8+
9+
endpointConfiguration.EnableInstallers();
10+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
11+
endpointConfiguration.Conventions().DefiningEventsAs(type => type.Name == nameof(EventTwo) || type.Name == nameof(EventOne));
12+
13+
14+
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
15+
if (string.IsNullOrWhiteSpace(connectionString))
16+
{
17+
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
18+
}
19+
20+
var transport = new AzureServiceBusTransport(connectionString, TopicTopology.Default);
21+
endpointConfiguration.UseTransport(transport);
22+
23+
var endpointInstance = await Endpoint.Start(endpointConfiguration);
24+
Console.WriteLine("Press any key to publish events");
25+
Console.ReadKey();
26+
Console.WriteLine();
27+
28+
await endpointInstance.Publish(new EventOne
29+
{
30+
Content = $"{nameof(EventOne)} sample content",
31+
PublishedOnUtc = DateTime.UtcNow
32+
});
33+
34+
await endpointInstance.Publish(new EventTwo
35+
{
36+
Content = $"{nameof(EventTwo)} sample content",
37+
PublishedOnUtc = DateTime.UtcNow
38+
});
39+
40+
Console.WriteLine("Press any key to exit");
41+
Console.ReadKey();
42+
await endpointInstance.Stop();
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Shared\Shared.csproj" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="6.0.0-alpha.1" />
12+
</ItemGroup>
13+
<ItemGroup Label="Transitive dependencies">
14+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
15+
</ItemGroup>
16+
</Project>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using System;
2+
3+
public class EventOne
4+
{
5+
public string Content { get; set; }
6+
public DateTime PublishedOnUtc { get; set; }
7+
}

0 commit comments

Comments
 (0)