From c36b86514a12796e8f17a2226eb3028e5d6f72a5 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 4 Aug 2025 09:49:25 -0700 Subject: [PATCH 1/5] First cut --- .../EncryptionExecutor.cs | 20 +--- .../KmsClientWrapper.cs | 106 ++++++++++++++++++ 2 files changed, 109 insertions(+), 17 deletions(-) create mode 100644 src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs diff --git a/src/Confluent.SchemaRegistry.Encryption/EncryptionExecutor.cs b/src/Confluent.SchemaRegistry.Encryption/EncryptionExecutor.cs index 939ccd992..22ee77028 100644 --- a/src/Confluent.SchemaRegistry.Encryption/EncryptionExecutor.cs +++ b/src/Confluent.SchemaRegistry.Encryption/EncryptionExecutor.cs @@ -38,6 +38,7 @@ public static void Register() public static readonly string EncryptKmsType = "encrypt.kms.type"; public static readonly string EncryptDekAlgorithm = "encrypt.dek.algorithm"; public static readonly string EncryptDekExpiryDays = "encrypt.dek.expiry.days"; + public static readonly string EncryptAlternateKmsKeyIds = "encrypt.alternate.kms.key.ids"; public static readonly string KmsTypeSuffix = "://"; @@ -337,7 +338,7 @@ private async Task GetOrCreateDek(RuleContext ctx, int? version) byte[] encryptedDek = null; if (!kek.Shared) { - kmsClient = GetKmsClient(executor.Configs, kek); + kmsClient = new KmsClientWrapper(executor.Configs, kek); // Generate new dek byte[] rawDek = cryptor.GenerateKey(); encryptedDek = await kmsClient.Encrypt(rawDek) @@ -363,7 +364,7 @@ private async Task GetOrCreateDek(RuleContext ctx, int? version) { if (kmsClient == null) { - kmsClient = GetKmsClient(executor.Configs, kek); + kmsClient = new KmsClientWrapper(executor.Configs, kek); } byte[] rawDek = await kmsClient.Decrypt(dek.EncryptedKeyMaterialBytes) @@ -566,21 +567,6 @@ private byte[] PrefixVersion(int version, byte[] ciphertext) } } } - - private static IKmsClient GetKmsClient(IEnumerable> configs, RegisteredKek kek) - { - string keyUrl = kek.KmsType + EncryptionExecutor.KmsTypeSuffix + kek.KmsKeyId; - IKmsClient kmsClient = KmsRegistry.GetKmsClient(keyUrl); - if (kmsClient == null) - { - IKmsDriver kmsDriver = KmsRegistry.GetKmsDriver(keyUrl); - kmsClient = kmsDriver.NewKmsClient( - configs.ToDictionary(it => it.Key, it => it.Value), keyUrl); - KmsRegistry.RegisterKmsClient(kmsClient); - } - - return kmsClient; - } } public interface IClock diff --git a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs new file mode 100644 index 000000000..47d38a45a --- /dev/null +++ b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Confluent.SchemaRegistry.Encryption +{ + public class KmsClientWrapper : IKmsClient + { + public IEnumerable> Configs { get; } + + public RegisteredKek Kek { get; } + + public string KekId { get; } + + public IList KmsKeyIds { get; } + + public KmsClientWrapper(IEnumerable> configs, RegisteredKek kek) + { + Configs = configs; + Kek = kek; + KekId = kek.KmsType + EncryptionExecutor.KmsTypeSuffix + kek.KmsKeyId; + KmsKeyIds = GetKmsKeyIds(); + } + + public bool DoesSupport(string uri) + { + return KekId == uri; + } + + public async Task Encrypt(byte[] plaintext) + { + for (int i = 0; i < KmsKeyIds.Count; i++) + { + try + { + IKmsClient kmsClient = GetKmsClient(Configs, Kek.KmsType, KmsKeyIds[i]); + return await kmsClient.Encrypt(plaintext).ConfigureAwait(false); + } + catch (Exception e) + { + if (i == KmsKeyIds.Count - 1) + { + throw new RuleException("Failed to encrypt with all KEKs", e); + } + } + } + return null; + } + + public async Task Decrypt(byte[] ciphertext) + { + for (int i = 0; i < KmsKeyIds.Count; i++) + { + try + { + IKmsClient kmsClient = GetKmsClient(Configs, Kek.KmsType, KmsKeyIds[i]); + return await kmsClient.Decrypt(ciphertext).ConfigureAwait(false); + } + catch (Exception e) + { + if (i == KmsKeyIds.Count - 1) + { + throw new RuleException("Failed to decrypt with all KEKs", e); + } + } + } + return null; + } + + private IList GetKmsKeyIds() + { + IList kmsKeyIds = new List(); + kmsKeyIds.Add(Kek.KmsKeyId); + if (Kek.KmsProps != null) + { + if (Kek.KmsProps.TryGetValue(EncryptionExecutor.EncryptAlternateKmsKeyIds, out string alternateKmsKeyIds)) + { + char[] separators = { ',' }; + string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); + foreach (string id in ids) { + if (!string.IsNullOrEmpty(id)) { + kmsKeyIds.Add(id); + } + } + } + } + return kmsKeyIds; + } + + private static IKmsClient GetKmsClient(IEnumerable> configs, string kmsType, string kmsKeyId) + { + string keyUrl = kmsType + EncryptionExecutor.KmsTypeSuffix + kmsKeyId; + IKmsClient kmsClient = KmsRegistry.GetKmsClient(keyUrl); + if (kmsClient == null) + { + IKmsDriver kmsDriver = KmsRegistry.GetKmsDriver(keyUrl); + kmsClient = kmsDriver.NewKmsClient( + configs.ToDictionary(it => it.Key, it => it.Value), keyUrl); + KmsRegistry.RegisterKmsClient(kmsClient); + } + + return kmsClient; + } + } +} \ No newline at end of file From 2211f0dda24fef74be838f591ddb80d9730f9ebc Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 7 Aug 2025 14:47:02 -0700 Subject: [PATCH 2/5] Minor cleanup --- src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs index 47d38a45a..9d423276c 100644 --- a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs +++ b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs @@ -45,7 +45,7 @@ public async Task Encrypt(byte[] plaintext) } } } - return null; + throw new RuleException("No KEK found for encryption"); } public async Task Decrypt(byte[] ciphertext) @@ -65,7 +65,7 @@ public async Task Decrypt(byte[] ciphertext) } } } - return null; + throw new RuleException("No KEK found for decryption"); } private IList GetKmsKeyIds() From a8ef24523c7a2dce60b0a231e0ec653911bf7ec6 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 8 Aug 2025 15:48:38 -0700 Subject: [PATCH 3/5] Add tests --- .../KmsClientWrapper.cs | 32 +++++++--- .../SerializeDeserialize.cs | 63 +++++++++++++++++++ 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs index 9d423276c..aca7f7423 100644 --- a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs +++ b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs @@ -72,16 +72,32 @@ private IList GetKmsKeyIds() { IList kmsKeyIds = new List(); kmsKeyIds.Add(Kek.KmsKeyId); - if (Kek.KmsProps != null) + string alternateKmsKeyIds = null; + if (Kek.KmsProps != null && Kek.KmsProps.TryGetValue(EncryptionExecutor.EncryptAlternateKmsKeyIds, out alternateKmsKeyIds)) { - if (Kek.KmsProps.TryGetValue(EncryptionExecutor.EncryptAlternateKmsKeyIds, out string alternateKmsKeyIds)) + char[] separators = { ',' }; + string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); + foreach (string id in ids) { + if (!string.IsNullOrEmpty(id)) { + kmsKeyIds.Add(id); + } + } + } else + { + var kvp = Configs.FirstOrDefault(x => + x.Key == EncryptionExecutor.EncryptAlternateKmsKeyIds); + if (!kvp.Equals(default(KeyValuePair))) { - char[] separators = { ',' }; - string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); - foreach (string id in ids) { - if (!string.IsNullOrEmpty(id)) { - kmsKeyIds.Add(id); - } + alternateKmsKeyIds = kvp.Value; + } + } + if (alternateKmsKeyIds != null) + { + char[] separators = { ',' }; + string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); + foreach (string id in ids) { + if (!string.IsNullOrEmpty(id)) { + kmsKeyIds.Add(id); } } } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 1e0a958a5..2a43e49f2 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -649,6 +649,69 @@ public void ISpecificRecordPayloadEncryption() Assert.True(pic.SequenceEqual(result.picture)); } + [Fact] + public void ISpecificRecordEncryptionAlternateKeks() + { + var schemaStr = "{\"type\":\"record\",\"name\":\"UserWithPic\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific" + + "\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\"," + + "\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"picture\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}"; + + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null); + schema.Metadata = new Metadata(new Dictionary> + { + ["Confluent.Kafka.Examples.AvroSpecific.UserWithPic.name"] = new HashSet { "PII" }, + ["Confluent.Kafka.Examples.AvroSpecific.UserWithPic.picture"] = new HashSet { "PII" } + + }, new Dictionary(), new HashSet() + ); + schema.RuleSet = new RuleSet(new List(), new List(), + new List + { + new Rule("encryptPII", RuleKind.Transform, RuleMode.WriteRead, "ENCRYPT_PAYLOAD", null, + new Dictionary + { + ["encrypt.kek.name"] = "kek1", + ["encrypt.kms.type"] = "local-kms", + ["encrypt.kms.key.id"] = "mykey" + }) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + config.Set("rules.secret", "mysecret"); + config.Set("rules.encrypt.alternate.kms.key.ids", "mykey2,mykey3"); + RuleRegistry ruleRegistry = new RuleRegistry(); + IRuleExecutor ruleExecutor = new EncryptionExecutor(dekRegistryClient, clock); + ruleRegistry.RegisterExecutor(ruleExecutor); + var serializer = new AvroSerializer(schemaRegistryClient, config, ruleRegistry); + var deserializer = new AvroDeserializer(schemaRegistryClient, null, ruleRegistry); + + var pic = new byte[] { 1, 2, 3 }; + var user = new UserWithPic() + { + favorite_color = "blue", + favorite_number = 100, + name = "awesome", + picture = pic + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + // The user name has been modified + Assert.Equal("awesome", result.name); + Assert.Equal(user.favorite_color, result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + Assert.True(pic.SequenceEqual(result.picture)); + } + [Fact] public void ISpecificRecordFieldEncryptionDekRotation() { From 7e4a106ba40453d2550a9b32dea49fd5d2967da8 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 8 Aug 2025 16:07:49 -0700 Subject: [PATCH 4/5] Add test --- .../KmsClientWrapper.cs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs index aca7f7423..2c2714a2f 100644 --- a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs +++ b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs @@ -73,16 +73,12 @@ private IList GetKmsKeyIds() IList kmsKeyIds = new List(); kmsKeyIds.Add(Kek.KmsKeyId); string alternateKmsKeyIds = null; - if (Kek.KmsProps != null && Kek.KmsProps.TryGetValue(EncryptionExecutor.EncryptAlternateKmsKeyIds, out alternateKmsKeyIds)) + if (Kek.KmsProps != null) { - char[] separators = { ',' }; - string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); - foreach (string id in ids) { - if (!string.IsNullOrEmpty(id)) { - kmsKeyIds.Add(id); - } - } - } else + Kek.KmsProps.TryGetValue(EncryptionExecutor.EncryptAlternateKmsKeyIds, + out alternateKmsKeyIds); + } + if (string.IsNullOrEmpty(alternateKmsKeyIds)) { var kvp = Configs.FirstOrDefault(x => x.Key == EncryptionExecutor.EncryptAlternateKmsKeyIds); @@ -91,12 +87,14 @@ private IList GetKmsKeyIds() alternateKmsKeyIds = kvp.Value; } } - if (alternateKmsKeyIds != null) + if (!string.IsNullOrEmpty(alternateKmsKeyIds)) { char[] separators = { ',' }; string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); - foreach (string id in ids) { - if (!string.IsNullOrEmpty(id)) { + foreach (string id in ids) + { + if (!string.IsNullOrEmpty(id)) + { kmsKeyIds.Add(id); } } From eb23e0b5ae36b02c634eeefb060ee93313ba6356 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 8 Aug 2025 16:21:27 -0700 Subject: [PATCH 5/5] Minor cleanup --- src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs index 2c2714a2f..fd13e2d06 100644 --- a/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs +++ b/src/Confluent.SchemaRegistry.Encryption/KmsClientWrapper.cs @@ -89,8 +89,7 @@ private IList GetKmsKeyIds() } if (!string.IsNullOrEmpty(alternateKmsKeyIds)) { - char[] separators = { ',' }; - string[] ids = alternateKmsKeyIds.Split(separators, StringSplitOptions.RemoveEmptyEntries); + string[] ids = alternateKmsKeyIds.Split(',', StringSplitOptions.RemoveEmptyEntries); foreach (string id in ids) { if (!string.IsNullOrEmpty(id))