Skip to content

Commit f8bf616

Browse files
chore: Fixed constructor argument.
1 parent 3bc834f commit f8bf616

File tree

9 files changed

+263
-236
lines changed

9 files changed

+263
-236
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,21 @@
2626
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
2727
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
2828
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3029
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31-
import java.util.Collections;
30+
import java.io.IOException;
31+
import java.security.GeneralSecurityException;
3232
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3333
import org.apache.beam.sdk.Pipeline;
3434
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3635
import org.apache.beam.sdk.options.Default;
3736
import org.apache.beam.sdk.options.Description;
3837
import org.apache.beam.sdk.options.PipelineOptions;
3938
import org.apache.beam.sdk.options.PipelineOptionsFactory;
4039
import org.apache.beam.sdk.options.ValueProvider;
4140
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
4241
import org.apache.beam.sdk.transforms.SerializableFunction;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4344

4445
/**
4546
* Dataflow template that exports a Cloud Spanner database to Avro files in GCS.
@@ -78,6 +79,7 @@
7879
"In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
7980
})
8081
public class ExportPipeline {
82+
private static final Logger LOG = LoggerFactory.getLogger(ExportPipeline.class);
8183

8284
/** Options for Export pipeline. */
8385
public interface ExportPipelineOptions extends PipelineOptions {
@@ -282,7 +284,7 @@ public static void main(String[] args) {
282284
options.getShouldExportRelatedTables(),
283285
options.getShouldExportTimestampAsLogicalType(),
284286
options.getAvroTempDirectory()));
285-
validateRequiredPermissions(options);
287+
validateRequiredPermissions(spannerConfig);
286288
PipelineResult result = p.run();
287289
if (options.getWaitUntilFinish()
288290
&&
@@ -294,25 +296,36 @@ public static void main(String[] args) {
294296
}
295297
}
296298

297-
private static void validateRequiredPermissions(ExportPipelineOptions options) {
298-
IAMResourceRequirements spannerRequirements =
299-
IAMRequirementsCreator.createSpannerResourceRequirement();
300-
301-
GcpOptions gcpOptions = options.as(GcpOptions.class);
302-
303-
IAMPermissionsChecker iamPermissionsChecker =
304-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
305-
IAMCheckResult missingPermission =
306-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
307-
if (missingPermission.isSuccess()) {
299+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
300+
if (!spannerConfig.getInstanceId().isAccessible()) {
301+
// If instance id is not accessible, then the current context is not runtime. Hence,
302+
// validation is not applicable.
308303
return;
309304
}
310-
String errorString =
311-
"For resource: "
312-
+ missingPermission.getResourceName()
313-
+ ", missing permissions: "
314-
+ missingPermission.getMissingPermissions()
315-
+ ";";
316-
throw new RuntimeException(errorString);
305+
String instanceId = spannerConfig.getInstanceId().get();
306+
String databaseId = spannerConfig.getDatabaseId().get();
307+
IAMCheckResult iamCheckResult;
308+
try {
309+
IAMPermissionsChecker iamPermissionsChecker =
310+
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
311+
iamCheckResult =
312+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
313+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
314+
instanceId,
315+
databaseId);
316+
if (iamCheckResult.isPermissionsAvailable()) {
317+
return;
318+
}
319+
String errorString =
320+
"For resource: "
321+
+ iamCheckResult.getResourceName()
322+
+ ", missing permissions: "
323+
+ iamCheckResult.getMissingPermissions()
324+
+ ";";
325+
throw new RuntimeException(errorString);
326+
} catch (GeneralSecurityException | IOException e) {
327+
LOG.error("Error while validating permissions for spanner", e);
328+
throw new RuntimeException(e);
329+
}
317330
}
318331
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.teleport.spanner;
1717

18+
import com.google.cloud.spanner.DatabaseNotFoundException;
1819
import com.google.cloud.spanner.Options.RpcPriority;
1920
import com.google.cloud.spanner.SpannerOptions;
2021
import com.google.cloud.teleport.metadata.Template;
@@ -26,20 +27,21 @@
2627
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
2728
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
2829
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
29-
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
3030
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
31-
import java.util.Collections;
31+
import java.io.IOException;
32+
import java.security.GeneralSecurityException;
3233
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3334
import org.apache.beam.sdk.Pipeline;
3435
import org.apache.beam.sdk.PipelineResult;
35-
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3636
import org.apache.beam.sdk.options.Default;
3737
import org.apache.beam.sdk.options.Description;
3838
import org.apache.beam.sdk.options.PipelineOptions;
3939
import org.apache.beam.sdk.options.PipelineOptionsFactory;
4040
import org.apache.beam.sdk.options.ValueProvider;
4141
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
4242
import org.apache.beam.sdk.transforms.SerializableFunction;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4345

4446
/**
4547
* Avro to Cloud Spanner Import pipeline.
@@ -65,6 +67,7 @@
6567
"The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
6668
})
6769
public class ImportPipeline {
70+
private static final Logger LOG = LoggerFactory.getLogger(ImportPipeline.class);
6871

6972
/** Options for {@link ImportPipeline}. */
7073
public interface Options extends PipelineOptions {
@@ -258,7 +261,6 @@ public static void main(String[] args) {
258261
options.getDdlCreationTimeoutInMinutes(),
259262
options.getEarlyIndexCreateThreshold()));
260263

261-
validateRequiredPermissions(options);
262264
PipelineResult result = p.run();
263265

264266
if (options.getWaitUntilFinish()
@@ -271,24 +273,45 @@ public static void main(String[] args) {
271273
}
272274
}
273275

274-
private static void validateRequiredPermissions(Options options) {
275-
IAMResourceRequirements spannerRequirements =
276-
IAMRequirementsCreator.createSpannerResourceRequirement();
277-
GcpOptions gcpOptions = options.as(GcpOptions.class);
278-
279-
IAMPermissionsChecker iamPermissionsChecker =
280-
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
281-
IAMCheckResult missingPermission =
282-
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
283-
if (missingPermission.isSuccess()) {
276+
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
277+
if (!spannerConfig.getInstanceId().isAccessible()) {
278+
// If instance id is not accessible, then the current context is not runtime. Hence,
279+
// validation is not applicable.
284280
return;
285281
}
286-
String errorString =
287-
"For resource: "
288-
+ missingPermission.getResourceName()
289-
+ ", missing permissions: "
290-
+ missingPermission.getMissingPermissions()
291-
+ ";";
292-
throw new RuntimeException(errorString);
282+
String instanceId = spannerConfig.getInstanceId().get();
283+
String databaseId = spannerConfig.getDatabaseId().get();
284+
IAMCheckResult iamCheckResult;
285+
try {
286+
IAMPermissionsChecker iamPermissionsChecker =
287+
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
288+
try {
289+
290+
iamCheckResult =
291+
iamPermissionsChecker.checkSpannerDatabaseRequirements(
292+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
293+
instanceId,
294+
databaseId);
295+
} catch (DatabaseNotFoundException e) {
296+
// If DatabaseNotFoundException exception is thrown, then validation at instance level need
297+
// to be performed.
298+
iamCheckResult =
299+
iamPermissionsChecker.checkSpannerInstanceRequirements(
300+
IAMRequirementsCreator.createSpannerWriteResourceRequirement(), instanceId);
301+
}
302+
if (iamCheckResult.isPermissionsAvailable()) {
303+
return;
304+
}
305+
String errorString =
306+
"For resource: "
307+
+ iamCheckResult.getResourceName()
308+
+ ", missing permissions: "
309+
+ iamCheckResult.getMissingPermissions()
310+
+ ";";
311+
throw new RuntimeException(errorString);
312+
} catch (GeneralSecurityException | IOException e) {
313+
LOG.error("Error while validating permissions for spanner", e);
314+
throw new RuntimeException(e);
315+
}
293316
}
294317
}

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,23 @@ public ImportTransform(
141141

142142
@Override
143143
public PDone expand(PBegin begin) {
144+
145+
PCollection<Void> validationSignal =
146+
begin
147+
.apply("Trigger Validation", Create.of((Void) null))
148+
.apply(
149+
"Validate Config",
150+
ParDo.of(
151+
new DoFn<Void, Void>() {
152+
@ProcessElement
153+
public void processElement(ProcessContext c) {
154+
throw new RuntimeException("Catch 2222");
155+
}
156+
}));
144157
PCollectionView<Dialect> dialectView =
145158
begin
146159
.apply("Read Dialect", new ReadDialect(spannerConfig))
160+
.apply("Validate", Wait.on(validationSignal))
147161
.apply("Dialect As PCollectionView", View.asSingleton());
148162

149163
PCollection<Export> manifest =

v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public void teardown() throws Exception {
6161

6262
@ProcessElement
6363
public void processElement(ProcessContext c) {
64+
6465
DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
6566
Dialect dialect = databaseClient.getDialect();
6667
c.output(dialect);

v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public List<String> getMissingPermissions() {
3636
return new ArrayList<>(missingPermissions);
3737
}
3838

39-
public boolean isSuccess() {
39+
public boolean isPermissionsAvailable() {
4040
return missingPermissions.isEmpty();
4141
}
4242

@@ -49,7 +49,7 @@ public String toString() {
4949
+ ", missingPermissions="
5050
+ missingPermissions
5151
+ ", success="
52-
+ isSuccess()
52+
+ isPermissionsAvailable()
5353
+ '}';
5454
}
5555
}

0 commit comments

Comments
 (0)