diff --git a/pom.xml b/pom.xml index b2c38dc..d96d939 100644 --- a/pom.xml +++ b/pom.xml @@ -253,12 +253,6 @@ com.amazonaws aws-java-sdk-s3 ${aws.sdk.version} - - - com.fasterxml.jackson.core - jackson-databind - - diff --git a/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java b/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java index f3d27da..6c531d7 100644 --- a/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java +++ b/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java @@ -16,6 +16,14 @@ package io.cdap.plugin.aws.s3.source; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -183,15 +191,12 @@ public void validate(FailureCollector collector) { collector.addFailure("Connection credentials is not provided", "Please provide valid credentials"); } else { connection.validate(collector); + validatePath(collector); + collector.getOrThrowException(); + validateCredentials(collector, path); } } - if (!containsMacro("path") && (!path.startsWith("s3a://") && !path.startsWith("s3n://"))) { - collector.addFailure("Path must start with s3a:// or s3n://.", null).withConfigProperty(NAME_PATH); - } - if (!containsMacro("path") && path.startsWith("s3n://") && !Strings.isNullOrEmpty(connection.getSessionToken())) { - collector.addFailure("Temporary credentials are only supported for s3a:// paths.", null) - .withConfigProperty(NAME_PATH); - } + if (!containsMacro(NAME_FILE_SYSTEM_PROPERTIES)) { try { getFilesystemProperties(); @@ -202,6 +207,45 @@ public void validate(FailureCollector collector) { } } + void validatePath(FailureCollector collector) { + if (!containsMacro("path") && (!path.startsWith("s3a://") && !path.startsWith("s3n://"))) { + collector.addFailure("Path must start with s3a:// or s3n://.", null).withConfigProperty(NAME_PATH); + } + if (!containsMacro("path") && path.startsWith("s3n://") && !Strings.isNullOrEmpty(connection.getSessionToken())) { + collector.addFailure("Temporary credentials are only supported for s3a:// paths.", null) + .withConfigProperty(NAME_PATH); + } + } + + public void validateCredentials(FailureCollector collector, String path) { + AmazonS3 s3 = getS3Client(Regions.DEFAULT_REGION.getName()); + try { + String bucketRegion = s3.getBucketLocation(S3Path.from(path).getBucket()); + s3 = getS3Client(bucketRegion); + s3.getBucketAcl(S3Path.from(path).getBucket()); + } catch (AmazonS3Exception exception) { + String errorMessage = exception.getErrorMessage(); + collector.addFailure(String.format("Invalid values: %s.", errorMessage), + "Please provide valid values.").withStacktrace(exception.getStackTrace()); + } + } + + private AmazonS3 getS3Client(String region) { + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + if (region != null) { + builder.setRegion(region); + } + AWSCredentials creds; + if (connection.getSessionToken() == null) { + creds = new BasicAWSCredentials(connection.getAccessID(), connection.getAccessKey()); + } else { + creds = new BasicSessionCredentials(connection.getAccessID(), connection.getAccessKey(), + connection.getSessionToken()); + } + return builder.withCredentials(new AWSStaticCredentialsProvider(creds)) + .build(); + } + @Override public String getPath() { return path.startsWith(S3Path.OLD_SCHEME) ? S3Path.SCHEME + path.substring(S3Path.OLD_SCHEME.length()) : path;