Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
"modification": 2,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
6 changes: 1 addition & 5 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3,
"modification": 4,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByEncryptedKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.Secret;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Specialized implementation of {@code GroupByKey} for translating Redistribute transform into
Expand All @@ -46,9 +50,13 @@ public class DataflowGroupByKey<K, V>

// Plumbed from Redistribute transform.
private final boolean allowDuplicates;
private boolean insideGBEK;
private boolean surroundsGBEK;

private DataflowGroupByKey(boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
this.insideGBEK = false;
this.surroundsGBEK = false;
}

/**
Expand Down Expand Up @@ -79,6 +87,22 @@ public boolean allowDuplicates() {
return allowDuplicates;
}

/**
* For Beam internal use only. Tells runner that this is an inner GBK inside of a
* GroupByEncryptedKey
*/
public void setInsideGBEK() {
this.insideGBEK = true;
}

/**
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
* GroupByEncryptedKey
*/
public boolean surroundsGBEK() {
return this.surroundsGBEK;
}

/////////////////////////////////////////////////////////////////////////////

public static void applicableTo(PCollection<?> input) {
Expand Down Expand Up @@ -117,6 +141,20 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
"the keyCoder of a DataflowGroupByKey must be deterministic", e);
}

PipelineOptions options = input.getPipeline().getOptions();
String gbekOveride = options.getGbek();
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
this.surroundsGBEK = true;
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
DataflowGroupByKey<byte[], KV<byte[], byte[]>> gbk = DataflowGroupByKey.create();
if (this.allowDuplicates) {
gbk = DataflowGroupByKey.createWithAllowDuplicates();
}
gbk.setInsideGBEK();
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
return input.apply(gbek);
}

// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
Expand Down Expand Up @@ -171,10 +209,22 @@ public String getUrn() {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
}

@Override
public String getUrn(DataflowGroupByKey<?, ?> transform) {
if (transform.surroundsGBEK()) {
return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN;
}
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
}

@Override
@SuppressWarnings("nullness")
public RunnerApi.FunctionSpec translate(
public RunnerApi.@Nullable FunctionSpec translate(
AppliedPTransform<?, ?, DataflowGroupByKey<?, ?>> transform, SdkComponents components) {
if (transform.getTransform().surroundsGBEK()) {
// Can use null for spec for empty composite.
return null;
}
return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -413,6 +414,40 @@ public Long create(PipelineOptions options) {

void setUserAgent(String userAgent);

/**
* A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey
*
* <p>Beam will infer the secret type and value based on the secret itself. This guarantees that
* any data at rest during the performing a GBK, so this can be used to guarantee that data is not
* unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The
* option should be structured like:
*
* <pre><code>
* --gbek=type:<secret_type>;<secret_param>:<value>
* </code></pre>
*
* for example:
*
* <pre><code>
* --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
* </code></pre>
*
* All variables should use snake case to allow consistency across languages.
*/
@Description(
"When set, will replace all GroupByKey transforms in the pipeline the option. Beam will"
+ " infer the secret type and value based on the secret itself. This guarantees that"
+ " any data at rest during the performing a GBK, so this can be used to guarantee"
+ " that data is not unencrypted. Runners with this behavior include the Dataflow,"
+ " Flink, and Spark runners. The option should be structured like:"
+ " --gbek=type:<secret_type>;<secret_param>:<value>, for example "
+ " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables "
+ " should use snake case to allow consistency across languages.")
@Nullable
String getGbek();

void setGbek(String gbek);

/**
* Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link
* ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,19 @@ public class GroupByEncryptedKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {

private final Secret hmacKey;
private final PTransform<
PCollection<KV<byte[], KV<byte[], byte[]>>>,
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
gbk;

private GroupByEncryptedKey(Secret hmacKey) {
private GroupByEncryptedKey(
Secret hmacKey,
PTransform<
PCollection<KV<byte[], KV<byte[], byte[]>>>,
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
gbk) {
this.hmacKey = hmacKey;
this.gbk = gbk;
}

/**
Expand All @@ -67,7 +77,25 @@ private GroupByEncryptedKey(Secret hmacKey) {
* @return A {@link GroupByEncryptedKey} transform.
*/
public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
return new GroupByEncryptedKey<>(hmacKey);
return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create());
}

/**
* Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle.
*
* @param hmacKey The {@link Secret} key to use for encryption.
* @param gbk The custom GBK transform to use in the middle of the GBEK.
* @param <K> The type of the keys in the input PCollection.
* @param <V> The type of the values in the input PCollection.
* @return A {@link GroupByEncryptedKey} transform.
*/
public static <K, V> GroupByEncryptedKey<K, V> createWithCustomGbk(
Secret hmacKey,
PTransform<
PCollection<KV<byte[], KV<byte[], byte[]>>>,
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
gbk) {
return new GroupByEncryptedKey<>(hmacKey, gbk);
}

@Override
Expand All @@ -93,7 +121,7 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
.apply(
"EncryptMessage",
ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder)))
.apply(GroupByKey.create());
.apply(this.gbk);

return grouped
.apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.Secret;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
Expand Down Expand Up @@ -115,9 +116,13 @@ public class GroupByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {

private final boolean fewKeys;
private boolean insideGBEK;
private boolean surroundsGBEK;

private GroupByKey(boolean fewKeys) {
this.fewKeys = fewKeys;
this.insideGBEK = false;
surroundsGBEK = false;
}

/**
Expand Down Expand Up @@ -148,6 +153,21 @@ public boolean fewKeys() {
return fewKeys;
}

/**
* For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey
*/
public void setInsideGBEK() {
this.insideGBEK = true;
}

/**
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
* GroupByEncryptedKey
*/
public boolean surroundsGBEK() {
return this.surroundsGBEK;
}

/////////////////////////////////////////////////////////////////////////////

public static void applicableTo(PCollection<?> input) {
Expand Down Expand Up @@ -244,6 +264,20 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);
}

PipelineOptions options = input.getPipeline().getOptions();
String gbekOveride = options.getGbek();
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
this.surroundsGBEK = true;
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
GroupByKey<byte[], KV<byte[], byte[]>> gbk = GroupByKey.create();
if (this.fewKeys) {
gbk = GroupByKey.createWithFewKeys();
}
gbk.setInsideGBEK();
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
return input.apply(gbek);
}

// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,13 @@ public byte[] getSecretBytes() {
throw new RuntimeException("Failed to retrieve secret bytes", e);
}
}

/**
* Returns the version name of the secret.
*
* @return The version name as a String.
*/
public String getVersionName() {
return versionName;
}
}
49 changes: 49 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.beam.sdk.util;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A secret management interface used for handling sensitive data.
Expand All @@ -33,4 +38,48 @@ public interface Secret extends Serializable {
* @return The secret as a byte array.
*/
byte[] getSecretBytes();

static Secret parseSecretOption(String secretOption) {
Map<String, String> paramMap = new HashMap<>();
for (String param : secretOption.split(";", -1)) {
String[] parts = param.split(":", 2);
if (parts.length == 2) {
paramMap.put(parts[0], parts[1]);
}
}

if (!paramMap.containsKey("type")) {
throw new RuntimeException("Secret string must contain a valid type parameter");
}

String secretType = paramMap.get("type");
paramMap.remove("type");

if (secretType == null) {
throw new RuntimeException("Secret string must contain a valid value for type parameter");
}

switch (secretType.toLowerCase()) {
case "gcpsecret":
Set<String> gcpSecretParams = new HashSet<>(Arrays.asList("version_name"));
for (String paramName : paramMap.keySet()) {
if (!gcpSecretParams.contains(paramName)) {
throw new RuntimeException(
String.format(
"Invalid secret parameter %s, GcpSecret only supports the following parameters: %s",
paramName, gcpSecretParams));
}
}
String versionName = paramMap.get("version_name");
if (versionName == null) {
throw new RuntimeException(
"version_name must contain a valid value for versionName parameter");
}
return new GcpSecret(versionName);
default:
throw new RuntimeException(
String.format(
"Invalid secret type %s, currently only GcpSecret is supported", secretType));
}
}
}
Loading
Loading