Skip to content

Commit 83bce82

Browse files
golanbzbrmagadutra
authored andcommitted
feat: add sample project to evaluate cooperative sticky strategy support
test: add unit tests and fix bug in ClusterConfiguration - Added 3 new test files to improve coverage: - ConsumerConfigurationBuilderTests.cs - KafkaConfigTests.cs - PartitionAssignmentStrategyTests.cs - Fixed a bug in ClusterConfiguration related to AutoCommitInterval initialization chore: update AutoCommitInterval to 100ms in Cooperative-sticky sample - Updated AutoCommitInterval in Cooperative-sticky sample Program.cs to 100ms. fix: resolve Codacy issues - Fixed various code quality issues flagged by Codacy.
1 parent b925601 commit 83bce82

File tree

11 files changed

+462
-3
lines changed

11 files changed

+462
-3
lines changed

KafkaFlow.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
9696
EndProject
9797
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}"
9898
EndProject
99+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.CooperativeSticky", "samples\KafkaFlow.Sample.CooperativeSticky\KafkaFlow.Sample.CooperativeSticky.csproj", "{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}"
100+
EndProject
99101
Global
100102
GlobalSection(SolutionConfigurationPlatforms) = preSolution
101103
Debug|Any CPU = Debug|Any CPU
@@ -222,6 +224,10 @@ Global
222224
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
223225
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
224226
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU
227+
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
228+
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
229+
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.ActiveCfg = Release|Any CPU
230+
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.Build.0 = Release|Any CPU
225231
EndGlobalSection
226232
GlobalSection(SolutionProperties) = preSolution
227233
HideSolutionNode = FALSE
@@ -265,6 +271,7 @@ Global
265271
{1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
266272
{80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
267273
{E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
274+
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
268275
EndGlobalSection
269276
GlobalSection(ExtensibilityGlobals) = postSolution
270277
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using KafkaFlow.Producers;
5+
using Microsoft.Extensions.Hosting;
6+
7+
namespace KafkaFlow.Sample.CooperativeSticky;
8+
9+
public class HostedService : IHostedService
10+
{
11+
private IMessageProducer _producer;
12+
const string producerName = "PrintConsole";
13+
const string topicName = "sample-topic";
14+
15+
16+
public HostedService(IProducerAccessor producerAccessor)
17+
{
18+
_producer = producerAccessor.GetProducer(producerName);
19+
}
20+
21+
public async Task StartAsync(CancellationToken cancellationToken)
22+
{
23+
try
24+
{
25+
while (true)
26+
{
27+
await _producer.ProduceAsync(
28+
topicName,
29+
Guid.NewGuid().ToString(),
30+
new TestMessage { Text = $"Message: {Guid.NewGuid()}" });
31+
await Task.Delay(500, cancellationToken);
32+
}
33+
}
34+
catch (Exception e)
35+
{
36+
Console.WriteLine(e);
37+
}
38+
}
39+
40+
public Task StopAsync(CancellationToken cancellationToken)
41+
{
42+
return Task.CompletedTask;
43+
}
44+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<IsPackable>false</IsPackable>
7+
<GenerateDocumentationFile>false</GenerateDocumentationFile>
8+
<InvariantGlobalization>true</InvariantGlobalization>
9+
</PropertyGroup>
10+
11+
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
12+
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
13+
</PropertyGroup>
14+
15+
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
16+
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
17+
</PropertyGroup>
18+
19+
20+
<ItemGroup>
21+
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
22+
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
23+
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
24+
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
25+
</ItemGroup>
26+
27+
<ItemGroup>
28+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
29+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
30+
</ItemGroup>
31+
32+
33+
</Project>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace KafkaFlow.Sample.CooperativeSticky;
5+
6+
public class PrintConsoleHandler : IMessageHandler<TestMessage>
7+
{
8+
public Task Handle(IMessageContext context, TestMessage message)
9+
{
10+
Console.WriteLine(
11+
"Partition: {0} | Offset: {1} | Message: {2}",
12+
context.ConsumerContext.Partition,
13+
context.ConsumerContext.Offset,
14+
message.Text);
15+
16+
return Task.CompletedTask;
17+
}
18+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using Confluent.Kafka;
2+
using KafkaFlow;
3+
using KafkaFlow.Sample.CooperativeSticky;
4+
using KafkaFlow.Serializer;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Hosting;
7+
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;
8+
9+
const string producerName = "PrintConsole";
10+
const string topicName = "sample-topic";
11+
var hostBuilder = new HostBuilder();
12+
hostBuilder.ConfigureServices(services =>
13+
services.AddHostedService<HostedService>().AddKafka(
14+
kafka => kafka
15+
.UseConsoleLog()
16+
.AddCluster(
17+
cluster => cluster
18+
.WithBrokers(new[] { "localhost:9092" })
19+
.CreateTopicIfNotExists(topicName, 6, 1)
20+
.AddProducer(
21+
producerName,
22+
producer => producer
23+
.DefaultTopic(topicName)
24+
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
25+
)
26+
.AddConsumer(
27+
consumer => consumer
28+
.WithConsumerConfig(new ConsumerConfig
29+
{
30+
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky})
31+
.Topic(topicName)
32+
.WithGroupId("print-console-handler")
33+
.WithBufferSize(100)
34+
.WithWorkersCount(3)
35+
.WithAutoCommitIntervalMs(100)
36+
.WithAutoOffsetReset(AutoOffsetReset.Latest)
37+
.AddMiddlewares(
38+
middlewares => middlewares
39+
.AddDeserializer<ProtobufNetDeserializer>()
40+
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
41+
)
42+
)
43+
)
44+
));
45+
46+
var build = hostBuilder.Build();
47+
var kafkaBus = build.Services.CreateKafkaBus();
48+
await kafkaBus.StartAsync();
49+
50+
await build.RunAsync();
51+
await kafkaBus.StopAsync();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# KafkaFlow.Sample
2+
3+
This is a simple sample that shows how to produce and consume messages.
4+
5+
## How to run
6+
7+
### Requirements
8+
9+
- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0)
10+
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
11+
12+
### Start the cluster
13+
14+
Using your terminal of choice, start the cluster.
15+
You can find a docker-compose file at the root of this repository.
16+
Position the terminal in that folder and run the following command.
17+
18+
```bash
19+
docker-compose up -d
20+
```
21+
22+
### Run the Sample
23+
24+
Using your terminal of choice, start the sample for the sample folder.
25+
26+
```bash
27+
dotnet run
28+
```
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System.Runtime.Serialization;
2+
3+
namespace KafkaFlow.Sample.CooperativeSticky;
4+
5+
[DataContract]
6+
public class TestMessage
7+
{
8+
[DataMember(Order = 1)]
9+
public string Text { get; set; }
10+
}

src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
253253

254254
consumerConfigCopy.EnableAutoOffsetStore = false;
255255
consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false;
256-
consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000;
256+
consumerConfigCopy.AutoCommitIntervalMs = (int?)_autoCommitInterval.TotalMilliseconds;
257257

258258
consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);
259259

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using AutoFixture;
3+
using AutoFixture.AutoMoq;
4+
using Confluent.Kafka;
5+
using FluentAssertions;
6+
using KafkaFlow.Configuration;
7+
using Microsoft.VisualStudio.TestTools.UnitTesting;
8+
using Moq;
9+
10+
namespace KafkaFlow.UnitTests.Consumer;
11+
12+
[TestClass]
13+
public class ConsumerConfigurationBuilderTests
14+
{
15+
private readonly Fixture _fixture = new();
16+
17+
[TestInitialize]
18+
public void Setup()
19+
{
20+
_fixture.Customize(new AutoMoqCustomization());
21+
}
22+
23+
[TestMethod]
24+
public void ConfigurationBuild_CallBuild_WithSticky_EnableAutoCommit_True()
25+
{
26+
// Arrange
27+
var consumerConfigurationBuilder = _fixture.Create<ConsumerConfigurationBuilder>();
28+
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig
29+
{
30+
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
31+
GroupId = "Test",
32+
}).WithAutoCommitIntervalMs(500)
33+
.WithBufferSize(3);
34+
35+
// Act
36+
var consumerConfiguration = consumerConfigurationBuilder.Build(_fixture.Create<ClusterConfiguration>());
37+
38+
// Assert
39+
var consumerConfig = consumerConfiguration.GetKafkaConfig();
40+
consumerConfig.EnableAutoCommit.Should().BeTrue();
41+
consumerConfig.AutoCommitIntervalMs.Should().Be(500);
42+
consumerConfiguration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(500));
43+
}
44+
45+
[TestMethod]
46+
public void ConfigurationBuild_CallBuild_WithSRoundRobin_EnableAutoCommit_False()
47+
{
48+
// Arrange
49+
var consumerConfigurationBuilder = new ConsumerConfigurationBuilder(Mock.Of<IDependencyConfigurator>());
50+
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig
51+
{
52+
PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
53+
GroupId = "Test"
54+
}).WithAutoCommitIntervalMs(500).WithBufferSize(3);
55+
// Act
56+
var consumerConfiguration = consumerConfigurationBuilder.Build(_fixture.Create<ClusterConfiguration>());
57+
58+
// Assert
59+
consumerConfiguration.GetKafkaConfig().EnableAutoCommit.Should().BeFalse();
60+
consumerConfiguration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(500));
61+
}
62+
}

0 commit comments

Comments
 (0)