Skip to content

Commit 2028dfe

Browse files
chore: Fixed constructor argument.
1 parent 3bc834f commit 2028dfe

File tree

9 files changed

+274
-265
lines changed

9 files changed

+274
-265
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,10 @@
2323
import com.google.cloud.teleport.metadata.TemplateParameter;
2424
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
2525
import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions;
26-
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
27-
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
28-
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3026
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31-
import java.util.Collections;
3227
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3328
import org.apache.beam.sdk.Pipeline;
3429
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3630
import org.apache.beam.sdk.options.Default;
3731
import org.apache.beam.sdk.options.Description;
3832
import org.apache.beam.sdk.options.PipelineOptions;
@@ -269,7 +263,6 @@ public static void main(String[] args) {
269263
.withDatabaseId(options.getDatabaseId())
270264
.withRpcPriority(options.getSpannerPriority())
271265
.withDataBoostEnabled(options.getDataBoostEnabled());
272-
273266
p.begin()
274267
.apply(
275268
"Run Export",
@@ -282,7 +275,6 @@ public static void main(String[] args) {
282275
options.getShouldExportRelatedTables(),
283276
options.getShouldExportTimestampAsLogicalType(),
284277
options.getAvroTempDirectory()));
285-
validateRequiredPermissions(options);
286278
PipelineResult result = p.run();
287279
if (options.getWaitUntilFinish()
288280
&&
@@ -293,26 +285,4 @@ public static void main(String[] args) {
293285
result.waitUntilFinish();
294286
}
295287
}
296-
297-
private static void validateRequiredPermissions(ExportPipelineOptions options) {
298-
IAMResourceRequirements spannerRequirements =
299-
IAMRequirementsCreator.createSpannerResourceRequirement();
300-
301-
GcpOptions gcpOptions = options.as(GcpOptions.class);
302-
303-
IAMPermissionsChecker iamPermissionsChecker =
304-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
305-
IAMCheckResult missingPermission =
306-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
307-
if (missingPermission.isSuccess()) {
308-
return;
309-
}
310-
String errorString =
311-
"For resource: "
312-
+ missingPermission.getResourceName()
313-
+ ", missing permissions: "
314-
+ missingPermission.getMissingPermissions()
315-
+ ";";
316-
throw new RuntimeException(errorString);
317-
}
318288
}

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import com.google.cloud.teleport.spanner.ddl.Sequence;
3131
import com.google.cloud.teleport.spanner.ddl.Table;
3232
import com.google.cloud.teleport.spanner.ddl.Udf;
33+
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
34+
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
35+
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
3336
import com.google.cloud.teleport.spanner.proto.ExportProtos;
3437
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
3538
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
@@ -53,6 +56,7 @@
5356
import java.nio.file.Files;
5457
import java.nio.file.Path;
5558
import java.nio.file.Paths;
59+
import java.security.GeneralSecurityException;
5660
import java.util.ArrayList;
5761
import java.util.Arrays;
5862
import java.util.Collection;
@@ -86,19 +90,8 @@
8690
import org.apache.beam.sdk.io.fs.ResolveOptions;
8791
import org.apache.beam.sdk.io.fs.ResourceId;
8892
import org.apache.beam.sdk.options.ValueProvider;
89-
import org.apache.beam.sdk.transforms.Combine;
93+
import org.apache.beam.sdk.transforms.*;
9094
import org.apache.beam.sdk.transforms.Combine.CombineFn;
91-
import org.apache.beam.sdk.transforms.Contextful;
92-
import org.apache.beam.sdk.transforms.Create;
93-
import org.apache.beam.sdk.transforms.DoFn;
94-
import org.apache.beam.sdk.transforms.Flatten;
95-
import org.apache.beam.sdk.transforms.GroupByKey;
96-
import org.apache.beam.sdk.transforms.PTransform;
97-
import org.apache.beam.sdk.transforms.ParDo;
98-
import org.apache.beam.sdk.transforms.Requirements;
99-
import org.apache.beam.sdk.transforms.SerializableFunction;
100-
import org.apache.beam.sdk.transforms.SerializableFunctions;
101-
import org.apache.beam.sdk.transforms.View;
10295
import org.apache.beam.sdk.transforms.join.CoGbkResult;
10396
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
10497
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
@@ -184,6 +177,19 @@ public ExportTransform(
184177
public WriteFilesResult<String> expand(PBegin begin) {
185178
Pipeline p = begin.getPipeline();
186179

180+
PCollection<Void> validationSignal =
181+
begin
182+
.apply("Trigger Validation", Create.of((Void) null))
183+
.apply(
184+
"Validate Config",
185+
ParDo.of(
186+
new DoFn<Void, Void>() {
187+
@ProcessElement
188+
public void processElement(ProcessContext c) {
189+
validateRequiredPermissions(spannerConfig);
190+
}
191+
}));
192+
187193
/*
188194
* Allow users to specify read timestamp.
189195
* CreateTransaction and CreateTransactionFn classes in SpannerIO
@@ -198,6 +204,7 @@ public WriteFilesResult<String> expand(PBegin begin) {
198204
.apply(
199205
"Create transaction",
200206
ParDo.of(new CreateTransactionFnWithTimestamp(spannerConfig, snapshotTime)))
207+
.apply("Validate", Wait.on(validationSignal))
201208
.apply("Tx As PCollectionView", View.asSingleton());
202209

203210
PCollectionView<Dialect> dialectView =
@@ -1120,4 +1127,33 @@ private TableManifest buildGcsManifest(ProcessContext c, Iterable<GcsPath> files
11201127
return result.build();
11211128
}
11221129
}
1130+
1131+
1132+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
1133+
String instanceId = spannerConfig.getInstanceId().get();
1134+
String databaseId = spannerConfig.getDatabaseId().get();
1135+
IAMCheckResult iamCheckResult;
1136+
try {
1137+
IAMPermissionsChecker iamPermissionsChecker =
1138+
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
1139+
iamCheckResult =
1140+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
1141+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
1142+
instanceId,
1143+
databaseId);
1144+
if (iamCheckResult.isPermissionsAvailable()) {
1145+
return;
1146+
}
1147+
String errorString =
1148+
"For resource: "
1149+
+ iamCheckResult.getResourceName()
1150+
+ ", missing permissions: "
1151+
+ iamCheckResult.getMissingPermissions()
1152+
+ ";";
1153+
throw new RuntimeException(errorString);
1154+
} catch (GeneralSecurityException | IOException e) {
1155+
LOG.error("Error while validating permissions for spanner", e);
1156+
throw new RuntimeException(e);
1157+
}
1158+
}
11231159
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,10 @@
2323
import com.google.cloud.teleport.metadata.TemplateParameter;
2424
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
2525
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
26-
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
27-
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
28-
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3026
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31-
import java.util.Collections;
3227
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3328
import org.apache.beam.sdk.Pipeline;
3429
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3630
import org.apache.beam.sdk.options.Default;
3731
import org.apache.beam.sdk.options.Description;
3832
import org.apache.beam.sdk.options.PipelineOptions;
@@ -246,6 +240,7 @@ public static void main(String[] args) {
246240
.withInstanceId(options.getInstanceId())
247241
.withDatabaseId(options.getDatabaseId())
248242
.withRpcPriority(options.getSpannerPriority());
243+
249244
p.apply(
250245
new ImportTransform(
251246
spannerConfig,
@@ -258,7 +253,6 @@ public static void main(String[] args) {
258253
options.getDdlCreationTimeoutInMinutes(),
259254
options.getEarlyIndexCreateThreshold()));
260255

261-
validateRequiredPermissions(options);
262256
PipelineResult result = p.run();
263257

264258
if (options.getWaitUntilFinish()
@@ -270,25 +264,4 @@ public static void main(String[] args) {
270264
result.waitUntilFinish();
271265
}
272266
}
273-
274-
private static void validateRequiredPermissions(Options options) {
275-
IAMResourceRequirements spannerRequirements =
276-
IAMRequirementsCreator.createSpannerResourceRequirement();
277-
GcpOptions gcpOptions = options.as(GcpOptions.class);
278-
279-
IAMPermissionsChecker iamPermissionsChecker =
280-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
281-
IAMCheckResult missingPermission =
282-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
283-
if (missingPermission.isSuccess()) {
284-
return;
285-
}
286-
String errorString =
287-
"For resource: "
288-
+ missingPermission.getResourceName()
289-
+ ", missing permissions: "
290-
+ missingPermission.getMissingPermissions()
291-
+ ";";
292-
throw new RuntimeException(errorString);
293-
}
294267
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.spanner.Database;
2020
import com.google.cloud.spanner.DatabaseAdminClient;
2121
import com.google.cloud.spanner.DatabaseId;
22+
import com.google.cloud.spanner.DatabaseNotFoundException;
2223
import com.google.cloud.spanner.Dialect;
2324
import com.google.cloud.spanner.Mutation;
2425
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
@@ -29,6 +30,9 @@
2930
import com.google.cloud.teleport.spanner.ddl.Sequence;
3031
import com.google.cloud.teleport.spanner.ddl.Table;
3132
import com.google.cloud.teleport.spanner.ddl.Udf;
33+
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
34+
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
35+
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
3236
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
3337
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
3438
import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest;
@@ -54,6 +58,7 @@
5458
import java.nio.channels.Channels;
5559
import java.nio.file.Path;
5660
import java.nio.file.Paths;
61+
import java.security.GeneralSecurityException;
5762
import java.util.ArrayList;
5863
import java.util.Arrays;
5964
import java.util.List;
@@ -141,9 +146,23 @@ public ImportTransform(
141146

142147
@Override
143148
public PDone expand(PBegin begin) {
149+
150+
PCollection<Void> validationSignal =
151+
begin
152+
.apply("Trigger Validation", Create.of((Void) null))
153+
.apply(
154+
"Validate Config",
155+
ParDo.of(
156+
new DoFn<Void, Void>() {
157+
@ProcessElement
158+
public void processElement(ProcessContext c) {
159+
validateRequiredPermissions(spannerConfig);
160+
}
161+
}));
144162
PCollectionView<Dialect> dialectView =
145163
begin
146164
.apply("Read Dialect", new ReadDialect(spannerConfig))
165+
.apply("Validate", Wait.on(validationSignal))
147166
.apply("Dialect As PCollectionView", View.asSingleton());
148167

149168
PCollection<Export> manifest =
@@ -953,4 +972,41 @@ private void validateLocalFiles(ProcessContext c, String table, TableManifest ma
953972
}
954973
}
955974
}
975+
976+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
977+
String instanceId = spannerConfig.getInstanceId().get();
978+
String databaseId = spannerConfig.getDatabaseId().get();
979+
IAMCheckResult iamCheckResult;
980+
try {
981+
IAMPermissionsChecker iamPermissionsChecker =
982+
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
983+
try {
984+
985+
iamCheckResult =
986+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
987+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
988+
instanceId,
989+
databaseId);
990+
} catch (DatabaseNotFoundException e) {
991+
// If DatabaseNotFoundException exception is thrown, then validation at instance level need
992+
// to be performed.
993+
iamCheckResult =
994+
iamPermissionsChecker.checkSpannerInstanceRequirements(
995+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(), instanceId);
996+
}
997+
if (iamCheckResult.isPermissionsAvailable()) {
998+
return;
999+
}
1000+
String errorString =
1001+
"For resource: "
1002+
+ iamCheckResult.getResourceName()
1003+
+ ", missing permissions: "
1004+
+ iamCheckResult.getMissingPermissions()
1005+
+ ";";
1006+
throw new RuntimeException(errorString);
1007+
} catch (GeneralSecurityException | IOException e) {
1008+
LOG.error("Error while validating permissions for spanner", e);
1009+
throw new RuntimeException(e);
1010+
}
1011+
}
9561012
}

v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public List<String> getMissingPermissions() {
3636
return new ArrayList<>(missingPermissions);
3737
}
3838

39-
public boolean isSuccess() {
39+
public boolean isPermissionsAvailable() {
4040
return missingPermissions.isEmpty();
4141
}
4242

@@ -49,7 +49,7 @@ public String toString() {
4949
+ ", missingPermissions="
5050
+ missingPermissions
5151
+ ", success="
52-
+ isSuccess()
52+
+ isPermissionsAvailable()
5353
+ '}';
5454
}
5555
}

0 commit comments

Comments
 (0)