diff --git a/v1/pom.xml b/v1/pom.xml index 708e1df2d7..f04cae15fd 100644 --- a/v1/pom.xml +++ b/v1/pom.xml @@ -558,6 +558,11 @@ protobuf-java 4.33.2 + + com.google.apis + google-api-services-cloudresourcemanager + v3-rev20251103-2.0.0 + diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java index 28323b23d9..bf6326b2b0 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java @@ -30,6 +30,9 @@ import com.google.cloud.teleport.spanner.ddl.Sequence; import com.google.cloud.teleport.spanner.ddl.Table; import com.google.cloud.teleport.spanner.ddl.Udf; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; import com.google.cloud.teleport.spanner.proto.ExportProtos; import com.google.cloud.teleport.spanner.proto.ExportProtos.Export; import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect; @@ -53,6 +56,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -86,19 +90,8 @@ import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Contextful; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Requirements; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; @@ -184,6 +177,19 @@ public ExportTransform( public WriteFilesResult expand(PBegin begin) { Pipeline p = begin.getPipeline(); + PCollection validationSignal = + begin + .apply("Trigger Validation", Create.of((Void) null)) + .apply( + "Validate Config", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + validateRequiredPermissions(spannerConfig); + } + })); + /* * Allow users to specify read timestamp. * CreateTransaction and CreateTransactionFn classes in SpannerIO @@ -198,6 +204,7 @@ public WriteFilesResult expand(PBegin begin) { .apply( "Create transaction", ParDo.of(new CreateTransactionFnWithTimestamp(spannerConfig, snapshotTime))) + .apply("Validate", Wait.on(validationSignal)) .apply("Tx As PCollectionView", View.asSingleton()); PCollectionView dialectView = @@ -1120,4 +1127,33 @@ private TableManifest buildGcsManifest(ProcessContext c, Iterable files return result.build(); } } + + + private static void validateRequiredPermissions(SpannerConfig spannerConfig) { + String instanceId = spannerConfig.getInstanceId().get(); + String databaseId = spannerConfig.getDatabaseId().get(); + IAMCheckResult iamCheckResult; + try { + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(spannerConfig.getProjectId().get()); + iamCheckResult = + iamPermissionsChecker.checkSpannerDatabaseRequirements( + IAMRequirementsCreator.createSpannerWriteResourceRequirement(), + instanceId, + databaseId); + if (iamCheckResult.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + iamCheckResult.getResourceName() + + ", missing permissions: " + + iamCheckResult.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner", e); + throw new RuntimeException(e); + } + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java index af08abc3fe..e64ff13dd4 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java @@ -19,6 +19,7 @@ import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.DatabaseNotFoundException; import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.Mutation; import com.google.cloud.teleport.spanner.ddl.ChangeStream; @@ -29,6 +30,9 @@ import com.google.cloud.teleport.spanner.ddl.Sequence; import com.google.cloud.teleport.spanner.ddl.Table; import com.google.cloud.teleport.spanner.ddl.Udf; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; import com.google.cloud.teleport.spanner.proto.ExportProtos.Export; import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect; import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest; @@ -54,6 +58,7 @@ import java.nio.channels.Channels; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -141,9 +146,23 @@ public ImportTransform( @Override public PDone expand(PBegin begin) { + + PCollection validationSignal = + begin + .apply("Trigger Validation", Create.of((Void) null)) + .apply( + "Validate Config", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + validateRequiredPermissions(spannerConfig); + } + })); PCollectionView dialectView = begin .apply("Read Dialect", new ReadDialect(spannerConfig)) + .apply("Validate", Wait.on(validationSignal)) .apply("Dialect As PCollectionView", View.asSingleton()); PCollection manifest = @@ -953,4 +972,36 @@ private void validateLocalFiles(ProcessContext c, String table, TableManifest ma } } } + + /** + * @throws DatabaseNotFoundException if database is not found. + */ + private static void validateRequiredPermissions(SpannerConfig spannerConfig) { + String instanceId = spannerConfig.getInstanceId().get(); + String databaseId = spannerConfig.getDatabaseId().get(); + IAMCheckResult iamCheckResult; + try { + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(spannerConfig.getProjectId().get()); + + iamCheckResult = + iamPermissionsChecker.checkSpannerDatabaseRequirements( + IAMRequirementsCreator.createSpannerWriteResourceRequirement(), + instanceId, + databaseId); + if (iamCheckResult.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + iamCheckResult.getResourceName() + + ", missing permissions: " + + iamCheckResult.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner", e); + throw new RuntimeException(e); + } + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java new file mode 100644 index 0000000000..509c3904f1 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import java.util.ArrayList; +import java.util.List; + +/** Represents the result of an IAM permission check on a specific resource. */ +public class IAMCheckResult { + private final String resourceName; + private final List missingPermissions; + + public IAMCheckResult(String resourceName, List missingPermissions) { + this.resourceName = resourceName; + this.missingPermissions = new ArrayList<>(missingPermissions); + } + + public String getResourceName() { + return resourceName; + } + + public List getMissingPermissions() { + return new ArrayList<>(missingPermissions); + } + + public boolean isPermissionsAvailable() { + return missingPermissions.isEmpty(); + } + + @Override + public String toString() { + return "IAMCheckResult{" + + "resourceName='" + + resourceName + + '\'' + + ", missingPermissions=" + + missingPermissions + + ", success=" + + isPermissionsAvailable() + + '}'; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java new file mode 100644 index 0000000000..62845d1023 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.cloud.spanner.DatabaseNotFoundException; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility to check IAM permissions for various GCP resources. */ +public class IAMPermissionsChecker { + private static final Logger LOG = LoggerFactory.getLogger(IAMPermissionsChecker.class); + private final String projectId; + + private final Spanner spanner; + private static final String INSTANCE_STRING = "projects/%s/instances/%s"; + private static final String DATABASE_STRING = "projects/%s/instances/%s/database/%s"; + + public IAMPermissionsChecker(String projectId) throws GeneralSecurityException, IOException { + SpannerOptions options = SpannerOptions.newBuilder().setProjectId(projectId).build(); + + this.spanner = options.getService(); + this.projectId = projectId; + } + + @VisibleForTesting + IAMPermissionsChecker(Spanner spanner, String projectId) { + this.projectId = projectId; + this.spanner = spanner; + } + + /** + * Checks IAM permissions for a list of requirements. This api should be called once with all the + * requirements. + * + * @return List of results, only missing permissions are included. Empty list indicate all the + * requirements are met. + */ + public IAMCheckResult checkSpannerInstanceRequirements( + List permissionList, String instanceId) { + + Iterable grantedPermissions = + spanner.getInstanceAdminClient().getInstance(instanceId).testIAMPermissions(permissionList); + + return new IAMCheckResult( + String.format(INSTANCE_STRING, projectId, instanceId), + fetchMissingPermission(permissionList, grantedPermissions)); + } + + /** + * Checks IAM permissions for a list of requirements. This api should be called once with all the + * requirements. + * + * @return List of results, only missing permissions are included. Empty list indicate all the + * requirements are met. + * @throws DatabaseNotFoundException when database is not found. + */ + public IAMCheckResult checkSpannerDatabaseRequirements( + List permissionList, String instanceId, String databaseId) + throws DatabaseNotFoundException { + + Iterable grantedPermissions = + spanner + .getDatabaseAdminClient() + .getDatabase(instanceId, databaseId) + .testIAMPermissions(permissionList); + return new IAMCheckResult( + String.format(DATABASE_STRING, projectId, instanceId, databaseId), + fetchMissingPermission(permissionList, grantedPermissions)); + } + + private List fetchMissingPermission( + List requiredPermission, Iterable grantedPermissions) { + + HashSet grantedPermissionsSet = + StreamSupport.stream(grantedPermissions.spliterator(), false) + .collect(Collectors.toCollection(HashSet::new)); + + return requiredPermission.stream().filter(p -> !grantedPermissionsSet.contains(p)).toList(); + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java new file mode 100644 index 0000000000..a4ff15dcd8 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.common.collect.ImmutableList; +import java.util.List; + +/** + * The goal of this class contain the list of permissions required by various templates. This will + * help in maintaining centralised lists for all templates. + */ +public class IAMRequirementsCreator { + /** Default permissions required by templates. */ + private static final List READ_SPANNER_PERMISSIONS = + ImmutableList.of( + "spanner.databases.beginReadOnlyTransaction", + "spanner.databases.get", + "spanner.databases.getDdl", + "spanner.databases.partitionQuery", + "spanner.databases.partitionRead", + "spanner.databases.read", + "spanner.databases.select"); + + private static final List WRITE_SPANNER_PERMISSIONS = + ImmutableList.of( + "spanner.databases.beginOrRollbackReadWriteTransaction", + "spanner.databases.beginPartitionedDmlTransaction", + "spanner.databases.beginReadOnlyTransaction", + "spanner.databases.getDdl", + "spanner.databases.partitionQuery", + "spanner.databases.partitionRead", + "spanner.databases.read", + "spanner.databases.select", + "spanner.databases.update", + "spanner.databases.updateDdl", + "spanner.databases.write"); + + public static List createSpannerWriteResourceRequirement() { + return WRITE_SPANNER_PERMISSIONS; + } + + public static List createSpannerReadResourceRequirement() { + return READ_SPANNER_PERMISSIONS; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java new file mode 100644 index 0000000000..e65e4143f0 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** IAM validatory Utility classes for templates. */ +package com.google.cloud.teleport.spanner.iam; diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java new file mode 100644 index 0000000000..006de5358a --- /dev/null +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseNotFoundException; +import com.google.cloud.spanner.Instance; +import com.google.cloud.spanner.InstanceAdminClient; +import com.google.cloud.spanner.Spanner; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for IAMPermissionsChecker. */ +@RunWith(JUnit4.class) +public class IAMPermissionsCheckerTest { + + private static final String PROJECT_ID = "test-project"; + private static final String INSTANCE_ID = "test-instance"; + private static final String DATABASE_ID = "test-database"; + + @Mock private Spanner spanner; + @Mock private InstanceAdminClient instanceAdminClient; + @Mock private DatabaseAdminClient databaseAdminClient; + @Mock private Instance instance; + @Mock private Database database; + + private IAMPermissionsChecker checker; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + checker = new IAMPermissionsChecker(spanner, PROJECT_ID); + + when(spanner.getInstanceAdminClient()).thenReturn(instanceAdminClient); + when(spanner.getDatabaseAdminClient()).thenReturn(databaseAdminClient); + when(instanceAdminClient.getInstance(INSTANCE_ID)).thenReturn(instance); + when(databaseAdminClient.getDatabase(INSTANCE_ID, DATABASE_ID)).thenReturn(database); + } + + @Test + public void testCheckSpannerInstanceRequirements_allPermissionsGranted() { + List requiredPermissions = + Arrays.asList("spanner.instances.get", "spanner.instances.update"); + when(instance.testIAMPermissions(requiredPermissions)).thenReturn(requiredPermissions); + + IAMCheckResult result = + checker.checkSpannerInstanceRequirements(requiredPermissions, INSTANCE_ID); + + assertTrue(result.getMissingPermissions().isEmpty()); + assertEquals( + String.format("projects/%s/instances/%s", PROJECT_ID, INSTANCE_ID), + result.getResourceName()); + } + + @Test + public void testCheckSpannerInstanceRequirements_somePermissionsMissing() { + List requiredPermissions = + Arrays.asList("spanner.instances.get", "spanner.instances.update"); + List grantedPermissions = Collections.singletonList("spanner.instances.get"); + when(instance.testIAMPermissions(requiredPermissions)).thenReturn(grantedPermissions); + + IAMCheckResult result = + checker.checkSpannerInstanceRequirements(requiredPermissions, INSTANCE_ID); + + assertEquals( + Collections.singletonList("spanner.instances.update"), result.getMissingPermissions()); + assertEquals( + String.format("projects/%s/instances/%s", PROJECT_ID, INSTANCE_ID), + result.getResourceName()); + } + + @Test + public void testCheckSpannerDatabaseRequirements_allPermissionsGranted() { + List requiredPermissions = + Arrays.asList("spanner.databases.get", "spanner.databases.update"); + when(database.testIAMPermissions(requiredPermissions)).thenReturn(requiredPermissions); + + IAMCheckResult result = + checker.checkSpannerDatabaseRequirements(requiredPermissions, INSTANCE_ID, DATABASE_ID); + + assertTrue(result.getMissingPermissions().isEmpty()); + assertEquals( + String.format("projects/%s/instances/%s/database/%s", PROJECT_ID, INSTANCE_ID, DATABASE_ID), + result.getResourceName()); + } + + @Test + public void testCheckSpannerDatabaseRequirements_somePermissionsMissing() { + List requiredPermissions = + Arrays.asList("spanner.databases.get", "spanner.databases.update"); + List grantedPermissions = Collections.singletonList("spanner.databases.get"); + when(database.testIAMPermissions(requiredPermissions)).thenReturn(grantedPermissions); + + IAMCheckResult result = + checker.checkSpannerDatabaseRequirements(requiredPermissions, INSTANCE_ID, DATABASE_ID); + + assertEquals( + Collections.singletonList("spanner.databases.update"), result.getMissingPermissions()); + assertEquals( + String.format("projects/%s/instances/%s/database/%s", PROJECT_ID, INSTANCE_ID, DATABASE_ID), + result.getResourceName()); + } + + @Test(expected = DatabaseNotFoundException.class) + public void testCheckSpannerDatabaseRequirements_databaseNotFound() { + when(databaseAdminClient.getDatabase(INSTANCE_ID, DATABASE_ID)) + .thenThrow(DatabaseNotFoundException.class); + checker.checkSpannerDatabaseRequirements( + Collections.singletonList("spanner.databases.get"), INSTANCE_ID, DATABASE_ID); + } +}