1616
1717package io .cdap .plugin .aws .s3 .source ;
1818
19+ import com .amazonaws .auth .AWSCredentials ;
20+ import com .amazonaws .auth .AWSStaticCredentialsProvider ;
21+ import com .amazonaws .auth .BasicAWSCredentials ;
22+ import com .amazonaws .auth .BasicSessionCredentials ;
23+ import com .amazonaws .regions .Regions ;
24+ import com .amazonaws .services .s3 .AmazonS3 ;
25+ import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
26+ import com .amazonaws .services .s3 .model .AmazonS3Exception ;
1927import com .google .common .base .Strings ;
2028import com .google .gson .Gson ;
2129import com .google .gson .reflect .TypeToken ;
@@ -183,15 +191,12 @@ public void validate(FailureCollector collector) {
183191 collector .addFailure ("Connection credentials is not provided" , "Please provide valid credentials" );
184192 } else {
185193 connection .validate (collector );
194+ validatePath (collector );
195+ collector .getOrThrowException ();
196+ validateCredentials (collector , path );
186197 }
187198 }
188- if (!containsMacro ("path" ) && (!path .startsWith ("s3a://" ) && !path .startsWith ("s3n://" ))) {
189- collector .addFailure ("Path must start with s3a:// or s3n://." , null ).withConfigProperty (NAME_PATH );
190- }
191- if (!containsMacro ("path" ) && path .startsWith ("s3n://" ) && !Strings .isNullOrEmpty (connection .getSessionToken ())) {
192- collector .addFailure ("Temporary credentials are only supported for s3a:// paths." , null )
193- .withConfigProperty (NAME_PATH );
194- }
199+
195200 if (!containsMacro (NAME_FILE_SYSTEM_PROPERTIES )) {
196201 try {
197202 getFilesystemProperties ();
@@ -202,6 +207,45 @@ public void validate(FailureCollector collector) {
202207 }
203208 }
204209
210+ void validatePath (FailureCollector collector ) {
211+ if (!containsMacro ("path" ) && (!path .startsWith ("s3a://" ) && !path .startsWith ("s3n://" ))) {
212+ collector .addFailure ("Path must start with s3a:// or s3n://." , null ).withConfigProperty (NAME_PATH );
213+ }
214+ if (!containsMacro ("path" ) && path .startsWith ("s3n://" ) && !Strings .isNullOrEmpty (connection .getSessionToken ())) {
215+ collector .addFailure ("Temporary credentials are only supported for s3a:// paths." , null )
216+ .withConfigProperty (NAME_PATH );
217+ }
218+ }
219+
220+ public void validateCredentials (FailureCollector collector , String path ) {
221+ AmazonS3 s3 = getS3Client (Regions .DEFAULT_REGION .getName ());
222+ try {
223+ String bucketRegion = s3 .getBucketLocation (S3Path .from (path ).getBucket ());
224+ s3 = getS3Client (bucketRegion );
225+ s3 .getBucketAcl (S3Path .from (path ).getBucket ());
226+ } catch (AmazonS3Exception exception ) {
227+ String errorMessage = exception .getErrorMessage ();
228+ collector .addFailure (String .format ("Invalid values: %s." , errorMessage ),
229+ "Please provide valid values." ).withStacktrace (exception .getStackTrace ());
230+ }
231+ }
232+
233+ private AmazonS3 getS3Client (String region ) {
234+ AmazonS3ClientBuilder builder = AmazonS3ClientBuilder .standard ();
235+ if (region != null ) {
236+ builder .setRegion (region );
237+ }
238+ AWSCredentials creds ;
239+ if (connection .getSessionToken () == null ) {
240+ creds = new BasicAWSCredentials (connection .getAccessID (), connection .getAccessKey ());
241+ } else {
242+ creds = new BasicSessionCredentials (connection .getAccessID (), connection .getAccessKey (),
243+ connection .getSessionToken ());
244+ }
245+ return builder .withCredentials (new AWSStaticCredentialsProvider (creds ))
246+ .build ();
247+ }
248+
205249 @ Override
206250 public String getPath () {
207251 return path .startsWith (S3Path .OLD_SCHEME ) ? S3Path .SCHEME + path .substring (S3Path .OLD_SCHEME .length ()) : path ;
0 commit comments