Skip to content

Commit b03c76a

Browse files
namelessvoidmhowlett
authored andcommitted
Validate config values for 'not-null' (#544)
1 parent 57192f3 commit b03c76a

File tree

4 files changed

+83
-14
lines changed

4 files changed

+83
-14
lines changed

src/Confluent.Kafka/Consumer.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -819,15 +819,21 @@ public Consumer(IEnumerable<KeyValuePair<string, object>> config)
819819
config
820820
.Where(prop => prop.Key != "default.topic.config")
821821
.ToList()
822-
.ForEach((kvp) => { configHandle.Set(kvp.Key, kvp.Value.ToString()); });
822+
.ForEach((kvp) => {
823+
if (kvp.Value == null) throw new ArgumentException($"'{kvp.Key}' configuration parameter must not be null.");
824+
configHandle.Set(kvp.Key, kvp.Value.ToString());
825+
});
823826

824827
// Note: Setting default topic configuration properties via default.topic.config is depreciated
825828
// and this functionality will be removed in a future version of the library.
826829
var defaultTopicConfig = (IEnumerable<KeyValuePair<string, object>>)config.FirstOrDefault(prop => prop.Key == "default.topic.config").Value;
827830
if (defaultTopicConfig != null)
828831
{
829832
defaultTopicConfig.ToList().ForEach(
830-
(kvp) => { configHandle.Set(kvp.Key, kvp.Value.ToString()); }
833+
(kvp) => {
834+
if (kvp.Value == null) throw new ArgumentException($"'{kvp.Key}' configuration parameter in 'default.topic.config' must not be null.");
835+
configHandle.Set(kvp.Key, kvp.Value.ToString());
836+
}
831837
);
832838
}
833839

src/Confluent.Kafka/Producer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,10 @@ public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPol
279279
modifiedConfig = modifiedConfig.Concat(new KeyValuePair<string, object>[] { new KeyValuePair<string, object>("produce.offset.report", "true") });
280280
}
281281

282-
modifiedConfig.ToList().ForEach((kvp) => { configHandle.Set(kvp.Key, kvp.Value.ToString()); });
282+
modifiedConfig.ToList().ForEach((kvp) => {
283+
if (kvp.Value == null) throw new ArgumentException($"'{kvp.Key}' configuration parameter must not be null.");
284+
configHandle.Set(kvp.Key, kvp.Value.ToString());
285+
});
283286

284287
IntPtr configPtr = configHandle.DangerousGetHandle();
285288

test/Confluent.Kafka.UnitTests/Consumer.cs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,52 @@ namespace Confluent.Kafka.UnitTests
2525
{
2626
public class ConsumerTests
2727
{
28-
/// <summary>
29-
/// Test that the Consumer constructor throws an exception if
30-
/// the group.id configuration parameter is not set and that
31-
/// the message of the exception mentions group.id (i.e. is
32-
/// not some unrelated exception).
33-
/// </summary>
3428
[Fact]
3529
public void Constuctor()
3630
{
31+
// Throw exception if 'group.id' is not set in config and ensure that exception
32+
// mentions 'group.id'.
3733
var config = new Dictionary<string, object>();
3834
var e = Assert.Throws<ArgumentException>(() => { var c = new Consumer(config); });
3935
Assert.True(e.Message.Contains("group.id"));
4036
e = Assert.Throws<ArgumentException>(() => { var c = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)); });
4137
Assert.True(e.Message.Contains("group.id"));
4238

39+
// Throw exception if a config value is null and ensure that exception mentions the
40+
// respective config key.
41+
var configWithNullValue = CreateValidConfiguration();
42+
configWithNullValue["sasl.password"] = null;
43+
e = Assert.Throws<ArgumentException>(() => { var c = new Consumer<byte[], byte[]>(configWithNullValue, new ByteArrayDeserializer(), new ByteArrayDeserializer()); });
44+
Assert.Contains("sasl.password", e.Message);
45+
46+
// Throw exception if a config value within default.topic.config is null and
47+
// ensure that exception mentions the respective config key.
48+
var configWithDefaultTopicNullValue = CreateValidConfiguration();
49+
configWithDefaultTopicNullValue["default.topic.config"] = new Dictionary<string, object>() { { "auto.offset.reset", null } };
50+
e = Assert.Throws<ArgumentException>(() => { var c = new Consumer<byte[], byte[]>(configWithDefaultTopicNullValue, new ByteArrayDeserializer(), new ByteArrayDeserializer()); });
51+
Assert.Contains("default.topic.config", e.Message);
52+
Assert.Contains("auto.offset.reset", e.Message);
53+
54+
// Throw exception when serializer and deserializer are equal and ensure that exception
55+
// message indicates the issue.
4356
e = Assert.Throws<ArgumentException>(() =>
4457
{
45-
var validConfig = new Dictionary<string, object>
46-
{
47-
{ "bootstrap.servers", "localhost:9092" },
48-
{ "group.id", "my-group" }
49-
};
58+
var validConfig = CreateValidConfiguration();
5059
var deserializer = new StringDeserializer(Encoding.UTF8);
5160
var c = new Consumer<string, string>(validConfig, deserializer, deserializer);
5261
});
5362
Assert.True(e.Message.Contains("must not be the same object"));
5463

5564
// positve case covered by integration tests. here, avoiding creating a rd_kafka_t instance.
5665
}
66+
67+
private static Dictionary<string, object> CreateValidConfiguration()
68+
{
69+
return new Dictionary<string, object>
70+
{
71+
{ "bootstrap.servers", "localhost:9092" },
72+
{ "group.id", "my-group" }
73+
};
74+
}
5775
}
5876
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2016-2017 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Xunit;
18+
using System;
19+
using System.Text;
20+
using System.Collections.Generic;
21+
using Confluent.Kafka.Serialization;
22+
23+
24+
namespace Confluent.Kafka.UnitTests
25+
{
26+
public class ProducerTests
27+
{
28+
[Fact]
29+
public void Constuctor()
30+
{
31+
// Throw exception if a config value is null and ensure that exception mentions the
32+
// respective config key.
33+
var configWithNullValue = new Dictionary<string, object>
34+
{
35+
{ "sasl.password", null }
36+
};
37+
configWithNullValue["sasl.password"] = null;
38+
var e = Assert.Throws<ArgumentException>(() => { var c = new Producer<byte[], byte[]>(configWithNullValue, new ByteArraySerializer(), new ByteArraySerializer()); });
39+
Assert.Contains("sasl.password", e.Message);
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)