Skip to content

Commit 04aa50d

Browse files
committed
Update service-bus-kafka sample
1 parent 0b7e87b commit 04aa50d

File tree

25 files changed

+274
-313
lines changed

25 files changed

+274
-313
lines changed
Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
<Project Sdk="Microsoft.NET.Sdk">
2+
23
<PropertyGroup>
34
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
5+
<OutputType>Exe</OutputType>
46
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
58
<LangVersion>12.0</LangVersion>
6-
<OutputType>Exe</OutputType>
79
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\AzureFunctions.Messages\AzureFunctions.Messages.csproj" />
13+
</ItemGroup>
14+
815
<ItemGroup>
9-
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.19.0" />
16+
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.*" />
17+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.*" />
1018
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="3.*" />
11-
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.0" />
12-
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />
19+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.*" />
1320
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
1421
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="3.*" />
1522
</ItemGroup>
16-
<ItemGroup>
17-
<ProjectReference Include="..\AzureFunctions.Messages\AzureFunctions.Messages.csproj" />
18-
</ItemGroup>
23+
1924
<ItemGroup>
2025
<Content Include="..\local.settings.json" CopyToOutputDirectory="PreserveNewest" />
2126
<None Update="host.json" CopyToOutputDirectory="PreserveNewest" />
2227
</ItemGroup>
28+
2329
</Project>

samples/azure-functions/service-bus-kafka/ASBS_3/AzureFunctions.KafkaTrigger.FunctionsHostBuilder/KafkaTrigger.cs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,19 @@
1-
using System.Threading.Tasks;
2-
using AzureFunctions.Messages.NServiceBusMessages;
1+
using AzureFunctions.Messages.NServiceBusMessages;
32
using Microsoft.Azure.Functions.Worker;
43
using Microsoft.Extensions.Logging;
54
using Newtonsoft.Json.Linq;
65
using NServiceBus;
76

87
namespace AzureFunctions.KafkaTrigger.FunctionsHostBuilder;
98

10-
public class KafkaTrigger
9+
public class KafkaTrigger(IMessageSession messageSession, ILogger<KafkaTrigger> logger)
1110
{
12-
readonly IMessageSession messageSession;
13-
readonly ILogger<KafkaTrigger> logger;
14-
15-
public KafkaTrigger(IMessageSession messageSession, ILogger<KafkaTrigger> logger)
16-
{
17-
this.messageSession = messageSession;
18-
this.logger = logger;
19-
}
20-
2111
#region KafkaTrigger
2212

2313
[Function(nameof(ElectricityUsage))]
2414
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
2515
FunctionContext context)
2616
{
27-
2817
var eventValue = JObject.Parse(eventData)["Value"]?.ToString();
2918
var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);
3019

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,31 @@
1-
using System;
2-
using System.Threading.Tasks;
3-
using AzureFunctions.Messages.NServiceBusMessages;
1+
using AzureFunctions.Messages.NServiceBusMessages;
42
using Microsoft.Extensions.DependencyInjection;
53
using Microsoft.Extensions.Hosting;
64
using NServiceBus;
75

8-
namespace AzureFunctions.KafkaTrigger.FunctionsHostBuilder;
6+
#region SetupNServiceBusSendOnly
97

10-
public class Program
11-
{
12-
public static async Task Main()
8+
var host = new HostBuilder()
9+
.ConfigureServices(async services =>
1310
{
14-
#region SetupNServiceBusSendOnly
11+
var cfg = new EndpointConfiguration("SendOnly");
12+
cfg.SendOnly();
13+
cfg.UseSerialization<SystemJsonSerializer>();
1514

16-
var host = new HostBuilder()
17-
.ConfigureServices(async services =>
18-
{
19-
var cfg = new EndpointConfiguration("SendOnly");
20-
cfg.SendOnly();
21-
cfg.UseSerialization<SystemJsonSerializer>();
15+
var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
16+
var transport = new AzureServiceBusTransport(connectionString);
17+
var routing = cfg.UseTransport(transport);
2218

23-
var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
24-
var transport = new AzureServiceBusTransport(connectionString);
25-
var routing = cfg.UseTransport(transport);
19+
routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");
2620

27-
routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");
21+
var endpoint = await Endpoint.Start(cfg);
2822

29-
var endpoint = await Endpoint.Start(cfg);
23+
// Inject the endpoint in the DI container
24+
services.AddSingleton<IMessageSession>(endpoint);
25+
})
26+
.ConfigureFunctionsWorkerDefaults()
27+
.Build();
3028

31-
// Inject the endpoint in the DI container
32-
services.AddSingleton<IMessageSession>(endpoint);
33-
})
34-
.ConfigureFunctionsWorkerDefaults()
35-
.Build();
29+
#endregion
3630

37-
#endregion
38-
39-
await host.RunAsync();
40-
}
41-
}
31+
await host.RunAsync();
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"version": "2.0",
33
"extensions": {
4-
"ServiceBus": {
5-
"EnableCrossEntityTransactions": true
4+
"serviceBus": {
5+
"enableCrossEntityTransactions": true
66
}
77
}
88
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
23
<PropertyGroup>
34
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
5+
<ImplicitUsings>enable</ImplicitUsings>
46
<LangVersion>12.0</LangVersion>
57
</PropertyGroup>
8+
69
<ItemGroup>
710
<PackageReference Include="NServiceBus" Version="8.*" />
811
</ItemGroup>
12+
913
</Project>
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
using NServiceBus;
2-
3-
namespace AzureFunctions.Messages.NServiceBusMessages;
1+
namespace AzureFunctions.Messages.NServiceBusMessages;
42

53
public class FollowUp : IMessage
64
{
75
public int CustomerId { get; set; }
6+
87
public int UnitId { get; set; }
8+
99
public string Description { get; set; }
1010
}

samples/azure-functions/service-bus-kafka/ASBS_3/AzureFunctions.Messages/KafkaMessages/ElectricityUsage.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ namespace AzureFunctions.Messages.KafkaMessages;
55
public class ElectricityUsage
66
{
77
public int CustomerId { get; set; }
8+
89
public int UnitId { get; set; }
10+
911
public int CurrentUsage { get; set; }
1012

1113
public static string Serialize(ElectricityUsage electricityUsage)

samples/azure-functions/service-bus-kafka/ASBS_3/ConsoleEndpoint/Program.cs

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,71 +2,65 @@
22
using Confluent.Kafka;
33
using NServiceBus;
44

5-
class Program
6-
{
7-
static async Task Main()
8-
{
9-
const string endpointName = "Samples.KafkaTrigger.ConsoleEndpoint";
10-
Console.Title = endpointName;
5+
const string endpointName = "Samples.KafkaTrigger.ConsoleEndpoint";
6+
Console.Title = endpointName;
117

12-
var endpointConfiguration = new EndpointConfiguration(endpointName);
13-
endpointConfiguration.EnableInstallers();
14-
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
8+
var endpointConfiguration = new EndpointConfiguration(endpointName);
9+
endpointConfiguration.EnableInstallers();
10+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
1511

16-
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
17-
if (string.IsNullOrWhiteSpace(connectionString))
18-
{
19-
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
20-
}
12+
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
13+
if (string.IsNullOrWhiteSpace(connectionString))
14+
{
15+
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
16+
}
2117

22-
var transport = new AzureServiceBusTransport(connectionString);
23-
endpointConfiguration.UseTransport(transport);
18+
var transport = new AzureServiceBusTransport(connectionString);
19+
endpointConfiguration.UseTransport(transport);
2420

25-
var endpointInstance = await Endpoint.Start(endpointConfiguration);
21+
var endpointInstance = await Endpoint.Start(endpointConfiguration);
2622

27-
var config = new ProducerConfig
28-
{
29-
BootstrapServers = "localhost:9094",
30-
ClientId = "producer-1",
31-
BatchSize = 50
32-
};
23+
var config = new ProducerConfig
24+
{
25+
BootstrapServers = "localhost:9094",
26+
ClientId = "producer-1",
27+
BatchSize = 50
28+
};
3329

34-
Console.WriteLine("Press '[enter]' to send a 100 events using Kafka and wait for a possible response...");
35-
Console.WriteLine("Press any other key to exit");
30+
Console.WriteLine("Press '[enter]' to send a 100 events using Kafka and wait for a possible response...");
31+
Console.WriteLine("Press any other key to exit");
3632

37-
using (var producer = new ProducerBuilder<string, string>(config)
38-
.Build())
39-
{
40-
while (true)
41-
{
42-
var key = Console.ReadKey();
43-
Console.WriteLine();
44-
45-
if (key.Key != ConsoleKey.Enter)
46-
{
47-
break;
48-
}
33+
using (var producer = new ProducerBuilder<string, string>(config)
34+
.Build())
35+
{
36+
while (true)
37+
{
38+
var key = Console.ReadKey();
39+
Console.WriteLine();
4940

50-
for (int i = 0; i < 100; i++)
51-
{
52-
#region ProduceEvent
41+
if (key.Key != ConsoleKey.Enter)
42+
{
43+
break;
44+
}
5345

54-
var electricityUsage = new ElectricityUsage() { CustomerId = 42, CurrentUsage = i, UnitId = 1337 };
46+
for (int i = 0; i < 100; i++)
47+
{
48+
#region ProduceEvent
5549

56-
var message = new Message<string, string>
57-
{
58-
Value = ElectricityUsage.Serialize(electricityUsage)
59-
};
50+
var electricityUsage = new ElectricityUsage() { CustomerId = 42, CurrentUsage = i, UnitId = 1337 };
6051

61-
var deliveryResult = await producer.ProduceAsync("myKafkaTopic", message);
52+
var message = new Message<string, string>
53+
{
54+
Value = ElectricityUsage.Serialize(electricityUsage)
55+
};
6256

63-
#endregion
64-
}
57+
var deliveryResult = await producer.ProduceAsync("myKafkaTopic", message);
6558

66-
Console.WriteLine("100 messages sent");
67-
}
59+
#endregion
6860
}
6961

70-
await endpointInstance.Stop();
62+
Console.WriteLine("100 messages sent");
7163
}
72-
}
64+
}
65+
66+
await endpointInstance.Stop();
Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
<Project Sdk="Microsoft.NET.Sdk">
2+
23
<PropertyGroup>
34
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
5+
<OutputType>Exe</OutputType>
46
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
58
<LangVersion>12.0</LangVersion>
6-
<OutputType>Exe</OutputType>
79
</PropertyGroup>
10+
811
<ItemGroup>
9-
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.0.0" />
10-
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="4.1.2" />
11-
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.5" />
12-
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
13-
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="4.*" />
12+
<ProjectReference Include="..\AzureFunctions.Messages\AzureFunctions.Messages.csproj" />
1413
</ItemGroup>
14+
1515
<ItemGroup>
16-
<ProjectReference Include="..\AzureFunctions.Messages\AzureFunctions.Messages.csproj" />
16+
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.*" />
17+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.*" />
18+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="4.*" />
19+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.*" />
20+
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="4.*" />
1721
</ItemGroup>
22+
1823
<ItemGroup>
1924
<Content Include="..\local.settings.json" CopyToOutputDirectory="PreserveNewest" />
2025
<None Update="host.json" CopyToOutputDirectory="PreserveNewest" />
2126
</ItemGroup>
27+
2228
</Project>

samples/azure-functions/service-bus-kafka/ASBS_4/AzureFunctions.KafkaTrigger.FunctionsHostBuilder/KafkaTrigger.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
1-
using Microsoft.Azure.Functions.Worker;
2-
using Microsoft.Extensions.Logging;
3-
using NServiceBus;
4-
using System.Threading.Tasks;
1+
using System.Text.Json.Nodes;
52
using AzureFunctions.Messages.NServiceBusMessages;
6-
using System.Text.Json.Nodes;
3+
using Microsoft.Azure.Functions.Worker;
4+
using Microsoft.Extensions.Logging;
75

86
namespace AzureFunctions.KafkaTrigger.FunctionsHostBuilder;
97

108
public class KafkaTrigger(IMessageSession messageSession, ILogger<KafkaTrigger> logger)
119
{
12-
readonly IMessageSession messageSession = messageSession;
13-
readonly ILogger<KafkaTrigger> logger = logger;
14-
1510
#region KafkaTrigger
1611

1712
[Function(nameof(ElectricityUsage))]
1813
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
1914
FunctionContext context)
2015
{
21-
2216
var eventValue = JsonNode.Parse(eventData)["Value"]?.ToString();
2317
var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);
2418

0 commit comments

Comments
 (0)