diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index a3c0a07af..b572285ef 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -20,10 +20,9 @@ using System.Linq; using System.Net; using System.Text; +using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using NJsonSchema; using NJsonSchema.Generation; using NJsonSchema.Validation; @@ -55,6 +54,8 @@ public class JsonDeserializer : AsyncDeserializer where T : cl { private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings; + private readonly JsonSerializerOptions jsonSerializerOptions; + private JsonSchemaValidator validator = new JsonSchemaValidator(); private JsonSchema schema = null; @@ -84,6 +85,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial : base(schemaRegistryClient, config, ruleRegistry) { this.jsonSchemaGeneratorSettings = jsonSchemaGeneratorSettings; + this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings); if (config == null) { return; } @@ -127,6 +129,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, Schema schem schemaRegistryClient, schema, this.jsonSchemaGeneratorSettings); JsonSchema jsonSchema = utils.GetResolvedSchema().Result; this.schema = jsonSchema; + this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings); } /// @@ -196,7 +199,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i } } } - + if (latestSchema != null) { migrations = await GetMigrations(subject, writerSchema, latestSchema) @@ -208,14 +211,16 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i using (var jsonStream = new MemoryStream(array, headerSize, array.Length - headerSize)) using (var jsonReader = new StreamReader(jsonStream, Encoding.UTF8)) { - JToken json = Newtonsoft.Json.JsonConvert.DeserializeObject(jsonReader.ReadToEnd(), this.jsonSchemaGeneratorSettings?.ActualSerializerSettings); - json = await ExecuteMigrations(migrations, isKey, subject, topic, context.Headers, json) - .ContinueWith(t => (JToken)t.Result) + string jsonString = await jsonReader.ReadToEndAsync(); + using JsonDocument jsonDocument = JsonDocument.Parse(jsonString); + JsonElement jsonElement = jsonDocument.RootElement; + jsonElement = (JsonElement)await ExecuteMigrations(migrations, isKey, subject, topic, context.Headers, jsonElement) + .ContinueWith(t => t.Result) .ConfigureAwait(continueOnCapturedContext: false); if (schema != null) { - var validationResult = validator.Validate(json, schema); + var validationResult = validator.Validate(jsonElement.GetRawText(), schema); if (validationResult.Count > 0) { @@ -224,7 +229,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i } } - value = json.ToObject(JsonSerializer.Create()); + value = jsonElement.Deserialize(this.jsonSerializerOptions); // TODO not sure why we need this here } } else @@ -233,7 +238,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i using (var jsonReader = new StreamReader(jsonStream, Encoding.UTF8)) { string serializedString = jsonReader.ReadToEnd(); - + if (schema != null) { var validationResult = validator.Validate(serializedString, schema); @@ -245,7 +250,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i } } - value = Newtonsoft.Json.JsonConvert.DeserializeObject(serializedString, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings); + value = System.Text.Json.JsonSerializer.Deserialize(serializedString, this.jsonSerializerOptions); } } @@ -263,8 +268,8 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i writerSchema, value, fieldTransformer) .ContinueWith(t => (T)t.Result) .ConfigureAwait(continueOnCapturedContext: false); - } - + } + return value; } catch (AggregateException e) @@ -277,7 +282,7 @@ protected override async Task ParseSchema(Schema schema) { JsonSchemaResolver utils = new JsonSchemaResolver( schemaRegistryClient, schema, jsonSchemaGeneratorSettings); - + return await utils.GetResolvedSchema(); } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index d7a1aa6de..91d68dda3 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -22,6 +22,7 @@ using System.IO; using System.Linq; using System.Net; +using System.Text.Json; using System.Threading.Tasks; using NJsonSchema; using NJsonSchema.Generation; @@ -55,8 +56,10 @@ namespace Confluent.SchemaRegistry.Serdes public class JsonSerializer : AsyncSerializer where T : class { private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings; + + private readonly JsonSerializerOptions jsonSerializerOptions; private readonly List ReferenceList = new List(); - + private JsonSchemaValidator validator = new JsonSchemaValidator(); /// @@ -81,18 +84,19 @@ public class JsonSerializer : AsyncSerializer where T : class /// /// JSON schema generator settings. /// - public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializerConfig config = null, + public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializerConfig config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null) : base(schemaRegistryClient, config, ruleRegistry) { this.jsonSchemaGeneratorSettings = jsonSchemaGeneratorSettings; + this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings); this.schema = this.jsonSchemaGeneratorSettings == null ? JsonSchema.FromType() : JsonSchema.FromType(this.jsonSchemaGeneratorSettings); this.schemaText = schema.ToJson(); this.schemaFullname = schema.Title; - + if (config == null) { return; } var nonJsonConfig = config @@ -135,7 +139,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer /// /// JSON schema generator settings. /// - public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSerializerConfig config = null, + public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSerializerConfig config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null) : this(schemaRegistryClient, config, jsonSchemaGeneratorSettings, ruleRegistry) { @@ -150,6 +154,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, this.schema = jsonSchema; this.schemaText = schema.SchemaString; this.schemaFullname = jsonSchema.Title; + this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings); } /// @@ -186,7 +191,7 @@ public override async Task SerializeAsync(T value, SerializationContext subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname); latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json)) .ConfigureAwait(continueOnCapturedContext: false); - + if (!subjectsRegistered.Contains(subject)) { if (latestSchema != null) @@ -211,7 +216,7 @@ public override async Task SerializeAsync(T value, SerializationContext { serdeMutex.Release(); } - + if (latestSchema != null) { var latestSchemaJson = await GetParsedSchema(latestSchema).ConfigureAwait(false); @@ -226,7 +231,7 @@ public override async Task SerializeAsync(T value, SerializationContext .ConfigureAwait(continueOnCapturedContext: false); } - var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings); + var serializedString = JsonSerializer.Serialize(value, this.jsonSerializerOptions); var validationResult = validator.Validate(serializedString, this.schema); if (validationResult.Count > 0) { diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs index 243e598df..85b850419 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs @@ -19,12 +19,16 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Text.Json; using System.Threading.Tasks; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Newtonsoft.Json.Serialization; using NJsonSchema; using NJsonSchema.Validation; +using NJsonSchema.Generation; +using Microsoft.Extensions.Options; +using System.Text.Json.Serialization; namespace Confluent.SchemaRegistry.Serdes @@ -41,13 +45,13 @@ public static async Task Transform(RuleContext ctx, JsonSchema schema, s { return message; } - + RuleContext.FieldContext fieldContext = ctx.CurrentField(); if (fieldContext != null) { fieldContext.Type = GetType(schema); } - + if (schema.AllOf.Count > 0 || schema.AnyOf.Count > 0 || schema.OneOf.Count > 0) { JToken jsonObject = JToken.FromObject(message); @@ -65,9 +69,9 @@ public static async Task Transform(RuleContext ctx, JsonSchema schema, s } else if (schema.IsArray) { - bool isList = typeof(IList).IsAssignableFrom(message.GetType()) - || (message.GetType().IsGenericType - && (message.GetType().GetGenericTypeDefinition() == typeof(List<>) + bool isList = typeof(IList).IsAssignableFrom(message.GetType()) + || (message.GetType().IsGenericType + && (message.GetType().GetGenericTypeDefinition() == typeof(List<>) || message.GetType().GetGenericTypeDefinition() == typeof(IList<>))); if (!isList) { @@ -125,7 +129,7 @@ public static async Task Transform(RuleContext ctx, JsonSchema schema, s ISet ruleTags = ctx.Rule.Tags ?? new HashSet(); ISet intersect = new HashSet(fieldContext.Tags); intersect.IntersectWith(ruleTags); - + if (ruleTags.Count == 0 || intersect.Count != 0) { return await fieldTransform.Transform(ctx, fieldContext, message) @@ -191,7 +195,7 @@ public FieldAccessor(Type type, string fieldName) SetValue = (instance, value) => propertyInfo.SetValue(instance, value); return; } - + var fieldInfo = type.GetField(fieldName, BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance); if (fieldInfo != null) @@ -200,7 +204,7 @@ public FieldAccessor(Type type, string fieldName) SetValue = (instance, value) => fieldInfo.SetValue(instance, value); return; } - + foreach (PropertyInfo prop in type.GetProperties()) { if (prop.IsDefined(typeof(JsonPropertyAttribute))) @@ -217,7 +221,7 @@ public FieldAccessor(Type type, string fieldName) } } } - + foreach (FieldInfo field in type.GetFields()) { if (field.IsDefined(typeof(JsonPropertyAttribute))) @@ -234,7 +238,7 @@ public FieldAccessor(Type type, string fieldName) } } } - + throw new ArgumentException("Could not find field " + fieldName); } @@ -248,5 +252,24 @@ public void SetFieldValue(object message, object value) SetValue(message, value); } } + + public static JsonSerializerOptions SettingsMapping(JsonSchemaGeneratorSettings settings) + { //TODO mapping settings using the jsonSchemaGeneratorSettings object + + var options = new JsonSerializerOptions(); + + // if (settings.SerializerSettings.ContractResolver is DefaultContractResolver contractResolver) + // { + // if (contractResolver.NamingStrategy is CamelCaseNamingStrategy) + // { + // options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase; + // } + // } + + options.PropertyNameCaseInsensitive = true; + options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull; + + return options; + } } } \ No newline at end of file