Skip to content

Commit db1302e

Browse files
Merge pull request #1517 from data-integrations/error-messages-fix-bq-cherrypick
[🍒][PLUGIN-1818] Fix error messages in BQ Output Format
2 parents 8117e76 + 7474ae1 commit db1302e

File tree

1 file changed

+20
-29
lines changed

1 file changed

+20
-29
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -170,30 +170,24 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
170170
// Error if the output path already exists.
171171
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
172172
if (outputFileSystem.exists(outputPath)) {
173-
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
174-
throw ErrorUtils.getProgramFailureException(
175-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
176-
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
177-
new IOException(errorMessage));
173+
throw new IOException("The output path '" + outputPath + "' already exists.");
178174
}
179175

180176
// Error if compression is set as there's mixed support in BigQuery.
181177
if (FileOutputFormat.getCompressOutput(job)) {
182178
String errorMessage = "Compression isn't supported for this OutputFormat.";
183179
throw ErrorUtils.getProgramFailureException(
184180
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
185-
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
186-
new IOException(errorMessage));
181+
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage),
182+
ErrorType.SYSTEM, true, new IOException(errorMessage));
187183
}
188184

189185
// Error if unable to create a BigQuery helper.
190186
try {
191187
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
192188
} catch (GeneralSecurityException gse) {
193-
String errorMessage = "Failed to create BigQuery client";
194-
throw ErrorUtils.getProgramFailureException(
195-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat,
196-
ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse);
189+
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
190+
gse.getClass().getName(), gse.getMessage()), gse);
197191
}
198192

199193
// Let delegate process its checks.
@@ -223,11 +217,9 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
223217
try {
224218
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
225219
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
226-
} catch (GeneralSecurityException e) {
227-
String errorMessage = "Failed to create BigQuery client";
228-
throw ErrorUtils.getProgramFailureException(
229-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
230-
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
220+
} catch (GeneralSecurityException gse) {
221+
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
222+
gse.getClass().getName(), gse.getMessage()), gse);
231223
}
232224
}
233225

@@ -285,10 +277,8 @@ public void commitJob(JobContext jobContext) throws IOException {
285277
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
286278
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
287279
} catch (Exception e) {
288-
String errorMessage = "Failed to import GCS into BigQuery.";
289-
throw ErrorUtils.getProgramFailureException(
290-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
291-
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
280+
throw new IOException(String.format("Failed to import GCS into BigQuery %s: %s.",
281+
e.getClass().getName(), e.getMessage()), e);
292282
}
293283

294284
cleanup(jobContext);
@@ -597,25 +587,25 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
597587
numOfErrors = errors.size();
598588
}
599589
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
600-
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
590+
String errorMessageException = String.format(
591+
"Error occurred while importing data to BigQuery '%s'." +
601592
" There are total %s error(s) for BigQuery job %s. Please look at " +
602593
"BigQuery job logs for more information.",
603594
errorMessage, numOfErrors, jobReference.getJobId());
604595
throw ErrorUtils.getProgramFailureException(
605596
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
606-
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
607-
new IOException(errorMessageException));
608-
597+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException),
598+
ErrorType.UNKNOWN, true, null);
609599
}
610600
} else {
611601
long millisToWait = pollBackOff.nextBackOffMillis();
612602
if (millisToWait == BackOff.STOP) {
613-
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
614-
, elapsedTime);
603+
String errorMessage = String.format("Job %s failed to complete after %s millis.",
604+
jobReference.getJobId(), elapsedTime);
615605
throw ErrorUtils.getProgramFailureException(
616606
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
617-
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
618-
new IOException(errorMessage));
607+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
608+
ErrorType.UNKNOWN, true, null);
619609
}
620610
// Pause execution for the configured duration before polling job status again.
621611
Thread.sleep(millisToWait);
@@ -655,7 +645,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
655645
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
656646
throw ErrorUtils.getProgramFailureException(
657647
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
658-
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
648+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
649+
ErrorType.SYSTEM, true, e);
659650
}
660651
}
661652
return Optional.empty();

0 commit comments

Comments
 (0)