Skip to content

Commit 5d38703

Browse files
jainruchiremasabanchitj
authored
JSON referenced schema support (#2042)
* jsonserialisation with references support * json deserialiser with validation against a passed schema * changes to reference resolver, using AddSchema * integration test added for JSON with references * "commit cleanup (1)" * "commit cleanup(2)" * "removed error: same reference used elsewhere" * "changes to sln file and Version" * commit cleanup (3) * NonGenericJsonSerializer added, along with Convertor in JsonDeserializer * added check for int enum while adding ref schema * added unit test for schema w json ref SerDes * moved schema resolution to separate utils class * changes as per review comments * after review changes * changerlogs updated * Use object mapping when serializing (#2088) * Use object mapping when serializing and deserializing with referenced schemas. * Test producing and consumer JObject * Rename JsonSerDesSchemaUtils to JsonSchemaResolver * Copyright * Documentation * Fix unit tests * Add comment about NJson mapping * Fix CHANGELOG --------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Anchit Jain <[email protected]>
1 parent 19d4fa1 commit 5d38703

File tree

11 files changed

+1005
-41
lines changed

11 files changed

+1005
-41
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# vNext
2+
3+
## Enhancements
4+
5+
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
6+
7+
18
# 2.2.0
29

310
## Enhancements
@@ -22,7 +29,6 @@
2229
- References librdkafka.redist 2.1.1. Refer to the [librdkafka v2.1.1 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.1.1) for more information.
2330
- Less heap allocations when calling Produce ([bjornbouetsmith](https://github.com/bjornbouetsmith), #2020)
2431

25-
2632
# 2.1.0
2733

2834
## Enhancements

Confluent.Kafka.sln

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthOIDC", "examples\OAuth
6969
EndProject
7070
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthProducer", "examples\OAuthProducer\OAuthProducer.csproj", "{E72DAB16-FAF7-4365-8151-9450007C93A0}"
7171
EndProject
72+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JsonWithReferences", "examples\JsonWithReferences\JsonWithReferences.csproj", "{2931D890-9420-4EA7-BCEE-AAD53108A629}"
73+
EndProject
7274
Global
7375
GlobalSection(SolutionConfigurationPlatforms) = preSolution
7476
Debug|Any CPU = Debug|Any CPU
@@ -442,6 +444,18 @@ Global
442444
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x64.Build.0 = Release|Any CPU
443445
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x86.ActiveCfg = Release|Any CPU
444446
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x86.Build.0 = Release|Any CPU
447+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
448+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|Any CPU.Build.0 = Debug|Any CPU
449+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|x64.ActiveCfg = Debug|Any CPU
450+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|x64.Build.0 = Debug|Any CPU
451+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|x86.ActiveCfg = Debug|Any CPU
452+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Debug|x86.Build.0 = Debug|Any CPU
453+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|Any CPU.ActiveCfg = Release|Any CPU
454+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|Any CPU.Build.0 = Release|Any CPU
455+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x64.ActiveCfg = Release|Any CPU
456+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x64.Build.0 = Release|Any CPU
457+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x86.ActiveCfg = Release|Any CPU
458+
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x86.Build.0 = Release|Any CPU
445459
EndGlobalSection
446460
GlobalSection(NestedProjects) = preSolution
447461
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
@@ -471,5 +485,6 @@ Global
471485
{3BE5B540-43FC-4945-ACE5-88BB6B0D846E} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
472486
{98D7F3E1-80EE-437C-8915-528BFD80E9B2} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
473487
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
488+
{2931D890-9420-4EA7-BCEE-AAD53108A629} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
474489
EndGlobalSection
475490
EndGlobal
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>JsonWithReferences</AssemblyName>
6+
<OutputType>Exe</OutputType>
7+
<TargetFramework>net6.0</TargetFramework>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.2.0" /> -->
13+
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj" />
14+
<ProjectReference Include="../../src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj" />
15+
</ItemGroup>
16+
17+
</Project>
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Confluent.Kafka.SyncOverAsync;
18+
using Confluent.SchemaRegistry;
19+
using Confluent.SchemaRegistry.Serdes;
20+
using System;
21+
using System.IO;
22+
using System.Collections.Generic;
23+
using System.Threading;
24+
using System.Threading.Tasks;
25+
using NJsonSchema.Generation;
26+
using Newtonsoft.Json;
27+
using Newtonsoft.Json.Serialization;
28+
29+
30+
/// <summary>
31+
/// An example of working with JSON schemas with external,
32+
/// references and Json data, Apache Kafka and
33+
/// Confluent Schema Registry (v5.5 or later required for
34+
/// JSON schema support).
35+
/// </summary>
36+
namespace Confluent.Kafka.Examples.JsonWithReferences
37+
{
38+
39+
/// <remarks>
40+
/// The deserializer allows multiple ways to consume data.
41+
///
42+
/// If the consumer is aware of the entire schema details,
43+
/// they can create a class corresponding to it and use the
44+
/// deserializer in these ways:
45+
/// - without passing a schema, the deserializer will convert
46+
/// the serialized string to the object of this class.
47+
/// - pass a schema and allow validating against it.
48+
///
49+
/// Note: The user can also pass JObject to the
50+
/// ConsumerBuilder<string, JObject> and JsonDeserializer<JObject>
51+
/// in order to get JObject instead in consumer, this is possible
52+
/// in the producer too.
53+
/// </remarks>
54+
public class Product
55+
{
56+
public long ProductId { get; set; }
57+
58+
public string ProductName { get; set; }
59+
60+
public decimal Price { get; set; }
61+
62+
public List<string> Tags { get; set; }
63+
64+
public Dimensions Dimensions { get; set; }
65+
66+
public GeographicalLocation WarehouseLocation { get; set; }
67+
}
68+
69+
public class Dimensions
70+
{
71+
public decimal Length { get; set; }
72+
73+
public decimal Width { get; set; }
74+
75+
public decimal Height { get; set; }
76+
}
77+
78+
public class GeographicalLocation
79+
{
80+
public decimal Latitude { get; set; }
81+
82+
public decimal Longitude { get; set; }
83+
}
84+
85+
/// <remarks>
86+
/// Internally, the JSON serializer uses Newtonsoft.Json for
87+
/// serialization and NJsonSchema for schema creation and
88+
/// validation.
89+
/// </remarks>
90+
class Program
91+
{
92+
// from: https://json-schema.org/learn/getting-started-step-by-step.html
93+
private static string S1;
94+
private static string S2;
95+
static async Task Main(string[] args)
96+
{
97+
if (args.Length != 3)
98+
{
99+
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
100+
return;
101+
}
102+
103+
S1 = File.ReadAllText("geographical-location.json");
104+
S2 = File.ReadAllText("product.json");
105+
string bootstrapServers = args[0];
106+
string schemaRegistryUrl = args[1];
107+
string topicName = args[2];
108+
109+
var consumerConfig = new ConsumerConfig
110+
{
111+
BootstrapServers = bootstrapServers,
112+
GroupId = "json-example-consumer-group"
113+
};
114+
115+
var producerConfig = new ProducerConfig
116+
{
117+
BootstrapServers = bootstrapServers
118+
};
119+
120+
var schemaRegistryConfig = new SchemaRegistryConfig
121+
{
122+
Url = schemaRegistryUrl
123+
};
124+
125+
var sr = new CachedSchemaRegistryClient(schemaRegistryConfig);
126+
127+
var subject1 = $"{topicName}-CoordinatesOnMap";
128+
var subject2 = $"{topicName}-Product";
129+
130+
// Test there are no errors (exceptions) registering a schema that references another.
131+
var id1 = sr.RegisterSchemaAsync(subject1, new Schema(S1, SchemaType.Json)).Result;
132+
var s1 = sr.GetLatestSchemaAsync(subject1).Result;
133+
var refs = new List<SchemaReference> { new SchemaReference("geographical-location.json", subject1, s1.Version) };
134+
var id2 = sr.RegisterSchemaAsync(subject2, new Schema(S2, refs, SchemaType.Json)).Result;
135+
136+
// In fact, it seems references are not checked server side.
137+
var latestSchema2 = sr.GetLatestSchemaAsync(subject2).Result;
138+
var latestSchema2Unreg = latestSchema2.Schema;
139+
var latestSchema1 = sr.GetLatestSchemaAsync(subject1).Result;
140+
141+
var jsonSerializerConfig = new JsonSerializerConfig
142+
{
143+
BufferBytes = 100,
144+
UseLatestVersion = true,
145+
AutoRegisterSchemas = false,
146+
SubjectNameStrategy = SubjectNameStrategy.TopicRecord
147+
};
148+
149+
// This is needed only if you want to change attribute naming strategy
150+
// from default one to camelCase.
151+
// It's also possible to add JsonProperty attributes to customize
152+
// serialization mapping and all available NJson attributes.
153+
var jsonSchemaGeneratorSettings = new JsonSchemaGeneratorSettings
154+
{
155+
SerializerSettings = new JsonSerializerSettings
156+
{
157+
ContractResolver = new DefaultContractResolver
158+
{
159+
NamingStrategy = new CamelCaseNamingStrategy()
160+
}
161+
}
162+
};
163+
164+
CancellationTokenSource cts = new CancellationTokenSource();
165+
166+
var consumeTask = Task.Run(() =>
167+
{
168+
using (var consumer =
169+
new ConsumerBuilder<long, Product>(consumerConfig)
170+
.SetValueDeserializer(new JsonDeserializer<Product>(sr, latestSchema2Unreg, null, jsonSchemaGeneratorSettings).AsSyncOverAsync())
171+
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
172+
.Build())
173+
{
174+
consumer.Subscribe(topicName);
175+
176+
try
177+
{
178+
while (true)
179+
{
180+
try
181+
{
182+
var cr = consumer.Consume(cts.Token);
183+
var product = cr.Message.Value;
184+
185+
Console.WriteLine("CONSUMER: product name " + product.ProductName +
186+
$" Product id {product.ProductId} " +
187+
$"Price: {product.Price} " +
188+
$"Latitude: {product.WarehouseLocation.Latitude} " +
189+
$"Longitude: {product.WarehouseLocation.Longitude}");
190+
}
191+
catch (ConsumeException e)
192+
{
193+
Console.WriteLine($"Consume error: {e.Error.Reason}");
194+
}
195+
}
196+
}
197+
catch (OperationCanceledException)
198+
{
199+
consumer.Close();
200+
}
201+
}
202+
});
203+
204+
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
205+
using (var producer =
206+
new ProducerBuilder<long, Product>(producerConfig)
207+
.SetValueSerializer(new JsonSerializer<Product>(schemaRegistry, latestSchema2Unreg,
208+
jsonSerializerConfig, jsonSchemaGeneratorSettings))
209+
.Build())
210+
{
211+
Console.WriteLine($"PRODUCER: {producer.Name} producing on {topicName}. Enter product name, q to exit.");
212+
213+
long i = 1;
214+
string text;
215+
while ((text = Console.ReadLine()) != "q")
216+
{
217+
var product = new Product
218+
{
219+
ProductId = i++,
220+
ProductName = text,
221+
Price = 9.99M,
222+
Tags = new List<string> { "tag1", "tag2" },
223+
Dimensions = new Dimensions
224+
{
225+
Length = 10.0M,
226+
Width = 5.0M,
227+
Height = 2.0M
228+
},
229+
WarehouseLocation = new GeographicalLocation
230+
{
231+
Latitude = 37.7749M,
232+
Longitude = -122.4194M
233+
}
234+
};
235+
try
236+
{
237+
await producer.ProduceAsync(topicName, new Message<long, Product>
238+
{
239+
Key = product.ProductId,
240+
Value = product
241+
});
242+
}
243+
catch (Exception e)
244+
{
245+
Console.WriteLine($"error producing message: {e.Message}");
246+
}
247+
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter product name, q to exit.");
248+
}
249+
}
250+
cts.Cancel();
251+
}
252+
}
253+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"$id": "geographical-location.json",
3+
"$schema": "http://json-schema.org/draft-07/schema#",
4+
"title": "CoordinatesOnMap",
5+
"description": "A geographical coordinate on a planet (most commonly Earth).",
6+
"required": [
7+
"latitude",
8+
"longitude"
9+
],
10+
"type": "object",
11+
"properties": {
12+
"latitude": {
13+
"type": "number",
14+
"minimum": -90,
15+
"maximum": 90
16+
},
17+
"longitude": {
18+
"type": "number",
19+
"minimum": -180,
20+
"maximum": 180
21+
}
22+
}
23+
}
24+

0 commit comments

Comments
 (0)