|
61 | 61 | import com.google.common.base.Strings; |
62 | 62 | import com.google.common.collect.Lists; |
63 | 63 | import io.cdap.cdap.api.data.format.StructuredRecord; |
| 64 | +import io.cdap.cdap.api.exception.ErrorCategory; |
| 65 | +import io.cdap.cdap.api.exception.ErrorType; |
| 66 | +import io.cdap.cdap.api.exception.ErrorUtils; |
| 67 | +import io.cdap.cdap.etl.api.exception.ErrorPhase; |
64 | 68 | import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings; |
65 | 69 | import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; |
66 | 70 | import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; |
|
103 | 107 | */ |
104 | 108 | public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> { |
105 | 109 | private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class); |
| 110 | + private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; |
106 | 111 |
|
107 | 112 | @Override |
108 | 113 | public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) |
@@ -165,19 +170,30 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, |
165 | 170 | // Error if the output path already exists. |
166 | 171 | FileSystem outputFileSystem = outputPath.getFileSystem(conf); |
167 | 172 | if (outputFileSystem.exists(outputPath)) { |
168 | | - throw new IOException("The output path '" + outputPath + "' already exists."); |
| 173 | + String errorMessage = String.format("The output path '%s' already exists.", outputPath); |
| 174 | + throw ErrorUtils.getProgramFailureException( |
| 175 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 176 | + String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, |
| 177 | + new IOException(errorMessage)); |
169 | 178 | } |
170 | 179 |
|
171 | 180 | // Error if compression is set as there's mixed support in BigQuery. |
172 | 181 | if (FileOutputFormat.getCompressOutput(job)) { |
173 | | - throw new IOException("Compression isn't supported for this OutputFormat."); |
| 182 | + String errorMessage = "Compression isn't supported for this OutputFormat."; |
| 183 | + throw ErrorUtils.getProgramFailureException( |
| 184 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 185 | + String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, |
| 186 | + new IOException(errorMessage)); |
174 | 187 | } |
175 | 188 |
|
176 | 189 | // Error if unable to create a BigQuery helper. |
177 | 190 | try { |
178 | 191 | new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf); |
179 | 192 | } catch (GeneralSecurityException gse) { |
180 | | - throw new IOException("Failed to create BigQuery client", gse); |
| 193 | + String errorMessage = "Failed to create BigQuery client"; |
| 194 | + throw ErrorUtils.getProgramFailureException( |
| 195 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat, |
| 196 | + ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse); |
181 | 197 | } |
182 | 198 |
|
183 | 199 | // Let delegate process its checks. |
@@ -208,7 +224,10 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput |
208 | 224 | BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES); |
209 | 225 | this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration()); |
210 | 226 | } catch (GeneralSecurityException e) { |
211 | | - throw new IOException("Failed to create Bigquery client.", e); |
| 227 | + String errorMessage = "Failed to create BigQuery client"; |
| 228 | + throw ErrorUtils.getProgramFailureException( |
| 229 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 230 | + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); |
212 | 231 | } |
213 | 232 | } |
214 | 233 |
|
@@ -266,7 +285,10 @@ public void commitJob(JobContext jobContext) throws IOException { |
266 | 285 | writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField, |
267 | 286 | requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf); |
268 | 287 | } catch (Exception e) { |
269 | | - throw new IOException("Failed to import GCS into BigQuery. ", e); |
| 288 | + String errorMessage = "Failed to import GCS into BigQuery."; |
| 289 | + throw ErrorUtils.getProgramFailureException( |
| 290 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 291 | + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); |
270 | 292 | } |
271 | 293 |
|
272 | 294 | cleanup(jobContext); |
@@ -566,26 +588,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p |
566 | 588 | int numOfErrors; |
567 | 589 | String errorMessage; |
568 | 590 | if (errors == null || errors.isEmpty()) { |
569 | | - errorMessage = pollJob.getStatus().getErrorResult().getMessage(); |
| 591 | + errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(), |
| 592 | + pollJob.getStatus().getErrorResult().getMessage()); |
570 | 593 | numOfErrors = 1; |
571 | 594 | } else { |
572 | | - errorMessage = errors.get(errors.size() - 1).getMessage(); |
| 595 | + errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(), |
| 596 | + errors.get(errors.size() - 1).getMessage()); |
573 | 597 | numOfErrors = errors.size(); |
574 | 598 | } |
575 | 599 | // Only add first error message in the exception. For other errors user should look at BigQuery job logs. |
576 | | - throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." + |
577 | | - " There are total %s error(s) for BigQuery job %s. Please look at " + |
578 | | - "BigQuery job logs for more information.", |
579 | | - errorMessage, numOfErrors, jobReference.getJobId())); |
| 600 | + String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." + |
| 601 | + " There are total %s error(s) for BigQuery job %s. Please look at " + |
| 602 | + "BigQuery job logs for more information.", |
| 603 | + errorMessage, numOfErrors, jobReference.getJobId()); |
| 604 | + throw ErrorUtils.getProgramFailureException( |
| 605 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException, |
| 606 | + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true, |
| 607 | + new IOException(errorMessageException)); |
| 608 | + |
580 | 609 | } |
581 | 610 | } else { |
582 | 611 | long millisToWait = pollBackOff.nextBackOffMillis(); |
583 | 612 | if (millisToWait == BackOff.STOP) { |
584 | | - throw new IOException( |
585 | | - String.format( |
586 | | - "Job %s failed to complete after %s millis.", |
587 | | - jobReference.getJobId(), |
588 | | - elapsedTime)); |
| 613 | + String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId() |
| 614 | + , elapsedTime); |
| 615 | + throw ErrorUtils.getProgramFailureException( |
| 616 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 617 | + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true, |
| 618 | + new IOException(errorMessage)); |
589 | 619 | } |
590 | 620 | // Pause execution for the configured duration before polling job status again. |
591 | 621 | Thread.sleep(millisToWait); |
@@ -621,8 +651,11 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I |
621 | 651 | TableSchema tableSchema = createTableSchemaFromFields(fieldsJson); |
622 | 652 | return Optional.of(tableSchema); |
623 | 653 | } catch (IOException e) { |
624 | | - throw new IOException( |
625 | | - "Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e); |
| 654 | + String errorMessage = String.format("Unable to parse key '%s'.", |
| 655 | + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey()); |
| 656 | + throw ErrorUtils.getProgramFailureException( |
| 657 | + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, |
| 658 | + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); |
626 | 659 | } |
627 | 660 | } |
628 | 661 | return Optional.empty(); |
|
0 commit comments