Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/KafkaFlow/Configuration/ProducerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal class ProducerConfigurationBuilder : IProducerConfigurationBuilder
private string _topic;
private ProducerConfig _producerConfig;
private Acks? _acks;
private int _statisticsInterval;
private int? _statisticsInterval;
private double? _lingerMs;
private ProducerCustomFactory _customFactory = (producer, _) => producer;

Expand Down Expand Up @@ -86,8 +86,15 @@ public IProducerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
_producerConfig ??= new ProducerConfig();

_producerConfig.StatisticsIntervalMs = _statisticsInterval;
_producerConfig.LingerMs = _lingerMs;
if (_statisticsInterval.HasValue)
{
_producerConfig.StatisticsIntervalMs = _statisticsInterval;
}

if (_lingerMs.HasValue)
{
_producerConfig.LingerMs = _lingerMs;
}

_producerConfig.ReadSecurityInformationFrom(clusterConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,78 @@ public void Build_UseCompressionWithoutCompressionLevel_ReturnDefaultValues()
configuration.BaseProducerConfig.CompressionType.Should().Be(compressionType);
configuration.BaseProducerConfig.CompressionLevel.Should().Be(-1);
}

[TestMethod]
public void Build_UseProducerConfig_ShouldNotOverrideLingerMsAndStatisticsIntervalMs()
{
// Arrange
var clusterConfiguration = _fixture.Create<ClusterConfiguration>();

const double lingerMs = 1000;
const int statisticsMs = 2000;
var producerConfig = new ProducerConfig
{
LingerMs = lingerMs,
StatisticsIntervalMs = statisticsMs
};

_target.WithProducerConfig(producerConfig);

// Act
var configuration = _target.Build(clusterConfiguration);

// Assert
configuration.Cluster.Should().Be(clusterConfiguration);
configuration.BaseProducerConfig.LingerMs.Should().Be(lingerMs);
configuration.BaseProducerConfig.StatisticsIntervalMs.Should().Be(statisticsMs);
}

[TestMethod]
public void Build_UseProducerConfigAndUseLingerMs_ShouldOverrideLingerMs()
{
// Arrange
var clusterConfiguration = _fixture.Create<ClusterConfiguration>();

const double producerConfigLingerMs = 1000;
const double expectedLingerMs = 2000;
var producerConfig = new ProducerConfig { LingerMs = producerConfigLingerMs };

_target
.WithProducerConfig(producerConfig)
.WithLingerMs(expectedLingerMs);

// Act
var configuration = _target.Build(clusterConfiguration);

// Assert
configuration.Cluster.Should().Be(clusterConfiguration);
configuration.BaseProducerConfig.LingerMs.Should().Be(expectedLingerMs);
}

[TestMethod]
public void Build_UseProducerConfigAndUseStatisticsIntervalMs_ShouldOverrideStatisticsIntervalMs()
{
// Arrange
var clusterConfiguration = _fixture.Create<ClusterConfiguration>();

const int producerConfigStatisticsIntervalMs = 1000;
const int expectedStatisticsIntervalMs = 2000;
var producerConfig = new ProducerConfig
{
StatisticsIntervalMs = producerConfigStatisticsIntervalMs
};

_target
.WithProducerConfig(producerConfig)
.WithStatisticsIntervalMs(expectedStatisticsIntervalMs);

// Act
var configuration = _target.Build(clusterConfiguration);

// Assert
configuration.Cluster.Should().Be(clusterConfiguration);
configuration
.BaseProducerConfig.StatisticsIntervalMs.Should()
.Be(expectedStatisticsIntervalMs);
}
}