diff --git a/.gitignore b/.gitignore index de02846e6..84deaf8b0 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ UpgradeLog*.htm *~ \#* core +.idea/ diff --git a/src/Confluent.Kafka/ConsumerBuilder.cs b/src/Confluent.Kafka/ConsumerBuilder.cs index 56549e783..d27307b5a 100644 --- a/src/Confluent.Kafka/ConsumerBuilder.cs +++ b/src/Confluent.Kafka/ConsumerBuilder.cs @@ -157,11 +157,7 @@ public ConsumerBuilder(IEnumerable> config) public ConsumerBuilder SetStatisticsHandler( Action, string> statisticsHandler) { - if (this.StatisticsHandler != null) - { - throw new InvalidOperationException("Statistics handler may not be specified more than once."); - } - this.StatisticsHandler = statisticsHandler; + this.StatisticsHandler += statisticsHandler; return this; } @@ -180,11 +176,7 @@ public ConsumerBuilder SetStatisticsHandler( public ConsumerBuilder SetErrorHandler( Action, Error> errorHandler) { - if (this.ErrorHandler != null) - { - throw new InvalidOperationException("Error handler may not be specified more than once."); - } - this.ErrorHandler = errorHandler; + this.ErrorHandler += errorHandler; return this; } @@ -210,11 +202,7 @@ public ConsumerBuilder SetErrorHandler( public ConsumerBuilder SetLogHandler( Action, LogMessage> logHandler) { - if (this.LogHandler != null) - { - throw new InvalidOperationException("Log handler may not be specified more than once."); - } - this.LogHandler = logHandler; + this.LogHandler += logHandler; return this; } diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index e2a3b488b..bd9f439f8 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -167,11 +167,7 @@ public ProducerBuilder(IEnumerable> config) /// public ProducerBuilder SetStatisticsHandler(Action, string> statisticsHandler) { - if (this.StatisticsHandler != null) - { - throw new InvalidOperationException("Statistics handler may not be specified more than once."); - } - this.StatisticsHandler = statisticsHandler; + this.StatisticsHandler += statisticsHandler; return this; } @@ -222,11 +218,7 @@ public ProducerBuilder SetDefaultPartitioner(PartitionerDelegate p /// public ProducerBuilder SetErrorHandler(Action, Error> errorHandler) { - if (this.ErrorHandler != null) - { - throw new InvalidOperationException("Error handler may not be specified more than once."); - } - this.ErrorHandler = errorHandler; + this.ErrorHandler += errorHandler; return this; } @@ -251,11 +243,7 @@ public ProducerBuilder SetErrorHandler(Action public ProducerBuilder SetLogHandler(Action, LogMessage> logHandler) { - if (this.LogHandler != null) - { - throw new InvalidOperationException("Log handler may not be specified more than once."); - } - this.LogHandler = logHandler; + this.LogHandler += logHandler; return this; } diff --git a/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs b/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs new file mode 100644 index 000000000..8d49b435b --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs @@ -0,0 +1,16 @@ +using System; +using Xunit; + +namespace Confluent.Kafka.IntegrationTests; + +public sealed class SkipWhenCITheory : TheoryAttribute +{ + private const string JenkinsBuildIdEnvVarName = "BUILD_ID"; + + public SkipWhenCITheory(string reason) + { + Skip = Environment.GetEnvironmentVariables().Contains(JenkinsBuildIdEnvVarName) + ? reason + : null; + } +} \ No newline at end of file diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs new file mode 100644 index 000000000..5c4651f56 --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs @@ -0,0 +1,230 @@ +// Copyright 2016-2017 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + +using System; +using System.Linq; +using System.Threading; +using Xunit; + + +namespace Confluent.Kafka.IntegrationTests +{ + /// + /// Test multiple calls to SetLogHandler, SetStatisticsHandler and SetErrorHandler + /// + public partial class Tests + { + private const string UnreachableBootstrapServers = "localhost:9000"; + + [Theory, MemberData(nameof(KafkaParameters))] + public void ProducerBuilder_SetLogHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetLogHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + Debug = "all" + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetLogHandler((_, _) => mres1.Set()) + .SetLogHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetLogHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ProducerBuilder_SetStatisticsHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetStatisticsHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + StatisticsIntervalMs = 100 + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetStatisticsHandler((_, _) => mres1.Set()) + .SetStatisticsHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetStatisticsHandler"); + } + + [Theory, InlineData(UnreachableBootstrapServers)] + public void ProducerBuilder_SetErrorHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetErrorHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetErrorHandler((_, _) => mres1.Set()) + .SetErrorHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetErrorHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetLogHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetLogHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + EnablePartitionEof = true, + Debug = "all" + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetLogHandler((_, _) => mres1.Set()) + .SetLogHandler((_, _) => mres2.Set()) + .Build(); + consumer.Subscribe(singlePartitionTopic); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ConsumerBuilder_SetLogHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetStatisticsHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + EnablePartitionEof = true, + StatisticsIntervalMs = 100 + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using (var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetStatisticsHandler((_, _) => mres1.Set()) + .SetStatisticsHandler((_, _) => mres2.Set()) + .Build()) + { + consumer.Subscribe(singlePartitionTopic); + + while (true) + { + var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (record == null) { continue; } + if (record.IsPartitionEOF) { break; } + } + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + consumer.Close(); + } + + LogToFile("end ConsumerBuilder_SetStatisticsHandler"); + } + + [SkipWhenCITheory("Requires to stop the broker in the while loop to simulate broker is down."), MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetErrorHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetErrorHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000 + }; + + bool errorHandler1Called = false, errorHandler2Called = false; + + using (var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetErrorHandler((_, _) => errorHandler1Called = true) + .SetErrorHandler((_, _) => errorHandler2Called = true) + .Build()) + { + consumer.Subscribe(singlePartitionTopic); + + while (!errorHandler1Called && !errorHandler2Called) + { + consumer.Consume(TimeSpan.FromMilliseconds(100)); + } + + consumer.Close(); + } + + LogToFile("end ConsumerBuilder_SetErrorHandler"); + } + } +}