1616
1717package io .cdap .plugin .gcp .gcs .actions ;
1818
19- import com .google .api .pathtemplate .ValidationException ;
2019import com .google .auth .Credentials ;
21- import com .google .auth .oauth2 .ServiceAccountCredentials ;
2220import com .google .cloud .kms .v1 .CryptoKeyName ;
2321import com .google .cloud .storage .Bucket ;
2422import com .google .cloud .storage .Storage ;
25- import com .google .cloud .storage .StorageException ;
26- import com .google .common .annotations .VisibleForTesting ;
27- import com .google .common .base .Strings ;
2823import io .cdap .cdap .api .annotation .Description ;
2924import io .cdap .cdap .api .annotation .Macro ;
3025import io .cdap .cdap .api .annotation .Name ;
3126import io .cdap .cdap .api .annotation .Plugin ;
32- import io .cdap .cdap .etl .api .Arguments ;
27+ import io .cdap .cdap .api .exception .ErrorCategory ;
28+ import io .cdap .cdap .api .exception .ErrorType ;
29+ import io .cdap .cdap .api .exception .ErrorUtils ;
3330import io .cdap .cdap .etl .api .FailureCollector ;
3431import io .cdap .cdap .etl .api .PipelineConfigurer ;
3532import io .cdap .cdap .etl .api .action .Action ;
3633import io .cdap .cdap .etl .api .action .ActionContext ;
3734import io .cdap .plugin .gcp .common .CmekUtils ;
3835import io .cdap .plugin .gcp .common .GCPConfig ;
36+ import io .cdap .plugin .gcp .common .GCPErrorDetailsProviderUtil ;
3937import io .cdap .plugin .gcp .common .GCPUtils ;
4038import io .cdap .plugin .gcp .gcs .GCSPath ;
41- import io .cdap .plugin .gcp .gcs .sink .GCSBatchSink ;
4239import org .apache .hadoop .conf .Configuration ;
4340import org .apache .hadoop .fs .FileSystem ;
4441import org .apache .hadoop .fs .Path ;
@@ -86,9 +83,16 @@ public void run(ActionContext context) throws Exception {
8683 return ;
8784 }
8885 String serviceAccount = config .getServiceAccount ();
89- Credentials credentials = serviceAccount == null ?
90- null : GCPUtils .loadServiceAccountCredentials (serviceAccount , isServiceAccountFilePath );
91-
86+ Credentials credentials = null ;
87+ try {
88+ credentials = serviceAccount == null ? null : GCPUtils .loadServiceAccountCredentials (serviceAccount ,
89+ isServiceAccountFilePath );
90+ } catch (IOException e ) {
91+ String errorReason = "Failed to load service account credentials." ;
92+ collector .addFailure (String .format ("%s %s: %s" , errorReason , e .getClass ().getName (), e .getMessage ()), null )
93+ .withStacktrace (e .getStackTrace ());
94+ collector .getOrThrowException ();
95+ }
9296 Map <String , String > map = GCPUtils .generateGCSAuthProperties (serviceAccount , config .getServiceAccountType ());
9397 map .forEach (configuration ::set );
9498
@@ -125,19 +129,22 @@ public void run(ActionContext context) throws Exception {
125129 Bucket bucket = null ;
126130 try {
127131 bucket = storage .get (gcsPath .getBucket ());
128- } catch (StorageException e ) {
132+ } catch (Exception e ) {
129133 // Add more descriptive error message
130- throw new RuntimeException (
131- String .format ("Unable to access or create bucket %s. " , gcsPath .getBucket ())
132- + "Ensure you entered the correct bucket path and have permissions for it." , e );
134+ String errorReason = String .format ("Unable to access or create bucket %s. " , gcsPath .getBucket ()) +
135+ "Ensure you entered the correct bucket path and have permissions for it." ;
136+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (e , errorReason , ErrorType .UNKNOWN ,
137+ true , GCPUtils .GCS_SUPPORTED_DOC_URL );
133138 }
134139 if (bucket == null ) {
135140 GCPUtils .createBucket (storage , gcsPath .getBucket (), config .location , cmekKeyName );
136141 undoBucket .add (bucketPath );
137142 } else if (gcsPath .equals (bucketPath ) && config .failIfExists ()) {
138143 // if the gcs path is just a bucket, and it exists, fail the pipeline
139144 rollback = true ;
140- throw new Exception (String .format ("Path %s already exists" , gcsPath ));
145+ String errorReason = String .format ("Path %s already exists" , gcsPath );
146+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
147+ errorReason , errorReason , ErrorType .USER , true , null );
141148 }
142149 }
143150
@@ -146,7 +153,9 @@ public void run(ActionContext context) throws Exception {
146153 fs = gcsPath .getFileSystem (configuration );
147154 } catch (IOException e ) {
148155 rollback = true ;
149- throw new Exception ("Unable to get GCS filesystem handler. " + e .getMessage (), e );
156+ String errorReason = "Unable to get GCS filesystem handler." ;
157+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (e , errorReason , ErrorType .UNKNOWN ,
158+ true , GCPUtils .GCS_SUPPORTED_DOC_URL );
150159 }
151160 if (!fs .exists (gcsPath )) {
152161 try {
@@ -156,12 +165,16 @@ public void run(ActionContext context) throws Exception {
156165 } catch (IOException e ) {
157166 LOG .warn (String .format ("Failed to create path '%s'" , gcsPath ));
158167 rollback = true ;
159- throw e ;
168+ String errorReason = String .format ("Failed to create path %s." , gcsPath );
169+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (e , errorReason ,
170+ ErrorType .UNKNOWN , true , GCPUtils .GCS_SUPPORTED_DOC_URL );
160171 }
161172 } else {
162173 if (config .failIfExists ()) {
163174 rollback = true ;
164- throw new Exception (String .format ("Path %s already exists" , gcsPath ));
175+ String errorReason = String .format ("Path %s already exists" , gcsPath );
176+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
177+ errorReason , errorReason , ErrorType .SYSTEM , true , null );
165178 }
166179 }
167180 }
0 commit comments