diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index e8d4e8888da1..c103ab7f5b1d 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -52,10 +52,12 @@ + + diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e849ae597791..4a6d2f11973e 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,9 +100,13 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.google_cloud_secret_manager + implementation library.java.proto_google_cloud_secret_manager_v1 + implementation library.java.protobuf_java permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation library.java.guava implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit @@ -123,6 +127,7 @@ dependencies { shadowTest library.java.log4j shadowTest library.java.log4j2_api shadowTest library.java.jamm + shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0' testRuntimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java new file mode 100644 index 000000000000..e927efad44af --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.Arrays; +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.Secret; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link + * org.apache.beam.sdk.transforms.GroupByKey}. + * + *

This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in + * the output. This is useful when the keys contain sensitive data that should not be stored at rest + * by the runner. + * + *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + * + *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this + * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) + * If using this transform in streaming mode, this transform may not properly handle update + * compatibility checks around coders. This means that an improper update could lead to invalid + * coders, causing pipeline failure or data corruption. If you need to update, make sure that the + * input type passed into this transform does not change. + */ +public class GroupByEncryptedKey + extends PTransform>, PCollection>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + /** + * Creates a {@link GroupByEncryptedKey} transform. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static GroupByEncryptedKey create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection>> expand(PCollection> input) { + Coder> inputCoder = input.getCoder(); + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByEncryptedKey requires its input to use KvCoder"); + } + KvCoder inputKvCoder = (KvCoder) inputCoder; + Coder keyCoder = inputKvCoder.getKeyCoder(); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException( + "the keyCoder of a GroupByEncryptedKey must be deterministic", e); + } + + Coder valueCoder = inputKvCoder.getValueCoder(); + + PCollection>>> grouped = + input + .apply( + "EncryptMessage", + ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .apply(GroupByKey.create()); + + return grouped + .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); + } + + /** + * A {@link PTransform} that encrypts the key and value of an element. + * + *

The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a KV pair of the encrypted key and value. + */ + @SuppressWarnings("initialization.fields.uninitialized") + private static class EncryptMessage extends DoFn, KV>> { + private final Secret hmacKey; + private final Coder keyCoder; + private final Coder valueCoder; + private transient Mac mac; + private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; + private transient java.security.SecureRandom generator; + + EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { + this.hmacKey = hmacKey; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Setup + public void setup() { + try { + byte[] secretBytes = this.hmacKey.getSecretBytes(); + this.mac = Mac.getInstance("HmacSHA256"); + this.mac.init(new SecretKeySpec(secretBytes, "HmacSHA256")); + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + this.secretKeySpec = new SecretKeySpec(secretBytes, "AES"); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } + this.generator = new java.security.SecureRandom(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + byte[] encodedKey = encode(this.keyCoder, c.element().getKey()); + byte[] encodedValue = encode(this.valueCoder, c.element().getValue()); + + byte[] hmac = this.mac.doFinal(encodedKey); + + byte[] keyIv = new byte[12]; + byte[] valueIv = new byte[12]; + this.generator.nextBytes(keyIv); + this.generator.nextBytes(valueIv); + GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, keyIv); + this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + byte[] encryptedKey = this.cipher.doFinal(encodedKey); + gcmParameterSpec = new GCMParameterSpec(128, valueIv); + this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + byte[] encryptedValue = this.cipher.doFinal(encodedValue); + + c.output( + KV.of( + hmac, + KV.of( + com.google.common.primitives.Bytes.concat(keyIv, encryptedKey), + com.google.common.primitives.Bytes.concat(valueIv, encryptedValue)))); + } + + private byte[] encode(Coder coder, T value) throws Exception { + java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream(); + coder.encode(value, os); + return os.toByteArray(); + } + } + + /** + * A {@link PTransform} that decrypts the key and values of an element. + * + *

The input PCollection will be a KV pair with the key being the HMAC of the encoded key, and + * the value being a list of KV pairs of the encrypted key and value. + * + *

This will return a tuple containing the decrypted key and a list of decrypted values. + * + *

Since there is some loss of precision in the HMAC encoding of the key (but not the key + * encryption), there is some extra work done here to ensure that all key/value pairs are mapped + * out appropriately. + */ + @SuppressWarnings("initialization.fields.uninitialized") + private static class DecryptMessage + extends DoFn>>, KV>> { + private final Secret hmacKey; + private final Coder keyCoder; + private final Coder valueCoder; + private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; + + DecryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { + this.hmacKey = hmacKey; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Setup + public void setup() { + try { + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + java.util.Map> decryptedKvs = new java.util.HashMap<>(); + for (KV encryptedKv : c.element().getValue()) { + byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12); + GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedKey = + Arrays.copyOfRange(encryptedKv.getKey(), 12, encryptedKv.getKey().length); + byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey); + K key = decode(this.keyCoder, decryptedKeyBytes); + + if (key != null) { + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + + iv = Arrays.copyOfRange(encryptedKv.getValue(), 0, 12); + gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedValue = + Arrays.copyOfRange(encryptedKv.getValue(), 12, encryptedKv.getValue().length); + byte[] decryptedValueBytes = this.cipher.doFinal(encryptedValue); + V value = decode(this.valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } else { + throw new RuntimeException( + "Found null key when decoding " + Arrays.toString(decryptedKeyBytes)); + } + } + + for (java.util.Map.Entry> entry : decryptedKvs.entrySet()) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + + private T decode(Coder coder, byte[] bytes) throws Exception { + java.io.ByteArrayInputStream is = new java.io.ByteArrayInputStream(bytes); + return coder.decode(is); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java new file mode 100644 index 000000000000..80bc3a54535e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import java.io.IOException; + +/** + * A {@link Secret} manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ +public class GcpSecret implements Secret { + private final String versionName; + + /** + * Initializes a {@link GcpSecret} object. + * + * @param versionName The full version name of the secret in Google Cloud Secret Manager. For + * example: projects//secrets//versions/1. For more info, see + * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version + */ + public GcpSecret(String versionName) { + this.versionName = versionName; + } + + /** + * Returns the secret as a byte array. Assumes that the current active service account has + * permissions to read the secret. + * + * @return The secret as a byte array. + */ + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.parse(versionName); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve secret bytes", e); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java new file mode 100644 index 000000000000..fe476ef6cb1d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; + +/** + * A secret management interface used for handling sensitive data. + * + *

This interface provides a generic way to handle secrets. Implementations of this interface + * should handle fetching secrets from a secret management system. The underlying secret management + * system should be able to return a valid byte array representing the secret. + */ +public interface Secret extends Serializable { + /** + * Returns the secret as a byte array. + * + * @return The secret as a byte array. + */ + byte[] getSecretBytes(); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java new file mode 100644 index 000000000000..ba4c50e5a41e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.junit.Assert.assertThrows; + +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpSecret; +import org.apache.beam.sdk.util.Secret; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GroupByEncryptedKey}. */ +@RunWith(JUnit4.class) +public class GroupByEncryptedKeyTest implements Serializable { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + private static class FakeSecret implements Secret { + private final byte[] secret = + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); + + @Override + public byte[] getSecretBytes() { + return secret; + } + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyFakeSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(new FakeSecret())); + + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static Secret gcpSecret; + + @BeforeClass + public static void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + } + + @AfterClass + public static void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(gcpSecret)); + + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecretThrows() { + Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); + assertThrows(RuntimeException.class, () -> p.run()); + } + + private static class SortValues + extends SimpleFunction>, KV>> { + @Override + public KV> apply(KV> input) { + List sorted = + StreamSupport.stream(input.getValue().spliterator(), false) + .sorted() + .collect(Collectors.toList()); + return KV.of(input.getKey(), sorted); + } + } +}