Skip to content

Commit 40b4740

Browse files
committed
Add service-bus-kafka NSB 10 sample
1 parent 04aa50d commit 40b4740

File tree

16 files changed

+357
-0
lines changed

16 files changed

+357
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<component name="ProjectRunConfigurationManager">
2+
<configuration default="false" name="Everything" type="CompoundRunConfigurationType">
3+
<toRun name="ConsoleEndpoint" type="DotNetProject" />
4+
<toRun name="AzureFunctions.KafkaTrigger.FunctionsHostBuilder" type="AzureFunctionsHost" />
5+
<method v="2" />
6+
</configuration>
7+
</component>
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
AzureFunctions.KafkaTrigger.FunctionsHostBuilder\AzureFunctions.KafkaTrigger.FunctionsHostBuilder.csproj
2+
ConsoleEndpoint\ConsoleEndpoint.csproj
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
Microsoft Visual Studio Solution File, Format Version 12.00
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29728.190
4+
MinimumVisualStudioVersion = 15.0.26730.12
5+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureFunctions.Messages", "AzureFunctions.Messages\AzureFunctions.Messages.csproj", "{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}"
6+
EndProject
7+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureFunctions.KafkaTrigger.FunctionsHostBuilder", "AzureFunctions.KafkaTrigger.FunctionsHostBuilder\AzureFunctions.KafkaTrigger.FunctionsHostBuilder.csproj", "{D3C0429A-08D8-41B0-B51A-F31A047959BD}"
8+
EndProject
9+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleEndpoint", "ConsoleEndpoint\ConsoleEndpoint.csproj", "{A2800EC9-AB77-4CDB-A620-224295A10951}"
10+
EndProject
11+
Global
12+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
13+
Debug|Any CPU = Debug|Any CPU
14+
Debug|x64 = Debug|x64
15+
Debug|x86 = Debug|x86
16+
Release|Any CPU = Release|Any CPU
17+
Release|x64 = Release|x64
18+
Release|x86 = Release|x86
19+
EndGlobalSection
20+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
21+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
22+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
23+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|x64.ActiveCfg = Debug|Any CPU
24+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|x64.Build.0 = Debug|Any CPU
25+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|x86.ActiveCfg = Debug|Any CPU
26+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Debug|x86.Build.0 = Debug|Any CPU
27+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
28+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|Any CPU.Build.0 = Release|Any CPU
29+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|x64.ActiveCfg = Release|Any CPU
30+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|x64.Build.0 = Release|Any CPU
31+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|x86.ActiveCfg = Release|Any CPU
32+
{8EB0DD38-EF11-4C6E-A8F6-9128463D22B9}.Release|x86.Build.0 = Release|Any CPU
33+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
34+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
35+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|x64.ActiveCfg = Debug|Any CPU
36+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|x64.Build.0 = Debug|Any CPU
37+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|x86.ActiveCfg = Debug|Any CPU
38+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Debug|x86.Build.0 = Debug|Any CPU
39+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
40+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|Any CPU.Build.0 = Release|Any CPU
41+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|x64.ActiveCfg = Release|Any CPU
42+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|x64.Build.0 = Release|Any CPU
43+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|x86.ActiveCfg = Release|Any CPU
44+
{D3C0429A-08D8-41B0-B51A-F31A047959BD}.Release|x86.Build.0 = Release|Any CPU
45+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
46+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|Any CPU.Build.0 = Debug|Any CPU
47+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|x64.ActiveCfg = Debug|Any CPU
48+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|x64.Build.0 = Debug|Any CPU
49+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|x86.ActiveCfg = Debug|Any CPU
50+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Debug|x86.Build.0 = Debug|Any CPU
51+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|Any CPU.ActiveCfg = Release|Any CPU
52+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|Any CPU.Build.0 = Release|Any CPU
53+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|x64.ActiveCfg = Release|Any CPU
54+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|x64.Build.0 = Release|Any CPU
55+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|x86.ActiveCfg = Release|Any CPU
56+
{A2800EC9-AB77-4CDB-A620-224295A10951}.Release|x86.Build.0 = Release|Any CPU
57+
EndGlobalSection
58+
GlobalSection(SolutionProperties) = preSolution
59+
HideSolutionNode = FALSE
60+
EndGlobalSection
61+
GlobalSection(ExtensibilityGlobals) = postSolution
62+
SolutionGuid = {6C66C18C-5075-4811-AF40-2997E8863F3B}
63+
EndGlobalSection
64+
EndGlobal
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<OutputType>Exe</OutputType>
6+
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
<LangVersion>14.0</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\AzureFunctions.Messages\AzureFunctions.Messages.csproj" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.50.0-preview1" />
17+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.*" />
18+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="4.*" />
19+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.*" />
20+
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="6.0.0-alpha.2" />
21+
</ItemGroup>
22+
23+
<ItemGroup>
24+
<Content Include="..\local.settings.json" CopyToOutputDirectory="PreserveNewest" />
25+
<None Update="host.json" CopyToOutputDirectory="PreserveNewest" />
26+
</ItemGroup>
27+
28+
</Project>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System.Text.Json.Nodes;
2+
using AzureFunctions.Messages.NServiceBusMessages;
3+
using Microsoft.Azure.Functions.Worker;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace AzureFunctions.KafkaTrigger.FunctionsHostBuilder;
7+
8+
public class KafkaTrigger(IMessageSession messageSession, ILogger<KafkaTrigger> logger)
9+
{
10+
#region KafkaTrigger
11+
12+
[Function(nameof(ElectricityUsage))]
13+
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
14+
FunctionContext context)
15+
{
16+
var eventValue = JsonNode.Parse(eventData)["Value"]?.ToString();
17+
var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);
18+
19+
logger.LogInformation("Received Kafka event with usage: {CurrentUsage}", electricityUsage.CurrentUsage);
20+
21+
if (IsUsageAboveAverage(electricityUsage.CurrentUsage))
22+
{
23+
var message = new FollowUp
24+
{
25+
CustomerId = electricityUsage.CustomerId,
26+
UnitId = electricityUsage.UnitId,
27+
Description = $"Usage over monthly average at [{electricityUsage.CurrentUsage}] units"
28+
};
29+
30+
await messageSession.Send(message);
31+
}
32+
}
33+
34+
#endregion
35+
36+
// Because Kafka is an event stream, more messages arrive there than we might be able to handle with
37+
// Azure ServiceBus. For demo purposes an alert is raised at the exact usage of 42.
38+
static bool IsUsageAboveAverage(int currentUsage)
39+
{
40+
return currentUsage == 42;
41+
}
42+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using AzureFunctions.Messages.NServiceBusMessages;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Hosting;
4+
5+
#region SetupNServiceBusSendOnly
6+
7+
var host = new HostBuilder()
8+
.ConfigureServices(async services =>
9+
{
10+
var cfg = new EndpointConfiguration("SendOnly");
11+
cfg.SendOnly();
12+
cfg.UseSerialization<SystemJsonSerializer>();
13+
14+
var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
15+
var transport = new AzureServiceBusTransport(connectionString, TopicTopology.Default);
16+
var routing = cfg.UseTransport(transport);
17+
18+
routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");
19+
20+
var endpoint = await Endpoint.Start(cfg);
21+
22+
// Inject the endpoint in the DI container
23+
services.AddSingleton<IMessageSession>(endpoint);
24+
})
25+
.ConfigureFunctionsWorkerDefaults()
26+
.Build();
27+
28+
#endregion
29+
30+
await host.RunAsync();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"version": "2.0",
3+
"extensions": {
4+
"serviceBus": {
5+
"enableCrossEntityTransactions": true
6+
}
7+
}
8+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<LangVersion>14.0</LangVersion>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.6" />
10+
</ItemGroup>
11+
12+
</Project>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using NServiceBus;
2+
3+
namespace AzureFunctions.Messages.NServiceBusMessages;
4+
5+
public class FollowUp : IMessage
6+
{
7+
public int CustomerId { get; set; }
8+
9+
public int UnitId { get; set; }
10+
11+
public string Description { get; set; }
12+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Text.Json;
2+
3+
namespace AzureFunctions.Messages.KafkaMessages;
4+
5+
public class ElectricityUsage
6+
{
7+
public int CustomerId { get; set; }
8+
9+
public int UnitId { get; set; }
10+
11+
public int CurrentUsage { get; set; }
12+
13+
public static string Serialize(ElectricityUsage electricityUsage)
14+
{
15+
return JsonSerializer.Serialize(electricityUsage);
16+
}
17+
18+
public static ElectricityUsage Deserialize(string value)
19+
{
20+
return JsonSerializer.Deserialize<ElectricityUsage>(value);
21+
}
22+
}

0 commit comments

Comments
 (0)