Skip to content

Commit 57192f3

Browse files
NikolayDekovmhowlett
authored andcommitted
Fixed GenericSerializerImpl to work with multiple topics. (#538)
* Fixed GenericSerializerImpl to work with multiple topics. Added UnitTests * Revert "Fixed GenericSerializerImpl to work with multiple topics." This reverts commit b231938. * GenericSerializerImpl fixed to work with multiple topics * GenericSerializerImpl - added clearing of local schema cache and trowing and exception in case of schemaId mismatch * GenericSerializerImpl fixed error message when throwing exception on schema mismatch
1 parent 4e8cdb8 commit 57192f3

File tree

2 files changed

+78
-6
lines changed

2 files changed

+78
-6
lines changed

src/Confluent.Kafka.Avro/GenericSerializerImpl.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,26 @@ public byte[] Serialize(string topic, GenericRecord data)
111111
var subjectSchemaPair = new KeyValuePair<string, string>(subject, writerSchemaString);
112112
if (!registeredSchemas.Contains(subjectSchemaPair))
113113
{
114+
int newSchemaId;
114115
// first usage: register/get schema to check compatibility
115116
if (autoRegisterSchema)
116117
{
117-
schemaIds.Add(
118-
writerSchemaString,
119-
schemaRegistryClient.RegisterSchemaAsync(subject, writerSchemaString).ConfigureAwait(false).GetAwaiter().GetResult());
118+
newSchemaId = schemaRegistryClient.RegisterSchemaAsync(subject, writerSchemaString).ConfigureAwait(false).GetAwaiter().GetResult();
120119
}
121120
else
122121
{
123-
schemaIds.Add(
124-
writerSchemaString,
125-
schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString).ConfigureAwait(false).GetAwaiter().GetResult());
122+
newSchemaId = schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString).ConfigureAwait(false).GetAwaiter().GetResult();
123+
}
124+
125+
if (!schemaIds.ContainsKey(writerSchemaString))
126+
{
127+
schemaIds.Add(writerSchemaString, newSchemaId);
128+
}
129+
else if (schemaIds[writerSchemaString] != newSchemaId)
130+
{
131+
schemaIds.Clear();
132+
registeredSchemas.Clear();
133+
throw new KafkaException(new Error(isKey ? ErrorCode.Local_KeySerialization : ErrorCode.Local_ValueSerialization, $"Duplicate schema registration encountered: Schema ids {schemaIds[writerSchemaString]} and {newSchemaId} are associated with the same schema."));
126134
}
127135

128136
registeredSchemas.Add(subjectSchemaPair);
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Confluent.Kafka.Examples.AvroSpecific;
4+
using Confluent.Kafka.Serialization;
5+
using Avro;
6+
using Avro.Generic;
7+
using Xunit;
8+
9+
namespace Confluent.Kafka.Avro.IntegrationTests
10+
{
11+
public static partial class Tests
12+
{
13+
/// <summary>
14+
/// Test that messages produced with the avro serializer can be consumed with the
15+
/// avro deserializer.
16+
/// </summary>
17+
[Theory, MemberData(nameof(TestParameters))]
18+
public static void ProduceGenericMultipleTopics(string bootstrapServers, string schemaRegistryServers)
19+
{
20+
var s = (RecordSchema)Schema.Parse(
21+
@"{
22+
""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
23+
""type"": ""record"",
24+
""name"": ""User"",
25+
""fields"": [
26+
{""name"": ""name"", ""type"": ""string""},
27+
{""name"": ""favorite_number"", ""type"": [""int"", ""null""]},
28+
{""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
29+
]
30+
}"
31+
);
32+
33+
var config = new Dictionary<string, object>()
34+
{
35+
{ "bootstrap.servers", bootstrapServers },
36+
{ "schema.registry.url", schemaRegistryServers }
37+
};
38+
39+
var topic = Guid.NewGuid().ToString();
40+
var topic2 = Guid.NewGuid().ToString();
41+
42+
Message<Null, GenericRecord> dr;
43+
Message<Null, GenericRecord> dr2;
44+
45+
using (var p = new Producer<Null, GenericRecord>(config, null, new AvroSerializer<GenericRecord>()))
46+
{
47+
var record = new GenericRecord(s);
48+
record.Add("name", "my name 2");
49+
record.Add("favorite_number", 44);
50+
record.Add("favorite_color", null);
51+
dr = p.ProduceAsync(topic, null, record).Result;
52+
dr2 = p.ProduceAsync(topic2, null, record).Result;
53+
}
54+
55+
Assert.Null(dr.Key);
56+
Assert.NotNull(dr.Value);
57+
58+
Assert.Null(dr2.Key);
59+
Assert.NotNull(dr2.Value);
60+
61+
}
62+
63+
}
64+
}

0 commit comments

Comments
 (0)