From 13ea866e34e7c4f982dad51918bb65962593d479 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 18 Nov 2024 10:38:18 -0800 Subject: [PATCH 01/30] Fix validation of SSL CA Certificate for DEK Registry client (#2348) --- .../CachedDekRegistryClient.cs | 25 ++++++++----------- .../FieldEncryptionExecutor.cs | 12 +++++++-- .../Rest/DekRestService.cs | 5 ++-- .../CelExecutor.cs | 3 ++- .../CelFieldExecutor.cs | 3 ++- .../JsonataExecutor.cs | 3 ++- src/Confluent.SchemaRegistry/AsyncSerde.cs | 3 ++- .../CachedSchemaRegistryClient.cs | 21 +++++++++++++--- src/Confluent.SchemaRegistry/ErrorAction.cs | 3 ++- .../FieldRuleExecutor.cs | 3 ++- src/Confluent.SchemaRegistry/IRuleBase.cs | 4 ++- .../ISchemaRegistryClient.cs | 21 +++++++++++++++- src/Confluent.SchemaRegistry/NoneAction.cs | 3 ++- .../Rest/RestService.cs | 1 - 14 files changed, 79 insertions(+), 31 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs index cbd631b5f..47b4d06f3 100644 --- a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs @@ -18,7 +18,7 @@ using System.Threading.Tasks; using System.Linq; using System; -using System.ComponentModel; +using System.Net; using System.Threading; using System.Security.Cryptography.X509Certificates; @@ -31,7 +31,7 @@ public record DekId(string KekName, string Subject, int? Version, DekFormat? Dek /// /// A caching DEK Registry client. /// - public class CachedDekRegistryClient : IDekRegistryClient, IDisposable + public class CachedDekRegistryClient : IDekRegistryClient { private DekRestService restService; @@ -71,12 +71,16 @@ public int MaxCachedKeys /// /// The authentication header value provider /// + /// + /// The proxy server to use for connections + /// public CachedDekRegistryClient(IEnumerable> config, - IAuthenticationHeaderValueProvider authenticationHeaderValueProvider) + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IWebProxy proxy = null) { if (config == null) { - throw new ArgumentNullException("config properties must be specified."); + throw new ArgumentNullException("config"); } var schemaRegistryUrisMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl); @@ -236,8 +240,9 @@ public CachedDekRegistryClient(IEnumerable> config, $"Configured value for {SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification} must be a bool."); } - this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, - SetSslConfig(config), sslVerify); + var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value; + var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation); + this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy); } /// @@ -291,14 +296,6 @@ private List SetSslConfig(IEnumerable prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation) - .Value ?? ""; - if (!String.IsNullOrEmpty(caLocation)) - { - certificates.Add(new X509Certificate2(caLocation)); - } - return certificates; } diff --git a/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs b/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs index 3ec805e56..5b8e0c7de 100644 --- a/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs +++ b/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs @@ -45,12 +45,20 @@ public FieldEncryptionExecutor(IDekRegistryClient client, IClock clock) Clock = clock ?? new Clock(); } - public override void Configure(IEnumerable> config) + public override void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { Configs = config; if (Client == null) { - Client = new CachedDekRegistryClient(Configs); + if (client != null) + { + Client = new CachedDekRegistryClient(Configs, client.AuthHeaderProvider, client.Proxy); + } + else + { + Client = new CachedDekRegistryClient(Configs); + } } } diff --git a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs index be7b2bc2c..a6eda49ab 100644 --- a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs +++ b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; +using System.Net; using System.Net.Http; using System.Threading.Tasks; using System.Security.Cryptography.X509Certificates; @@ -29,9 +30,9 @@ public class DekRestService : RestService /// public DekRestService(string schemaRegistryUrl, int timeoutMs, IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List certificates, - bool enableSslCertificateVerification) : + bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) : base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates, - enableSslCertificateVerification) + enableSslCertificateVerification, sslCaCertificate, proxy) { } diff --git a/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs b/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs index b3843c3e2..346607a59 100644 --- a/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs @@ -34,7 +34,8 @@ public CelExecutor() { } - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs b/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs index c51c71c9b..5cd329a20 100644 --- a/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs @@ -22,7 +22,8 @@ public CelFieldExecutor() public override string Type() => RuleType; - public override void Configure(IEnumerable> config) + public override void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs b/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs index 33dd64b5f..c34fe9a4a 100644 --- a/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs @@ -20,7 +20,8 @@ public JsonataExecutor() { } - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 082f2dfd9..bfc32638e 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -59,7 +59,7 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con foreach (IRuleExecutor executor in this.ruleRegistry.GetExecutors()) { - executor.Configure(ruleConfigs); + executor.Configure(ruleConfigs, schemaRegistryClient); } } @@ -297,6 +297,7 @@ protected async Task ExecuteMigrations( /// /// /// + /// /// /// /// diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 8cfb31649..20dc1bd58 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -42,7 +42,7 @@ namespace Confluent.SchemaRegistry /// - /// - /// - - /// - + /// - /// /// The following method calls do NOT cache results: /// - @@ -54,11 +54,13 @@ namespace Confluent.SchemaRegistry /// - /// - /// - public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable + public class CachedSchemaRegistryClient : ISchemaRegistryClient { private readonly List EmptyReferencesList = new List(); private IEnumerable> config; + private IAuthenticationHeaderValueProvider authHeaderProvider; + private IWebProxy proxy; private IRestService restService; private int identityMapCapacity; @@ -117,6 +119,16 @@ public IEnumerable> Config => config; + /// + public IAuthenticationHeaderValueProvider AuthHeaderProvider + => authHeaderProvider; + + + /// + public IWebProxy Proxy + => proxy; + + /// public int MaxCachedSchemas => identityMapCapacity; @@ -176,10 +188,12 @@ public CachedSchemaRegistryClient(IEnumerable> conf { if (config == null) { - throw new ArgumentNullException("config properties must be specified."); + throw new ArgumentNullException("config"); } this.config = config; + this.authHeaderProvider = authenticationHeaderValueProvider; + this.proxy = proxy; keySubjectNameStrategy = GetKeySubjectNameStrategy(config); valueSubjectNameStrategy = GetValueSubjectNameStrategy(config); @@ -663,6 +677,7 @@ public async Task GetLatestSchemaAsync(string subject) return schema; } + /// public async Task GetLatestWithMetadataAsync(string subject, IDictionary metadata, bool ignoreDeletedSchemas) { diff --git a/src/Confluent.SchemaRegistry/ErrorAction.cs b/src/Confluent.SchemaRegistry/ErrorAction.cs index 598d7522c..7698e86db 100644 --- a/src/Confluent.SchemaRegistry/ErrorAction.cs +++ b/src/Confluent.SchemaRegistry/ErrorAction.cs @@ -27,7 +27,8 @@ public class ErrorAction : IRuleAction { public static readonly string ActionType = "ERROR"; - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs b/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs index 0be208719..21ac9ff92 100644 --- a/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs +++ b/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs @@ -21,7 +21,8 @@ namespace Confluent.SchemaRegistry { public abstract class FieldRuleExecutor : IRuleExecutor { - public abstract void Configure(IEnumerable> config); + public abstract void Configure(IEnumerable> config, + ISchemaRegistryClient client = null); public abstract string Type(); diff --git a/src/Confluent.SchemaRegistry/IRuleBase.cs b/src/Confluent.SchemaRegistry/IRuleBase.cs index 9086e1e50..5aa66b7fd 100644 --- a/src/Confluent.SchemaRegistry/IRuleBase.cs +++ b/src/Confluent.SchemaRegistry/IRuleBase.cs @@ -28,7 +28,9 @@ public interface IRuleBase : IDisposable /// Configure the rule executor or action /// /// - void Configure(IEnumerable> config); + /// + void Configure(IEnumerable> config, + ISchemaRegistryClient client = null); /// /// The type of rule executor or action diff --git a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs index 3876dfd1e..293e9c284 100644 --- a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; +using System.Net; using System.Threading.Tasks; @@ -32,6 +33,18 @@ public interface ISchemaRegistryClient : IDisposable IEnumerable> Config { get; } + /// + /// The authentication header provider. + /// + IAuthenticationHeaderValueProvider AuthHeaderProvider { get; } + + + /// + /// The web proxy. + /// + IWebProxy Proxy { get; } + + /// /// The maximum capacity of the local schema cache. /// @@ -141,7 +154,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Gets the schema uniquely identified by and . + /// Gets the schema uniquely identified by and . /// /// /// The subject. @@ -238,6 +251,12 @@ public interface ISchemaRegistryClient : IDisposable /// /// The subject to get the latest associated schema for. /// + /// + /// The metadata to search for. + /// + /// + /// Whether to ignore deleted schemas. + /// /// /// The latest schema with the given metadata registered against . /// diff --git a/src/Confluent.SchemaRegistry/NoneAction.cs b/src/Confluent.SchemaRegistry/NoneAction.cs index 6d8e02bff..556170302 100644 --- a/src/Confluent.SchemaRegistry/NoneAction.cs +++ b/src/Confluent.SchemaRegistry/NoneAction.cs @@ -26,7 +26,8 @@ public class NoneAction : IRuleAction { public static readonly string ActionType = "NONE"; - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index af694d3d1..9c54d8203 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -24,7 +24,6 @@ using System.Threading.Tasks; using X509Certificate2 = System.Security.Cryptography.X509Certificates.X509Certificate2; -using System.Security.Cryptography.X509Certificates; using System.Net.Security; namespace Confluent.SchemaRegistry From cfafad4004737eb5badf25de93183395bf78ba2b Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Tue, 19 Nov 2024 01:11:53 +0530 Subject: [PATCH 02/30] Include librdkafka 2.6.1 changelog link (#2354) --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae3755874..e473a042c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Enhancements +* References librdkafka.redist 2.6.1. Refer to the [librdkafka v2.6.1 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1) for more information. * Fix to continue supporting .NET Framework 4.6.2+ in core client library (#2342). From b2e4bfdca029c87a4d9cf8b57ea6f4591c509c8d Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 18 Nov 2024 11:58:39 -0800 Subject: [PATCH 03/30] Add retry logic to RestService (#2353) * Add retry logic to RestService * Minor fix * Minor fix --- .../CachedDekRegistryClient.cs | 60 ++++++++++++++- .../Rest/DekRestService.cs | 6 +- .../CachedSchemaRegistryClient.cs | 60 ++++++++++++++- .../Rest/RestService.cs | 75 ++++++++++++++++--- .../SchemaRegistryConfig.cs | 55 ++++++++++++++ 5 files changed, 241 insertions(+), 15 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs index 47b4d06f3..f11798380 100644 --- a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs @@ -48,6 +48,21 @@ public class CachedDekRegistryClient : IDekRegistryClient /// public const int DefaultTimeout = 30000; + /// + /// The default maximum number of retries. + /// + public const int DefaultMaxRetries = RestService.DefaultMaxRetries; + + /// + /// The default time to wait for the first retry. + /// + public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs; + + /// + /// The default time to wait for any retry. + /// + public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs; + /// /// The default maximum capacity of the local cache. /// @@ -105,6 +120,45 @@ public CachedDekRegistryClient(IEnumerable> config, $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer."); } + var maxRetriesMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); + int maxRetries; + try + { + maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer."); + } + + var retriesWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); + int retriesWaitMs; + try + { + retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer."); + } + + var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); + int retriesMaxWaitMs; + try + { + retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer."); + } + var identityMapCapacityMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas); try @@ -210,6 +264,9 @@ public CachedDekRegistryClient(IEnumerable> config, if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && @@ -242,7 +299,8 @@ public CachedDekRegistryClient(IEnumerable> config, var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value; var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation); - this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy); + this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, + SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs); } /// diff --git a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs index a6eda49ab..ebda3d756 100644 --- a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs +++ b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs @@ -30,9 +30,11 @@ public class DekRestService : RestService /// public DekRestService(string schemaRegistryUrl, int timeoutMs, IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List certificates, - bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) : + bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null, + int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs, + int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) : base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates, - enableSslCertificateVerification, sslCaCertificate, proxy) + enableSslCertificateVerification, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs) { } diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 20dc1bd58..0379454c7 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -88,6 +88,21 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient /// public const int DefaultTimeout = 30000; + /// + /// The default maximum number of retries. + /// + public const int DefaultMaxRetries = RestService.DefaultMaxRetries; + + /// + /// The default time to wait for the first retry. + /// + public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs; + + /// + /// The default time to wait for any retry. + /// + public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs; + /// /// The default maximum capacity of the local schema cache. /// @@ -221,6 +236,45 @@ public CachedSchemaRegistryClient(IEnumerable> conf $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer."); } + var maxRetriesMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); + int maxRetries; + try + { + maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer."); + } + + var retriesWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); + int retriesWaitMs; + try + { + retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer."); + } + + var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); + int retriesMaxWaitMs; + try + { + retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer."); + } + var identityMapCapacityMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas); try @@ -340,6 +394,9 @@ public CachedSchemaRegistryClient(IEnumerable> conf if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && @@ -372,7 +429,8 @@ public CachedSchemaRegistryClient(IEnumerable> conf var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value; var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation); - this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy); + this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, + SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs); } /// diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 9c54d8203..25e24d6d8 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -34,6 +34,14 @@ public class RestService : IRestService private static readonly string acceptHeader = string.Join(", ", Versions.PreferredResponseTypes); + public const int DefaultMaxRetries = 2; + + public const int DefaultRetriesWaitMs = 1000; + + public const int DefaultRetriesMaxWaitMs = 20000; + + private static Random random = new Random(); + /// /// The index of the last client successfully used (or random if none worked). /// @@ -51,15 +59,25 @@ public class RestService : IRestService /// private readonly IAuthenticationHeaderValueProvider authenticationHeaderValueProvider; + private int maxRetries; + + private int retriesWaitMs; + + private int retriesMaxWaitMs; /// /// Initializes a new instance of the RestService class. /// public RestService(string schemaRegistryUrl, int timeoutMs, IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List certificates, - bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) + bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null, + int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs, + int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) { this.authenticationHeaderValueProvider = authenticationHeaderValueProvider; + this.maxRetries = maxRetries; + this.retriesWaitMs = retriesWaitMs; + this.retriesMaxWaitMs = retriesMaxWaitMs; this.clients = schemaRegistryUrl .Split(',') @@ -91,11 +109,11 @@ private static HttpClientHandler CreateHandler(List certificat if (!enableSslCertificateVerification) { handler.ServerCertificateCustomValidationCallback = (_, __, ___, ____) => { return true; }; - } + } else if (sslCaCertificate != null) { - handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => { - + handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => { + if (policyErrors == SslPolicyErrors.None) { return true; @@ -105,7 +123,7 @@ private static HttpClientHandler CreateHandler(List certificat if (chain.ChainElements.Count < 2) { return false; - } + } var connectionCertHash = chain.ChainElements[1].Certificate.GetCertHash(); @@ -123,7 +141,7 @@ private static HttpClientHandler CreateHandler(List certificat return false; } } - return true; + return true; }; } @@ -205,12 +223,9 @@ private async Task ExecuteOnOneInstanceAsync(Func SendRequest( + HttpClient client, Func createRequest) + { + HttpResponseMessage response = null; + for (int i = 0; i < maxRetries; i++) + { + response = await client + .SendAsync(createRequest()) + .ConfigureAwait(continueOnCapturedContext: false); + if (IsSuccess((int)response.StatusCode) || !IsRetriable((int)response.StatusCode) || i >= maxRetries) + { + return response; + } + + await Task.Delay(CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i)); + } + return response; + } + + private static bool IsSuccess(int statusCode) + { + return statusCode >= 200 && statusCode < 300; + } + + private static bool IsRetriable(int statusCode) + { + return statusCode == 408 || statusCode == 429 || + statusCode == 500 || statusCode == 502 || statusCode == 503 || statusCode == 504; + } + + protected static int CalculateRetryDelay(int baseDelayMs, int maxDelayMs, int retriesAttempted) + { + double jitter; + lock (random) { + jitter = random.NextDouble(); + } + return Convert.ToInt32(Math.Min(jitter * Math.Pow(2, retriesAttempted) * baseDelayMs, maxDelayMs)); + } /// /// Used for end points that return a json object { ... } diff --git a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs index 568b2b77a..60dbac5c5 100644 --- a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs +++ b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs @@ -44,6 +44,28 @@ public static class PropertyNames /// public const string SchemaRegistryRequestTimeoutMs = "schema.registry.request.timeout.ms"; + /// + /// Specifies the maximum number of retries for a request. + /// + /// default: 3 + /// + public const string SchemaRegistryMaxRetries = "schema.registry.max.retries"; + + /// + /// Specifies the maximum time to wait for the first retry. + /// When jitter is applied, the actual wait may be less. + /// + /// default: 1000 + /// + public const string SchemaRegistryRetriesWaitMs = "schema.registry.retries.wait.ms"; + + /// + /// Specifies the maximum time to wait any retry. + /// + /// default: 20000 + /// + public const string SchemaRegistryRetriesMaxWaitMs = "schema.registry.retries.max.wait.ms"; + /// /// Specifies the maximum number of schemas CachedSchemaRegistryClient /// should cache locally. @@ -187,6 +209,39 @@ public int? RequestTimeoutMs set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs, value?.ToString()); } } + /// + /// Specifies the maximum number of retries for a request. + /// + /// default: 3 + /// + public int? MaxRetries + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries, value?.ToString()); } + } + + /// + /// Specifies the time to wait for the first retry. + /// + /// default: 1000 + /// + public int? RetriesWaitMs + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs, value?.ToString()); } + } + + /// + /// Specifies the time to wait for any retry. + /// + /// default: 20000 + /// + public int? RetriesMaxWaitMs + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs, value?.ToString()); } + } + /// /// File or directory path to CA certificate(s) for verifying the schema registry's key. /// From 0e4765b569ab898dbdffdf308a7093493327a27a Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 18 Nov 2024 12:01:03 -0800 Subject: [PATCH 04/30] Add config for validating payloads against JSON Schema (#2350) --- .../JsonDeserializer.cs | 6 ++++-- .../JsonDeserializerConfig.cs | 20 +++++++++++++++++++ .../JsonSerializer.cs | 12 ++++++++--- .../JsonSerializerConfig.cs | 19 ++++++++++++++++++ 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 21f0d7a8c..dfcac9488 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -63,6 +63,8 @@ public class JsonDeserializer : AsyncDeserializer where T : cl private JsonSchema schema = null; + private bool validate = true; + private JsonSerializerSettings jsonSchemaGeneratorSettingsSerializerSettings { get => #if NET8_0_OR_GREATER @@ -110,6 +112,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; } if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } + if (config.Validate!= null) { this.validate= config.Validate.Value; } } /// @@ -228,10 +231,9 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i .ContinueWith(t => (JToken)t.Result) .ConfigureAwait(continueOnCapturedContext: false); - if (schema != null) + if (schema != null && validate) { var validationResult = validator.Validate(json, schema); - if (validationResult.Count > 0) { throw new InvalidDataException("Schema validation failed for properties: [" + diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs index 26361dcef..9fd8baf7b 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs @@ -53,6 +53,13 @@ public static class PropertyNames /// Possible values: /// public const string SubjectNameStrategy = "json.deserializer.subject.name.strategy"; + + /// + /// Specifies whether to validate payloads against the schema. + /// + /// default: true + /// + public const string Validate= "json.serializer.validate"; } @@ -124,5 +131,18 @@ public SubjectNameStrategy? SubjectNameStrategy else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } } } + + + /// + /// Specifies whether or not the JSON serializer should attempt to + /// validate the payload against the schema. + /// + /// default: true + /// + public bool? Validate + { + get { return GetBool(PropertyNames.Validate); } + set { SetObject(PropertyNames.Validate, value); } + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index 643c0f207..229af15fe 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -75,6 +75,8 @@ public class JsonSerializer : AsyncSerializer where T : class private string schemaText; private string schemaFullname; + private bool validate = true; + private JsonSerializerSettings jsonSchemaGeneratorSettingsSerializerSettings { get => #if NET8_0_OR_GREATER @@ -129,6 +131,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer if (config.LatestCompatibilityStrict != null) { this.latestCompatibilityStrict = config.LatestCompatibilityStrict.Value; } if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } + if (config.Validate!= null) { this.validate= config.Validate.Value; } if (this.useLatestVersion && this.autoRegisterSchema) { @@ -245,10 +248,13 @@ public override async Task SerializeAsync(T value, SerializationContext } var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, jsonSchemaGeneratorSettingsSerializerSettings); - var validationResult = validator.Validate(serializedString, this.schema); - if (validationResult.Count > 0) + if (validate) { - throw new InvalidDataException("Schema validation failed for properties: [" + string.Join(", ", validationResult.Select(r => r.Path)) + "]"); + var validationResult = validator.Validate(serializedString, this.schema); + if (validationResult.Count > 0) + { + throw new InvalidDataException("Schema validation failed for properties: [" + string.Join(", ", validationResult.Select(r => r.Path)) + "]"); + } } using (var stream = new MemoryStream(initialBufferSize)) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs index 8886299fc..13af2dea3 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs @@ -92,6 +92,13 @@ public static class PropertyNames /// Possible values: /// public const string SubjectNameStrategy = "json.serializer.subject.name.strategy"; + + /// + /// Specifies whether to validate payloads against the schema. + /// + /// default: true + /// + public const string Validate= "json.serializer.validate"; } @@ -220,5 +227,17 @@ public SubjectNameStrategy? SubjectNameStrategy } } + + /// + /// Specifies whether or not the JSON serializer should attempt to + /// validate the payload against the schema. + /// + /// default: true + /// + public bool? Validate + { + get { return GetBool(PropertyNames.Validate); } + set { SetObject(PropertyNames.Validate, value); } + } } } From 092376122bd9956d90f8976ba9ef9df779c57f21 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 19 Nov 2024 03:32:35 -0800 Subject: [PATCH 05/30] Update changelog for SR 2.6.1 (#2355) --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e473a042c..8d3377608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,16 @@ ## Enhancements * References librdkafka.redist 2.6.1. Refer to the [librdkafka v2.6.1 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1) for more information. + +## Fixes + * Fix to continue supporting .NET Framework 4.6.2+ in core client library (#2342). +* Fix JSON Schema handling to not require use of `$id` (#2339). +* Update Caching.Memory to 8.0.1 to address CVE (#23440. +* Added Qualified and Custom reference name strategy approaches for protobuf references (#2345). +* Fix validate of SSL CA certs in Schema Registry client (#2346). +* Skip SSL certs validation when configured in Schema Registry client (#2347). +* Allow proxy to be specified in Schema Registry client (#2348). # 2.6.0 From 4fe8bbda3f35963b5f0510574b9ec678766c81d6 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 19 Nov 2024 11:30:14 -0800 Subject: [PATCH 06/30] Retrieve version for writer schema when getting migrations (#2358) * Retrieve version for writer schema when getting migrations Fixes https://github.com/confluentinc/confluent-kafka-dotnet/issues/2356 * Minor cleanup --- .../GenericDeserializerImpl.cs | 2 +- .../SpecificDeserializerImpl.cs | 2 +- .../JsonDeserializer.cs | 2 +- .../ProtobufDeserializer.cs | 2 +- src/Confluent.SchemaRegistry/AsyncSerde.cs | 11 +++++++---- .../BaseSerializeDeserialize.cs | 10 +++++++++- 6 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs index 6d026fa0a..96b938973 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs @@ -72,7 +72,7 @@ public async Task Deserialize(string topic, Headers headers, byte } string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs index f15e7b2e6..5d25eceed 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs @@ -126,7 +126,7 @@ public async Task Deserialize(string topic, Headers headers, byte[] array, bo } string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index dfcac9488..8a2e47300 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -175,7 +175,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i bool isKey = context.Component == MessageComponentType.Key; string topic = context.Topic; string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs index b58fc7817..3ff582484 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs @@ -128,7 +128,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i // Currently Protobuf does not support migration rules because of lack of support for DynamicMessage // See https://github.com/protocolbuffers/protobuf/issues/658 /* - Schema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion) + RegisteredSchema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion) .ConfigureAwait(continueOnCapturedContext: false); */ diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index bfc32638e..70e1ee047 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -166,11 +166,14 @@ await ResolveReferences(s, schemas, visited) return schemas; } - protected async Task> GetMigrations(string subject, Schema writerSchema, Schema readerSchema) + protected async Task> GetMigrations(string subject, Schema writer, RegisteredSchema readerSchema) { + var writerSchema = await schemaRegistryClient.LookupSchemaAsync(subject, writer, false, false) + .ConfigureAwait(continueOnCapturedContext: false); + RuleMode migrationMode; - Schema first; - Schema last; + RegisteredSchema first; + RegisteredSchema last; IList migrations = new List(); if (writerSchema.Version < readerSchema.Version) { @@ -217,7 +220,7 @@ protected async Task> GetMigrations(string subject, Schema writ return migrations; } - private async Task> GetSchemasBetween(string subject, Schema first, Schema last) + private async Task> GetSchemasBetween(string subject, RegisteredSchema first, RegisteredSchema last) { if (last.Version - first.Version <= 1) { diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs index 8fff067a6..0dbd34b67 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs @@ -45,7 +45,15 @@ public BaseSerializeDeserializeTests() var schemaRegistryMock = new Mock(); schemaRegistryMock.Setup(x => x.ConstructValueSubjectName(testTopic, It.IsAny())).Returns($"{testTopic}-value"); schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( - (string topic, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 + (string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 + ); + schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( + (string subject, Schema schema, bool ignoreDeleted, bool normalize) => + { + return subjectStore[subject].First(x => + x.SchemaString == schema.SchemaString + ); + } ); schemaRegistryMock.Setup(x => x.GetSchemaBySubjectAndIdAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( (string subject, int id, string format) => From d0f87530f8ad5af01958b5e2c4a134b74f8192bb Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 19 Nov 2024 12:28:54 -0800 Subject: [PATCH 07/30] Mark some deprecated properties as obsolete in Schema (#2359) These properties will be removed in a future major release. These were actually marked as obsolete in the past, but were temporarily removed. This just re-adds the annotations. --- .../Rest/DataContracts/RegisteredSchema.cs | 6 +++--- .../Rest/DataContracts/Schema.cs | 15 +++++++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs index c0efb4442..41545a14e 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs @@ -41,19 +41,19 @@ public class RegisteredSchema : Schema, IComparable, IEquatabl /// The subject the schema is registered against. /// [DataMember(Name = "subject")] - public override string Subject { get; set; } + public new string Subject { get; set; } /// /// The schema version. /// [DataMember(Name = "version")] - public override int Version { get; set; } + public new int Version { get; set; } /// /// Unique identifier of the schema. /// [DataMember(Name = "id")] - public override int Id { get; set; } + public new int Id { get; set; } /// /// The unregistered schema corresponding to this schema. diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs index 3b5746a70..08d901e9a 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs @@ -30,19 +30,22 @@ public class Schema : IComparable, IEquatable #region API backwards-compatibility hack /// - /// The subject the schema is registered against. + /// DEPRECATED. The subject the schema is registered against. /// - public virtual string Subject { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public string Subject { get; set; } /// - /// The schema version. + /// DEPRECATED. The schema version. /// - public virtual int Version { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public int Version { get; set; } /// - /// Unique identifier of the schema. + /// DEPRECATED. Unique identifier of the schema. /// - public virtual int Id { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public int Id { get; set; } /// /// DEPRECATED. Initializes a new instance of the Schema class. From 97cdbd66bb2654f58c1f1068faac8d950476e3dc Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 25 Nov 2024 15:38:28 -0800 Subject: [PATCH 08/30] Add test for JSON with definition (#2363) --- .../JsonSerializeDeserialize.cs | 91 ++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs index c1fdab1ce..efcc049ae 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs @@ -577,12 +577,12 @@ public void CELFieldTransform() schema.RuleSet = new RuleSet(new List(), new List { - new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, "typeName == 'STRING' ; value + '-suffix'", null, null, false) } ); store[schemaStr] = 1; - subjectStore["topic-value"] = new List { schema }; + subjectStore["topic-value"] = new List { schema }; var config = new JsonSerializerConfig { AutoRegisterSchemas = false, @@ -607,6 +607,80 @@ public void CELFieldTransform() Assert.Equal(user.FavoriteNumber, result.FavoriteNumber); } + [Fact] + public void CELFieldTransformWithDef() + { + var schemaStr = @"{ + ""$schema"" : ""http://json-schema.org/draft-07/schema#"", + ""additionalProperties"" : false, + ""definitions"" : { + ""Address"" : { + ""additionalProperties"" : false, + ""properties"" : { + ""DoorNumber"" : { + ""type"" : ""integer"" + }, + ""DoorPin"" : { + ""confluent:tags"" : [ ""PII"" ], + ""type"" : ""string"" + } + }, + ""type"" : ""object"" + } + }, + ""properties"" : { + ""Address"" : { + ""$ref"" : ""#/definitions/Address"" + }, + ""Name"" : { + ""confluent:tags"" : [ ""PII"" ], + ""type"" : ""string"" + } + }, + ""title"" : ""Sample Event"", + ""type"" : ""object"" + }"; + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Json, null); + schema.RuleSet = new RuleSet(new List(), + new List + { + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", new HashSet + { + "PII" + + }, null, + "value + '-suffix'", null, null, false) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new JsonSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + var serializer = new JsonSerializer(schemaRegistryClient, config); + var deserializer = new JsonDeserializer(schemaRegistryClient); + + var address = new Address + { + DoorNumber = 100, + DoorPin = "1234" + }; + var person = new JsonPerson() + { + Address = address, + Name = "bob" + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(person, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal("bob-suffix", result.Name); + Assert.Equal("1234-suffix", result.Address.DoorPin); + } + [Fact] public void CELFieldCondition() { @@ -1004,4 +1078,17 @@ class NewerCustomer [JsonProperty("title")] public string Title { get; set; } } + + class JsonPerson + { + public string Name { get; set; } + public Address Address { get; set; } + + } + + class Address + { + public int DoorNumber { get; set; } + public string DoorPin { get; set; } + } } From 84f4e32bf3509bd0cb460e1a8eb00fa82ed81c70 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 26 Nov 2024 14:59:30 -0800 Subject: [PATCH 09/30] MINOR refactor env var lookup for consistency with other repos (#2368) --- .../HcVaultKmsClient.cs | 5 ----- .../HcVaultKmsDriver.cs | 5 +++++ src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs | 8 -------- src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs | 8 ++++++++ 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs index 128a531b1..b63fc703f 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs @@ -21,11 +21,6 @@ public class HcVaultKmsClient : IKmsClient public HcVaultKmsClient(string kekId, string ns, string tokenId) { - if (tokenId == null) - { - tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); - ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); - } KekId = kekId; Namespace = ns; TokenId = tokenId; diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs index e220afc7d..b2cfd17ca 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs @@ -23,6 +23,11 @@ public IKmsClient NewKmsClient(IDictionary config, string keyUrl { config.TryGetValue(TokenId, out string tokenId); config.TryGetValue(Namespace, out string ns); + if (tokenId == null) + { + tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); + ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); + } return new HcVaultKmsClient(keyUrl, ns, tokenId); } } diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs index 66afb08ef..5b05dac89 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs @@ -16,14 +16,6 @@ public class LocalKmsClient : IKmsClient public LocalKmsClient(string secret) { - if (secret == null) - { - secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); - } - if (secret == null) - { - throw new ArgumentNullException("Cannot load secret"); - } Secret = secret; cryptor = new Cryptor(DekFormat.AES128_GCM); byte[] rawKey = Hkdf.DeriveKey( diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs index 87ad7bd91..2def8c054 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs @@ -22,6 +22,14 @@ public string GetKeyUrlPrefix() public IKmsClient NewKmsClient(IDictionary config, string keyUrl) { config.TryGetValue(Secret, out string secret); + if (secret == null) + { + secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); + } + if (secret == null) + { + throw new ArgumentNullException("Cannot load secret"); + } return new LocalKmsClient(secret); } } From 7d82888458f4d7ace35f850eb46408d8a944a554 Mon Sep 17 00:00:00 2001 From: Fernando Luiz de Lima Date: Fri, 29 Nov 2024 03:58:12 -0300 Subject: [PATCH 10/30] Add Kafka Core to Third-Party Libraries (#2361) * Add Kafka Core to 3rd party libraries. --- 3RD_PARTY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/3RD_PARTY.md b/3RD_PARTY.md index 6a4a20f57..df00ef05c 100644 --- a/3RD_PARTY.md +++ b/3RD_PARTY.md @@ -10,3 +10,4 @@ To add your project, open a pull request! - [Multi Schema Avro Deserializer](https://github.com/ycherkes/multi-schema-avro-desrializer) - Avro deserializer for reading messages serialized with multiple schemas. - [OpenSleigh.Transport.Kafka](https://github.com/mizrael/OpenSleigh/tree/develop/src/OpenSleigh.Transport.Kafka) - A Kafka Transport for OpenSleigh, a distributed saga management library. - [SlimMessageBus.Host.Kafka](https://github.com/zarusz/SlimMessageBus) - Apache Kafka transport for SlimMessageBus (lightweight message bus for .NET) +- [Kafka Core](https://github.com/ffernandolima/confluent-kafka-core-dotnet) - Kafka Core empowers developers to build robust .NET applications on top of Confluent Kafka, focusing on simplicity, maintainability, and extensibility with intuitive abstractions and builders. \ No newline at end of file From 224a56110ca9d0bb977efcc2c25d838e7d2b9fc7 Mon Sep 17 00:00:00 2001 From: Krzysztof Porebski Date: Mon, 2 Dec 2024 22:59:40 +0100 Subject: [PATCH 11/30] Improve lock utilization on the consumer hot path (#2370) --- src/Confluent.SchemaRegistry/AsyncSerde.cs | 10 ++++++++-- .../CachedSchemaRegistryClient.cs | 11 ++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 70e1ee047..d63d3e2d9 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -18,6 +18,7 @@ #pragma warning disable CS0618 using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -41,7 +42,7 @@ public abstract class AsyncSerde protected SemaphoreSlim serdeMutex = new SemaphoreSlim(1); - private readonly IDictionary parsedSchemaCache = new Dictionary(); + private readonly IDictionary parsedSchemaCache = new ConcurrentDictionary(); private SemaphoreSlim parsedSchemaMutex = new SemaphoreSlim(1); protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig config, RuleRegistry ruleRegistry = null) @@ -98,10 +99,15 @@ protected string GetSubjectName(string topic, bool isKey, string recordType) protected async Task GetParsedSchema(Schema schema) { + if (parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + { + return parsedSchema; + } + await parsedSchemaMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + if (!parsedSchemaCache.TryGetValue(schema, out parsedSchema)) { if (parsedSchemaCache.Count > schemaRegistryClient.MaxCachedSchemas) { diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 0379454c7..fe3ed684b 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -23,6 +23,7 @@ using System.Threading.Tasks; using System.Linq; using System; +using System.Collections.Concurrent; using System.Net; using System.Threading; using System.Security.Cryptography.X509Certificates; @@ -65,7 +66,7 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient private IRestService restService; private int identityMapCapacity; private int latestCacheTtlSecs; - private readonly Dictionary schemaById = new Dictionary(); + private readonly ConcurrentDictionary schemaById = new ConcurrentDictionary(); private readonly Dictionary> idBySchemaBySubject = new Dictionary>(); @@ -657,11 +658,15 @@ public async Task GetSchemaAsync(int id, string format = null) /// public async Task GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null) { + if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString)) + { + return schema; + } + await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!this.schemaById.TryGetValue(id, out Schema schema) || - !checkSchemaMatchesFormat(format, schema.SchemaString)) + if (!this.schemaById.TryGetValue(id, out schema) || !checkSchemaMatchesFormat(format, schema.SchemaString)) { CleanCacheIfFull(); schema = (await restService.GetSchemaBySubjectAndIdAsync(subject, id, format) From 35b8f9938ff786c10912144becab37be509efe7f Mon Sep 17 00:00:00 2001 From: Krzysztof Porebski Date: Mon, 2 Dec 2024 23:00:55 +0100 Subject: [PATCH 12/30] Improve perf of base64 encoding check on a consumer hot path (#2371) Co-authored-by: Robert Yokota --- .../CachedSchemaRegistryClient.cs | 25 +++---------------- src/Confluent.SchemaRegistry/Utils.cs | 20 +++++++++++++++ 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index fe3ed684b..949b0f3ba 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -593,16 +593,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) // if a format isn't specified, then assume text is desired. if (format == null) { - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return true; // Base64 conversion failed, infer the schemaString format is text. - } - - return false; // Base64 conversion succeeded, so infer the schamaString format is base64. + // If schemaString is not Base64, infer the schemaString format is text. + return !Utils.IsBase64String(schemaString); } else { @@ -610,17 +602,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) { throw new ArgumentException($"Invalid schema format was specified: {format}."); } - - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return false; - } - - return true; + + return Utils.IsBase64String(schemaString); } } diff --git a/src/Confluent.SchemaRegistry/Utils.cs b/src/Confluent.SchemaRegistry/Utils.cs index 5c6e16cfc..f652f7b5f 100644 --- a/src/Confluent.SchemaRegistry/Utils.cs +++ b/src/Confluent.SchemaRegistry/Utils.cs @@ -20,6 +20,9 @@ using System.Linq; using Confluent.Kafka; +#if NET8_0_OR_GREATER +using System.Buffers.Text; +#endif namespace Confluent.SchemaRegistry { @@ -70,5 +73,22 @@ public static bool ListEquals(IList a, IList b) if (a == null || b == null) return false; return a.SequenceEqual(b); } + + internal static bool IsBase64String(string value) + { +#if NET8_0_OR_GREATER + return Base64.IsValid(value); +#else + try + { + _ = Convert.FromBase64String(value); + return true; + } + catch (FormatException) + { + return false; + } +#endif + } } } From 8656b582f2264cec51e7f9e0454af8c4dda659ca Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 2 Dec 2024 19:49:14 -0800 Subject: [PATCH 13/30] Add missing validate check (#2372) This branch was missed when adding the validate config --- src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 8a2e47300..9d75e708d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -251,7 +251,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i { string serializedString = jsonReader.ReadToEnd(); - if (schema != null) + if (schema != null && validate) { var validationResult = validator.Validate(serializedString, schema); From 1a336456147c57298390ea14bff1cd62c243c56a Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 4 Dec 2024 08:44:14 -0800 Subject: [PATCH 14/30] Fix null reference exception (#2373) --- .../Rest/DataContracts/RuleSet.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs index 822ff74d7..6a9154020 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs @@ -45,14 +45,14 @@ public bool HasRules(RuleMode mode) { switch (mode) { case RuleMode.Upgrade: case RuleMode.Downgrade: - return MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); case RuleMode.UpDown: - return MigrationRules.Any(r => r.Mode == mode); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode); case RuleMode.Write: case RuleMode.Read: - return DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); case RuleMode.WriteRead: - return DomainRules.Any(r => r.Mode == mode); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode); default: return false; } From 16367a7b701e990df72de1e6ab4390aa473df622 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 4 Dec 2024 11:29:37 -0800 Subject: [PATCH 15/30] Ensure different key ids use different client instances (#2374) * Ensure different key ids use different client instances * Minor cleanup * Minor cleanup --- src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs | 4 ++-- .../AzureKmsClient.cs | 2 +- src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs | 2 +- .../HcVaultKmsClient.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs index 0b4a0ff94..5a3c14ac8 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs @@ -12,7 +12,7 @@ public class AwsKmsClient : IKmsClient { private AmazonKeyManagementServiceClient kmsClient; private string keyId; - + public string KekId { get; } public AwsKmsClient(string kekId, AWSCredentials credentials) @@ -36,7 +36,7 @@ public AwsKmsClient(string kekId, AWSCredentials credentials) public bool DoesSupport(string uri) { - return uri.StartsWith(AwsKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs index 2ef3f1cd7..1a70524c6 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs @@ -25,7 +25,7 @@ public AzureKmsClient(string kekId, TokenCredential tokenCredential) public bool DoesSupport(string uri) { - return uri.StartsWith(AzureKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs index db5fd4683..c3a995bfe 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs @@ -36,7 +36,7 @@ public GcpKmsClient(string kekId, GoogleCredential credential) public bool DoesSupport(string uri) { - return uri.StartsWith(GcpKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs index b63fc703f..781af5714 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs @@ -48,7 +48,7 @@ public HcVaultKmsClient(string kekId, string ns, string tokenId) public bool DoesSupport(string uri) { - return uri.StartsWith(HcVaultKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) From 30671534eab07df0d2d62755237b79425e9cc588 Mon Sep 17 00:00:00 2001 From: Kunal Gupta Date: Thu, 5 Dec 2024 11:22:32 -0600 Subject: [PATCH 16/30] DGS-19008: Add PR template (#2375) --- .github/pull_request_template.md | 43 ++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 .github/pull_request_template.md diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..c4d1853ee --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,43 @@ + +What +---- + + +Checklist +------------------ +- [ ] Contains customer facing changes? Including API/behavior changes +- [ ] Did you add sufficient unit test and/or integration test coverage for this PR? + - If not, please explain why it is not required + +References +---------- +JIRA: + + +Test & Review +------------ + + +Open questions / Follow-ups +-------------------------- + + + From 16456fddbb1abd439d1c526bb9caeb092d36d3a2 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Dec 2024 14:27:44 -0800 Subject: [PATCH 17/30] Set skipKnownTypes default to be consistent with other clients (#2376) * Change skipKnownTypes to be consistent with other clients * Enhance test --- .../ProtobufSerializer.cs | 11 +++++++++-- .../ProtobufSerializerConfig.cs | 2 +- src/Confluent.SchemaRegistry/AsyncSerde.cs | 7 ++++++- .../Tests_Protobuf/ProduceConsumeGoogleRef.cs | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index 898ec21da..53ad08c4d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -54,7 +54,7 @@ namespace Confluent.SchemaRegistry.Serdes /// public class ProtobufSerializer : AsyncSerializer where T : IMessage, new() { - private bool skipKnownTypes; + private bool skipKnownTypes = true; private bool useDeprecatedFormat; private ReferenceSubjectNameStrategyDelegate referenceSubjectNameStrategy; @@ -190,7 +190,7 @@ private async Task> RegisterOrGetReferences(FileDescriptor for (int i=0; i ParseSchema(Schema schema) .ConfigureAwait(continueOnCapturedContext: false); return ProtobufUtils.Parse(schema.SchemaString, references); } + + protected override bool IgnoreReference(string name) + { + return name.StartsWith("confluent/") || + name.StartsWith("google/protobuf/") || + name.StartsWith("google/type/"); + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs index b40a0d9ca..72071509b 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs @@ -195,7 +195,7 @@ public IDictionary UseLatestWithMetadata /// Specifies whether or not the Protobuf serializer should skip known types /// when resolving dependencies. /// - /// default: false + /// default: true /// public bool? SkipKnownTypes { diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index d63d3e2d9..70934d291 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -142,6 +142,11 @@ protected async Task> ResolveReferences(Schema schem .ConfigureAwait(continueOnCapturedContext: false); return result; } + + protected virtual bool IgnoreReference(string name) + { + return false; + } private async Task> ResolveReferences( Schema schema, IDictionary schemas, ISet visited) @@ -149,7 +154,7 @@ private async Task> ResolveReferences( IList references = schema.References; foreach (SchemaReference reference in references) { - if (visited.Contains(reference.Name)) + if (IgnoreReference(reference.Name) || visited.Contains(reference.Name)) { continue; } diff --git a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs index 427e609d6..f29048e2e 100644 --- a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs +++ b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs @@ -33,12 +33,13 @@ public static void ProduceConsumeGoogleRefProtobuf(string bootstrapServers, stri { var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryServers }; + var serializerConfig = new ProtobufSerializerConfig() { SkipKnownTypes = false }; using (var topic = new TemporaryTopic(bootstrapServers, 1)) using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) using (var producer = new ProducerBuilder(producerConfig) - .SetValueSerializer(new ProtobufSerializer(schemaRegistry)) + .SetValueSerializer(new ProtobufSerializer(schemaRegistry, serializerConfig)) .Build()) { var u = new WithGoogleRefs.TheRecord(); From 87979d8ffa6cbaa42a35ed323ff7b2b83ed4f514 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 10 Dec 2024 10:12:08 -0800 Subject: [PATCH 18/30] Add ability to override disable flag and actions on a rule (#2377) * Add ability to override disable flag and actions on a rule * Add test --- src/Confluent.SchemaRegistry/AsyncSerde.cs | 47 ++++++++++++++-- src/Confluent.SchemaRegistry/RuleOverride.cs | 40 ++++++++++++++ src/Confluent.SchemaRegistry/RuleRegistry.cs | 54 +++++++++++++++++-- .../SerializeDeserialize.cs | 44 ++++++++++++++- 4 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 src/Confluent.SchemaRegistry/RuleOverride.cs diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 70934d291..698e6873b 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -359,7 +359,7 @@ protected async Task ExecuteRules( for (int i = 0; i < rules.Count; i++) { Rule rule = rules[i]; - if (rule.Disabled) + if (IsDisabled(rule)) { continue; } @@ -406,21 +406,21 @@ protected async Task ExecuteRules( default: throw new ArgumentException("Unsupported rule kind " + rule.Kind); } - await RunAction(ctx, ruleMode, rule, message != null ? rule.OnSuccess : rule.OnFailure, + await RunAction(ctx, ruleMode, rule, message != null ? GetOnSuccess(rule) : GetOnFailure(rule), message, null, message != null ? null : ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } catch (RuleException ex) { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, ex, ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } } else { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, new RuleException("Could not find rule executor of type " + rule.Type), ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); @@ -429,6 +429,45 @@ await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, return message; } + private string GetOnSuccess(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnSuccess != null) + { + return ruleOverride.OnSuccess; + } + } + + return rule.OnSuccess; + } + + private string GetOnFailure(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnFailure != null) + { + return ruleOverride.OnFailure; + } + } + + return rule.OnFailure; + } + + private bool IsDisabled(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.Disabled.HasValue) + { + return ruleOverride.Disabled.Value; + } + } + + return rule.Disabled; + } + private static IRuleExecutor GetRuleExecutor(RuleRegistry ruleRegistry, string type) { if (ruleRegistry.TryGetExecutor(type, out IRuleExecutor result)) diff --git a/src/Confluent.SchemaRegistry/RuleOverride.cs b/src/Confluent.SchemaRegistry/RuleOverride.cs new file mode 100644 index 000000000..cc00e4316 --- /dev/null +++ b/src/Confluent.SchemaRegistry/RuleOverride.cs @@ -0,0 +1,40 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.SchemaRegistry +{ + /// + /// A rule override. + /// + public class RuleOverride + { + public string Type { get; set; } + + public string OnSuccess { get; set; } + + public string OnFailure { get; set; } + + public bool? Disabled { get; set; } + + public RuleOverride(string type, string onSuccess, string onFailure, bool? disabled) + { + Type = type; + OnSuccess = onSuccess; + OnFailure = onFailure; + Disabled = disabled; + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/RuleRegistry.cs b/src/Confluent.SchemaRegistry/RuleRegistry.cs index af71421a0..16ae348c7 100644 --- a/src/Confluent.SchemaRegistry/RuleRegistry.cs +++ b/src/Confluent.SchemaRegistry/RuleRegistry.cs @@ -26,19 +26,16 @@ public class RuleRegistry { private readonly SemaphoreSlim ruleExecutorsMutex = new SemaphoreSlim(1); private readonly SemaphoreSlim ruleActionsMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim ruleOverridesMutex = new SemaphoreSlim(1); private IDictionary ruleExecutors = new Dictionary(); private IDictionary ruleActions = new Dictionary(); + private IDictionary ruleOverrides = new Dictionary(); private static readonly RuleRegistry GLOBAL_INSTANCE = new RuleRegistry(); public static RuleRegistry GlobalInstance => GLOBAL_INSTANCE; - public static List GetRuleActions() - { - return GlobalInstance.GetActions(); - } - public void RegisterExecutor(IRuleExecutor executor) { ruleExecutorsMutex.Wait(); @@ -123,6 +120,48 @@ public List GetActions() } } + public void RegisterOverride(RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + if (!ruleOverrides.ContainsKey(ruleOverride.Type)) + { + ruleOverrides.Add(ruleOverride.Type, ruleOverride); + } + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public bool TryGetOverride(string name, out RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + return ruleOverrides.TryGetValue(name, out ruleOverride); + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public List GetOverrides() + { + ruleOverridesMutex.Wait(); + try + { + return new List(ruleOverrides.Values); + } + finally + { + ruleOverridesMutex.Release(); + } + } + public static void RegisterRuleExecutor(IRuleExecutor executor) { GlobalInstance.RegisterExecutor(executor); @@ -132,5 +171,10 @@ public static void RegisterRuleAction(IRuleAction action) { GlobalInstance.RegisterAction(action); } + + public static void RegisterRuleOverride(RuleOverride ruleOverride) + { + GlobalInstance.RegisterOverride(ruleOverride); + } } } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 8843f882d..db021d651 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -275,12 +275,12 @@ public void ISpecificRecordCELFieldTransform() schema.RuleSet = new RuleSet(new List(), new List { - new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, "typeName == 'STRING' ; value + '-suffix'", null, null, false) } ); store[schemaStr] = 1; - subjectStore["topic-value"] = new List { schema }; + subjectStore["topic-value"] = new List { schema }; var config = new AvroSerializerConfig { AutoRegisterSchemas = false, @@ -305,6 +305,46 @@ public void ISpecificRecordCELFieldTransform() Assert.Equal(user.favorite_number, result.favorite_number); } + [Fact] + public void ISpecificRecordCELFieldTransformDisable() + { + var schemaStr = User._SCHEMA.ToString(); + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null); + schema.RuleSet = new RuleSet(new List(), + new List + { + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + "typeName == 'STRING' ; value + '-suffix'", null, null, false) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + RuleRegistry registry = new RuleRegistry(); + registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true)); + var serializer = new AvroSerializer(schemaRegistryClient, config, registry); + var deserializer = new AvroDeserializer(schemaRegistryClient, null); + + var user = new User + { + favorite_color = "blue", + favorite_number = 100, + name = "awesome" + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal("awesome", result.name); + Assert.Equal("blue", result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + } + [Fact] public void ISpecificRecordCELFieldCondition() { From e759ac5e6b77e0646cd9bd60d9e3fc089c1f4ad8 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 09:21:12 +0100 Subject: [PATCH 19/30] ci --- .github/workflows/build.yml | 99 +++++++++++++++++++++++++++++++ .github/workflows/integration.yml | 62 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/integration.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 000000000..99c35b0d8 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,99 @@ +name: 'confluent-kafka-dotnet build pipeline' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + +on: + push: + branches: [ main ] + tags: [ '*' ] + pull_request: + branches: [ main ] + +jobs: + linux-build: + name: 'Linux x64' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Build and test + run: | + dotnet restore + make build + make test + + osx-build: + name: 'OSX x64' + runs-on: macos-13 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Set ulimit + run: ulimit -n 1024 + - name: Build and test + run: | + dotnet restore + make build + make test + + windows-build: + name: 'Windows x64' + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Build and test + run: | + dotnet restore + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj + + windows-artifacts: + name: 'Windows Artifacts' + needs: windows-build + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Install DocFX + run: dotnet tool update -g docfx + - name: Build and create packages + run: | + dotnet restore + dotnet build Confluent.Kafka.sln -c $env:CONFIGURATION + + # Different packaging for tagged vs untagged builds + if ($env:GITHUB_REF -match '^refs/tags/') { + $suffix = "" + } else { + $suffix = "--version-suffix ci-$env:GITHUB_RUN_ID" + } + + dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts + + docfx doc/docfx.json + tar -czf artifacts/docs-$env:GITHUB_RUN_ID.zip doc/_site/* + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: build-artifacts + path: artifacts/ diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 000000000..def0cde88 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,62 @@ +name: 'Integration Tests' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + SEMAPHORE_SKIP_FLAKY_TESTS: 'true' + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + integration-tests: + name: 'Integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + + - name: Classic Protocol Tests + run: | + cd test/docker && docker-compose up -d && sleep 30 && cd ../.. + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + - name: Consumer Protocol Tests + run: | + cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../.. + sleep 300 + export TEST_CONSUMER_GROUP_PROTOCOL=consumer + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + schema-registry-tests: + name: 'Schema registry and serdes integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + - name: Run Tests + run: | + cd test/docker && docker-compose up -d && cd ../.. + dotnet restore + cd test/Confluent.SchemaRegistry.Serdes.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. \ No newline at end of file From 331f8671f62946ccd13d159d96064636f48efde9 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 09:24:43 +0100 Subject: [PATCH 20/30] ci (#1) --- .github/workflows/build.yml | 99 +++++++++++++++++++++++++++++++ .github/workflows/integration.yml | 62 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/integration.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 000000000..99c35b0d8 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,99 @@ +name: 'confluent-kafka-dotnet build pipeline' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + +on: + push: + branches: [ main ] + tags: [ '*' ] + pull_request: + branches: [ main ] + +jobs: + linux-build: + name: 'Linux x64' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Build and test + run: | + dotnet restore + make build + make test + + osx-build: + name: 'OSX x64' + runs-on: macos-13 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Set ulimit + run: ulimit -n 1024 + - name: Build and test + run: | + dotnet restore + make build + make test + + windows-build: + name: 'Windows x64' + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Build and test + run: | + dotnet restore + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj + dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj + + windows-artifacts: + name: 'Windows Artifacts' + needs: windows-build + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Install DocFX + run: dotnet tool update -g docfx + - name: Build and create packages + run: | + dotnet restore + dotnet build Confluent.Kafka.sln -c $env:CONFIGURATION + + # Different packaging for tagged vs untagged builds + if ($env:GITHUB_REF -match '^refs/tags/') { + $suffix = "" + } else { + $suffix = "--version-suffix ci-$env:GITHUB_RUN_ID" + } + + dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts + + docfx doc/docfx.json + tar -czf artifacts/docs-$env:GITHUB_RUN_ID.zip doc/_site/* + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: build-artifacts + path: artifacts/ diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 000000000..def0cde88 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,62 @@ +name: 'Integration Tests' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + SEMAPHORE_SKIP_FLAKY_TESTS: 'true' + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + integration-tests: + name: 'Integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + + - name: Classic Protocol Tests + run: | + cd test/docker && docker-compose up -d && sleep 30 && cd ../.. + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + - name: Consumer Protocol Tests + run: | + cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../.. + sleep 300 + export TEST_CONSUMER_GROUP_PROTOCOL=consumer + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + schema-registry-tests: + name: 'Schema registry and serdes integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + - name: Run Tests + run: | + cd test/docker && docker-compose up -d && cd ../.. + dotnet restore + cd test/Confluent.SchemaRegistry.Serdes.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. \ No newline at end of file From 88681afd2440f142e5168956c862a8372b66621a Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 09:27:54 +0100 Subject: [PATCH 21/30] ci2 --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 99c35b0d8..04adc84a5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -97,3 +97,4 @@ jobs: with: name: build-artifacts path: artifacts/ + From 5448e5c9133bceb1569a860a0f5cc485394bcd60 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 09:32:33 +0100 Subject: [PATCH 22/30] no branches --- .github/workflows/build.yml | 3 --- .github/workflows/integration.yml | 2 -- 2 files changed, 5 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 99c35b0d8..3ceea34de 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,10 +6,7 @@ env: on: push: - branches: [ main ] - tags: [ '*' ] pull_request: - branches: [ main ] jobs: linux-build: diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index def0cde88..65a970b62 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -7,9 +7,7 @@ env: on: push: - branches: [ main ] pull_request: - branches: [ main ] jobs: integration-tests: From d8bae45119f4865f6b18483884d3b65cac7e4e2a Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 10:19:55 +0100 Subject: [PATCH 23/30] build --- .github/workflows/build.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3ceea34de..665cf1e05 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -53,9 +53,9 @@ jobs: - name: Build and test run: | dotnet restore - dotnet test -c $env:CONFIGURATION --no-build test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj - dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj - dotnet test -c $env:CONFIGURATION --no-build test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj + dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj + dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj windows-artifacts: name: 'Windows Artifacts' From 3526d6ea470211d2a21351294cdebe708e71a7ea Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 10:39:26 +0100 Subject: [PATCH 24/30] removing docfx --- .github/workflows/build.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 665cf1e05..5ad85012d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -86,9 +86,7 @@ jobs: dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts - - docfx doc/docfx.json - tar -czf artifacts/docs-$env:GITHUB_RUN_ID.zip doc/_site/* + - name: Upload artifacts uses: actions/upload-artifact@v3 with: From b72aa5c738ec7779e434fcfa3bf5b58aa010c7a8 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 10:46:55 +0100 Subject: [PATCH 25/30] only Confluent.Kafka --- .github/workflows/build.yml | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5ad85012d..69bfd6b1d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -54,8 +54,8 @@ jobs: run: | dotnet restore dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj - dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj - dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj + # dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj + # dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj windows-artifacts: name: 'Windows Artifacts' @@ -82,11 +82,13 @@ jobs: } dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj -c $env:CONFIGURATION $suffix --output artifacts - dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts - dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts - dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts - dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts + # docfx doc/docfx.json + # tar -czf artifacts/docs-$env:GITHUB_RUN_ID.zip doc/_site/* - name: Upload artifacts uses: actions/upload-artifact@v3 with: From 7f499d8490109691cc1263e2a96b834658eef76c Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 10:50:10 +0100 Subject: [PATCH 26/30] explicit dotnet test --- .github/workflows/build.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 69bfd6b1d..8126f7141 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,8 +21,7 @@ jobs: - name: Build and test run: | dotnet restore - make build - make test + dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj osx-build: name: 'OSX x64' @@ -38,8 +37,7 @@ jobs: - name: Build and test run: | dotnet restore - make build - make test + dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj windows-build: name: 'Windows x64' From 9fc39e553790036b2ea8dd9b3c44185c5d205be8 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 11:02:23 +0100 Subject: [PATCH 27/30] make --- .github/workflows/build.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8126f7141..69bfd6b1d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,7 +21,8 @@ jobs: - name: Build and test run: | dotnet restore - dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + make build + make test osx-build: name: 'OSX x64' @@ -37,7 +38,8 @@ jobs: - name: Build and test run: | dotnet restore - dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + make build + make test windows-build: name: 'Windows x64' From ca66707aff1c602875e0610e60f35ec25d7c7f71 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Thu, 12 Dec 2024 11:16:52 +0100 Subject: [PATCH 28/30] suffix vsuffix --- .github/workflows/build.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 69bfd6b1d..f6105fb6e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,11 +77,13 @@ jobs: # Different packaging for tagged vs untagged builds if ($env:GITHUB_REF -match '^refs/tags/') { $suffix = "" + $vsuffix = "" } else { - $suffix = "--version-suffix ci-$env:GITHUB_RUN_ID" + $suffix = "ci-$env:GITHUB_RUN_ID" + $vsuffix = "--version-suffix" } - dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj -c $env:CONFIGURATION $suffix --output artifacts + dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output artifacts -c $env:CONFIGURATION $vsuffix $suffix # dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts # dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts # dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts From ae85d5e68a80a6512ea6aca652816229a328410e Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Fri, 13 Dec 2024 13:34:09 +0100 Subject: [PATCH 29/30] rename --- src/Confluent.Kafka/Confluent.Kafka.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 4c9467ddc..93302a70d 100644 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -12,10 +12,10 @@ https://raw.githubusercontent.com/confluentinc/confluent-kafka-dotnet/master/confluent-logo.png https://github.com/confluentinc/confluent-kafka-dotnet/releases Kafka;Confluent;librdkafka - Confluent.Kafka + Confluent.Kafka.MK README.md Confluent.Kafka - Confluent.Kafka + Confluent.Kafka.MK 2.6.1 netstandard2.0;net462;net6.0;net8.0 true From 33e66139bcd2fdfcdd5f50d4a46c080b668fbee5 Mon Sep 17 00:00:00 2001 From: Marcin Krystianc Date: Fri, 13 Dec 2024 14:54:26 +0100 Subject: [PATCH 30/30] no assembly signing --- src/Confluent.Kafka/Confluent.Kafka.csproj | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 93302a70d..39bcabed4 100644 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -20,8 +20,6 @@ netstandard2.0;net462;net6.0;net8.0 true true - true - Confluent.Kafka.snk