Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@
<artifactId>protobuf-java</artifactId>
<version>4.33.2</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-cloudresourcemanager</artifactId>
<version>v3-rev20251103-2.0.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import com.google.cloud.teleport.spanner.ddl.Sequence;
import com.google.cloud.teleport.spanner.ddl.Table;
import com.google.cloud.teleport.spanner.ddl.Udf;
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
import com.google.cloud.teleport.spanner.proto.ExportProtos;
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
Expand All @@ -53,6 +56,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -86,19 +90,8 @@
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
Expand Down Expand Up @@ -184,6 +177,19 @@ public ExportTransform(
public WriteFilesResult<String> expand(PBegin begin) {
Pipeline p = begin.getPipeline();

PCollection<Void> validationSignal =
begin
.apply("Trigger Validation", Create.of((Void) null))
.apply(
"Validate Config",
ParDo.of(
new DoFn<Void, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation will happen after the pipeline starts, which is going to be pretty similar to running the import itself.

validateRequiredPermissions(spannerConfig);
}
}));

/*
* Allow users to specify read timestamp.
* CreateTransaction and CreateTransactionFn classes in SpannerIO
Expand All @@ -198,6 +204,7 @@ public WriteFilesResult<String> expand(PBegin begin) {
.apply(
"Create transaction",
ParDo.of(new CreateTransactionFnWithTimestamp(spannerConfig, snapshotTime)))
.apply("Validate", Wait.on(validationSignal))
.apply("Tx As PCollectionView", View.asSingleton());

PCollectionView<Dialect> dialectView =
Expand Down Expand Up @@ -1120,4 +1127,33 @@ private TableManifest buildGcsManifest(ProcessContext c, Iterable<GcsPath> files
return result.build();
}
}


private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
String instanceId = spannerConfig.getInstanceId().get();
String databaseId = spannerConfig.getDatabaseId().get();
IAMCheckResult iamCheckResult;
try {
IAMPermissionsChecker iamPermissionsChecker =
new IAMPermissionsChecker(spannerConfig.getProjectId().get());
iamCheckResult =
iamPermissionsChecker.checkSpannerDatabaseRequirements(
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
instanceId,
databaseId);
if (iamCheckResult.isPermissionsAvailable()) {
return;
}
String errorString =
"For resource: "
+ iamCheckResult.getResourceName()
+ ", missing permissions: "
+ iamCheckResult.getMissingPermissions()
+ ";";
throw new RuntimeException(errorString);
} catch (GeneralSecurityException | IOException e) {
LOG.error("Error while validating permissions for spanner", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.DatabaseNotFoundException;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
Expand All @@ -29,6 +30,9 @@
import com.google.cloud.teleport.spanner.ddl.Sequence;
import com.google.cloud.teleport.spanner.ddl.Table;
import com.google.cloud.teleport.spanner.ddl.Udf;
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest;
Expand All @@ -54,6 +58,7 @@
import java.nio.channels.Channels;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -141,9 +146,23 @@ public ImportTransform(

@Override
public PDone expand(PBegin begin) {

PCollection<Void> validationSignal =
begin
.apply("Trigger Validation", Create.of((Void) null))
.apply(
"Validate Config",
ParDo.of(
new DoFn<Void, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
validateRequiredPermissions(spannerConfig);
}
}));
PCollectionView<Dialect> dialectView =
begin
.apply("Read Dialect", new ReadDialect(spannerConfig))
.apply("Validate", Wait.on(validationSignal))
.apply("Dialect As PCollectionView", View.asSingleton());

PCollection<Export> manifest =
Expand Down Expand Up @@ -953,4 +972,36 @@ private void validateLocalFiles(ProcessContext c, String table, TableManifest ma
}
}
}

/**
* @throws DatabaseNotFoundException if database is not found.
*/
private static void validateRequiredPermissions(SpannerConfig spannerConfig) {
String instanceId = spannerConfig.getInstanceId().get();
String databaseId = spannerConfig.getDatabaseId().get();
IAMCheckResult iamCheckResult;
try {
IAMPermissionsChecker iamPermissionsChecker =
new IAMPermissionsChecker(spannerConfig.getProjectId().get());

iamCheckResult =
iamPermissionsChecker.checkSpannerDatabaseRequirements(
IAMRequirementsCreator.createSpannerWriteResourceRequirement(),
instanceId,
databaseId);
if (iamCheckResult.isPermissionsAvailable()) {
return;
}
String errorString =
"For resource: "
+ iamCheckResult.getResourceName()
+ ", missing permissions: "
+ iamCheckResult.getMissingPermissions()
+ ";";
throw new RuntimeException(errorString);
} catch (GeneralSecurityException | IOException e) {
LOG.error("Error while validating permissions for spanner", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner.iam;

import java.util.ArrayList;
import java.util.List;

/** Represents the result of an IAM permission check on a specific resource. */
public class IAMCheckResult {
private final String resourceName;
private final List<String> missingPermissions;

public IAMCheckResult(String resourceName, List<String> missingPermissions) {
this.resourceName = resourceName;
this.missingPermissions = new ArrayList<>(missingPermissions);
}

public String getResourceName() {
return resourceName;
}

public List<String> getMissingPermissions() {
return new ArrayList<>(missingPermissions);
}

public boolean isPermissionsAvailable() {
return missingPermissions.isEmpty();
}

@Override
public String toString() {
return "IAMCheckResult{"
+ "resourceName='"
+ resourceName
+ '\''
+ ", missingPermissions="
+ missingPermissions
+ ", success="
+ isPermissionsAvailable()
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner.iam;

import com.google.cloud.spanner.DatabaseNotFoundException;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility to check IAM permissions for various GCP resources. */
public class IAMPermissionsChecker {
private static final Logger LOG = LoggerFactory.getLogger(IAMPermissionsChecker.class);
private final String projectId;

private final Spanner spanner;
private static final String INSTANCE_STRING = "projects/%s/instances/%s";
private static final String DATABASE_STRING = "projects/%s/instances/%s/database/%s";

public IAMPermissionsChecker(String projectId) throws GeneralSecurityException, IOException {
SpannerOptions options = SpannerOptions.newBuilder().setProjectId(projectId).build();

this.spanner = options.getService();
this.projectId = projectId;
}

@VisibleForTesting
IAMPermissionsChecker(Spanner spanner, String projectId) {
this.projectId = projectId;
this.spanner = spanner;
}

/**
* Checks IAM permissions for a list of requirements. This api should be called once with all the
* requirements.
*
* @return List of results, only missing permissions are included. Empty list indicate all the
* requirements are met.
*/
public IAMCheckResult checkSpannerInstanceRequirements(
List<String> permissionList, String instanceId) {

Iterable<String> grantedPermissions =
spanner.getInstanceAdminClient().getInstance(instanceId).testIAMPermissions(permissionList);

return new IAMCheckResult(
String.format(INSTANCE_STRING, projectId, instanceId),
fetchMissingPermission(permissionList, grantedPermissions));
}

/**
* Checks IAM permissions for a list of requirements. This api should be called once with all the
* requirements.
*
* @return List of results, only missing permissions are included. Empty list indicate all the
* requirements are met.
* @throws DatabaseNotFoundException when database is not found.
*/
public IAMCheckResult checkSpannerDatabaseRequirements(
List<String> permissionList, String instanceId, String databaseId)
throws DatabaseNotFoundException {

Iterable<String> grantedPermissions =
spanner
.getDatabaseAdminClient()
.getDatabase(instanceId, databaseId)
.testIAMPermissions(permissionList);
return new IAMCheckResult(
String.format(DATABASE_STRING, projectId, instanceId, databaseId),
fetchMissingPermission(permissionList, grantedPermissions));
}

private List<String> fetchMissingPermission(
List<String> requiredPermission, Iterable<String> grantedPermissions) {

HashSet<String> grantedPermissionsSet =
StreamSupport.stream(grantedPermissions.spliterator(), false)
.collect(Collectors.toCollection(HashSet::new));

return requiredPermission.stream().filter(p -> !grantedPermissionsSet.contains(p)).toList();
}
}
Loading
Loading