Skip to content

Commit 4c9d55b

Browse files
committed
Revert "Add pipeline option to force GBEK (Java) (#36346)"
This reverts commit c8df4da.
1 parent 8325d1b commit 4c9d55b

File tree

15 files changed

+11
-595
lines changed

15 files changed

+11
-595
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 2
3+
"modification": 1
44
}

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
44
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
55
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 2,
6+
"modification": 1,
77
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
88
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
{
2-
"modification": 4,
2+
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
3+
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
4+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
5+
"comment": "Modify this file in a trivial way to cause this test suite to run",
6+
"modification": 3,
37
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
48
}

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,17 @@
2525
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
2626
import org.apache.beam.sdk.coders.IterableCoder;
2727
import org.apache.beam.sdk.coders.KvCoder;
28-
import org.apache.beam.sdk.options.PipelineOptions;
2928
import org.apache.beam.sdk.runners.AppliedPTransform;
30-
import org.apache.beam.sdk.transforms.GroupByEncryptedKey;
3129
import org.apache.beam.sdk.transforms.PTransform;
3230
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
3331
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
34-
import org.apache.beam.sdk.util.Secret;
3532
import org.apache.beam.sdk.util.construction.PTransformTranslation;
3633
import org.apache.beam.sdk.util.construction.SdkComponents;
3734
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
3835
import org.apache.beam.sdk.values.KV;
3936
import org.apache.beam.sdk.values.PCollection;
4037
import org.apache.beam.sdk.values.PCollection.IsBounded;
4138
import org.apache.beam.sdk.values.WindowingStrategy;
42-
import org.checkerframework.checker.nullness.qual.Nullable;
4339

4440
/**
4541
* Specialized implementation of {@code GroupByKey} for translating Redistribute transform into
@@ -50,13 +46,9 @@ public class DataflowGroupByKey<K, V>
5046

5147
// Plumbed from Redistribute transform.
5248
private final boolean allowDuplicates;
53-
private boolean insideGBEK;
54-
private boolean surroundsGBEK;
5549

5650
private DataflowGroupByKey(boolean allowDuplicates) {
5751
this.allowDuplicates = allowDuplicates;
58-
this.insideGBEK = false;
59-
this.surroundsGBEK = false;
6052
}
6153

6254
/**
@@ -87,22 +79,6 @@ public boolean allowDuplicates() {
8779
return allowDuplicates;
8880
}
8981

90-
/**
91-
* For Beam internal use only. Tells runner that this is an inner GBK inside of a
92-
* GroupByEncryptedKey
93-
*/
94-
public void setInsideGBEK() {
95-
this.insideGBEK = true;
96-
}
97-
98-
/**
99-
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
100-
* GroupByEncryptedKey
101-
*/
102-
public boolean surroundsGBEK() {
103-
return this.surroundsGBEK;
104-
}
105-
10682
/////////////////////////////////////////////////////////////////////////////
10783

10884
public static void applicableTo(PCollection<?> input) {
@@ -141,20 +117,6 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
141117
"the keyCoder of a DataflowGroupByKey must be deterministic", e);
142118
}
143119

144-
PipelineOptions options = input.getPipeline().getOptions();
145-
String gbekOveride = options.getGbek();
146-
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
147-
this.surroundsGBEK = true;
148-
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
149-
DataflowGroupByKey<byte[], KV<byte[], byte[]>> gbk = DataflowGroupByKey.create();
150-
if (this.allowDuplicates) {
151-
gbk = DataflowGroupByKey.createWithAllowDuplicates();
152-
}
153-
gbk.setInsideGBEK();
154-
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
155-
return input.apply(gbek);
156-
}
157-
158120
// This primitive operation groups by the combination of key and window,
159121
// merging windows as needed, using the windows assigned to the
160122
// key/value input elements and the window merge operation of the
@@ -209,22 +171,10 @@ public String getUrn() {
209171
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
210172
}
211173

212-
@Override
213-
public String getUrn(DataflowGroupByKey<?, ?> transform) {
214-
if (transform.surroundsGBEK()) {
215-
return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN;
216-
}
217-
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
218-
}
219-
220174
@Override
221175
@SuppressWarnings("nullness")
222-
public RunnerApi.@Nullable FunctionSpec translate(
176+
public RunnerApi.FunctionSpec translate(
223177
AppliedPTransform<?, ?, DataflowGroupByKey<?, ?>> transform, SdkComponents components) {
224-
if (transform.getTransform().surroundsGBEK()) {
225-
// Can use null for spec for empty composite.
226-
return null;
227-
}
228178
return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
229179
}
230180
}

sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
5959
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
6060
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
61-
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
62-
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
6361
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" />
6462
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
6563
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />

sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.beam.sdk.util.ReleaseInfo;
3838
import org.apache.beam.sdk.util.common.ReflectHelpers;
3939
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
40-
import org.checkerframework.checker.nullness.qual.Nullable;
4140
import org.joda.time.DateTimeUtils;
4241
import org.joda.time.DateTimeZone;
4342
import org.joda.time.format.DateTimeFormat;
@@ -414,40 +413,6 @@ public Long create(PipelineOptions options) {
414413

415414
void setUserAgent(String userAgent);
416415

417-
/**
418-
* A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey
419-
*
420-
* <p>Beam will infer the secret type and value based on the secret itself. This guarantees that
421-
* any data at rest during the performing a GBK, so this can be used to guarantee that data is not
422-
* unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The
423-
* option should be structured like:
424-
*
425-
* <pre><code>
426-
* --gbek=type:<secret_type>;<secret_param>:<value>
427-
* </code></pre>
428-
*
429-
* for example:
430-
*
431-
* <pre><code>
432-
* --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
433-
* </code></pre>
434-
*
435-
* All variables should use snake case to allow consistency across languages.
436-
*/
437-
@Description(
438-
"When set, will replace all GroupByKey transforms in the pipeline the option. Beam will"
439-
+ " infer the secret type and value based on the secret itself. This guarantees that"
440-
+ " any data at rest during the performing a GBK, so this can be used to guarantee"
441-
+ " that data is not unencrypted. Runners with this behavior include the Dataflow,"
442-
+ " Flink, and Spark runners. The option should be structured like:"
443-
+ " --gbek=type:<secret_type>;<secret_param>:<value>, for example "
444-
+ " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables "
445-
+ " should use snake case to allow consistency across languages.")
446-
@Nullable
447-
String getGbek();
448-
449-
void setGbek(String gbek);
450-
451416
/**
452417
* Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link
453418
* ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}.

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,9 @@ public class GroupByEncryptedKey<K, V>
5353
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
5454

5555
private final Secret hmacKey;
56-
private final PTransform<
57-
PCollection<KV<byte[], KV<byte[], byte[]>>>,
58-
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
59-
gbk;
6056

61-
private GroupByEncryptedKey(
62-
Secret hmacKey,
63-
PTransform<
64-
PCollection<KV<byte[], KV<byte[], byte[]>>>,
65-
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
66-
gbk) {
57+
private GroupByEncryptedKey(Secret hmacKey) {
6758
this.hmacKey = hmacKey;
68-
this.gbk = gbk;
6959
}
7060

7161
/**
@@ -77,25 +67,7 @@ private GroupByEncryptedKey(
7767
* @return A {@link GroupByEncryptedKey} transform.
7868
*/
7969
public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
80-
return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create());
81-
}
82-
83-
/**
84-
* Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle.
85-
*
86-
* @param hmacKey The {@link Secret} key to use for encryption.
87-
* @param gbk The custom GBK transform to use in the middle of the GBEK.
88-
* @param <K> The type of the keys in the input PCollection.
89-
* @param <V> The type of the values in the input PCollection.
90-
* @return A {@link GroupByEncryptedKey} transform.
91-
*/
92-
public static <K, V> GroupByEncryptedKey<K, V> createWithCustomGbk(
93-
Secret hmacKey,
94-
PTransform<
95-
PCollection<KV<byte[], KV<byte[], byte[]>>>,
96-
PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>>>
97-
gbk) {
98-
return new GroupByEncryptedKey<>(hmacKey, gbk);
70+
return new GroupByEncryptedKey<>(hmacKey);
9971
}
10072

10173
@Override
@@ -121,7 +93,7 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
12193
.apply(
12294
"EncryptMessage",
12395
ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder)))
124-
.apply(this.gbk);
96+
.apply(GroupByKey.create());
12597

12698
return grouped
12799
.apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder)))

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
3333
import org.apache.beam.sdk.transforms.windowing.Window;
3434
import org.apache.beam.sdk.transforms.windowing.WindowFn;
35-
import org.apache.beam.sdk.util.Secret;
3635
import org.apache.beam.sdk.values.KV;
3736
import org.apache.beam.sdk.values.PCollection;
3837
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -116,13 +115,9 @@ public class GroupByKey<K, V>
116115
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
117116

118117
private final boolean fewKeys;
119-
private boolean insideGBEK;
120-
private boolean surroundsGBEK;
121118

122119
private GroupByKey(boolean fewKeys) {
123120
this.fewKeys = fewKeys;
124-
this.insideGBEK = false;
125-
surroundsGBEK = false;
126121
}
127122

128123
/**
@@ -153,21 +148,6 @@ public boolean fewKeys() {
153148
return fewKeys;
154149
}
155150

156-
/**
157-
* For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey
158-
*/
159-
public void setInsideGBEK() {
160-
this.insideGBEK = true;
161-
}
162-
163-
/**
164-
* For Beam internal use only. Tells runner that this is a GBK wrapped around of a
165-
* GroupByEncryptedKey
166-
*/
167-
public boolean surroundsGBEK() {
168-
return this.surroundsGBEK;
169-
}
170-
171151
/////////////////////////////////////////////////////////////////////////////
172152

173153
public static void applicableTo(PCollection<?> input) {
@@ -264,20 +244,6 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
264244
throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);
265245
}
266246

267-
PipelineOptions options = input.getPipeline().getOptions();
268-
String gbekOveride = options.getGbek();
269-
if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) {
270-
this.surroundsGBEK = true;
271-
Secret hmacSecret = Secret.parseSecretOption(gbekOveride);
272-
GroupByKey<byte[], KV<byte[], byte[]>> gbk = GroupByKey.create();
273-
if (this.fewKeys) {
274-
gbk = GroupByKey.createWithFewKeys();
275-
}
276-
gbk.setInsideGBEK();
277-
GroupByEncryptedKey<K, V> gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk);
278-
return input.apply(gbek);
279-
}
280-
281247
// This primitive operation groups by the combination of key and window,
282248
// merging windows as needed, using the windows assigned to the
283249
// key/value input elements and the window merge operation of the

sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,4 @@ public byte[] getSecretBytes() {
5555
throw new RuntimeException("Failed to retrieve secret bytes", e);
5656
}
5757
}
58-
59-
/**
60-
* Returns the version name of the secret.
61-
*
62-
* @return The version name as a String.
63-
*/
64-
public String getVersionName() {
65-
return versionName;
66-
}
6758
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
package org.apache.beam.sdk.util;
1919

2020
import java.io.Serializable;
21-
import java.util.Arrays;
22-
import java.util.HashMap;
23-
import java.util.HashSet;
24-
import java.util.Map;
25-
import java.util.Set;
2621

2722
/**
2823
* A secret management interface used for handling sensitive data.
@@ -38,48 +33,4 @@ public interface Secret extends Serializable {
3833
* @return The secret as a byte array.
3934
*/
4035
byte[] getSecretBytes();
41-
42-
static Secret parseSecretOption(String secretOption) {
43-
Map<String, String> paramMap = new HashMap<>();
44-
for (String param : secretOption.split(";", -1)) {
45-
String[] parts = param.split(":", 2);
46-
if (parts.length == 2) {
47-
paramMap.put(parts[0], parts[1]);
48-
}
49-
}
50-
51-
if (!paramMap.containsKey("type")) {
52-
throw new RuntimeException("Secret string must contain a valid type parameter");
53-
}
54-
55-
String secretType = paramMap.get("type");
56-
paramMap.remove("type");
57-
58-
if (secretType == null) {
59-
throw new RuntimeException("Secret string must contain a valid value for type parameter");
60-
}
61-
62-
switch (secretType.toLowerCase()) {
63-
case "gcpsecret":
64-
Set<String> gcpSecretParams = new HashSet<>(Arrays.asList("version_name"));
65-
for (String paramName : paramMap.keySet()) {
66-
if (!gcpSecretParams.contains(paramName)) {
67-
throw new RuntimeException(
68-
String.format(
69-
"Invalid secret parameter %s, GcpSecret only supports the following parameters: %s",
70-
paramName, gcpSecretParams));
71-
}
72-
}
73-
String versionName = paramMap.get("version_name");
74-
if (versionName == null) {
75-
throw new RuntimeException(
76-
"version_name must contain a valid value for versionName parameter");
77-
}
78-
return new GcpSecret(versionName);
79-
default:
80-
throw new RuntimeException(
81-
String.format(
82-
"Invalid secret type %s, currently only GcpSecret is supported", secretType));
83-
}
84-
}
8536
}

0 commit comments

Comments
 (0)