Skip to content

Commit 631bcd0

Browse files
author
Matt Howlett
committed
all tests pass
1 parent 01cf6b3 commit 631bcd0

File tree

8 files changed

+24
-11
lines changed

8 files changed

+24
-11
lines changed

src/Confluent.Kafka.Avro/AvroSerdeProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public SerializerGenerator<T> GetSerializerGenerator<T>()
6262
return (forKey) =>
6363
{
6464
var serializer = new AvroSerializer<T>(schemaRegistryClient);
65-
serializer.Configure(config, true);
65+
serializer.Configure(config, forKey);
6666
return (string topic, T data) => serializer.Serialize(topic, data);
6767
};
6868
}

src/Confluent.Kafka/Consumer.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,13 @@ public Consumer(
237237
{
238238
Librdkafka.Initialize(null);
239239

240-
KeyDeserializer = keyDeserializerGenerator(true);
241-
ValueDeserializer = valueDeserializerGenerator(false);
240+
KeyDeserializer = keyDeserializerGenerator == null
241+
? null
242+
: keyDeserializerGenerator(true);
243+
244+
ValueDeserializer = valueDeserializerGenerator == null
245+
? null
246+
: valueDeserializerGenerator(false);
242247

243248
if (KeyDeserializer != null && (object)KeyDeserializer == (object)ValueDeserializer)
244249
{

src/Confluent.Kafka/Producer.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ public Producer(
166166

167167
this.handle = ownedClient.Handle;
168168
this.producer = ownedClient;
169-
setAndValidateSerializers(keySerializerGenerator(true), valueSerializerGenerator(false));
169+
setAndValidateSerializers(
170+
keySerializerGenerator == null ? null: keySerializerGenerator(true),
171+
valueSerializerGenerator == null ? null : valueSerializerGenerator(false));
170172
}
171173

172174

@@ -184,8 +186,8 @@ public Producer(
184186
/// </param>
185187
public Producer(
186188
Handle handle,
187-
Serializer<TKey> keySerializer,
188-
Serializer<TValue> valueSerializer)
189+
Serializer<TKey> keySerializer = null,
190+
Serializer<TValue> valueSerializer = null)
189191
{
190192
if (!(handle.Owner is Producer))
191193
{
@@ -224,7 +226,9 @@ public Producer(
224226
this.ownedClient = null;
225227
this.handle = handle;
226228
this.producer = (Producer)handle.Owner;
227-
setAndValidateSerializers(keySerializerGenerator(true), valueSerializerGenerator(false));
229+
setAndValidateSerializers(
230+
keySerializerGenerator == null ? null : keySerializerGenerator(true),
231+
valueSerializerGenerator == null ? null : valueSerializerGenerator(false));
228232
}
229233

230234

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
8383
var basicAuthInfo = Convert.ToString(config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo).Value);
8484
if (basicAuthInfo != "" && basicAuthSource == "")
8585
{
86-
throw new ArgumentException($"CachedSchemaRegistryClient: {SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo} was specified, but {SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource} was not.");
86+
// default to USER_INFO if no source specified, since the strongly typed config
87+
// class doesn't even expose the config source property.
88+
basicAuthSource = "USER_INFO";
8789
}
8890

8991
string username = null;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"bootstrapServer": "localhost:9092",
2+
"bootstrapServer": "10.200.7.144:9092",
33
"schemaRegistryServer": "localhost:8081"
44
}

test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public static void LogDelegate(string bootstrapServers, string singlePartitionTo
6767
consumer.OnLog += (_, m)
6868
=> logCount += 1;
6969

70+
consumer.Assign(new TopicPartition(singlePartitionTopic, 0));
71+
7072
consumer.Consume(TimeSpan.FromMilliseconds(100));
7173
}
7274
Assert.True(logCount > 0);

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null_DeliveryHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static void Producer_Produce_Null_DeliveryHandler(string bootstrapServers
4848
Assert.Null(dr.Message.Value);
4949
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
5050
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
51-
count += 1;
51+
count += 1;
5252
};
5353

5454
using (var producer = new Producer<Null, Null>(producerConfig))
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"bootstrapServers": "localhost:9092",
2+
"bootstrapServers": "10.200.7.144:9092",
33
"singlePartitionTopic": "test-topic-1",
44
"partitionedTopic": "test-topic-2"
55
}

0 commit comments

Comments
 (0)