4848import io .cdap .cdap .etl .api .batch .BatchSourceContext ;
4949import io .cdap .cdap .etl .api .connector .Connector ;
5050import io .cdap .cdap .etl .api .engine .sql .SQLEngineInput ;
51+ import io .cdap .cdap .etl .api .exception .ErrorDetailsProviderSpec ;
5152import io .cdap .cdap .etl .api .validation .ValidationFailure ;
5253import io .cdap .plugin .common .Asset ;
5354import io .cdap .plugin .common .LineageRecorder ;
5758import io .cdap .plugin .gcp .bigquery .util .BigQueryConstants ;
5859import io .cdap .plugin .gcp .bigquery .util .BigQueryUtil ;
5960import io .cdap .plugin .gcp .common .CmekUtils ;
61+ import io .cdap .plugin .gcp .common .GCPErrorDetailsProvider ;
6062import io .cdap .plugin .gcp .common .GCPUtils ;
6163import org .apache .avro .generic .GenericData ;
6264import org .apache .hadoop .conf .Configuration ;
@@ -135,7 +137,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
135137
136138 // Create BigQuery client
137139 String serviceAccount = config .getServiceAccount ();
138- Credentials credentials = BigQuerySourceUtils .getCredentials (config .getConnection ());
140+ Credentials credentials = null ;
141+ try {
142+ credentials = BigQuerySourceUtils .getCredentials (config .getConnection ());
143+ } catch (Exception e ) {
144+ String errorReason = "Unable to load service account credentials." ;
145+ collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
146+ .withStacktrace (e .getStackTrace ());
147+ collector .getOrThrowException ();
148+ }
149+
150+
139151 BigQuery bigQuery = GCPUtils .getBigQuery (config .getProject (), credentials , null );
140152 Dataset dataset = bigQuery .getDataset (DatasetId .of (config .getDatasetProject (), config .getDataset ()));
141153 Storage storage = GCPUtils .getStorage (config .getProject (), credentials );
@@ -144,19 +156,30 @@ public void prepareRun(BatchSourceContext context) throws Exception {
144156 bucketPath = UUID .randomUUID ().toString ();
145157 CryptoKeyName cmekKeyName = CmekUtils .getCmekKey (config .cmekKey , context .getArguments ().asMap (), collector );
146158 collector .getOrThrowException ();
147- configuration = BigQueryUtil .getBigQueryConfig (serviceAccount , config .getProject (), cmekKeyName ,
148- config .getServiceAccountType ());
159+ try {
160+ configuration = BigQueryUtil .getBigQueryConfig (serviceAccount , config .getProject (), cmekKeyName ,
161+ config .getServiceAccountType ());
162+ } catch (Exception e ) {
163+ String errorReason = "Failed to create BigQuery configuration." ;
164+ collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
165+ .withStacktrace (e .getStackTrace ());
166+ collector .getOrThrowException ();
167+ }
149168
150169 String bucketName = BigQueryUtil .getStagingBucketName (context .getArguments ().asMap (), null ,
151170 dataset , config .getBucket ());
152171
153172 // Configure GCS Bucket to use
154- String bucket = BigQuerySourceUtils .getOrCreateBucket (configuration ,
155- storage ,
156- bucketName ,
157- dataset ,
158- bucketPath ,
159- cmekKeyName );
173+ String bucket = null ;
174+ try {
175+ bucket = BigQuerySourceUtils .getOrCreateBucket (configuration , storage , bucketName , dataset , bucketPath ,
176+ cmekKeyName );
177+ } catch (Exception e ) {
178+ String errorReason = "Failed to create bucket." ;
179+ collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
180+ .withStacktrace (e .getStackTrace ());
181+ collector .getOrThrowException ();
182+ }
160183
161184 // Configure Service account credentials
162185 BigQuerySourceUtils .configureServiceAccount (configuration , config .getConnection ());
@@ -166,10 +189,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
166189
167190 // Configure BigQuery input format.
168191 String temporaryGcsPath = BigQuerySourceUtils .getTemporaryGcsPath (bucket , bucketPath , bucketPath );
169- BigQuerySourceUtils .configureBigQueryInput (configuration ,
170- DatasetId .of (config .getDatasetProject (), config .getDataset ()),
171- config .getTable (),
172- temporaryGcsPath );
192+ try {
193+ BigQuerySourceUtils .configureBigQueryInput (configuration ,
194+ DatasetId .of (config .getDatasetProject (), config .getDataset ()),
195+ config .getTable (),
196+ temporaryGcsPath );
197+ } catch (Exception e ) {
198+ String errorReason = "Failed to configure BigQuery input." ;
199+ collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
200+ .withStacktrace (e .getStackTrace ());
201+ collector .getOrThrowException ();
202+ }
173203
174204 // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
175205 // We call emitLineage before since it creates the dataset with schema.
@@ -178,6 +208,10 @@ public void prepareRun(BatchSourceContext context) throws Exception {
178208 .setFqn (BigQueryUtil .getFQN (config .getDatasetProject (), config .getDataset (), config .getTable ()))
179209 .setLocation (dataset .getLocation ())
180210 .build ();
211+
212+ // set error details provider
213+ context .setErrorDetailsProvider (new ErrorDetailsProviderSpec (GCPErrorDetailsProvider .class .getName ()));
214+
181215 emitLineage (context , configuredSchema , sourceTableType , config .getTable (), asset );
182216 setInputFormat (context , configuredSchema );
183217 }
0 commit comments