Skip to content

Commit 39041a1

Browse files
committed
Removed some unnecessary parts of code, added one missing type in docs, handled validation while format is macro
1 parent e410175 commit 39041a1

File tree

4 files changed

+33
-24
lines changed

4 files changed

+33
-24
lines changed

docs/GCS-batchsink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Defaults to 'application/octet-stream'. The following table shows valid content
5656
| json | application/json, text/plain, application/octet-stream |
5757
| orc | application/octet-stream |
5858
| parquet | application/octet-stream |
59-
| tsv | text/tab-separated-values, application/octet-stream |
59+
| tsv | text/tab-separated-values, application/octet-stream, text/plain |
6060

6161
**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other.
6262
User can provide specific Content-Type, different from the options in the dropdown.

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,8 @@ public void validate(FailureCollector collector) {
358358
}
359359

360360
if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE)
361-
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)) {
361+
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)
362+
&& !containsMacro(NAME_FORMAT)) {
362363
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
363364
validateContentType(collector);
364365
}
@@ -379,21 +380,22 @@ public void validate(FailureCollector collector) {
379380
}
380381
}
381382

383+
//This method validates the specified content type for the used format
382384
public void validateContentType(FailureCollector failureCollector) {
383385
switch (format) {
384386
case FORMAT_AVRO:
385387
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) {
386-
failureCollector.addFailure(String.format("Valid content type for avro is %s",
387-
CONTENT_TYPE_APPLICATION_AVRO), null)
388+
failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.",
389+
CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null)
388390
.withConfigProperty(NAME_CONTENT_TYPE);
389391
}
390392
break;
391393
case FORMAT_JSON:
392394
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON)
393395
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
394396
failureCollector.addFailure(String.format(
395-
"Valid content types for json are %s, %s", CONTENT_TYPE_APPLICATION_JSON,
396-
CONTENT_TYPE_TEXT_PLAIN), null
397+
"Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON,
398+
CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null
397399
).withConfigProperty(NAME_CONTENT_TYPE);
398400
}
399401
break;
@@ -402,9 +404,8 @@ public void validateContentType(FailureCollector failureCollector) {
402404
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
403405
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
404406
failureCollector.addFailure(String.format(
405-
"Valid content types for csv are %s, %s, %s", CONTENT_TYPE_APPLICATION_CSV,
406-
CONTENT_TYPE_TEXT_PLAIN,
407-
CONTENT_TYPE_TEXT_CSV), null
407+
"Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV,
408+
CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null
408409
).withConfigProperty(NAME_CONTENT_TYPE);
409410
}
410411
break;
@@ -414,30 +415,29 @@ public void validateContentType(FailureCollector failureCollector) {
414415
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
415416
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
416417
failureCollector.addFailure(String.format(
417-
"Valid content types for delimited are %s, %s, %s, %s", CONTENT_TYPE_TEXT_PLAIN,
418-
CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV), null
418+
"Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN,
419+
CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null
419420
).withConfigProperty(NAME_CONTENT_TYPE);
420421
}
421422
break;
422423
case FORMAT_PARQUET:
423424
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
424-
failureCollector.addFailure(String.format("Valid content type for parquet is %s",
425-
DEFAULT_CONTENT_TYPE),
425+
failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE),
426426
null).withConfigProperty(NAME_CONTENT_TYPE);
427427
}
428428
break;
429429
case FORMAT_ORC:
430430
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
431-
failureCollector.addFailure(String.format("Valid content type for orc is %s", DEFAULT_CONTENT_TYPE),
431+
failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE),
432432
null).withConfigProperty(NAME_CONTENT_TYPE);
433433
}
434434
break;
435435
case FORMAT_TSV:
436436
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
437437
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
438438
failureCollector.addFailure(String.format(
439-
"Valid content types for tsv are %s, %s", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN), null
440-
).withConfigProperty(NAME_CONTENT_TYPE);
439+
"Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN,
440+
DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
441441
}
442442
break;
443443
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright © 2015-2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
117
package io.cdap.plugin.gcp.gcs.sink;
218

319
import com.google.cloud.storage.Blob;
@@ -100,7 +116,7 @@ private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws
100116
//update metadata on the output file present in the directory for this task
101117
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
102118
if (blob == null) {
103-
LOG.info("Could not find a file in path %s to apply count metadata.", taskAttemptPath.toString());
119+
LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString());
104120
return;
105121
}
106122
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()

src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.apache.hadoop.mapreduce.OutputFormat;
2626
import org.apache.hadoop.mapreduce.RecordWriter;
2727
import org.apache.hadoop.mapreduce.TaskAttemptContext;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3028

3129
import java.io.IOException;
3230
import java.util.HashMap;
@@ -37,7 +35,6 @@
3735
* An OutputFormat that filters records before sending them to a delegate
3836
*/
3937
public class RecordFilterOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {
40-
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
4138
public static final String FILTER_FIELD = "record.filter.field";
4239
public static final String PASS_VALUE = "record.filter.val";
4340
public static final String ORIGINAL_SCHEMA = "record.original.schema";
@@ -136,8 +133,4 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
136133
delegate.close(context);
137134
}
138135
}
139-
140-
/**
141-
* OutputCommitter for GCS
142-
*/
143136
}

0 commit comments

Comments
 (0)