Skip to content

Commit 4c12002

Browse files
chore: Fixed constructor argument.
1 parent 3bc834f commit 4c12002

File tree

8 files changed

+235
-159
lines changed

8 files changed

+235
-159
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,21 @@
2626
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
2727
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
2828
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3029
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31-
import java.util.Collections;
30+
import java.io.IOException;
31+
import java.security.GeneralSecurityException;
3232
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3333
import org.apache.beam.sdk.Pipeline;
3434
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3635
import org.apache.beam.sdk.options.Default;
3736
import org.apache.beam.sdk.options.Description;
3837
import org.apache.beam.sdk.options.PipelineOptions;
3938
import org.apache.beam.sdk.options.PipelineOptionsFactory;
4039
import org.apache.beam.sdk.options.ValueProvider;
4140
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
4241
import org.apache.beam.sdk.transforms.SerializableFunction;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4344

4445
/**
4546
* Dataflow template that exports a Cloud Spanner database to Avro files in GCS.
@@ -78,6 +79,7 @@
7879
"In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
7980
})
8081
public class ExportPipeline {
82+
private static final Logger LOG = LoggerFactory.getLogger(ExportPipeline.class);
8183

8284
/** Options for Export pipeline. */
8385
public interface ExportPipelineOptions extends PipelineOptions {
@@ -282,7 +284,7 @@ public static void main(String[] args) {
282284
options.getShouldExportRelatedTables(),
283285
options.getShouldExportTimestampAsLogicalType(),
284286
options.getAvroTempDirectory()));
285-
validateRequiredPermissions(options);
287+
validateRequiredPermissions(spannerConfig);
286288
PipelineResult result = p.run();
287289
if (options.getWaitUntilFinish()
288290
&&
@@ -294,25 +296,31 @@ public static void main(String[] args) {
294296
}
295297
}
296298

297-
private static void validateRequiredPermissions(ExportPipelineOptions options) {
298-
IAMResourceRequirements spannerRequirements =
299-
IAMRequirementsCreator.createSpannerResourceRequirement();
300-
301-
GcpOptions gcpOptions = options.as(GcpOptions.class);
302-
303-
IAMPermissionsChecker iamPermissionsChecker =
304-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
305-
IAMCheckResult missingPermission =
306-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
307-
if (missingPermission.isSuccess()) {
308-
return;
299+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
300+
try {
301+
if (!spannerConfig.getProjectId().isAccessible()) {
302+
throw new IllegalArgumentException("Spanner configs not accessible");
303+
}
304+
IAMPermissionsChecker iamPermissionsChecker =
305+
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
306+
IAMCheckResult missingPermission =
307+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
308+
IAMRequirementsCreator.createSpannerReadResourceRequirement(),
309+
spannerConfig.getInstanceId().get(),
310+
spannerConfig.getDatabaseId().get());
311+
if (missingPermission.isPermissionsAvailable()) {
312+
return;
313+
}
314+
String errorString =
315+
"For resource: "
316+
+ missingPermission.getResourceName()
317+
+ ", missing permissions: "
318+
+ missingPermission.getMissingPermissions()
319+
+ ";";
320+
throw new RuntimeException(errorString);
321+
} catch (GeneralSecurityException | IOException e) {
322+
LOG.error("Error while validating permissions for spanner", e);
323+
throw new RuntimeException(e);
309324
}
310-
String errorString =
311-
"For resource: "
312-
+ missingPermission.getResourceName()
313-
+ ", missing permissions: "
314-
+ missingPermission.getMissingPermissions()
315-
+ ";";
316-
throw new RuntimeException(errorString);
317325
}
318326
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.teleport.spanner;
1717

18+
import com.google.cloud.spanner.DatabaseNotFoundException;
1819
import com.google.cloud.spanner.Options.RpcPriority;
1920
import com.google.cloud.spanner.SpannerOptions;
2021
import com.google.cloud.teleport.metadata.Template;
@@ -26,20 +27,22 @@
2627
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
2728
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
2829
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3030
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31+
import java.io.IOException;
32+
import java.security.GeneralSecurityException;
3133
import java.util.Collections;
3234
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3335
import org.apache.beam.sdk.Pipeline;
3436
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3637
import org.apache.beam.sdk.options.Default;
3738
import org.apache.beam.sdk.options.Description;
3839
import org.apache.beam.sdk.options.PipelineOptions;
3940
import org.apache.beam.sdk.options.PipelineOptionsFactory;
4041
import org.apache.beam.sdk.options.ValueProvider;
4142
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
4243
import org.apache.beam.sdk.transforms.SerializableFunction;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
4346

4447
/**
4548
* Avro to Cloud Spanner Import pipeline.
@@ -65,6 +68,7 @@
6568
"The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
6669
})
6770
public class ImportPipeline {
71+
private static final Logger LOG = LoggerFactory.getLogger(ImportPipeline.class);
6872

6973
/** Options for {@link ImportPipeline}. */
7074
public interface Options extends PipelineOptions {
@@ -258,7 +262,8 @@ public static void main(String[] args) {
258262
options.getDdlCreationTimeoutInMinutes(),
259263
options.getEarlyIndexCreateThreshold()));
260264

261-
validateRequiredPermissions(options);
265+
validateRequiredPermissions(spannerConfig);
266+
262267
PipelineResult result = p.run();
263268

264269
if (options.getWaitUntilFinish()
@@ -271,24 +276,43 @@ public static void main(String[] args) {
271276
}
272277
}
273278

274-
private static void validateRequiredPermissions(Options options) {
275-
IAMResourceRequirements spannerRequirements =
276-
IAMRequirementsCreator.createSpannerResourceRequirement();
277-
GcpOptions gcpOptions = options.as(GcpOptions.class);
278-
279-
IAMPermissionsChecker iamPermissionsChecker =
280-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
281-
IAMCheckResult missingPermission =
282-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
283-
if (missingPermission.isSuccess()) {
279+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
280+
if (!spannerConfig.getInstanceId().isAccessible()) {
284281
return;
285282
}
286-
String errorString =
287-
"For resource: "
288-
+ missingPermission.getResourceName()
289-
+ ", missing permissions: "
290-
+ missingPermission.getMissingPermissions()
291-
+ ";";
292-
throw new RuntimeException(errorString);
283+
String instanceId = spannerConfig.getInstanceId().get();
284+
String databaseId = spannerConfig.getDatabaseId().get();
285+
IAMCheckResult iamCheckResult = new IAMCheckResult("", Collections.emptyList());
286+
try {
287+
IAMPermissionsChecker iamPermissionsChecker =
288+
new IAMPermissionsChecker("span-cloud-migrations-testing");
289+
try {
290+
291+
iamCheckResult =
292+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
293+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
294+
instanceId,
295+
databaseId);
296+
} catch (DatabaseNotFoundException e) {
297+
iamCheckResult =
298+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
299+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
300+
instanceId,
301+
databaseId);
302+
}
303+
if (iamCheckResult.isPermissionsAvailable()) {
304+
return;
305+
}
306+
String errorString =
307+
"For resource: "
308+
+ iamCheckResult.getResourceName()
309+
+ ", missing permissions: "
310+
+ iamCheckResult.getMissingPermissions()
311+
+ ";";
312+
throw new RuntimeException(errorString);
313+
} catch (GeneralSecurityException | IOException e) {
314+
LOG.error("Error while validating permissions for spanner", e);
315+
throw new RuntimeException(e);
316+
}
293317
}
294318
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.spanner.Database;
2020
import com.google.cloud.spanner.DatabaseAdminClient;
2121
import com.google.cloud.spanner.DatabaseId;
22+
import com.google.cloud.spanner.DatabaseNotFoundException;
2223
import com.google.cloud.spanner.Dialect;
2324
import com.google.cloud.spanner.Mutation;
2425
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
@@ -29,6 +30,9 @@
2930
import com.google.cloud.teleport.spanner.ddl.Sequence;
3031
import com.google.cloud.teleport.spanner.ddl.Table;
3132
import com.google.cloud.teleport.spanner.ddl.Udf;
33+
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
34+
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
35+
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
3236
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
3337
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
3438
import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest;
@@ -54,8 +58,10 @@
5458
import java.nio.channels.Channels;
5559
import java.nio.file.Path;
5660
import java.nio.file.Paths;
61+
import java.security.GeneralSecurityException;
5762
import java.util.ArrayList;
5863
import java.util.Arrays;
64+
import java.util.Collections;
5965
import java.util.List;
6066
import java.util.Set;
6167
import java.util.concurrent.ExecutionException;
@@ -72,6 +78,7 @@
7278
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
7379
import org.apache.beam.sdk.io.fs.MatchResult;
7480
import org.apache.beam.sdk.io.fs.ResourceId;
81+
import org.apache.beam.sdk.options.PipelineOptions;
7582
import org.apache.beam.sdk.options.ValueProvider;
7683
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
7784
import org.apache.beam.sdk.transforms.Combine;
@@ -93,6 +100,7 @@
93100
import org.apache.beam.sdk.values.TupleTag;
94101
import org.apache.beam.sdk.values.TupleTagList;
95102
import org.apache.beam.sdk.values.TypeDescriptor;
103+
import org.checkerframework.checker.nullness.qual.Nullable;
96104
import org.joda.time.Duration;
97105
import org.slf4j.Logger;
98106
import org.slf4j.LoggerFactory;
@@ -139,6 +147,14 @@ public ImportTransform(
139147
this.earlyIndexCreateThreshold = earlyIndexCreateThreshold;
140148
}
141149

150+
// @Override
151+
// public void validate(@Nullable PipelineOptions options) {
152+
// String instanceId = this.spannerConfig.getInstanceId().get();
153+
// String databaseId = this.spannerConfig.getDatabaseId().get();
154+
//
155+
// validateRequiredPermissions(instanceId, databaseId);
156+
// }
157+
142158
@Override
143159
public PDone expand(PBegin begin) {
144160
PCollectionView<Dialect> dialectView =
@@ -672,7 +688,7 @@ public void processElement(ProcessContext c) {
672688
"Create index early: {}",
673689
String.join(";", createIndexStatements));
674690
ddlStatements.addAll(createIndexStatements);
675-
c.output(pendingIndexesTag, new ArrayList<String>());
691+
c.output(pendingIndexesTag, new ArrayList<>());
676692
} else {
677693
LOG.info(
678694
"Pending index creation: {}",
@@ -953,4 +969,42 @@ private void validateLocalFiles(ProcessContext c, String table, TableManifest ma
953969
}
954970
}
955971
}
972+
973+
private static void validateRequiredPermissions(String instanceId, String databaseId) {
974+
if (instanceId == null) {
975+
return;
976+
}
977+
IAMCheckResult iamCheckResult = new IAMCheckResult("", Collections.emptyList());
978+
try {
979+
IAMPermissionsChecker iamPermissionsChecker =
980+
new IAMPermissionsChecker("span-cloud-migrations-testing");
981+
try {
982+
983+
iamCheckResult =
984+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
985+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
986+
instanceId,
987+
databaseId);
988+
} catch (DatabaseNotFoundException e) {
989+
iamCheckResult =
990+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
991+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
992+
instanceId,
993+
databaseId);
994+
}
995+
if (iamCheckResult.isPermissionsAvailable()) {
996+
return;
997+
}
998+
String errorString =
999+
"For resource: "
1000+
+ iamCheckResult.getResourceName()
1001+
+ ", missing permissions: "
1002+
+ iamCheckResult.getMissingPermissions()
1003+
+ ";";
1004+
throw new RuntimeException(errorString);
1005+
} catch (GeneralSecurityException | IOException e) {
1006+
LOG.error("Error while validating permissions for spanner", e);
1007+
throw new RuntimeException(e);
1008+
}
1009+
}
9561010
}

v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public List<String> getMissingPermissions() {
3636
return new ArrayList<>(missingPermissions);
3737
}
3838

39-
public boolean isSuccess() {
39+
public boolean isPermissionsAvailable() {
4040
return missingPermissions.isEmpty();
4141
}
4242

@@ -49,7 +49,7 @@ public String toString() {
4949
+ ", missingPermissions="
5050
+ missingPermissions
5151
+ ", success="
52-
+ isSuccess()
52+
+ isPermissionsAvailable()
5353
+ '}';
5454
}
5555
}

0 commit comments

Comments
 (0)