Skip to content

Commit dd431bd

Browse files
PrasanthV454Matt Howlett
andauthored
Bug/latest version compatibility check (#1929)
* adedd compatibility check on schema generated with latest version * NJsonSchema Update to 10.6.3 Co-authored-by: Matt Howlett <[email protected]>
1 parent 8974218 commit dd431bd

File tree

7 files changed

+142
-22
lines changed

7 files changed

+142
-22
lines changed

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
# 2.0.0
2+
3+
## Enhancements
4+
5+
- Upgraded `NJsonSchema` to v10.6.3
6+
- Added `LatestCompatibilityStrict` configuration property to JsonSerializerConfig to check the compatibility with latest schema
7+
when `UseLatestVersion` is set to true.
8+
9+
110
# 1.9.4
211

312
## Fixes
413

514
- References librdkafka.redist 1.9.3-RC2 which resolves a transaction related issue. Refer to the [librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.3-RC2) for more information.
615

716

8-
# 1.9.3
17+
# 1.9.3
918

1019
## Enhancements
1120

examples/JsonSerialization/Program.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,14 @@ static async Task Main(string[] args)
149149
while ((text = Console.ReadLine()) != "q")
150150
{
151151
User user = new User { Name = text, FavoriteColor = "blue", FavoriteNumber = i++ };
152-
await producer
153-
.ProduceAsync(topicName, new Message<string, User> { Value = user })
154-
.ContinueWith(task => task.IsFaulted
155-
? $"error producing message: {task.Exception.Message}"
156-
: $"produced to: {task.Result.TopicPartitionOffset}");
152+
try
153+
{
154+
await producer.ProduceAsync(topicName, new Message<string, User> { Value = user });
155+
}
156+
catch (Exception e)
157+
{
158+
Console.WriteLine($"error producing message: {e.Message}");
159+
}
157160
}
158161
}
159162

src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
</PropertyGroup>
2323

2424
<ItemGroup>
25-
<PackageReference Include="NJsonSchema" Version="10.5.2" />
25+
<PackageReference Include="NJsonSchema" Version="10.6.3" />
2626
</ItemGroup>
2727

2828
<ItemGroup>

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class JsonSerializer<T> : IAsyncSerializer<T> where T : class
6060
private bool autoRegisterSchema = true;
6161
private bool normalizeSchemas = false;
6262
private bool useLatestVersion = false;
63+
private bool latestCompatibilityStrict = false;
6364
private int initialBufferSize = DefaultInitialBufferSize;
6465
private SubjectNameStrategyDelegate subjectNameStrategy = null;
6566
private ISchemaRegistryClient schemaRegistryClient;
@@ -116,6 +117,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
116117
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
117118
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
118119
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
120+
if (config.LatestCompatibilityStrict != null) { this.latestCompatibilityStrict = config.LatestCompatibilityStrict.Value; }
119121
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
120122

121123
if (this.useLatestVersion && this.autoRegisterSchema)
@@ -171,26 +173,33 @@ public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
171173

172174
if (!subjectsRegistered.Contains(subject))
173175
{
174-
if (useLatestVersion)
176+
if (autoRegisterSchema)
177+
{
178+
schemaId = await schemaRegistryClient.RegisterSchemaAsync(subject,
179+
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
180+
.ConfigureAwait(continueOnCapturedContext: false);
181+
}
182+
else if (useLatestVersion)
175183
{
176184
var latestSchema = await schemaRegistryClient.GetLatestSchemaAsync(subject)
177185
.ConfigureAwait(continueOnCapturedContext: false);
186+
if (latestCompatibilityStrict)
187+
{
188+
var isCompatible = await schemaRegistryClient.IsCompatibleAsync(subject, new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json))
189+
.ConfigureAwait(continueOnCapturedContext: false);
190+
if (!isCompatible)
191+
{
192+
throw new InvalidDataException("Schema not compatible with latest schema : " + latestSchema.SchemaString);
193+
}
194+
}
178195
schemaId = latestSchema.Id;
179196
}
180197
else
181198
{
182-
// first usage: register/get schema to check compatibility
183-
schemaId = autoRegisterSchema
184-
? await schemaRegistryClient.RegisterSchemaAsync(subject,
185-
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
186-
.ConfigureAwait(continueOnCapturedContext: false)
187-
: await schemaRegistryClient.GetSchemaIdAsync(subject,
199+
schemaId = await schemaRegistryClient.GetSchemaIdAsync(subject,
188200
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
189201
.ConfigureAwait(continueOnCapturedContext: false);
190-
191-
// TODO: It may be better to fail fast if conflicting values for schemaId are seen here.
192202
}
193-
194203
subjectsRegistered.Add(subject);
195204
}
196205
}

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,22 @@ public static class PropertyNames
6464
/// <summary>
6565
/// Specifies whether or not the JSON serializer should use the latest subject
6666
/// version for serialization.
67-
/// WARNING: There is no check that the latest schema is backwards compatible
68-
/// with the schema of the object being serialized.
67+
/// WARNING: There is no check that the latest schema is compatible
68+
/// with the schema of the object being serialized by default.
69+
/// Use the LatestCompatibilityStrict config property to enable this.
6970
///
7071
/// default: false
7172
/// </summary>
7273
public const string UseLatestVersion = "json.serializer.use.latest.version";
7374

75+
/// <summary>
76+
/// Specifies whether or not the JSON serializer should check the compatibility
77+
/// with the latest schema of the subject if use.latest.version is set to true.
78+
///
79+
/// default: false
80+
/// </summary>
81+
public const string LatestCompatibilityStrict = "json.serializer.latest.compatibility.strict";
82+
7483
/// <summary>
7584
/// The subject name strategy to use for schema registration / lookup.
7685
/// Possible values: <see cref="Confluent.SchemaRegistry.SubjectNameStrategy" />
@@ -137,8 +146,9 @@ public bool? NormalizeSchemas
137146
/// <summary>
138147
/// Specifies whether or not the JSON serializer should use the latest subject
139148
/// version for serialization.
140-
/// WARNING: There is no check that the latest schema is backwards compatible
141-
/// with the schema of the object being serialized.
149+
/// WARNING: There is no check that the latest schema is compatible
150+
/// with the schema of the object being serialized by default.
151+
/// Use the LatestCompatibilityStrict config property to enable this.
142152
///
143153
/// default: false
144154
/// </summary>
@@ -147,6 +157,19 @@ public bool? UseLatestVersion
147157
get { return GetBool(PropertyNames.UseLatestVersion); }
148158
set { SetObject(PropertyNames.UseLatestVersion, value); }
149159
}
160+
161+
162+
/// <summary>
163+
/// Specifies whether or not the JSON serializer should check the backwards compatibility
164+
/// with the latest schema of the subject.
165+
///
166+
/// default: false
167+
/// </summary>
168+
public bool? LatestCompatibilityStrict
169+
{
170+
get { return GetBool(PropertyNames.LatestCompatibilityStrict); }
171+
set { SetObject(PropertyNames.LatestCompatibilityStrict, value); }
172+
}
150173

151174

152175
/// <summary>

test/Confluent.SchemaRegistry.IntegrationTests/Confluent.SchemaRegistry.IntegrationTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<PackageReference Include="xunit" Version="2.3.1" />
2626
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
2727
<PackageReference Include="Google.Protobuf" Version="3.15.0" />
28-
<PackageReference Include="NJsonSchema" Version="10.5.2" />
28+
<PackageReference Include="NJsonSchema" Version="10.6.3" />
2929
</ItemGroup>
3030

3131
<ItemGroup>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2020 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Xunit;
18+
using System;
19+
using Confluent.Kafka;
20+
21+
namespace Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses1
22+
{
23+
public class TestPoco
24+
{
25+
public int IntField { get; set; }
26+
}
27+
}
28+
29+
namespace Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses2
30+
{
31+
public class TestPoco
32+
{
33+
public string StringField { get; set; }
34+
}
35+
}
36+
37+
38+
namespace Confluent.SchemaRegistry.Serdes.IntegrationTests
39+
{
40+
public static partial class Tests
41+
{
42+
/// <summary>
43+
/// Test Use Latest Version on when AutoRegister enabled and disabled.
44+
/// </summary>
45+
[Theory, MemberData(nameof(TestParameters))]
46+
public static void UseLatestVersionCheck(string bootstrapServers, string schemaRegistryServers)
47+
{
48+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
49+
var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryServers };
50+
51+
using (var topic = new TemporaryTopic(bootstrapServers, 1))
52+
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
53+
{
54+
using (var producer =
55+
new ProducerBuilder<string, Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses1.TestPoco>(producerConfig)
56+
.SetValueSerializer(new JsonSerializer<Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses1.TestPoco>(schemaRegistry))
57+
.Build())
58+
{
59+
var c = new Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses1.TestPoco { IntField = 1 };
60+
producer.ProduceAsync(topic.Name, new Message<string, Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses1.TestPoco> { Key = "test1", Value = c }).Wait();
61+
}
62+
63+
using (var producer =
64+
new ProducerBuilder<string, Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses2.TestPoco>(producerConfig)
65+
.SetValueSerializer(new JsonSerializer<Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses2.TestPoco>(
66+
schemaRegistry, new JsonSerializerConfig { UseLatestVersion = true, AutoRegisterSchemas = false, LatestCompatibilityStrict = true }))
67+
.Build())
68+
{
69+
var c = new Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses2.TestPoco { StringField = "Test" };
70+
Assert.Throws<AggregateException>(
71+
() => producer.ProduceAsync(topic.Name, new Message<string, Confluent.SchemaRegistry.Serdes.IntegrationTests.TestClasses2.TestPoco> { Key = "test1", Value = c }).Wait());
72+
}
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)