Skip to content

Commit afcd30e

Browse files
authored
Merge pull request #517 from AdaptiveScale/cherrypick-release/0.15/PLUGIN-498
Cherrypick for relase/0.15 - PLUGIN-498
2 parents d3a9084 + aeace51 commit afcd30e

File tree

10 files changed

+460
-143
lines changed

10 files changed

+460
-143
lines changed

docs/GCS-batchsink.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ The delimiter will be ignored if the format is anything other than 'delimited'.
4343

4444
**Location:** The location where the gcs bucket will get created. This value is ignored if the bucket already exists.
4545

46+
**Content Type:** The Content Type entity is used to indicate the media type of the resource.
47+
Defaults to 'application/octet-stream'. The following table shows valid content types for each format.
48+
49+
| Format type | Content type |
50+
|---------------|--------------------------------------------------------------------------------------------|
51+
| avro | application/avro, application/octet-stream |
52+
| csv | text/csv, application/csv, text/plain, application/octet-stream |
53+
| delimited | text/csv, application/csv, text/tab-separated-values, text/plain, application/octet-stream |
54+
| json | application/json, text/plain, application/octet-stream |
55+
| orc | application/octet-stream |
56+
| parquet | application/octet-stream |
57+
| tsv | text/tab-separated-values, text/plain, application/octet-stream |
58+
59+
**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other.
60+
User can provide specific Content-Type, different from the options in the dropdown.
61+
More information about the Content-Type can be found at https://cloud.google.com/storage/docs/metadata
62+
4663
**Service Account** - service account key used for authorization
4764

4865
* **File Path**: Path on the local file system of the service account key used for

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class GCSBatchSink extends AbstractFileSink<GCSBatchSink.GCSBatchSinkConf
6767
private static final String RECORDS_UPDATED_METRIC = "records.updated";
6868
public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput";
6969
public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename";
70+
public static final String CONTENT_TYPE = "io.cdap.gcs.batch.sink.content.type";
7071

7172
private final GCSBatchSinkConfig config;
7273
private String outputPath;
@@ -125,6 +126,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
125126
@Override
126127
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
127128
Map<String, String> properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
129+
properties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
128130
properties.putAll(config.getFileSystemProperties());
129131
String outputFileBaseName = config.getOutputFileNameBase();
130132
if (outputFileBaseName == null || outputFileBaseName.isEmpty()) {
@@ -242,6 +244,23 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
242244
private static final String NAME_LOCATION = "location";
243245
private static final String NAME_FS_PROPERTIES = "fileSystemProperties";
244246
private static final String NAME_FILE_NAME_BASE = "outputFileNameBase";
247+
private static final String NAME_CONTENT_TYPE = "contentType";
248+
private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType";
249+
private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
250+
private static final String CONTENT_TYPE_OTHER = "other";
251+
private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json";
252+
private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro";
253+
private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv";
254+
private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
255+
private static final String CONTENT_TYPE_TEXT_CSV = "text/csv";
256+
private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values";
257+
private static final String FORMAT_AVRO = "avro";
258+
private static final String FORMAT_CSV = "csv";
259+
private static final String FORMAT_JSON = "json";
260+
private static final String FORMAT_TSV = "tsv";
261+
private static final String FORMAT_DELIMITED = "delimited";
262+
private static final String FORMAT_ORC = "orc";
263+
private static final String FORMAT_PARQUET = "parquet";
245264

246265
private static final String SCHEME = "gs://";
247266
@Name(NAME_PATH)
@@ -280,6 +299,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
280299
"This value is ignored if the bucket already exists")
281300
protected String location;
282301

302+
@Macro
303+
@Description("The Content Type property is used to indicate the media type of the resource." +
304+
"Defaults to 'application/octet-stream'.")
305+
@Nullable
306+
protected String contentType;
307+
308+
@Macro
309+
@Description("The Custom Content Type is used when the value of Content-Type is set to other." +
310+
"User can provide specific Content-Type, different from the options in the dropdown.")
311+
@Nullable
312+
protected String customContentType;
313+
283314
@Name(NAME_FS_PROPERTIES)
284315
@Macro
285316
@Nullable
@@ -326,10 +357,19 @@ public void validate(FailureCollector collector) {
326357
}
327358
}
328359

360+
if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE)
361+
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)
362+
&& !containsMacro(NAME_FORMAT)) {
363+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
364+
validateContentType(collector);
365+
}
366+
}
367+
329368
try {
330369
getSchema();
331370
} catch (IllegalArgumentException e) {
332-
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace());
371+
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA)
372+
.withStacktrace(e.getStackTrace());
333373
}
334374

335375
try {
@@ -340,6 +380,69 @@ public void validate(FailureCollector collector) {
340380
}
341381
}
342382

383+
//This method validates the specified content type for the used format.
384+
public void validateContentType(FailureCollector failureCollector) {
385+
switch (format) {
386+
case FORMAT_AVRO:
387+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) {
388+
failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.",
389+
CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null)
390+
.withConfigProperty(NAME_CONTENT_TYPE);
391+
}
392+
break;
393+
case FORMAT_JSON:
394+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON)
395+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
396+
failureCollector.addFailure(String.format(
397+
"Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON,
398+
CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null
399+
).withConfigProperty(NAME_CONTENT_TYPE);
400+
}
401+
break;
402+
case FORMAT_CSV:
403+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
404+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
405+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
406+
failureCollector.addFailure(String.format(
407+
"Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV,
408+
CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null
409+
).withConfigProperty(NAME_CONTENT_TYPE);
410+
}
411+
break;
412+
case FORMAT_DELIMITED:
413+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
414+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
415+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
416+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
417+
failureCollector.addFailure(String.format(
418+
"Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN,
419+
CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null
420+
).withConfigProperty(NAME_CONTENT_TYPE);
421+
}
422+
break;
423+
case FORMAT_PARQUET:
424+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
425+
failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE),
426+
null).withConfigProperty(NAME_CONTENT_TYPE);
427+
}
428+
break;
429+
case FORMAT_ORC:
430+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
431+
failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE),
432+
null).withConfigProperty(NAME_CONTENT_TYPE);
433+
}
434+
break;
435+
case FORMAT_TSV:
436+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
437+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
438+
failureCollector.addFailure(String.format(
439+
"Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN,
440+
DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
441+
}
442+
break;
443+
}
444+
}
445+
343446
public String getBucket() {
344447
return GCSPath.from(path).getBucket();
345448
}
@@ -383,6 +486,30 @@ public String getLocation() {
383486
return location;
384487
}
385488

489+
/* This method gets the value of content type. Valid content types for each format are:
490+
*
491+
* avro -> application/avro, application/octet-stream
492+
* json -> application/json, text/plain, application/octet-stream
493+
* csv -> application/csv, text/csv, text/plain, application/octet-stream
494+
* delimited -> application/csv, text/csv, text/plain, text/tsv, application/octet-stream
495+
* orc -> application/octet-stream
496+
* parquet -> application/octet-stream
497+
* tsv -> text/tab-separated-values, application/octet-stream
498+
*/
499+
@Nullable
500+
public String getContentType() {
501+
if (!Strings.isNullOrEmpty(contentType)) {
502+
if (contentType.equals(CONTENT_TYPE_OTHER)) {
503+
if (Strings.isNullOrEmpty(customContentType)) {
504+
return DEFAULT_CONTENT_TYPE;
505+
}
506+
return customContentType;
507+
}
508+
return contentType;
509+
}
510+
return DEFAULT_CONTENT_TYPE;
511+
}
512+
386513
public Map<String, String> getFileSystemProperties() {
387514
if (fileSystemProperties == null || fileSystemProperties.isEmpty()) {
388515
return Collections.emptyMap();

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
9595
collector.getOrThrowException();
9696

9797
Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
98-
9998
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());
10099

101100
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
@@ -137,7 +136,7 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
137136
outputProperties.putAll(RecordFilterOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(),
138137
config.splitField, name, schema));
139138
outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name));
140-
139+
outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
141140
context.addOutput(Output.of(
142141
config.getReferenceName() + "_" + name,
143142
new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties)));
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright © 2015-2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.gcs.sink;
18+
19+
import com.google.cloud.storage.Blob;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import io.cdap.plugin.gcp.common.GCPUtils;
22+
import io.cdap.plugin.gcp.gcs.StorageClient;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.mapreduce.JobContext;
26+
import org.apache.hadoop.mapreduce.JobStatus;
27+
import org.apache.hadoop.mapreduce.OutputCommitter;
28+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
29+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.IOException;
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
37+
/**
38+
* OutputCommitter for GCS
39+
*/
40+
public class GCSOutputCommitter extends OutputCommitter {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
43+
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
44+
45+
private final OutputCommitter delegate;
46+
47+
public GCSOutputCommitter(OutputCommitter delegate) {
48+
this.delegate = delegate;
49+
}
50+
51+
@Override
52+
public void setupJob(JobContext jobContext) throws IOException {
53+
delegate.setupJob(jobContext);
54+
}
55+
56+
@Override
57+
public void cleanupJob(JobContext jobContext) throws IOException {
58+
delegate.cleanupJob(jobContext);
59+
}
60+
61+
@Override
62+
public void commitJob(JobContext jobContext) throws IOException {
63+
delegate.commitJob(jobContext);
64+
}
65+
66+
@Override
67+
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
68+
delegate.abortJob(jobContext, state);
69+
}
70+
71+
@Override
72+
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
73+
delegate.setupTask(taskAttemptContext);
74+
}
75+
76+
@Override
77+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
78+
return delegate.needsTaskCommit(taskAttemptContext);
79+
}
80+
81+
@Override
82+
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
83+
/*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path
84+
where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter
85+
getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path
86+
returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add
87+
metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */
88+
try {
89+
updateMetricMetaData(taskAttemptContext);
90+
} catch (Exception exception) {
91+
LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.",
92+
exception);
93+
}
94+
95+
delegate.commitTask(taskAttemptContext);
96+
}
97+
98+
private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException {
99+
if (!(delegate instanceof FileOutputCommitter)) {
100+
return;
101+
}
102+
103+
FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate;
104+
Configuration configuration = taskAttemptContext.getConfiguration();
105+
//Task is not yet committed, so should be available in attempt path
106+
Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext);
107+
if (configuration == null || taskAttemptPath == null) {
108+
return;
109+
}
110+
111+
//read the count from configuration
112+
String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID());
113+
Map<String, String> metaData = new HashMap<>();
114+
metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L)));
115+
StorageClient storageClient = getStorageClient(configuration);
116+
//update metadata on the output file present in the directory for this task
117+
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
118+
if (blob == null) {
119+
LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString());
120+
return;
121+
}
122+
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()
123+
.update();
124+
}
125+
126+
@VisibleForTesting
127+
StorageClient getStorageClient(Configuration configuration) throws IOException {
128+
String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID);
129+
String serviceAccount = null;
130+
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
131+
.equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
132+
if (isServiceAccountFile) {
133+
serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
134+
} else {
135+
serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
136+
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
137+
}
138+
return StorageClient.create(project, serviceAccount, isServiceAccountFile);
139+
}
140+
141+
@Override
142+
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
143+
delegate.abortTask(taskAttemptContext);
144+
}
145+
146+
@Override
147+
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
148+
return delegate.isCommitJobRepeatable(jobContext);
149+
}
150+
151+
@Override
152+
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
153+
return delegate.isRecoverySupported(jobContext);
154+
}
155+
156+
@Override
157+
public boolean isRecoverySupported() {
158+
return delegate.isRecoverySupported();
159+
}
160+
161+
@Override
162+
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
163+
delegate.recoverTask(taskContext);
164+
}
165+
}

0 commit comments

Comments
 (0)