diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..c4d1853ee --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,43 @@ + +What +---- + + +Checklist +------------------ +- [ ] Contains customer facing changes? Including API/behavior changes +- [ ] Did you add sufficient unit test and/or integration test coverage for this PR? + - If not, please explain why it is not required + +References +---------- +JIRA: + + +Test & Review +------------ + + +Open questions / Follow-ups +-------------------------- + + + diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 000000000..f6105fb6e --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,98 @@ +name: 'confluent-kafka-dotnet build pipeline' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + +on: + push: + pull_request: + +jobs: + linux-build: + name: 'Linux x64' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Build and test + run: | + dotnet restore + make build + make test + + osx-build: + name: 'OSX x64' + runs-on: macos-13 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Set ulimit + run: ulimit -n 1024 + - name: Build and test + run: | + dotnet restore + make build + make test + + windows-build: + name: 'Windows x64' + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Build and test + run: | + dotnet restore + dotnet test -c $env:CONFIGURATION test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + # dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.UnitTests/Confluent.SchemaRegistry.UnitTests.csproj + # dotnet test -c $env:CONFIGURATION test/Confluent.SchemaRegistry.Serdes.UnitTests/Confluent.SchemaRegistry.Serdes.UnitTests.csproj + + windows-artifacts: + name: 'Windows Artifacts' + needs: windows-build + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.403' + - name: Install DocFX + run: dotnet tool update -g docfx + - name: Build and create packages + run: | + dotnet restore + dotnet build Confluent.Kafka.sln -c $env:CONFIGURATION + + # Different packaging for tagged vs untagged builds + if ($env:GITHUB_REF -match '^refs/tags/') { + $suffix = "" + $vsuffix = "" + } else { + $suffix = "ci-$env:GITHUB_RUN_ID" + $vsuffix = "--version-suffix" + } + + dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output artifacts -c $env:CONFIGURATION $vsuffix $suffix + # dotnet pack src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj -c $env:CONFIGURATION $suffix --output artifacts + # dotnet pack src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj -c $env:CONFIGURATION $suffix --output artifacts + + # docfx doc/docfx.json + # tar -czf artifacts/docs-$env:GITHUB_RUN_ID.zip doc/_site/* + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: build-artifacts + path: artifacts/ diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 000000000..65a970b62 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,60 @@ +name: 'Integration Tests' + +env: + CONFIGURATION: Release + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + SEMAPHORE_SKIP_FLAKY_TESTS: 'true' + +on: + push: + pull_request: + +jobs: + integration-tests: + name: 'Integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + + - name: Classic Protocol Tests + run: | + cd test/docker && docker-compose up -d && sleep 30 && cd ../.. + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + - name: Consumer Protocol Tests + run: | + cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../.. + sleep 300 + export TEST_CONSUMER_GROUP_PROTOCOL=consumer + dotnet restore + cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. + + schema-registry-tests: + name: 'Schema registry and serdes integration tests' + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: '6.0.x' + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_APIKEY }} + - name: Run Tests + run: | + cd test/docker && docker-compose up -d && cd ../.. + dotnet restore + cd test/Confluent.SchemaRegistry.Serdes.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../.. \ No newline at end of file diff --git a/3RD_PARTY.md b/3RD_PARTY.md index 6a4a20f57..df00ef05c 100644 --- a/3RD_PARTY.md +++ b/3RD_PARTY.md @@ -10,3 +10,4 @@ To add your project, open a pull request! - [Multi Schema Avro Deserializer](https://github.com/ycherkes/multi-schema-avro-desrializer) - Avro deserializer for reading messages serialized with multiple schemas. - [OpenSleigh.Transport.Kafka](https://github.com/mizrael/OpenSleigh/tree/develop/src/OpenSleigh.Transport.Kafka) - A Kafka Transport for OpenSleigh, a distributed saga management library. - [SlimMessageBus.Host.Kafka](https://github.com/zarusz/SlimMessageBus) - Apache Kafka transport for SlimMessageBus (lightweight message bus for .NET) +- [Kafka Core](https://github.com/ffernandolima/confluent-kafka-core-dotnet) - Kafka Core empowers developers to build robust .NET applications on top of Confluent Kafka, focusing on simplicity, maintainability, and extensibility with intuitive abstractions and builders. \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index ae3755874..8d3377608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,17 @@ ## Enhancements +* References librdkafka.redist 2.6.1. Refer to the [librdkafka v2.6.1 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1) for more information. + +## Fixes + * Fix to continue supporting .NET Framework 4.6.2+ in core client library (#2342). +* Fix JSON Schema handling to not require use of `$id` (#2339). +* Update Caching.Memory to 8.0.1 to address CVE (#23440. +* Added Qualified and Custom reference name strategy approaches for protobuf references (#2345). +* Fix validate of SSL CA certs in Schema Registry client (#2346). +* Skip SSL certs validation when configured in Schema Registry client (#2347). +* Allow proxy to be specified in Schema Registry client (#2348). # 2.6.0 diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 4c9467ddc..39bcabed4 100644 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -12,16 +12,14 @@ https://raw.githubusercontent.com/confluentinc/confluent-kafka-dotnet/master/confluent-logo.png https://github.com/confluentinc/confluent-kafka-dotnet/releases Kafka;Confluent;librdkafka - Confluent.Kafka + Confluent.Kafka.MK README.md Confluent.Kafka - Confluent.Kafka + Confluent.Kafka.MK 2.6.1 netstandard2.0;net462;net6.0;net8.0 true true - true - Confluent.Kafka.snk diff --git a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs index 0b4a0ff94..5a3c14ac8 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs @@ -12,7 +12,7 @@ public class AwsKmsClient : IKmsClient { private AmazonKeyManagementServiceClient kmsClient; private string keyId; - + public string KekId { get; } public AwsKmsClient(string kekId, AWSCredentials credentials) @@ -36,7 +36,7 @@ public AwsKmsClient(string kekId, AWSCredentials credentials) public bool DoesSupport(string uri) { - return uri.StartsWith(AwsKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs index 2ef3f1cd7..1a70524c6 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs @@ -25,7 +25,7 @@ public AzureKmsClient(string kekId, TokenCredential tokenCredential) public bool DoesSupport(string uri) { - return uri.StartsWith(AzureKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs index db5fd4683..c3a995bfe 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs @@ -36,7 +36,7 @@ public GcpKmsClient(string kekId, GoogleCredential credential) public bool DoesSupport(string uri) { - return uri.StartsWith(GcpKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs index 128a531b1..781af5714 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs @@ -21,11 +21,6 @@ public class HcVaultKmsClient : IKmsClient public HcVaultKmsClient(string kekId, string ns, string tokenId) { - if (tokenId == null) - { - tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); - ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); - } KekId = kekId; Namespace = ns; TokenId = tokenId; @@ -53,7 +48,7 @@ public HcVaultKmsClient(string kekId, string ns, string tokenId) public bool DoesSupport(string uri) { - return uri.StartsWith(HcVaultKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs index e220afc7d..b2cfd17ca 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs @@ -23,6 +23,11 @@ public IKmsClient NewKmsClient(IDictionary config, string keyUrl { config.TryGetValue(TokenId, out string tokenId); config.TryGetValue(Namespace, out string ns); + if (tokenId == null) + { + tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); + ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); + } return new HcVaultKmsClient(keyUrl, ns, tokenId); } } diff --git a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs index cbd631b5f..f11798380 100644 --- a/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs @@ -18,7 +18,7 @@ using System.Threading.Tasks; using System.Linq; using System; -using System.ComponentModel; +using System.Net; using System.Threading; using System.Security.Cryptography.X509Certificates; @@ -31,7 +31,7 @@ public record DekId(string KekName, string Subject, int? Version, DekFormat? Dek /// /// A caching DEK Registry client. /// - public class CachedDekRegistryClient : IDekRegistryClient, IDisposable + public class CachedDekRegistryClient : IDekRegistryClient { private DekRestService restService; @@ -48,6 +48,21 @@ public class CachedDekRegistryClient : IDekRegistryClient, IDisposable /// public const int DefaultTimeout = 30000; + /// + /// The default maximum number of retries. + /// + public const int DefaultMaxRetries = RestService.DefaultMaxRetries; + + /// + /// The default time to wait for the first retry. + /// + public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs; + + /// + /// The default time to wait for any retry. + /// + public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs; + /// /// The default maximum capacity of the local cache. /// @@ -71,12 +86,16 @@ public int MaxCachedKeys /// /// The authentication header value provider /// + /// + /// The proxy server to use for connections + /// public CachedDekRegistryClient(IEnumerable> config, - IAuthenticationHeaderValueProvider authenticationHeaderValueProvider) + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IWebProxy proxy = null) { if (config == null) { - throw new ArgumentNullException("config properties must be specified."); + throw new ArgumentNullException("config"); } var schemaRegistryUrisMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl); @@ -101,6 +120,45 @@ public CachedDekRegistryClient(IEnumerable> config, $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer."); } + var maxRetriesMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); + int maxRetries; + try + { + maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer."); + } + + var retriesWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); + int retriesWaitMs; + try + { + retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer."); + } + + var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); + int retriesMaxWaitMs; + try + { + retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer."); + } + var identityMapCapacityMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas); try @@ -206,6 +264,9 @@ public CachedDekRegistryClient(IEnumerable> config, if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && @@ -236,8 +297,10 @@ public CachedDekRegistryClient(IEnumerable> config, $"Configured value for {SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification} must be a bool."); } + var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value; + var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation); this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, - SetSslConfig(config), sslVerify); + SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs); } /// @@ -291,14 +354,6 @@ private List SetSslConfig(IEnumerable prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation) - .Value ?? ""; - if (!String.IsNullOrEmpty(caLocation)) - { - certificates.Add(new X509Certificate2(caLocation)); - } - return certificates; } diff --git a/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs b/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs index 3ec805e56..5b8e0c7de 100644 --- a/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs +++ b/src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs @@ -45,12 +45,20 @@ public FieldEncryptionExecutor(IDekRegistryClient client, IClock clock) Clock = clock ?? new Clock(); } - public override void Configure(IEnumerable> config) + public override void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { Configs = config; if (Client == null) { - Client = new CachedDekRegistryClient(Configs); + if (client != null) + { + Client = new CachedDekRegistryClient(Configs, client.AuthHeaderProvider, client.Proxy); + } + else + { + Client = new CachedDekRegistryClient(Configs); + } } } diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs index 66afb08ef..5b05dac89 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs @@ -16,14 +16,6 @@ public class LocalKmsClient : IKmsClient public LocalKmsClient(string secret) { - if (secret == null) - { - secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); - } - if (secret == null) - { - throw new ArgumentNullException("Cannot load secret"); - } Secret = secret; cryptor = new Cryptor(DekFormat.AES128_GCM); byte[] rawKey = Hkdf.DeriveKey( diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs index 87ad7bd91..2def8c054 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs @@ -22,6 +22,14 @@ public string GetKeyUrlPrefix() public IKmsClient NewKmsClient(IDictionary config, string keyUrl) { config.TryGetValue(Secret, out string secret); + if (secret == null) + { + secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); + } + if (secret == null) + { + throw new ArgumentNullException("Cannot load secret"); + } return new LocalKmsClient(secret); } } diff --git a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs index be7b2bc2c..ebda3d756 100644 --- a/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs +++ b/src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; +using System.Net; using System.Net.Http; using System.Threading.Tasks; using System.Security.Cryptography.X509Certificates; @@ -29,9 +30,11 @@ public class DekRestService : RestService /// public DekRestService(string schemaRegistryUrl, int timeoutMs, IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List certificates, - bool enableSslCertificateVerification) : + bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null, + int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs, + int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) : base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates, - enableSslCertificateVerification) + enableSslCertificateVerification, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs) { } diff --git a/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs b/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs index b3843c3e2..346607a59 100644 --- a/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/CelExecutor.cs @@ -34,7 +34,8 @@ public CelExecutor() { } - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs b/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs index c51c71c9b..5cd329a20 100644 --- a/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs @@ -22,7 +22,8 @@ public CelFieldExecutor() public override string Type() => RuleType; - public override void Configure(IEnumerable> config) + public override void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs b/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs index 33dd64b5f..c34fe9a4a 100644 --- a/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs +++ b/src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs @@ -20,7 +20,8 @@ public JsonataExecutor() { } - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs index 6d026fa0a..96b938973 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs @@ -72,7 +72,7 @@ public async Task Deserialize(string topic, Headers headers, byte } string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs index f15e7b2e6..5d25eceed 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs @@ -126,7 +126,7 @@ public async Task Deserialize(string topic, Headers headers, byte[] array, bo } string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 21f0d7a8c..9d75e708d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -63,6 +63,8 @@ public class JsonDeserializer : AsyncDeserializer where T : cl private JsonSchema schema = null; + private bool validate = true; + private JsonSerializerSettings jsonSchemaGeneratorSettingsSerializerSettings { get => #if NET8_0_OR_GREATER @@ -110,6 +112,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; } if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } + if (config.Validate!= null) { this.validate= config.Validate.Value; } } /// @@ -172,7 +175,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i bool isKey = context.Component == MessageComponentType.Key; string topic = context.Topic; string subject = GetSubjectName(topic, isKey, null); - Schema latestSchema = null; + RegisteredSchema latestSchema = null; if (subject != null) { latestSchema = await GetReaderSchema(subject) @@ -228,10 +231,9 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i .ContinueWith(t => (JToken)t.Result) .ConfigureAwait(continueOnCapturedContext: false); - if (schema != null) + if (schema != null && validate) { var validationResult = validator.Validate(json, schema); - if (validationResult.Count > 0) { throw new InvalidDataException("Schema validation failed for properties: [" + @@ -249,7 +251,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i { string serializedString = jsonReader.ReadToEnd(); - if (schema != null) + if (schema != null && validate) { var validationResult = validator.Validate(serializedString, schema); diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs index 26361dcef..9fd8baf7b 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs @@ -53,6 +53,13 @@ public static class PropertyNames /// Possible values: /// public const string SubjectNameStrategy = "json.deserializer.subject.name.strategy"; + + /// + /// Specifies whether to validate payloads against the schema. + /// + /// default: true + /// + public const string Validate= "json.serializer.validate"; } @@ -124,5 +131,18 @@ public SubjectNameStrategy? SubjectNameStrategy else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } } } + + + /// + /// Specifies whether or not the JSON serializer should attempt to + /// validate the payload against the schema. + /// + /// default: true + /// + public bool? Validate + { + get { return GetBool(PropertyNames.Validate); } + set { SetObject(PropertyNames.Validate, value); } + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index 643c0f207..229af15fe 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -75,6 +75,8 @@ public class JsonSerializer : AsyncSerializer where T : class private string schemaText; private string schemaFullname; + private bool validate = true; + private JsonSerializerSettings jsonSchemaGeneratorSettingsSerializerSettings { get => #if NET8_0_OR_GREATER @@ -129,6 +131,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer if (config.LatestCompatibilityStrict != null) { this.latestCompatibilityStrict = config.LatestCompatibilityStrict.Value; } if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } + if (config.Validate!= null) { this.validate= config.Validate.Value; } if (this.useLatestVersion && this.autoRegisterSchema) { @@ -245,10 +248,13 @@ public override async Task SerializeAsync(T value, SerializationContext } var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, jsonSchemaGeneratorSettingsSerializerSettings); - var validationResult = validator.Validate(serializedString, this.schema); - if (validationResult.Count > 0) + if (validate) { - throw new InvalidDataException("Schema validation failed for properties: [" + string.Join(", ", validationResult.Select(r => r.Path)) + "]"); + var validationResult = validator.Validate(serializedString, this.schema); + if (validationResult.Count > 0) + { + throw new InvalidDataException("Schema validation failed for properties: [" + string.Join(", ", validationResult.Select(r => r.Path)) + "]"); + } } using (var stream = new MemoryStream(initialBufferSize)) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs index 8886299fc..13af2dea3 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs @@ -92,6 +92,13 @@ public static class PropertyNames /// Possible values: /// public const string SubjectNameStrategy = "json.serializer.subject.name.strategy"; + + /// + /// Specifies whether to validate payloads against the schema. + /// + /// default: true + /// + public const string Validate= "json.serializer.validate"; } @@ -220,5 +227,17 @@ public SubjectNameStrategy? SubjectNameStrategy } } + + /// + /// Specifies whether or not the JSON serializer should attempt to + /// validate the payload against the schema. + /// + /// default: true + /// + public bool? Validate + { + get { return GetBool(PropertyNames.Validate); } + set { SetObject(PropertyNames.Validate, value); } + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs index b58fc7817..3ff582484 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs @@ -128,7 +128,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i // Currently Protobuf does not support migration rules because of lack of support for DynamicMessage // See https://github.com/protocolbuffers/protobuf/issues/658 /* - Schema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion) + RegisteredSchema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion) .ConfigureAwait(continueOnCapturedContext: false); */ diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index 898ec21da..53ad08c4d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -54,7 +54,7 @@ namespace Confluent.SchemaRegistry.Serdes /// public class ProtobufSerializer : AsyncSerializer where T : IMessage, new() { - private bool skipKnownTypes; + private bool skipKnownTypes = true; private bool useDeprecatedFormat; private ReferenceSubjectNameStrategyDelegate referenceSubjectNameStrategy; @@ -190,7 +190,7 @@ private async Task> RegisterOrGetReferences(FileDescriptor for (int i=0; i ParseSchema(Schema schema) .ConfigureAwait(continueOnCapturedContext: false); return ProtobufUtils.Parse(schema.SchemaString, references); } + + protected override bool IgnoreReference(string name) + { + return name.StartsWith("confluent/") || + name.StartsWith("google/protobuf/") || + name.StartsWith("google/type/"); + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs index b40a0d9ca..72071509b 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs @@ -195,7 +195,7 @@ public IDictionary UseLatestWithMetadata /// Specifies whether or not the Protobuf serializer should skip known types /// when resolving dependencies. /// - /// default: false + /// default: true /// public bool? SkipKnownTypes { diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 082f2dfd9..698e6873b 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -18,6 +18,7 @@ #pragma warning disable CS0618 using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -41,7 +42,7 @@ public abstract class AsyncSerde protected SemaphoreSlim serdeMutex = new SemaphoreSlim(1); - private readonly IDictionary parsedSchemaCache = new Dictionary(); + private readonly IDictionary parsedSchemaCache = new ConcurrentDictionary(); private SemaphoreSlim parsedSchemaMutex = new SemaphoreSlim(1); protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig config, RuleRegistry ruleRegistry = null) @@ -59,7 +60,7 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con foreach (IRuleExecutor executor in this.ruleRegistry.GetExecutors()) { - executor.Configure(ruleConfigs); + executor.Configure(ruleConfigs, schemaRegistryClient); } } @@ -98,10 +99,15 @@ protected string GetSubjectName(string topic, bool isKey, string recordType) protected async Task GetParsedSchema(Schema schema) { + if (parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + { + return parsedSchema; + } + await parsedSchemaMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + if (!parsedSchemaCache.TryGetValue(schema, out parsedSchema)) { if (parsedSchemaCache.Count > schemaRegistryClient.MaxCachedSchemas) { @@ -136,6 +142,11 @@ protected async Task> ResolveReferences(Schema schem .ConfigureAwait(continueOnCapturedContext: false); return result; } + + protected virtual bool IgnoreReference(string name) + { + return false; + } private async Task> ResolveReferences( Schema schema, IDictionary schemas, ISet visited) @@ -143,7 +154,7 @@ private async Task> ResolveReferences( IList references = schema.References; foreach (SchemaReference reference in references) { - if (visited.Contains(reference.Name)) + if (IgnoreReference(reference.Name) || visited.Contains(reference.Name)) { continue; } @@ -166,11 +177,14 @@ await ResolveReferences(s, schemas, visited) return schemas; } - protected async Task> GetMigrations(string subject, Schema writerSchema, Schema readerSchema) + protected async Task> GetMigrations(string subject, Schema writer, RegisteredSchema readerSchema) { + var writerSchema = await schemaRegistryClient.LookupSchemaAsync(subject, writer, false, false) + .ConfigureAwait(continueOnCapturedContext: false); + RuleMode migrationMode; - Schema first; - Schema last; + RegisteredSchema first; + RegisteredSchema last; IList migrations = new List(); if (writerSchema.Version < readerSchema.Version) { @@ -217,7 +231,7 @@ protected async Task> GetMigrations(string subject, Schema writ return migrations; } - private async Task> GetSchemasBetween(string subject, Schema first, Schema last) + private async Task> GetSchemasBetween(string subject, RegisteredSchema first, RegisteredSchema last) { if (last.Version - first.Version <= 1) { @@ -297,6 +311,7 @@ protected async Task ExecuteMigrations( /// /// /// + /// /// /// /// @@ -344,7 +359,7 @@ protected async Task ExecuteRules( for (int i = 0; i < rules.Count; i++) { Rule rule = rules[i]; - if (rule.Disabled) + if (IsDisabled(rule)) { continue; } @@ -391,21 +406,21 @@ protected async Task ExecuteRules( default: throw new ArgumentException("Unsupported rule kind " + rule.Kind); } - await RunAction(ctx, ruleMode, rule, message != null ? rule.OnSuccess : rule.OnFailure, + await RunAction(ctx, ruleMode, rule, message != null ? GetOnSuccess(rule) : GetOnFailure(rule), message, null, message != null ? null : ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } catch (RuleException ex) { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, ex, ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } } else { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, new RuleException("Could not find rule executor of type " + rule.Type), ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); @@ -414,6 +429,45 @@ await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, return message; } + private string GetOnSuccess(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnSuccess != null) + { + return ruleOverride.OnSuccess; + } + } + + return rule.OnSuccess; + } + + private string GetOnFailure(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnFailure != null) + { + return ruleOverride.OnFailure; + } + } + + return rule.OnFailure; + } + + private bool IsDisabled(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.Disabled.HasValue) + { + return ruleOverride.Disabled.Value; + } + } + + return rule.Disabled; + } + private static IRuleExecutor GetRuleExecutor(RuleRegistry ruleRegistry, string type) { if (ruleRegistry.TryGetExecutor(type, out IRuleExecutor result)) diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 8cfb31649..949b0f3ba 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -23,6 +23,7 @@ using System.Threading.Tasks; using System.Linq; using System; +using System.Collections.Concurrent; using System.Net; using System.Threading; using System.Security.Cryptography.X509Certificates; @@ -42,7 +43,7 @@ namespace Confluent.SchemaRegistry /// - /// - /// - - /// - + /// - /// /// The following method calls do NOT cache results: /// - @@ -54,16 +55,18 @@ namespace Confluent.SchemaRegistry /// - /// - /// - public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable + public class CachedSchemaRegistryClient : ISchemaRegistryClient { private readonly List EmptyReferencesList = new List(); private IEnumerable> config; + private IAuthenticationHeaderValueProvider authHeaderProvider; + private IWebProxy proxy; private IRestService restService; private int identityMapCapacity; private int latestCacheTtlSecs; - private readonly Dictionary schemaById = new Dictionary(); + private readonly ConcurrentDictionary schemaById = new ConcurrentDictionary(); private readonly Dictionary> idBySchemaBySubject = new Dictionary>(); @@ -86,6 +89,21 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable /// public const int DefaultTimeout = 30000; + /// + /// The default maximum number of retries. + /// + public const int DefaultMaxRetries = RestService.DefaultMaxRetries; + + /// + /// The default time to wait for the first retry. + /// + public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs; + + /// + /// The default time to wait for any retry. + /// + public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs; + /// /// The default maximum capacity of the local schema cache. /// @@ -117,6 +135,16 @@ public IEnumerable> Config => config; + /// + public IAuthenticationHeaderValueProvider AuthHeaderProvider + => authHeaderProvider; + + + /// + public IWebProxy Proxy + => proxy; + + /// public int MaxCachedSchemas => identityMapCapacity; @@ -176,10 +204,12 @@ public CachedSchemaRegistryClient(IEnumerable> conf { if (config == null) { - throw new ArgumentNullException("config properties must be specified."); + throw new ArgumentNullException("config"); } this.config = config; + this.authHeaderProvider = authenticationHeaderValueProvider; + this.proxy = proxy; keySubjectNameStrategy = GetKeySubjectNameStrategy(config); valueSubjectNameStrategy = GetValueSubjectNameStrategy(config); @@ -207,6 +237,45 @@ public CachedSchemaRegistryClient(IEnumerable> conf $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer."); } + var maxRetriesMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); + int maxRetries; + try + { + maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer."); + } + + var retriesWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); + int retriesWaitMs; + try + { + retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer."); + } + + var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); + int retriesMaxWaitMs; + try + { + retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value); + } + catch (FormatException) + { + throw new ArgumentException( + $"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer."); + } + var identityMapCapacityMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas); try @@ -326,6 +395,9 @@ public CachedSchemaRegistryClient(IEnumerable> conf if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && + property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && @@ -358,7 +430,8 @@ public CachedSchemaRegistryClient(IEnumerable> conf var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value; var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation); - this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy); + this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, + SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs); } /// @@ -520,16 +593,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) // if a format isn't specified, then assume text is desired. if (format == null) { - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return true; // Base64 conversion failed, infer the schemaString format is text. - } - - return false; // Base64 conversion succeeded, so infer the schamaString format is base64. + // If schemaString is not Base64, infer the schemaString format is text. + return !Utils.IsBase64String(schemaString); } else { @@ -537,17 +602,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) { throw new ArgumentException($"Invalid schema format was specified: {format}."); } - - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return false; - } - - return true; + + return Utils.IsBase64String(schemaString); } } @@ -585,11 +641,15 @@ public async Task GetSchemaAsync(int id, string format = null) /// public async Task GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null) { + if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString)) + { + return schema; + } + await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!this.schemaById.TryGetValue(id, out Schema schema) || - !checkSchemaMatchesFormat(format, schema.SchemaString)) + if (!this.schemaById.TryGetValue(id, out schema) || !checkSchemaMatchesFormat(format, schema.SchemaString)) { CleanCacheIfFull(); schema = (await restService.GetSchemaBySubjectAndIdAsync(subject, id, format) @@ -663,6 +723,7 @@ public async Task GetLatestSchemaAsync(string subject) return schema; } + /// public async Task GetLatestWithMetadataAsync(string subject, IDictionary metadata, bool ignoreDeletedSchemas) { diff --git a/src/Confluent.SchemaRegistry/ErrorAction.cs b/src/Confluent.SchemaRegistry/ErrorAction.cs index 598d7522c..7698e86db 100644 --- a/src/Confluent.SchemaRegistry/ErrorAction.cs +++ b/src/Confluent.SchemaRegistry/ErrorAction.cs @@ -27,7 +27,8 @@ public class ErrorAction : IRuleAction { public static readonly string ActionType = "ERROR"; - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs b/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs index 0be208719..21ac9ff92 100644 --- a/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs +++ b/src/Confluent.SchemaRegistry/FieldRuleExecutor.cs @@ -21,7 +21,8 @@ namespace Confluent.SchemaRegistry { public abstract class FieldRuleExecutor : IRuleExecutor { - public abstract void Configure(IEnumerable> config); + public abstract void Configure(IEnumerable> config, + ISchemaRegistryClient client = null); public abstract string Type(); diff --git a/src/Confluent.SchemaRegistry/IRuleBase.cs b/src/Confluent.SchemaRegistry/IRuleBase.cs index 9086e1e50..5aa66b7fd 100644 --- a/src/Confluent.SchemaRegistry/IRuleBase.cs +++ b/src/Confluent.SchemaRegistry/IRuleBase.cs @@ -28,7 +28,9 @@ public interface IRuleBase : IDisposable /// Configure the rule executor or action /// /// - void Configure(IEnumerable> config); + /// + void Configure(IEnumerable> config, + ISchemaRegistryClient client = null); /// /// The type of rule executor or action diff --git a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs index 3876dfd1e..293e9c284 100644 --- a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; +using System.Net; using System.Threading.Tasks; @@ -32,6 +33,18 @@ public interface ISchemaRegistryClient : IDisposable IEnumerable> Config { get; } + /// + /// The authentication header provider. + /// + IAuthenticationHeaderValueProvider AuthHeaderProvider { get; } + + + /// + /// The web proxy. + /// + IWebProxy Proxy { get; } + + /// /// The maximum capacity of the local schema cache. /// @@ -141,7 +154,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Gets the schema uniquely identified by and . + /// Gets the schema uniquely identified by and . /// /// /// The subject. @@ -238,6 +251,12 @@ public interface ISchemaRegistryClient : IDisposable /// /// The subject to get the latest associated schema for. /// + /// + /// The metadata to search for. + /// + /// + /// Whether to ignore deleted schemas. + /// /// /// The latest schema with the given metadata registered against . /// diff --git a/src/Confluent.SchemaRegistry/NoneAction.cs b/src/Confluent.SchemaRegistry/NoneAction.cs index 6d8e02bff..556170302 100644 --- a/src/Confluent.SchemaRegistry/NoneAction.cs +++ b/src/Confluent.SchemaRegistry/NoneAction.cs @@ -26,7 +26,8 @@ public class NoneAction : IRuleAction { public static readonly string ActionType = "NONE"; - public void Configure(IEnumerable> config) + public void Configure(IEnumerable> config, + ISchemaRegistryClient client = null) { } diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs index c0efb4442..41545a14e 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs @@ -41,19 +41,19 @@ public class RegisteredSchema : Schema, IComparable, IEquatabl /// The subject the schema is registered against. /// [DataMember(Name = "subject")] - public override string Subject { get; set; } + public new string Subject { get; set; } /// /// The schema version. /// [DataMember(Name = "version")] - public override int Version { get; set; } + public new int Version { get; set; } /// /// Unique identifier of the schema. /// [DataMember(Name = "id")] - public override int Id { get; set; } + public new int Id { get; set; } /// /// The unregistered schema corresponding to this schema. diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs index 822ff74d7..6a9154020 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs @@ -45,14 +45,14 @@ public bool HasRules(RuleMode mode) { switch (mode) { case RuleMode.Upgrade: case RuleMode.Downgrade: - return MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); case RuleMode.UpDown: - return MigrationRules.Any(r => r.Mode == mode); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode); case RuleMode.Write: case RuleMode.Read: - return DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); case RuleMode.WriteRead: - return DomainRules.Any(r => r.Mode == mode); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode); default: return false; } diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs index 3b5746a70..08d901e9a 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs @@ -30,19 +30,22 @@ public class Schema : IComparable, IEquatable #region API backwards-compatibility hack /// - /// The subject the schema is registered against. + /// DEPRECATED. The subject the schema is registered against. /// - public virtual string Subject { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public string Subject { get; set; } /// - /// The schema version. + /// DEPRECATED. The schema version. /// - public virtual int Version { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public int Version { get; set; } /// - /// Unique identifier of the schema. + /// DEPRECATED. Unique identifier of the schema. /// - public virtual int Id { get; set; } + [Obsolete("Included to maintain API backwards compatibility only. Use RegisteredSchema instead. This property will be removed in a future version of the library.")] + public int Id { get; set; } /// /// DEPRECATED. Initializes a new instance of the Schema class. diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index af694d3d1..25e24d6d8 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -24,7 +24,6 @@ using System.Threading.Tasks; using X509Certificate2 = System.Security.Cryptography.X509Certificates.X509Certificate2; -using System.Security.Cryptography.X509Certificates; using System.Net.Security; namespace Confluent.SchemaRegistry @@ -35,6 +34,14 @@ public class RestService : IRestService private static readonly string acceptHeader = string.Join(", ", Versions.PreferredResponseTypes); + public const int DefaultMaxRetries = 2; + + public const int DefaultRetriesWaitMs = 1000; + + public const int DefaultRetriesMaxWaitMs = 20000; + + private static Random random = new Random(); + /// /// The index of the last client successfully used (or random if none worked). /// @@ -52,15 +59,25 @@ public class RestService : IRestService /// private readonly IAuthenticationHeaderValueProvider authenticationHeaderValueProvider; + private int maxRetries; + + private int retriesWaitMs; + + private int retriesMaxWaitMs; /// /// Initializes a new instance of the RestService class. /// public RestService(string schemaRegistryUrl, int timeoutMs, IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List certificates, - bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) + bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null, + int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs, + int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) { this.authenticationHeaderValueProvider = authenticationHeaderValueProvider; + this.maxRetries = maxRetries; + this.retriesWaitMs = retriesWaitMs; + this.retriesMaxWaitMs = retriesMaxWaitMs; this.clients = schemaRegistryUrl .Split(',') @@ -92,11 +109,11 @@ private static HttpClientHandler CreateHandler(List certificat if (!enableSslCertificateVerification) { handler.ServerCertificateCustomValidationCallback = (_, __, ___, ____) => { return true; }; - } + } else if (sslCaCertificate != null) { - handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => { - + handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => { + if (policyErrors == SslPolicyErrors.None) { return true; @@ -106,7 +123,7 @@ private static HttpClientHandler CreateHandler(List certificat if (chain.ChainElements.Count < 2) { return false; - } + } var connectionCertHash = chain.ChainElements[1].Certificate.GetCertHash(); @@ -124,7 +141,7 @@ private static HttpClientHandler CreateHandler(List certificat return false; } } - return true; + return true; }; } @@ -206,12 +223,9 @@ private async Task ExecuteOnOneInstanceAsync(Func SendRequest( + HttpClient client, Func createRequest) + { + HttpResponseMessage response = null; + for (int i = 0; i < maxRetries; i++) + { + response = await client + .SendAsync(createRequest()) + .ConfigureAwait(continueOnCapturedContext: false); + if (IsSuccess((int)response.StatusCode) || !IsRetriable((int)response.StatusCode) || i >= maxRetries) + { + return response; + } + + await Task.Delay(CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i)); + } + return response; + } + + private static bool IsSuccess(int statusCode) + { + return statusCode >= 200 && statusCode < 300; + } + + private static bool IsRetriable(int statusCode) + { + return statusCode == 408 || statusCode == 429 || + statusCode == 500 || statusCode == 502 || statusCode == 503 || statusCode == 504; + } + + protected static int CalculateRetryDelay(int baseDelayMs, int maxDelayMs, int retriesAttempted) + { + double jitter; + lock (random) { + jitter = random.NextDouble(); + } + return Convert.ToInt32(Math.Min(jitter * Math.Pow(2, retriesAttempted) * baseDelayMs, maxDelayMs)); + } /// /// Used for end points that return a json object { ... } diff --git a/src/Confluent.SchemaRegistry/RuleOverride.cs b/src/Confluent.SchemaRegistry/RuleOverride.cs new file mode 100644 index 000000000..cc00e4316 --- /dev/null +++ b/src/Confluent.SchemaRegistry/RuleOverride.cs @@ -0,0 +1,40 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.SchemaRegistry +{ + /// + /// A rule override. + /// + public class RuleOverride + { + public string Type { get; set; } + + public string OnSuccess { get; set; } + + public string OnFailure { get; set; } + + public bool? Disabled { get; set; } + + public RuleOverride(string type, string onSuccess, string onFailure, bool? disabled) + { + Type = type; + OnSuccess = onSuccess; + OnFailure = onFailure; + Disabled = disabled; + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/RuleRegistry.cs b/src/Confluent.SchemaRegistry/RuleRegistry.cs index af71421a0..16ae348c7 100644 --- a/src/Confluent.SchemaRegistry/RuleRegistry.cs +++ b/src/Confluent.SchemaRegistry/RuleRegistry.cs @@ -26,19 +26,16 @@ public class RuleRegistry { private readonly SemaphoreSlim ruleExecutorsMutex = new SemaphoreSlim(1); private readonly SemaphoreSlim ruleActionsMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim ruleOverridesMutex = new SemaphoreSlim(1); private IDictionary ruleExecutors = new Dictionary(); private IDictionary ruleActions = new Dictionary(); + private IDictionary ruleOverrides = new Dictionary(); private static readonly RuleRegistry GLOBAL_INSTANCE = new RuleRegistry(); public static RuleRegistry GlobalInstance => GLOBAL_INSTANCE; - public static List GetRuleActions() - { - return GlobalInstance.GetActions(); - } - public void RegisterExecutor(IRuleExecutor executor) { ruleExecutorsMutex.Wait(); @@ -123,6 +120,48 @@ public List GetActions() } } + public void RegisterOverride(RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + if (!ruleOverrides.ContainsKey(ruleOverride.Type)) + { + ruleOverrides.Add(ruleOverride.Type, ruleOverride); + } + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public bool TryGetOverride(string name, out RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + return ruleOverrides.TryGetValue(name, out ruleOverride); + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public List GetOverrides() + { + ruleOverridesMutex.Wait(); + try + { + return new List(ruleOverrides.Values); + } + finally + { + ruleOverridesMutex.Release(); + } + } + public static void RegisterRuleExecutor(IRuleExecutor executor) { GlobalInstance.RegisterExecutor(executor); @@ -132,5 +171,10 @@ public static void RegisterRuleAction(IRuleAction action) { GlobalInstance.RegisterAction(action); } + + public static void RegisterRuleOverride(RuleOverride ruleOverride) + { + GlobalInstance.RegisterOverride(ruleOverride); + } } } diff --git a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs index 568b2b77a..60dbac5c5 100644 --- a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs +++ b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs @@ -44,6 +44,28 @@ public static class PropertyNames /// public const string SchemaRegistryRequestTimeoutMs = "schema.registry.request.timeout.ms"; + /// + /// Specifies the maximum number of retries for a request. + /// + /// default: 3 + /// + public const string SchemaRegistryMaxRetries = "schema.registry.max.retries"; + + /// + /// Specifies the maximum time to wait for the first retry. + /// When jitter is applied, the actual wait may be less. + /// + /// default: 1000 + /// + public const string SchemaRegistryRetriesWaitMs = "schema.registry.retries.wait.ms"; + + /// + /// Specifies the maximum time to wait any retry. + /// + /// default: 20000 + /// + public const string SchemaRegistryRetriesMaxWaitMs = "schema.registry.retries.max.wait.ms"; + /// /// Specifies the maximum number of schemas CachedSchemaRegistryClient /// should cache locally. @@ -187,6 +209,39 @@ public int? RequestTimeoutMs set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs, value?.ToString()); } } + /// + /// Specifies the maximum number of retries for a request. + /// + /// default: 3 + /// + public int? MaxRetries + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries, value?.ToString()); } + } + + /// + /// Specifies the time to wait for the first retry. + /// + /// default: 1000 + /// + public int? RetriesWaitMs + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs, value?.ToString()); } + } + + /// + /// Specifies the time to wait for any retry. + /// + /// default: 20000 + /// + public int? RetriesMaxWaitMs + { + get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs, value?.ToString()); } + } + /// /// File or directory path to CA certificate(s) for verifying the schema registry's key. /// diff --git a/src/Confluent.SchemaRegistry/Utils.cs b/src/Confluent.SchemaRegistry/Utils.cs index 5c6e16cfc..f652f7b5f 100644 --- a/src/Confluent.SchemaRegistry/Utils.cs +++ b/src/Confluent.SchemaRegistry/Utils.cs @@ -20,6 +20,9 @@ using System.Linq; using Confluent.Kafka; +#if NET8_0_OR_GREATER +using System.Buffers.Text; +#endif namespace Confluent.SchemaRegistry { @@ -70,5 +73,22 @@ public static bool ListEquals(IList a, IList b) if (a == null || b == null) return false; return a.SequenceEqual(b); } + + internal static bool IsBase64String(string value) + { +#if NET8_0_OR_GREATER + return Base64.IsValid(value); +#else + try + { + _ = Convert.FromBase64String(value); + return true; + } + catch (FormatException) + { + return false; + } +#endif + } } } diff --git a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs index 427e609d6..f29048e2e 100644 --- a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs +++ b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs @@ -33,12 +33,13 @@ public static void ProduceConsumeGoogleRefProtobuf(string bootstrapServers, stri { var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryServers }; + var serializerConfig = new ProtobufSerializerConfig() { SkipKnownTypes = false }; using (var topic = new TemporaryTopic(bootstrapServers, 1)) using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) using (var producer = new ProducerBuilder(producerConfig) - .SetValueSerializer(new ProtobufSerializer(schemaRegistry)) + .SetValueSerializer(new ProtobufSerializer(schemaRegistry, serializerConfig)) .Build()) { var u = new WithGoogleRefs.TheRecord(); diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs index 8fff067a6..0dbd34b67 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs @@ -45,7 +45,15 @@ public BaseSerializeDeserializeTests() var schemaRegistryMock = new Mock(); schemaRegistryMock.Setup(x => x.ConstructValueSubjectName(testTopic, It.IsAny())).Returns($"{testTopic}-value"); schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( - (string topic, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 + (string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 + ); + schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( + (string subject, Schema schema, bool ignoreDeleted, bool normalize) => + { + return subjectStore[subject].First(x => + x.SchemaString == schema.SchemaString + ); + } ); schemaRegistryMock.Setup(x => x.GetSchemaBySubjectAndIdAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( (string subject, int id, string format) => diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs index c1fdab1ce..efcc049ae 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs @@ -577,12 +577,12 @@ public void CELFieldTransform() schema.RuleSet = new RuleSet(new List(), new List { - new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, "typeName == 'STRING' ; value + '-suffix'", null, null, false) } ); store[schemaStr] = 1; - subjectStore["topic-value"] = new List { schema }; + subjectStore["topic-value"] = new List { schema }; var config = new JsonSerializerConfig { AutoRegisterSchemas = false, @@ -607,6 +607,80 @@ public void CELFieldTransform() Assert.Equal(user.FavoriteNumber, result.FavoriteNumber); } + [Fact] + public void CELFieldTransformWithDef() + { + var schemaStr = @"{ + ""$schema"" : ""http://json-schema.org/draft-07/schema#"", + ""additionalProperties"" : false, + ""definitions"" : { + ""Address"" : { + ""additionalProperties"" : false, + ""properties"" : { + ""DoorNumber"" : { + ""type"" : ""integer"" + }, + ""DoorPin"" : { + ""confluent:tags"" : [ ""PII"" ], + ""type"" : ""string"" + } + }, + ""type"" : ""object"" + } + }, + ""properties"" : { + ""Address"" : { + ""$ref"" : ""#/definitions/Address"" + }, + ""Name"" : { + ""confluent:tags"" : [ ""PII"" ], + ""type"" : ""string"" + } + }, + ""title"" : ""Sample Event"", + ""type"" : ""object"" + }"; + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Json, null); + schema.RuleSet = new RuleSet(new List(), + new List + { + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", new HashSet + { + "PII" + + }, null, + "value + '-suffix'", null, null, false) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new JsonSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + var serializer = new JsonSerializer(schemaRegistryClient, config); + var deserializer = new JsonDeserializer(schemaRegistryClient); + + var address = new Address + { + DoorNumber = 100, + DoorPin = "1234" + }; + var person = new JsonPerson() + { + Address = address, + Name = "bob" + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(person, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal("bob-suffix", result.Name); + Assert.Equal("1234-suffix", result.Address.DoorPin); + } + [Fact] public void CELFieldCondition() { @@ -1004,4 +1078,17 @@ class NewerCustomer [JsonProperty("title")] public string Title { get; set; } } + + class JsonPerson + { + public string Name { get; set; } + public Address Address { get; set; } + + } + + class Address + { + public int DoorNumber { get; set; } + public string DoorPin { get; set; } + } } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 8843f882d..db021d651 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -275,12 +275,12 @@ public void ISpecificRecordCELFieldTransform() schema.RuleSet = new RuleSet(new List(), new List { - new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, "typeName == 'STRING' ; value + '-suffix'", null, null, false) } ); store[schemaStr] = 1; - subjectStore["topic-value"] = new List { schema }; + subjectStore["topic-value"] = new List { schema }; var config = new AvroSerializerConfig { AutoRegisterSchemas = false, @@ -305,6 +305,46 @@ public void ISpecificRecordCELFieldTransform() Assert.Equal(user.favorite_number, result.favorite_number); } + [Fact] + public void ISpecificRecordCELFieldTransformDisable() + { + var schemaStr = User._SCHEMA.ToString(); + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null); + schema.RuleSet = new RuleSet(new List(), + new List + { + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + "typeName == 'STRING' ; value + '-suffix'", null, null, false) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + RuleRegistry registry = new RuleRegistry(); + registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true)); + var serializer = new AvroSerializer(schemaRegistryClient, config, registry); + var deserializer = new AvroDeserializer(schemaRegistryClient, null); + + var user = new User + { + favorite_color = "blue", + favorite_number = 100, + name = "awesome" + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal("awesome", result.name); + Assert.Equal("blue", result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + } + [Fact] public void ISpecificRecordCELFieldCondition() {