1515 */
1616package com .google .cloud .teleport .spanner ;
1717
18+ import com .google .cloud .spanner .DatabaseNotFoundException ;
1819import com .google .cloud .spanner .Options .RpcPriority ;
1920import com .google .cloud .spanner .SpannerOptions ;
2021import com .google .cloud .teleport .metadata .Template ;
2627import com .google .cloud .teleport .spanner .iam .IAMCheckResult ;
2728import com .google .cloud .teleport .spanner .iam .IAMPermissionsChecker ;
2829import com .google .cloud .teleport .spanner .iam .IAMRequirementsCreator ;
29- import com .google .cloud .teleport .spanner .iam .IAMResourceRequirements ;
3030import com .google .cloud .teleport .spanner .spannerio .SpannerConfig ;
31- import java .util .Collections ;
31+ import java .io .IOException ;
32+ import java .security .GeneralSecurityException ;
3233import org .apache .beam .runners .dataflow .options .DataflowPipelineOptions ;
3334import org .apache .beam .sdk .Pipeline ;
3435import org .apache .beam .sdk .PipelineResult ;
35- import org .apache .beam .sdk .extensions .gcp .options .GcpOptions ;
3636import org .apache .beam .sdk .options .Default ;
3737import org .apache .beam .sdk .options .Description ;
3838import org .apache .beam .sdk .options .PipelineOptions ;
3939import org .apache .beam .sdk .options .PipelineOptionsFactory ;
4040import org .apache .beam .sdk .options .ValueProvider ;
4141import org .apache .beam .sdk .options .ValueProvider .NestedValueProvider ;
4242import org .apache .beam .sdk .transforms .SerializableFunction ;
43+ import org .slf4j .Logger ;
44+ import org .slf4j .LoggerFactory ;
4345
4446/**
4547 * Avro to Cloud Spanner Import pipeline.
6567 "The input Cloud Storage path must exist, and it must include a <a href=\" https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\" >spanner-export.json</a> file that contains a JSON description of files to import."
6668 })
6769public class ImportPipeline {
70+ private static final Logger LOG = LoggerFactory .getLogger (ImportPipeline .class );
6871
6972 /** Options for {@link ImportPipeline}. */
7073 public interface Options extends PipelineOptions {
@@ -258,7 +261,8 @@ public static void main(String[] args) {
258261 options .getDdlCreationTimeoutInMinutes (),
259262 options .getEarlyIndexCreateThreshold ()));
260263
261- validateRequiredPermissions (options );
264+ validateRequiredPermissions (spannerConfig );
265+
262266 PipelineResult result = p .run ();
263267
264268 if (options .getWaitUntilFinish ()
@@ -271,24 +275,45 @@ public static void main(String[] args) {
271275 }
272276 }
273277
274- private static void validateRequiredPermissions (Options options ) {
275- IAMResourceRequirements spannerRequirements =
276- IAMRequirementsCreator .createSpannerResourceRequirement ();
277- GcpOptions gcpOptions = options .as (GcpOptions .class );
278-
279- IAMPermissionsChecker iamPermissionsChecker =
280- new IAMPermissionsChecker (gcpOptions .getProject (), gcpOptions );
281- IAMCheckResult missingPermission =
282- iamPermissionsChecker .check (Collections .singletonList (spannerRequirements ));
283- if (missingPermission .isSuccess ()) {
278+ private static void validateRequiredPermissions (SpannerConfig spannerConfig ) {
279+ if (!spannerConfig .getInstanceId ().isAccessible ()) {
280+ // If instance id is not accessible, then the current context is not runtime. Hence,
281+ // validation is not applicable.
284282 return ;
285283 }
286- String errorString =
287- "For resource: "
288- + missingPermission .getResourceName ()
289- + ", missing permissions: "
290- + missingPermission .getMissingPermissions ()
291- + ";" ;
292- throw new RuntimeException (errorString );
284+ String instanceId = spannerConfig .getInstanceId ().get ();
285+ String databaseId = spannerConfig .getDatabaseId ().get ();
286+ IAMCheckResult iamCheckResult ;
287+ try {
288+ IAMPermissionsChecker iamPermissionsChecker =
289+ new IAMPermissionsChecker (spannerConfig .getProjectId ().get ());
290+ try {
291+
292+ iamCheckResult =
293+ iamPermissionsChecker .checkSpannerDatabaseRequirements (
294+ IAMRequirementsCreator .createSpannerWriteResourceRequirement (),
295+ instanceId ,
296+ databaseId );
297+ } catch (DatabaseNotFoundException e ) {
298+ // If DatabaseNotFoundException exception is thrown, then validation at instance level need
299+ // to be performed.
300+ iamCheckResult =
301+ iamPermissionsChecker .checkSpannerInstanceRequirements (
302+ IAMRequirementsCreator .createSpannerWriteResourceRequirement (), instanceId );
303+ }
304+ if (iamCheckResult .isPermissionsAvailable ()) {
305+ return ;
306+ }
307+ String errorString =
308+ "For resource: "
309+ + iamCheckResult .getResourceName ()
310+ + ", missing permissions: "
311+ + iamCheckResult .getMissingPermissions ()
312+ + ";" ;
313+ throw new RuntimeException (errorString );
314+ } catch (GeneralSecurityException | IOException e ) {
315+ LOG .error ("Error while validating permissions for spanner" , e );
316+ throw new RuntimeException (e );
317+ }
293318 }
294319}
0 commit comments