From b98663c2fa8644364c639b9e08de94fc7fae47ec Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 14:03:00 -0400 Subject: [PATCH 01/14] First pass at Java GBEK (AI generated) --- sdks/java/core/build.gradle | 1 + .../apache/beam/sdk/transforms/GcpSecret.java | 52 ++++++ .../sdk/transforms/GroupByEncryptedKey.java | 138 ++++++++++++++ .../apache/beam/sdk/transforms/Secret.java | 31 ++++ .../transforms/GroupByEncryptedKeyTest.java | 169 ++++++++++++++++++ 5 files changed, 391 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e849ae597791..8ca227a6ae8f 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,6 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation 'com.google.cloud:google-cloud-secretmanager' permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java new file mode 100644 index 000000000000..00271dd7ad86 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import java.io.IOException; + +/** + * A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ +public class GcpSecret implements Secret { + private final String version_name; + + /** + * Initializes a GcpSecret object. + * + * @param version_name The full version name of the secret in Google Cloud Secret Manager. For + * example: projects//secrets//versions/1. For more info, see + * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version + */ + public GcpSecret(String version_name) { + this.version_name = version_name; + } + + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.parse(version_name); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve secret bytes", e); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java new file mode 100644 index 000000000000..29fe57e5d856 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link GroupByKey}. + * + *

This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * GroupByKey} on the encrypted keys, and then decrypts the keys in the output. This is useful when + * the keys contain sensitive data that should not be stored at rest by the runner. + */ +public class GroupByEncryptedKey + extends PTransform>, PCollection>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + public static GroupByEncryptedKey create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection>> expand(PCollection> input) { + return input + .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>())) + .apply(GroupByKey.create()) + .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>())); + } + + private static class _EncryptMessage extends DoFn, KV>> { + private final Secret hmacKey; + private transient Mac mac; + private transient Cipher cipher; + + _EncryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + byte[] encodedKey = encode(keyCoder, c.element().getKey()); + byte[] encodedValue = encode(valueCoder, c.element().getValue()); + + byte[] hmac = mac.doFinal(encodedKey); + byte[] encryptedKey = cipher.doFinal(encodedKey); + byte[] encryptedValue = cipher.doFinal(encodedValue); + + c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue))); + } + + private byte[] encode(Coder coder, T value) throws Exception { + java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream(); + coder.encode(value, os); + return os.toByteArray(); + } + } + + private static class _DecryptMessage + extends DoFn>>, KV>> { + private final Secret hmacKey; + private transient Cipher cipher; + + _DecryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + java.util.Map> decryptedKvs = new java.util.HashMap<>(); + for (KV encryptedKv : c.element().getValue()) { + byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); + K key = decode(keyCoder, decryptedKeyBytes); + + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + V value = decode(valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } + + for (java.util.Map.Entry> entry : decryptedKvs.entrySet()) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + + private T decode(Coder coder, byte[] bytes) throws Exception { + java.io.ByteArrayInputStream is = new java.io.ByteArrayInputStream(bytes); + return coder.decode(is); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java new file mode 100644 index 000000000000..ae34fb6ee630 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +/** + * A secret management interface used for handling sensitive data. + * + *

This interface provides a generic way to handle secrets. Implementations of this interface + * should handle fetching secrets from a secret management system. + */ +public interface Secret extends Serializable { + /** Returns the secret as a byte array. */ + byte[] getSecretBytes(); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java new file mode 100644 index 000000000000..5860448140fd --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GroupByEncryptedKey}. */ +@RunWith(JUnit4.class) +public class GroupByEncryptedKeyTest implements Serializable { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + private static class FakeSecret implements Secret { + private final byte[] secret = "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(); + + @Override + public byte[] getSecretBytes() { + return secret; + } + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyFakeSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(new FakeSecret())); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private Secret gcpSecret; + + @Before + public void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + } + + @After + public void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(gcpSecret)); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecretThrows() { + Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); + assertThrows(RuntimeException.class, () -> p.run()); + } +} From c5aacfb2556367bb2261c49f6b414ff91d46fb4a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 16:39:30 -0400 Subject: [PATCH 02/14] Compile --- .../apache/beam/sdk/transforms/GcpSecret.java | 4 +- .../sdk/transforms/GroupByEncryptedKey.java | 103 +++++++++++------- .../transforms/GroupByEncryptedKeyTest.java | 21 ++-- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java index 00271dd7ad86..b67431b5c00f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -22,9 +22,7 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import java.io.IOException; -/** - * A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. - */ +/** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ public class GcpSecret implements Secret { private final String version_name; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 29fe57e5d856..204cddda4a84 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import java.util.Arrays; import javax.crypto.Cipher; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; @@ -26,11 +27,13 @@ import org.apache.beam.sdk.values.PCollection; /** - * A {@link PTransform} that provides a secure alternative to {@link GroupByKey}. + * A {@link PTransform} that provides a secure alternative to {@link + * org.apache.beam.sdk.transforms.GroupByKey}. * *

This transform encrypts the keys of the input {@link PCollection}, performs a {@link - * GroupByKey} on the encrypted keys, and then decrypts the keys in the output. This is useful when - * the keys contain sensitive data that should not be stored at rest by the runner. + * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in + * the output. This is useful when the keys contain sensitive data that should not be stored at rest + * by the runner. */ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { @@ -47,36 +50,48 @@ public static GroupByEncryptedKey create(Secret hmacKey) { @Override public PCollection>> expand(PCollection> input) { + Coder> inputCoder = input.getCoder(); + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByEncryptedKey requires its input to use KvCoder"); + } + KvCoder inputKvCoder = (KvCoder) inputCoder; + Coder keyCoder = inputKvCoder.getKeyCoder(); + Coder valueCoder = inputKvCoder.getValueCoder(); + return input - .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>())) + .apply("EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) .apply(GroupByKey.create()) - .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>())); + .apply( + "DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))); } - private static class _EncryptMessage extends DoFn, KV>> { + private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; - private transient Mac mac; - private transient Cipher cipher; + private final Coder keyCoder; + private final Coder valueCoder; + private final Mac mac; + private final Cipher cipher; - _EncryptMessage(Secret hmacKey) { + EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; - } - - @Setup - public void setup() throws Exception { - mac = Mac.getInstance("HmacSHA256"); - mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + + try { + this.mac = Mac.getInstance("HmacSHA256"); + this.mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + this.cipher = Cipher.getInstance("AES"); + this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } } @ProcessElement public void processElement(ProcessContext c) throws Exception { - Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); - Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); - - byte[] encodedKey = encode(keyCoder, c.element().getKey()); - byte[] encodedValue = encode(valueCoder, c.element().getValue()); + byte[] encodedKey = encode(this.keyCoder, c.element().getKey()); + byte[] encodedValue = encode(this.valueCoder, c.element().getValue()); byte[] hmac = mac.doFinal(encodedKey); byte[] encryptedKey = cipher.doFinal(encodedKey); @@ -92,37 +107,45 @@ private byte[] encode(Coder coder, T value) throws Exception { } } - private static class _DecryptMessage + private static class DecryptMessage extends DoFn>>, KV>> { private final Secret hmacKey; + private final Coder keyCoder; + private final Coder valueCoder; private transient Cipher cipher; - _DecryptMessage(Secret hmacKey) { + DecryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; - } - - @Setup - public void setup() throws Exception { - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + + try { + this.cipher = Cipher.getInstance("AES"); + this.cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } } @ProcessElement public void processElement(ProcessContext c) throws Exception { - Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); - Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); - java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); - K key = decode(keyCoder, decryptedKeyBytes); - - if (!decryptedKvs.containsKey(key)) { - decryptedKvs.put(key, new java.util.ArrayList<>()); + K key = decode(this.keyCoder, decryptedKeyBytes); + + if (key != null) { + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + V value = decode(this.valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } else { + throw new RuntimeException( + "Found null key when decoding " + Arrays.toString(decryptedKeyBytes)); } - byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); - V value = decode(valueCoder, decryptedValueBytes); - decryptedKvs.get(key).add(value); } for (java.util.Map.Entry> entry : decryptedKvs.entrySet()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 5860448140fd..456f9d770da7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -17,7 +17,14 @@ */ package org.apache.beam.sdk.transforms; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; import java.io.Serializable; +import java.security.SecureRandom; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.coders.KvCoder; @@ -28,18 +35,11 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import org.junit.After; -import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -162,8 +162,7 @@ public void testGroupByKeyGcpSecret() { @Category(NeedsRunner.class) public void testGroupByKeyGcpSecretThrows() { Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))) - .apply(GroupByEncryptedKey.create(gcpSecret)); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByEncryptedKey.create(gcpSecret)); assertThrows(RuntimeException.class, () -> p.run()); } } From 45ed98fd22c446e956569fe78a6b1cb6c3e94e5a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 16:54:49 -0400 Subject: [PATCH 03/14] Compiletest --- .../apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 456f9d770da7..a093e4d4a5fb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static org.junit.Assert.assertThrows; + import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -24,6 +26,7 @@ import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.Arrays; import java.util.List; @@ -50,7 +53,8 @@ public class GroupByEncryptedKeyTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create(); private static class FakeSecret implements Secret { - private final byte[] secret = "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(); + private final byte[] secret = + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { From 4bf3c9a58169547a7e233de0670386245cb898e3 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 17:28:28 -0400 Subject: [PATCH 04/14] checkstyle --- .../main/resources/beam/checkstyle/suppressions.xml | 1 + .../java/org/apache/beam/sdk/transforms/GcpSecret.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index e8d4e8888da1..5ee8872d006e 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -56,6 +56,7 @@ + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java index b67431b5c00f..d7d04be23a83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -24,23 +24,23 @@ /** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ public class GcpSecret implements Secret { - private final String version_name; + private final String versionName; /** * Initializes a GcpSecret object. * - * @param version_name The full version name of the secret in Google Cloud Secret Manager. For + * @param versionName The full version name of the secret in Google Cloud Secret Manager. For * example: projects//secrets//versions/1. For more info, see * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version */ - public GcpSecret(String version_name) { - this.version_name = version_name; + public GcpSecret(String versionName) { + this.versionName = versionName; } @Override public byte[] getSecretBytes() { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { - SecretVersionName secretVersionName = SecretVersionName.parse(version_name); + SecretVersionName secretVersionName = SecretVersionName.parse(versionName); AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); return response.getPayload().getData().toByteArray(); } catch (IOException e) { From c62d8e0d68acbba733bada273636f9af294b3952 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 11:47:40 -0400 Subject: [PATCH 05/14] tests passing --- sdks/java/core/build.gradle | 3 +- .../sdk/transforms/GroupByEncryptedKey.java | 46 +++++++++++------ .../transforms/GroupByEncryptedKeyTest.java | 49 ++++++++++++------- 3 files changed, 65 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 8ca227a6ae8f..9fbd9a70385d 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,7 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - implementation 'com.google.cloud:google-cloud-secretmanager' + implementation 'com.google.cloud:google-cloud-secretmanager:2.75.0' permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema @@ -124,6 +124,7 @@ dependencies { shadowTest library.java.log4j shadowTest library.java.log4j2_api shadowTest library.java.jamm + shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0' testRuntimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 204cddda4a84..ef7a21ee7e43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -22,6 +22,7 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -58,30 +59,40 @@ public PCollection>> expand(PCollection> input) { Coder keyCoder = inputKvCoder.getKeyCoder(); Coder valueCoder = inputKvCoder.getValueCoder(); - return input - .apply("EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(GroupByKey.create()) - .apply( - "DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))); + PCollection>>> grouped = + input + .apply( + "EncryptMessage", + ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .apply(GroupByKey.create()); + + return grouped + .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); } + @SuppressWarnings("initialization.fields.uninitialized") private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; private final Coder keyCoder; private final Coder valueCoder; - private final Mac mac; - private final Cipher cipher; + private transient Mac mac; + private transient Cipher cipher; EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; this.keyCoder = keyCoder; this.valueCoder = valueCoder; + } + @Setup + public void setup() { try { this.mac = Mac.getInstance("HmacSHA256"); - this.mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + this.mac.init(new SecretKeySpec(this.hmacKey.getSecretBytes(), "HmacSHA256")); this.cipher = Cipher.getInstance("AES"); - this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.cipher.init( + Cipher.ENCRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -93,9 +104,9 @@ public void processElement(ProcessContext c) throws Exception { byte[] encodedKey = encode(this.keyCoder, c.element().getKey()); byte[] encodedValue = encode(this.valueCoder, c.element().getValue()); - byte[] hmac = mac.doFinal(encodedKey); - byte[] encryptedKey = cipher.doFinal(encodedKey); - byte[] encryptedValue = cipher.doFinal(encodedValue); + byte[] hmac = this.mac.doFinal(encodedKey); + byte[] encryptedKey = this.cipher.doFinal(encodedKey); + byte[] encryptedValue = this.cipher.doFinal(encodedValue); c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue))); } @@ -107,6 +118,7 @@ private byte[] encode(Coder coder, T value) throws Exception { } } + @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage extends DoFn>>, KV>> { private final Secret hmacKey; @@ -118,10 +130,14 @@ private static class DecryptMessage this.hmacKey = hmacKey; this.keyCoder = keyCoder; this.valueCoder = valueCoder; + } + @Setup + public void setup() { try { this.cipher = Cipher.getInstance("AES"); - this.cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.cipher.init( + Cipher.DECRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -132,14 +148,14 @@ private static class DecryptMessage public void processElement(ProcessContext c) throws Exception { java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { - byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); + byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKv.getKey()); K key = decode(this.keyCoder, decryptedKeyBytes); if (key != null) { if (!decryptedKvs.containsKey(key)) { decryptedKvs.put(key, new java.util.ArrayList<>()); } - byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + byte[] decryptedValueBytes = this.cipher.doFinal(encryptedKv.getValue()); V value = decode(this.valueCoder, decryptedValueBytes); decryptedKvs.get(key).add(value); } else { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index a093e4d4a5fb..5ccb1ec41cc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -30,6 +30,8 @@ import java.security.SecureRandom; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -38,8 +40,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -54,7 +56,7 @@ public class GroupByEncryptedKeyTest implements Serializable { private static class FakeSecret implements Secret { private final byte[] secret = - "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(Charset.defaultCharset()); + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { @@ -81,13 +83,13 @@ public void testGroupByKeyFakeSecret() { .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); PCollection>> output = - input.apply(GroupByEncryptedKey.create(new FakeSecret())); + input.apply(GroupByEncryptedKey.create(new FakeSecret())); - PAssert.that(output) + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) .containsInAnyOrder( KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), KV.of("k3", Arrays.asList(0))); p.run(); @@ -95,10 +97,10 @@ public void testGroupByKeyFakeSecret() { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; - private Secret gcpSecret; + private static Secret gcpSecret; - @Before - public void setup() throws IOException { + @BeforeClass + public static void setup() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); ProjectName projectName = ProjectName.of(PROJECT_ID); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); @@ -124,8 +126,8 @@ public void setup() throws IOException { gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } - @After - public void tearDown() throws IOException { + @AfterClass + public static void tearDown() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); client.deleteSecret(secretName); @@ -150,13 +152,13 @@ public void testGroupByKeyGcpSecret() { .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); PCollection>> output = - input.apply(GroupByEncryptedKey.create(gcpSecret)); + input.apply(GroupByEncryptedKey.create(gcpSecret)); - PAssert.that(output) + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) .containsInAnyOrder( KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), KV.of("k3", Arrays.asList(0))); p.run(); @@ -166,7 +168,20 @@ public void testGroupByKeyGcpSecret() { @Category(NeedsRunner.class) public void testGroupByKeyGcpSecretThrows() { Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByEncryptedKey.create(gcpSecret)); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); assertThrows(RuntimeException.class, () -> p.run()); } + + private static class SortValues + extends SimpleFunction>, KV>> { + @Override + public KV> apply(KV> input) { + List sorted = + StreamSupport.stream(input.getValue().spliterator(), false) + .sorted() + .collect(Collectors.toList()); + return KV.of(input.getKey(), sorted); + } + } } From 9230d2e87fd96d9f1e7687cc5904b2f39394c4cb Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 12:08:08 -0400 Subject: [PATCH 06/14] Move secret code into utils --- .../org/apache/beam/sdk/transforms/GroupByEncryptedKey.java | 1 + .../org/apache/beam/sdk/{transforms => util}/GcpSecret.java | 2 +- .../java/org/apache/beam/sdk/{transforms => util}/Secret.java | 2 +- .../org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 2 ++ 4 files changed, 5 insertions(+), 2 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{transforms => util}/GcpSecret.java (98%) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{transforms => util}/Secret.java (96%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index ef7a21ee7e43..de50626596fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index d7d04be23a83..8a33f19d33d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.sdk.util; import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java similarity index 96% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index ae34fb6ee630..b9c974cee352 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.sdk.util; import java.io.Serializable; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 5ccb1ec41cc5..ba4c50e5a41e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -38,6 +38,8 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpSecret; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.AfterClass; From 56b7130a083807452823385376654dffabc1af0b Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 13:24:54 -0400 Subject: [PATCH 07/14] Use secret manager from bom --- sdks/java/core/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 9fbd9a70385d..5319b5e3f82c 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,7 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - implementation 'com.google.cloud:google-cloud-secretmanager:2.75.0' + implementation library.java.google_cloud_secret_manager permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema From 9f2204fca4b71a5e21783a9bfe2930396e8765a6 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 13:29:02 -0400 Subject: [PATCH 08/14] Docs --- .../apache/beam/sdk/transforms/GroupByEncryptedKey.java | 8 ++++++++ .../src/main/java/org/apache/beam/sdk/util/GcpSecret.java | 6 ++++-- .../src/main/java/org/apache/beam/sdk/util/Secret.java | 6 +++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index de50626596fd..03e9ba41bbf7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -46,6 +46,14 @@ private GroupByEncryptedKey(Secret hmacKey) { this.hmacKey = hmacKey; } + /** + * Creates a {@link GroupByEncryptedKey} transform. + * + * @param hmacKey The secret key to use for HMAC. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ public static GroupByEncryptedKey create(Secret hmacKey) { return new GroupByEncryptedKey<>(hmacKey); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 8a33f19d33d9..2fcea6f0619f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -22,12 +22,14 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import java.io.IOException; -/** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ +/** + * A {@link Secret} manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ public class GcpSecret implements Secret { private final String versionName; /** - * Initializes a GcpSecret object. + * Initializes a {@link GcpSecret} object. * * @param versionName The full version name of the secret in Google Cloud Secret Manager. For * example: projects//secrets//versions/1. For more info, see diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index b9c974cee352..874fc4efd8b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -26,6 +26,10 @@ * should handle fetching secrets from a secret management system. */ public interface Secret extends Serializable { - /** Returns the secret as a byte array. */ + /** + * Returns the secret as a byte array. + * + * @return The secret as a byte array. + */ byte[] getSecretBytes(); } From 25d96bd2b788ff15112c695d3320042fe7ceff60 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 15:42:07 -0400 Subject: [PATCH 09/14] Better docs --- .../sdk/transforms/GroupByEncryptedKey.java | 31 ++++++++++++++++++- .../org/apache/beam/sdk/util/GcpSecret.java | 6 ++++ .../java/org/apache/beam/sdk/util/Secret.java | 3 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 03e9ba41bbf7..f7b2958de9c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -36,6 +36,17 @@ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. + * + * The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + * + * Note the following caveats: + * 1) Runners can implement arbitrary materialization steps, so this does not guarantee that the + * whole pipeline will not have unencrypted data at rest by itself. + * 2) If using this transform in streaming mode, this transform may not properly handle update + * compatibility checks around coders. This means that an improper update could lead to invalid + * coders, causing pipeline failure or data corruption. If you need to update, make sure that the + * input type passed into this transform does not change. */ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { @@ -49,7 +60,7 @@ private GroupByEncryptedKey(Secret hmacKey) { /** * Creates a {@link GroupByEncryptedKey} transform. * - * @param hmacKey The secret key to use for HMAC. + * @param hmacKey The {@link Secret} key to use for encryption. * @param The type of the keys in the input PCollection. * @param The type of the values in the input PCollection. * @return A {@link GroupByEncryptedKey} transform. @@ -80,6 +91,12 @@ public PCollection>> expand(PCollection> input) { .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); } + /** + * A {@link PTransform} that encrypts the key and value of an element. + * + * The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a KV pair of the encrypted key and value. + */ @SuppressWarnings("initialization.fields.uninitialized") private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; @@ -127,6 +144,18 @@ private byte[] encode(Coder coder, T value) throws Exception { } } + /** + * A {@link PTransform} that decrypts the key and values of an element. + * + * The input PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a list of KV pairs of the encrypted key and value. + * + * This will return a tuple containing the decrypted key and a list of decrypted values. + * + * Since there is some loss of precision in the HMAC encoding of the key (but not the key + * encryption), there is some extra work done here to ensure that all key/value pairs are + * mapped out appropriately. + */ @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage extends DoFn>>, KV>> { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 2fcea6f0619f..d19338b1afba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -39,6 +39,12 @@ public GcpSecret(String versionName) { this.versionName = versionName; } + /** + * Returns the secret as a byte array. Assumes that the current active service account + * has permissions to read the secret. + * + * @return The secret as a byte array. + */ @Override public byte[] getSecretBytes() { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index 874fc4efd8b3..269a94886311 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,7 +23,8 @@ * A secret management interface used for handling sensitive data. * *

This interface provides a generic way to handle secrets. Implementations of this interface - * should handle fetching secrets from a secret management system. + * should handle fetching secrets from a secret management system. The underlying secret + * management system should be able to return a valid byte array representing the secret. */ public interface Secret extends Serializable { /** From cbd441abd64dfd0776f8104acf8d8f579be00087 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 29 Sep 2025 13:59:19 -0400 Subject: [PATCH 10/14] Updates --- sdks/java/core/build.gradle | 2 + .../sdk/transforms/GroupByEncryptedKey.java | 44 +++++++++++-------- .../org/apache/beam/sdk/util/GcpSecret.java | 4 +- .../java/org/apache/beam/sdk/util/Secret.java | 4 +- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 5319b5e3f82c..953caee27793 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -101,6 +101,8 @@ dependencies { shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) implementation library.java.google_cloud_secret_manager + implementation library.java.proto_google_cloud_secret_manager_v1 + implementation library.java.protobuf_java permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index f7b2958de9c8..0e5a873799a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -22,6 +22,7 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.util.Secret; @@ -36,14 +37,13 @@ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. - * - * The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * + *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. - * - * Note the following caveats: - * 1) Runners can implement arbitrary materialization steps, so this does not guarantee that the - * whole pipeline will not have unencrypted data at rest by itself. - * 2) If using this transform in streaming mode, this transform may not properly handle update + * + *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this + * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) + * If using this transform in streaming mode, this transform may not properly handle update * compatibility checks around coders. This means that an improper update could lead to invalid * coders, causing pipeline failure or data corruption. If you need to update, make sure that the * input type passed into this transform does not change. @@ -77,6 +77,14 @@ public PCollection>> expand(PCollection> input) { } KvCoder inputKvCoder = (KvCoder) inputCoder; Coder keyCoder = inputKvCoder.getKeyCoder(); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException( + "the keyCoder of a GroupByEncryptedKey must be deterministic", e); + } + Coder valueCoder = inputKvCoder.getValueCoder(); PCollection>>> grouped = @@ -93,8 +101,8 @@ public PCollection>> expand(PCollection> input) { /** * A {@link PTransform} that encrypts the key and value of an element. - * - * The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * + *

The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, * and the value being a KV pair of the encrypted key and value. */ @SuppressWarnings("initialization.fields.uninitialized") @@ -146,15 +154,15 @@ private byte[] encode(Coder coder, T value) throws Exception { /** * A {@link PTransform} that decrypts the key and values of an element. - * - * The input PCollection will be a KV pair with the key being the HMAC of the encoded key, - * and the value being a list of KV pairs of the encrypted key and value. - * - * This will return a tuple containing the decrypted key and a list of decrypted values. - * - * Since there is some loss of precision in the HMAC encoding of the key (but not the key - * encryption), there is some extra work done here to ensure that all key/value pairs are - * mapped out appropriately. + * + *

The input PCollection will be a KV pair with the key being the HMAC of the encoded key, and + * the value being a list of KV pairs of the encrypted key and value. + * + *

This will return a tuple containing the decrypted key and a list of decrypted values. + * + *

Since there is some loss of precision in the HMAC encoding of the key (but not the key + * encryption), there is some extra work done here to ensure that all key/value pairs are mapped + * out appropriately. */ @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index d19338b1afba..80bc3a54535e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -40,8 +40,8 @@ public GcpSecret(String versionName) { } /** - * Returns the secret as a byte array. Assumes that the current active service account - * has permissions to read the secret. + * Returns the secret as a byte array. Assumes that the current active service account has + * permissions to read the secret. * * @return The secret as a byte array. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index 269a94886311..fe476ef6cb1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,8 +23,8 @@ * A secret management interface used for handling sensitive data. * *

This interface provides a generic way to handle secrets. Implementations of this interface - * should handle fetching secrets from a secret management system. The underlying secret - * management system should be able to return a valid byte array representing the secret. + * should handle fetching secrets from a secret management system. The underlying secret management + * system should be able to return a valid byte array representing the secret. */ public interface Secret extends Serializable { /** From db5670b56c0b30bbf281b83b194cf2f2ba9ec663 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 15:16:43 -0400 Subject: [PATCH 11/14] Update encryption mode --- .../sdk/transforms/GroupByEncryptedKey.java | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 0e5a873799a2..ce4abef9e144 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -20,6 +20,7 @@ import java.util.Arrays; import javax.crypto.Cipher; import javax.crypto.Mac; +import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; @@ -112,6 +113,7 @@ private static class EncryptMessage extends DoFn, KV valueCoder; private transient Mac mac; private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -122,11 +124,11 @@ private static class EncryptMessage extends DoFn, KV byte[] encode(Coder coder, T value) throws Exception { @@ -171,6 +188,7 @@ private static class DecryptMessage private final Coder keyCoder; private final Coder valueCoder; private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; DecryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -181,9 +199,8 @@ private static class DecryptMessage @Setup public void setup() { try { - this.cipher = Cipher.getInstance("AES"); - this.cipher.init( - Cipher.DECRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -194,14 +211,27 @@ public void setup() { public void processElement(ProcessContext c) throws Exception { java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { - byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKv.getKey()); + byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12); + GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedKey = + Arrays.copyOfRange(encryptedKv.getKey(), 12, encryptedKv.getKey().length); + byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey); K key = decode(this.keyCoder, decryptedKeyBytes); if (key != null) { if (!decryptedKvs.containsKey(key)) { decryptedKvs.put(key, new java.util.ArrayList<>()); } - byte[] decryptedValueBytes = this.cipher.doFinal(encryptedKv.getValue()); + + iv = Arrays.copyOfRange(encryptedKv.getValue(), 0, 12); + gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedValue = + Arrays.copyOfRange(encryptedKv.getValue(), 12, encryptedKv.getValue().length); + byte[] decryptedValueBytes = this.cipher.doFinal(encryptedValue); V value = decode(this.valueCoder, decryptedValueBytes); decryptedKvs.get(key).add(value); } else { From 23d9a0153f277e08e6ff85bde036311c9b30cd75 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 16:04:41 -0400 Subject: [PATCH 12/14] checkstyle --- .../src/main/resources/beam/checkstyle/suppressions.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 5ee8872d006e..c103ab7f5b1d 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -52,6 +52,7 @@ + From c6f607085174d43bf52dddaa6c01772df1f67de6 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 21:02:47 -0400 Subject: [PATCH 13/14] explicitly add dep --- sdks/java/core/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 953caee27793..4a6d2f11973e 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -106,6 +106,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation library.java.guava implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit From 9b43d74b844a905c18fc30042a150f28733c0d52 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 21:39:30 -0400 Subject: [PATCH 14/14] spotbugs: only create generator once --- .../apache/beam/sdk/transforms/GroupByEncryptedKey.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index ce4abef9e144..e927efad44af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -114,6 +114,7 @@ private static class EncryptMessage extends DoFn, KV keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -133,6 +134,7 @@ public void setup() { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); } + this.generator = new java.security.SecureRandom(); } @ProcessElement @@ -144,9 +146,8 @@ public void processElement(ProcessContext c) throws Exception { byte[] keyIv = new byte[12]; byte[] valueIv = new byte[12]; - java.security.SecureRandom generator = new java.security.SecureRandom(); - generator.nextBytes(keyIv); - generator.nextBytes(valueIv); + this.generator.nextBytes(keyIv); + this.generator.nextBytes(valueIv); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, keyIv); this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, gcmParameterSpec); byte[] encryptedKey = this.cipher.doFinal(encodedKey);