Skip to content

Commit c8df4da

Browse files
authored
Add pipeline option to force GBEK (Java) (#36346)
* First pass at Java GBEK (AI generated) * Compile * Compiletest * checkstyle * tests passing * Move secret code into utils * Use secret manager from bom * Docs * Better docs * Updates * [WIP] Add pipeline option to force GBEK (Java) * Trigger some postcommits * Update triggers * Tests * test fixes * Move tests to IT * Randomized secret postfix * Update encryption mode * checkstyle * explicitly add dep * spotbugs: only create generator once * Gemini nits * Feedback * Syntax + format
1 parent 27ad139 commit c8df4da

File tree

15 files changed

+595
-11
lines changed

15 files changed

+595
-11
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 2
44
}

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
44
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
55
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 1,
6+
"modification": 2,
77
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
88
}
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
{
2-
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
3-
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
4-
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
5-
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 3,
2+
"modification": 4,
73
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
84
}

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,21 @@
2525
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
2626
import org.apache.beam.sdk.coders.IterableCoder;
2727
import org.apache.beam.sdk.coders.KvCoder;
28+
import org.apache.beam.sdk.options.PipelineOptions;
2829
import org.apache.beam.sdk.runners.AppliedPTransform;
30+
import org.apache.beam.sdk.transforms.GroupByEncryptedKey;
2931
import org.apache.beam.sdk.transforms.PTransform;
3032
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
3133
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
34+
import org.apache.beam.sdk.util.Secret;
3235
import org.apache.beam.sdk.util.construction.PTransformTranslation;
3336
import org.apache.beam.sdk.util.construction.SdkComponents;
3437
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
3538
import org.apache.beam.sdk.values.KV;
3639
import org.apache.beam.sdk.values.PCollection;
3740
import org.apache.beam.sdk.values.PCollection.IsBounded;
3841
import org.apache.beam.sdk.values.WindowingStrategy;
42+
import org.checkerframework.checker.nullness.qual.Nullable;
3943

4044
/**
4145
* Specialized implementation of {@code GroupByKey} for translating Redistribute transform into
@@ -46,9 +50,13 @@ public class DataflowGroupByKey<K, V>
4650

4751
// Plumbed from Redistribute transform.
4852
private final boolean allowDuplicates;
53+
private boolean insideGBEK;
54+
private boolean surroundsGBEK;
4955

5056
private DataflowGroupByKey(boolean allowDuplicates) {
5157
this.allowDuplicates = allowDuplicates;
58+
this.insideGBEK = false;
59+
this.surroundsGBEK = false;
5260
}
5361

5462
/**
@@ -79,6 +87,22 @@ public boolean allowDuplicates() {
7987
return allowDuplicates;
8088
}
8189

90+
/**
91+
* For Beam internal use only. Tells runner that this is an inner GBK inside of a
92+
* GroupByEncryptedKey
93+
*/
94+
public void setInsideGBEK() {
95+
this.insideGBEK = true;
96+
}
97+
98+
/**
99+
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
100+
* GroupByEncryptedKey
101+
*/
102+
public boolean surroundsGBEK() {
103+
return this.surroundsGBEK;
104+
}
105+
82106
/////////////////////////////////////////////////////////////////////////////
83107

84108
public static void applicableTo(PCollection<?> input) {
@@ -117,6 +141,20 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
117141
"the keyCoder of a DataflowGroupByKey must be deterministic", e);
118142
}
119143

144+
PipelineOptions options = input.getPipeline().getOptions();
145+
String gbekOveride = options.getGbek();
146+
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
147+
this.surroundsGBEK = true;
148+
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
149+
DataflowGroupByKey<byte[], KV<byte[], byte[]>> gbk = DataflowGroupByKey.create();
150+
if (this.allowDuplicates) {
151+
gbk = DataflowGroupByKey.createWithAllowDuplicates();
152+
}
153+
gbk.setInsideGBEK();
154+
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
155+
return input.apply(gbek);
156+
}
157+
120158
// This primitive operation groups by the combination of key and window,
121159
// merging windows as needed, using the windows assigned to the
122160
// key/value input elements and the window merge operation of the
@@ -171,10 +209,22 @@ public String getUrn() {
171209
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
172210
}
173211

212+
@Override
213+
public String getUrn(DataflowGroupByKey<?, ?> transform) {
214+
if (transform.surroundsGBEK()) {
215+
return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN;
216+
}
217+
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
218+
}
219+
174220
@Override
175221
@SuppressWarnings("nullness")
176-
public RunnerApi.FunctionSpec translate(
222+
public RunnerApi.@Nullable FunctionSpec translate(
177223
AppliedPTransform<?, ?, DataflowGroupByKey<?, ?>> transform, SdkComponents components) {
224+
if (transform.getTransform().surroundsGBEK()) {
225+
// Can use null for spec for empty composite.
226+
return null;
227+
}
178228
return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
179229
}
180230
}

sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
5959
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
6060
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
61+
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
62+
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
6163
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" />
6264
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
6365
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />

sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.beam.sdk.util.ReleaseInfo;
3838
import org.apache.beam.sdk.util.common.ReflectHelpers;
3939
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
40+
import org.checkerframework.checker.nullness.qual.Nullable;
4041
import org.joda.time.DateTimeUtils;
4142
import org.joda.time.DateTimeZone;
4243
import org.joda.time.format.DateTimeFormat;
@@ -413,6 +414,40 @@ public Long create(PipelineOptions options) {
413414

414415
void setUserAgent(String userAgent);
415416

417+
/**
418+
* A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey
419+
*
420+
* <p>Beam will infer the secret type and value based on the secret itself. This guarantees that
421+
* any data at rest during the performing a GBK, so this can be used to guarantee that data is not
422+
* unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The
423+
* option should be structured like:
424+
*
425+
* <pre><code>
426+
* --gbek=type:<secret_type>;<secret_param>:<value>
427+
* </code></pre>
428+
*
429+
* for example:
430+
*
431+
* <pre><code>
432+
* --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
433+
* </code></pre>
434+
*
435+
* All variables should use snake case to allow consistency across languages.
436+
*/
437+
@Description(
438+
"When set, will replace all GroupByKey transforms in the pipeline the option. Beam will"
439+
+ " infer the secret type and value based on the secret itself. This guarantees that"
440+
+ " any data at rest during the performing a GBK, so this can be used to guarantee"
441+
+ " that data is not unencrypted. Runners with this behavior include the Dataflow,"
442+
+ " Flink, and Spark runners. The option should be structured like:"
443+
+ " --gbek=type:<secret_type>;<secret_param>:<value>, for example "
444+
+ " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables "
445+
+ " should use snake case to allow consistency across languages.")
446+
@Nullable
447+
String getGbek();
448+
449+
void setGbek(String gbek);
450+
416451
/**
417452
* Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link
418453
* ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}.

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,19 @@ public class GroupByEncryptedKey<K, V>
5353
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
5454

5555
private final Secret hmacKey;
56+
private final PTransform<
57+
PCollection<KV<byte[], KV<byte[], byte[]>>>,
58+
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
59+
gbk;
5660

57-
private GroupByEncryptedKey(Secret hmacKey) {
61+
private GroupByEncryptedKey(
62+
Secret hmacKey,
63+
PTransform<
64+
PCollection<KV<byte[], KV<byte[], byte[]>>>,
65+
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
66+
gbk) {
5867
this.hmacKey = hmacKey;
68+
this.gbk = gbk;
5969
}
6070

6171
/**
@@ -67,7 +77,25 @@ private GroupByEncryptedKey(Secret hmacKey) {
6777
* @return A {@link GroupByEncryptedKey} transform.
6878
*/
6979
public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
70-
return new GroupByEncryptedKey<>(hmacKey);
80+
return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create());
81+
}
82+
83+
/**
84+
* Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle.
85+
*
86+
* @param hmacKey The {@link Secret} key to use for encryption.
87+
* @param gbk The custom GBK transform to use in the middle of the GBEK.
88+
* @param <K> The type of the keys in the input PCollection.
89+
* @param <V> The type of the values in the input PCollection.
90+
* @return A {@link GroupByEncryptedKey} transform.
91+
*/
92+
public static <K, V> GroupByEncryptedKey<K, V> createWithCustomGbk(
93+
Secret hmacKey,
94+
PTransform<
95+
PCollection<KV<byte[], KV<byte[], byte[]>>>,
96+
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
97+
gbk) {
98+
return new GroupByEncryptedKey<>(hmacKey, gbk);
7199
}
72100

73101
@Override
@@ -93,7 +121,7 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
93121
.apply(
94122
"EncryptMessage",
95123
ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder)))
96-
.apply(GroupByKey.create());
124+
.apply(this.gbk);
97125

98126
return grouped
99127
.apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder)))

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
3333
import org.apache.beam.sdk.transforms.windowing.Window;
3434
import org.apache.beam.sdk.transforms.windowing.WindowFn;
35+
import org.apache.beam.sdk.util.Secret;
3536
import org.apache.beam.sdk.values.KV;
3637
import org.apache.beam.sdk.values.PCollection;
3738
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -115,9 +116,13 @@ public class GroupByKey<K, V>
115116
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
116117

117118
private final boolean fewKeys;
119+
private boolean insideGBEK;
120+
private boolean surroundsGBEK;
118121

119122
private GroupByKey(boolean fewKeys) {
120123
this.fewKeys = fewKeys;
124+
this.insideGBEK = false;
125+
surroundsGBEK = false;
121126
}
122127

123128
/**
@@ -148,6 +153,21 @@ public boolean fewKeys() {
148153
return fewKeys;
149154
}
150155

156+
/**
157+
* For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey
158+
*/
159+
public void setInsideGBEK() {
160+
this.insideGBEK = true;
161+
}
162+
163+
/**
164+
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
165+
* GroupByEncryptedKey
166+
*/
167+
public boolean surroundsGBEK() {
168+
return this.surroundsGBEK;
169+
}
170+
151171
/////////////////////////////////////////////////////////////////////////////
152172

153173
public static void applicableTo(PCollection<?> input) {
@@ -244,6 +264,20 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
244264
throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);
245265
}
246266

267+
PipelineOptions options = input.getPipeline().getOptions();
268+
String gbekOveride = options.getGbek();
269+
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
270+
this.surroundsGBEK = true;
271+
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
272+
GroupByKey<byte[], KV<byte[], byte[]>> gbk = GroupByKey.create();
273+
if (this.fewKeys) {
274+
gbk = GroupByKey.createWithFewKeys();
275+
}
276+
gbk.setInsideGBEK();
277+
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
278+
return input.apply(gbek);
279+
}
280+
247281
// This primitive operation groups by the combination of key and window,
248282
// merging windows as needed, using the windows assigned to the
249283
// key/value input elements and the window merge operation of the

sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,13 @@ public byte[] getSecretBytes() {
5555
throw new RuntimeException("Failed to retrieve secret bytes", e);
5656
}
5757
}
58+
59+
/**
60+
* Returns the version name of the secret.
61+
*
62+
* @return The version name as a String.
63+
*/
64+
public String getVersionName() {
65+
return versionName;
66+
}
5867
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
package org.apache.beam.sdk.util;
1919

2020
import java.io.Serializable;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
2126

2227
/**
2328
* A secret management interface used for handling sensitive data.
@@ -33,4 +38,48 @@ public interface Secret extends Serializable {
3338
* @return The secret as a byte array.
3439
*/
3540
byte[] getSecretBytes();
41+
42+
static Secret parseSecretOption(String secretOption) {
43+
Map<String, String> paramMap = new HashMap<>();
44+
for (String param : secretOption.split(";", -1)) {
45+
String[] parts = param.split(":", 2);
46+
if (parts.length == 2) {
47+
paramMap.put(parts[0], parts[1]);
48+
}
49+
}
50+
51+
if (!paramMap.containsKey("type")) {
52+
throw new RuntimeException("Secret string must contain a valid type parameter");
53+
}
54+
55+
String secretType = paramMap.get("type");
56+
paramMap.remove("type");
57+
58+
if (secretType == null) {
59+
throw new RuntimeException("Secret string must contain a valid value for type parameter");
60+
}
61+
62+
switch (secretType.toLowerCase()) {
63+
case "gcpsecret":
64+
Set<String> gcpSecretParams = new HashSet<>(Arrays.asList("version_name"));
65+
for (String paramName : paramMap.keySet()) {
66+
if (!gcpSecretParams.contains(paramName)) {
67+
throw new RuntimeException(
68+
String.format(
69+
"Invalid secret parameter %s, GcpSecret only supports the following parameters: %s",
70+
paramName, gcpSecretParams));
71+
}
72+
}
73+
String versionName = paramMap.get("version_name");
74+
if (versionName == null) {
75+
throw new RuntimeException(
76+
"version_name must contain a valid value for versionName parameter");
77+
}
78+
return new GcpSecret(versionName);
79+
default:
80+
throw new RuntimeException(
81+
String.format(
82+
"Invalid secret type %s, currently only GcpSecret is supported", secretType));
83+
}
84+
}
3685
}

0 commit comments

Comments
 (0)