Skip to content

Commit e410175

Browse files
committed
Added documentation, removed some unnecessary checks, added a new file for GCSOutputCommitter
1 parent 2a5b522 commit e410175

File tree

6 files changed

+165
-281
lines changed

6 files changed

+165
-281
lines changed

docs/GCS-batchsink.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,17 @@ The delimiter will be ignored if the format is anything other than 'delimited'.
4646
**Location:** The location where the gcs bucket will get created. This value is ignored if the bucket already exists.
4747

4848
**Content Type:** The Content Type entity is used to indicate the media type of the resource.
49-
Defaults to 'application/octet-stream'.
49+
Defaults to 'application/octet-stream'. The following table shows valid content types for each format.
50+
51+
| Format type | Content type |
52+
|---------------|---------------------------------------------------------------------------|
53+
| avro | application/avro, application/octet-stream |
54+
| csv | text/csv, application/csv, text/plain, application/octet-stream |
55+
| delimited | text/csv, application/csv, text/tsv, text/plain, application/octet-stream |
56+
| json | application/json, text/plain, application/octet-stream |
57+
| orc | application/octet-stream |
58+
| parquet | application/octet-stream |
59+
| tsv | text/tab-separated-values, application/octet-stream |
5060

5161
**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other.
5262
User can provide specific Content-Type, different from the options in the dropdown.

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,6 @@ public void validate(FailureCollector collector) {
357357
}
358358
}
359359

360-
if (containsMacro(NAME_CONTENT_TYPE)) {
361-
contentType = null;
362-
}
363-
364-
if (containsMacro(NAME_CUSTOM_CONTENT_TYPE)) {
365-
customContentType = null;
366-
}
367-
368360
if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE)
369361
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)) {
370362
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
@@ -401,8 +393,7 @@ public void validateContentType(FailureCollector failureCollector) {
401393
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
402394
failureCollector.addFailure(String.format(
403395
"Valid content types for json are %s, %s", CONTENT_TYPE_APPLICATION_JSON,
404-
CONTENT_TYPE_TEXT_PLAIN),
405-
null
396+
CONTENT_TYPE_TEXT_PLAIN), null
406397
).withConfigProperty(NAME_CONTENT_TYPE);
407398
}
408399
break;
@@ -413,8 +404,7 @@ public void validateContentType(FailureCollector failureCollector) {
413404
failureCollector.addFailure(String.format(
414405
"Valid content types for csv are %s, %s, %s", CONTENT_TYPE_APPLICATION_CSV,
415406
CONTENT_TYPE_TEXT_PLAIN,
416-
CONTENT_TYPE_TEXT_CSV),
417-
null
407+
CONTENT_TYPE_TEXT_CSV), null
418408
).withConfigProperty(NAME_CONTENT_TYPE);
419409
}
420410
break;
@@ -446,8 +436,7 @@ public void validateContentType(FailureCollector failureCollector) {
446436
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
447437
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
448438
failureCollector.addFailure(String.format(
449-
"Valid content types for tsv are %s, %s", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN),
450-
null
439+
"Valid content types for tsv are %s, %s", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN), null
451440
).withConfigProperty(NAME_CONTENT_TYPE);
452441
}
453442
break;
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package io.cdap.plugin.gcp.gcs.sink;
2+
3+
import com.google.cloud.storage.Blob;
4+
import com.google.common.annotations.VisibleForTesting;
5+
import io.cdap.plugin.gcp.common.GCPUtils;
6+
import io.cdap.plugin.gcp.gcs.StorageClient;
7+
import org.apache.hadoop.conf.Configuration;
8+
import org.apache.hadoop.fs.Path;
9+
import org.apache.hadoop.mapreduce.JobContext;
10+
import org.apache.hadoop.mapreduce.JobStatus;
11+
import org.apache.hadoop.mapreduce.OutputCommitter;
12+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
13+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import java.io.IOException;
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
/**
22+
* OutputCommitter for GCS
23+
*/
24+
public class GCSOutputCommitter extends OutputCommitter {
25+
26+
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
27+
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
28+
29+
private final OutputCommitter delegate;
30+
31+
public GCSOutputCommitter(OutputCommitter delegate) {
32+
this.delegate = delegate;
33+
}
34+
35+
@Override
36+
public void setupJob(JobContext jobContext) throws IOException {
37+
delegate.setupJob(jobContext);
38+
}
39+
40+
@Override
41+
public void cleanupJob(JobContext jobContext) throws IOException {
42+
delegate.cleanupJob(jobContext);
43+
}
44+
45+
@Override
46+
public void commitJob(JobContext jobContext) throws IOException {
47+
delegate.commitJob(jobContext);
48+
}
49+
50+
@Override
51+
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
52+
delegate.abortJob(jobContext, state);
53+
}
54+
55+
@Override
56+
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
57+
delegate.setupTask(taskAttemptContext);
58+
}
59+
60+
@Override
61+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
62+
return delegate.needsTaskCommit(taskAttemptContext);
63+
}
64+
65+
@Override
66+
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
67+
/*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path
68+
where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter
69+
getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path
70+
returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add
71+
metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */
72+
try {
73+
updateMetricMetaData(taskAttemptContext);
74+
} catch (Exception exception) {
75+
LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.",
76+
exception);
77+
}
78+
79+
delegate.commitTask(taskAttemptContext);
80+
}
81+
82+
private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException {
83+
if (!(delegate instanceof FileOutputCommitter)) {
84+
return;
85+
}
86+
87+
FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate;
88+
Configuration configuration = taskAttemptContext.getConfiguration();
89+
//Task is not yet committed, so should be available in attempt path
90+
Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext);
91+
if (configuration == null || taskAttemptPath == null) {
92+
return;
93+
}
94+
95+
//read the count from configuration
96+
String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID());
97+
Map<String, String> metaData = new HashMap<>();
98+
metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L)));
99+
StorageClient storageClient = getStorageClient(configuration);
100+
//update metadata on the output file present in the directory for this task
101+
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
102+
if (blob == null) {
103+
LOG.info("Could not find a file in path %s to apply count metadata.", taskAttemptPath.toString());
104+
return;
105+
}
106+
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()
107+
.update();
108+
}
109+
110+
@VisibleForTesting
111+
StorageClient getStorageClient(Configuration configuration) throws IOException {
112+
String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID);
113+
String serviceAccount = null;
114+
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
115+
.equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
116+
if (isServiceAccountFile) {
117+
serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
118+
} else {
119+
serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
120+
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
121+
}
122+
return StorageClient.create(project, serviceAccount, isServiceAccountFile);
123+
}
124+
125+
@Override
126+
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
127+
delegate.abortTask(taskAttemptContext);
128+
}
129+
130+
@Override
131+
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
132+
return delegate.isCommitJobRepeatable(jobContext);
133+
}
134+
135+
@Override
136+
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
137+
return delegate.isRecoverySupported(jobContext);
138+
}
139+
140+
@Override
141+
public boolean isRecoverySupported() {
142+
return delegate.isRecoverySupported();
143+
}
144+
145+
@Override
146+
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
147+
delegate.recoverTask(taskContext);
148+
}
149+
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java

Lines changed: 0 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,16 @@
11
package io.cdap.plugin.gcp.gcs.sink;
22

3-
import com.google.cloud.storage.Blob;
4-
import com.google.common.annotations.VisibleForTesting;
53
import io.cdap.cdap.api.data.format.StructuredRecord;
64
import io.cdap.cdap.etl.api.validation.FormatContext;
75
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
8-
import io.cdap.plugin.gcp.common.GCPUtils;
9-
import io.cdap.plugin.gcp.gcs.StorageClient;
106
import org.apache.hadoop.conf.Configuration;
11-
import org.apache.hadoop.fs.Path;
127
import org.apache.hadoop.io.NullWritable;
138
import org.apache.hadoop.mapreduce.JobContext;
14-
import org.apache.hadoop.mapreduce.JobStatus;
159
import org.apache.hadoop.mapreduce.OutputCommitter;
1610
import org.apache.hadoop.mapreduce.OutputFormat;
1711
import org.apache.hadoop.mapreduce.RecordWriter;
1812
import org.apache.hadoop.mapreduce.TaskAttemptContext;
19-
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
2013
import org.apache.hadoop.util.ReflectionUtils;
21-
import org.slf4j.Logger;
22-
import org.slf4j.LoggerFactory;
2314

2415
import java.io.IOException;
2516
import java.util.HashMap;
@@ -29,8 +20,6 @@
2920
* OutputFormatProvider for GCSSink
3021
*/
3122
public class GCSOutputFormatProvider implements ValidatingOutputFormat {
32-
33-
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
3423
private static final String DELEGATE_OUTPUTFORMAT_CLASSNAME = "gcssink.delegate.outputformat.classname";
3524
private static final String OUTPUT_FOLDER = "gcssink.metric.output.folder";
3625
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
@@ -102,133 +91,6 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
10291
}
10392
}
10493

105-
/**
106-
* OutputCommitter for GCS
107-
*/
108-
public static class GCSOutputCommitter extends OutputCommitter {
109-
110-
private final OutputCommitter delegate;
111-
112-
public GCSOutputCommitter(OutputCommitter delegate) {
113-
this.delegate = delegate;
114-
}
115-
116-
@Override
117-
public void setupJob(JobContext jobContext) throws IOException {
118-
delegate.setupJob(jobContext);
119-
}
120-
121-
@Override
122-
public void cleanupJob(JobContext jobContext) throws IOException {
123-
delegate.cleanupJob(jobContext);
124-
}
125-
126-
@Override
127-
public void commitJob(JobContext jobContext) throws IOException {
128-
delegate.commitJob(jobContext);
129-
}
130-
131-
@Override
132-
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
133-
delegate.abortJob(jobContext, state);
134-
}
135-
136-
@Override
137-
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
138-
delegate.setupTask(taskAttemptContext);
139-
}
140-
141-
@Override
142-
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
143-
return delegate.needsTaskCommit(taskAttemptContext);
144-
}
145-
146-
@Override
147-
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
148-
/*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path
149-
where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter
150-
getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path
151-
returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add
152-
metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */
153-
try {
154-
updateMetricMetaData(taskAttemptContext);
155-
} catch (Exception exception) {
156-
LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.",
157-
exception);
158-
}
159-
160-
delegate.commitTask(taskAttemptContext);
161-
}
162-
163-
private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException {
164-
if (!(delegate instanceof FileOutputCommitter)) {
165-
return;
166-
}
167-
168-
FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate;
169-
Configuration configuration = taskAttemptContext.getConfiguration();
170-
//Task is not yet committed, so should be available in attempt path
171-
Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext);
172-
if (configuration == null || taskAttemptPath == null) {
173-
return;
174-
}
175-
176-
//read the count from configuration
177-
String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID());
178-
Map<String, String> metaData = new HashMap<>();
179-
metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L)));
180-
StorageClient storageClient = getStorageClient(configuration);
181-
//update metadata on the output file present in the directory for this task
182-
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
183-
if (blob == null) {
184-
LOG.info("Could not find a file in path %s to apply count metadata.", taskAttemptPath.toString());
185-
return;
186-
}
187-
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()
188-
.update();
189-
}
190-
191-
@VisibleForTesting
192-
StorageClient getStorageClient(Configuration configuration) throws IOException {
193-
String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID);
194-
String serviceAccount = null;
195-
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
196-
.equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
197-
if (isServiceAccountFile) {
198-
serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
199-
} else {
200-
serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
201-
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
202-
}
203-
return StorageClient.create(project, serviceAccount, isServiceAccountFile);
204-
}
205-
206-
@Override
207-
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
208-
delegate.abortTask(taskAttemptContext);
209-
}
210-
211-
@Override
212-
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
213-
return delegate.isCommitJobRepeatable(jobContext);
214-
}
215-
216-
@Override
217-
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
218-
return delegate.isRecoverySupported(jobContext);
219-
}
220-
221-
@Override
222-
public boolean isRecoverySupported() {
223-
return delegate.isRecoverySupported();
224-
}
225-
226-
@Override
227-
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
228-
delegate.recoverTask(taskContext);
229-
}
230-
}
231-
23294
/**
23395
* RecordWriter for GCSSink
23496
*/

0 commit comments

Comments
 (0)