Skip to content

Commit 5f809d8

Browse files
changes done for GCS multisink issue.
changes done for GCS multisink issue. (cherry picked from commit 8c0ce83)
1 parent 896852c commit 5f809d8

File tree

4 files changed

+572
-57
lines changed

4 files changed

+572
-57
lines changed

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java

Lines changed: 144 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,69 @@
1616

1717
package io.cdap.plugin.gcp.gcs.sink;
1818

19+
import org.apache.hadoop.fs.FSDataInputStream;
20+
import org.apache.hadoop.fs.FileStatus;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.Path;
1923
import org.apache.hadoop.mapreduce.JobContext;
24+
import org.apache.hadoop.mapreduce.JobID;
2025
import org.apache.hadoop.mapreduce.JobStatus;
2126
import org.apache.hadoop.mapreduce.OutputCommitter;
2227
import org.apache.hadoop.mapreduce.OutputFormat;
2328
import org.apache.hadoop.mapreduce.TaskAttemptContext;
29+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
2430
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
2531

32+
import java.io.DataOutputStream;
33+
import java.io.EOFException;
2634
import java.io.IOException;
27-
import java.util.HashMap;
28-
import java.util.Map;
35+
import java.util.HashSet;
36+
import java.util.Set;
37+
import javax.annotation.Nullable;
2938

3039
/**
3140
* Output Committer which creates and delegates operations to other GCS Output Committer instances.
32-
*
41+
* <p>
3342
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
3443
*/
3544
public class DelegatingGCSOutputCommitter extends OutputCommitter {
36-
private final Map<String, OutputCommitter> committerMap;
3745

38-
public DelegatingGCSOutputCommitter() {
39-
committerMap = new HashMap<>();
46+
private final TaskAttemptContext taskAttemptContext;
47+
private boolean firstTable = true;
48+
private static final String PARTITIONS_FILE_SUFFIX = "_partitions.txt";
49+
50+
public DelegatingGCSOutputCommitter(TaskAttemptContext taskAttemptContext) {
51+
this.taskAttemptContext = taskAttemptContext;
4052
}
4153

4254
/**
4355
* Add a new GCSOutputCommitter based on a supplied Output Format and Table Name.
44-
*
56+
* <p>
4557
* This GCS Output Committer gets initialized when created.
4658
*/
4759
@SuppressWarnings("rawtypes")
4860
public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
49-
TaskAttemptContext context,
5061
String tableName) throws IOException, InterruptedException {
5162
//Set output directory
52-
context.getConfiguration().set(FileOutputFormat.OUTDIR,
53-
DelegatingGCSOutputUtils.buildOutputPath(context.getConfiguration(), tableName));
63+
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR,
64+
DelegatingGCSOutputUtils.buildOutputPath(
65+
taskAttemptContext.getConfiguration(), tableName));
5466

5567
//Wrap output committer into the GCS Output Committer.
56-
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(context));
68+
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
5769

58-
//Initialize the new GCS Output Committer and add it to the Committer Map
59-
gcsOutputCommitter.setupJob(context);
60-
gcsOutputCommitter.setupTask(context);
61-
committerMap.put(tableName, gcsOutputCommitter);
70+
gcsOutputCommitter.setupJob(taskAttemptContext);
71+
gcsOutputCommitter.setupTask(taskAttemptContext);
72+
writePartitionFile(taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR), taskAttemptContext);
73+
firstTable = false;
6274
}
6375

6476
@Override
6577
public void setupJob(JobContext jobContext) throws IOException {
66-
//no-op
78+
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
79+
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
80+
Path tempPath = new Path(outputPath, getPendingDirPath(jobContext.getJobID()));
81+
fs.mkdirs(tempPath);
6782
}
6883

6984
@Override
@@ -73,39 +88,40 @@ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException
7388

7489
@Override
7590
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
76-
if (committerMap.isEmpty()) {
77-
return false;
78-
}
79-
80-
boolean needsTaskCommit = true;
81-
82-
for (OutputCommitter committer : committerMap.values()) {
83-
needsTaskCommit = needsTaskCommit && committer.needsTaskCommit(taskAttemptContext);
84-
}
85-
86-
return needsTaskCommit;
91+
return true;
8792
}
8893

8994
@Override
9095
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
91-
for (OutputCommitter committer : committerMap.values()) {
96+
for (String output : getOutputPaths(taskAttemptContext)) {
97+
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
9298
committer.commitTask(taskAttemptContext);
9399
}
94100
}
95101

96102
@Override
97103
public void commitJob(JobContext jobContext) throws IOException {
98-
for (OutputCommitter committer : committerMap.values()) {
104+
for (String output : getOutputPaths(jobContext)) {
105+
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
99106
committer.commitJob(jobContext);
100107
}
108+
cleanupJob(jobContext);
109+
}
110+
111+
@Override
112+
public void cleanupJob(JobContext jobContext) throws IOException {
113+
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
114+
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
115+
// delete the temporary directory that has partition information in text files.
116+
fs.delete(new Path(outputPath, getPendingDirPath(jobContext.getJobID())), true);
101117
}
102118

103119
@Override
104120
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
105121
IOException ioe = null;
106-
107-
for (OutputCommitter committer : committerMap.values()) {
122+
for (String output : getOutputPaths(taskAttemptContext)) {
108123
try {
124+
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
109125
committer.abortTask(taskAttemptContext);
110126
} catch (IOException e) {
111127
if (ioe == null) {
@@ -124,21 +140,108 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
124140
@Override
125141
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
126142
IOException ioe = null;
127-
128-
for (OutputCommitter committer : committerMap.values()) {
129-
try {
143+
try {
144+
for (String output : getOutputPaths(jobContext)) {
145+
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
146+
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
130147
committer.abortJob(jobContext, state);
131-
} catch (IOException e) {
132-
if (ioe == null) {
133-
ioe = e;
134-
} else {
135-
ioe.addSuppressed(e);
136-
}
137148
}
149+
} catch (IOException e) {
150+
if (ioe == null) {
151+
ioe = e;
152+
} else {
153+
ioe.addSuppressed(e);
154+
}
155+
} finally {
156+
cleanupJob(jobContext);
138157
}
139-
140158
if (ioe != null) {
141159
throw ioe;
142160
}
143161
}
162+
163+
// return path lists based on JobContext configuration.
164+
private Set<String> getOutputPaths(JobContext jobContext) throws IOException {
165+
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
166+
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
167+
return getOutputPathsFromTempPartitionFile(outputPath, fs, null, jobContext.getJobID());
168+
}
169+
170+
private Set<String> getOutputPaths(TaskAttemptContext taskAttemptContext) throws IOException {
171+
Path outputPath = new Path(
172+
taskAttemptContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
173+
FileSystem fs = outputPath.getFileSystem(taskAttemptContext.getConfiguration());
174+
return getOutputPathsFromTempPartitionFile(outputPath, fs,
175+
taskAttemptContext.getTaskAttemptID().getTaskID().toString(),
176+
taskAttemptContext.getJobID());
177+
}
178+
179+
/**
180+
* This method will return the full path up to path suffix after reading from partitions.txt file
181+
* If method is getting called from task context, it will return paths from single file, otherwise all paths
182+
*
183+
* @param baseOutputPath
184+
* @param fs
185+
* @param taskId
186+
* @param jobID
187+
* @return
188+
* @throws IOException
189+
*/
190+
private Set<String> getOutputPathsFromTempPartitionFile(Path baseOutputPath, FileSystem fs, @Nullable String taskId,
191+
JobID jobID) throws IOException {
192+
Set<String> outputPaths = new HashSet<>();
193+
Path tempPath = taskId == null ? new Path(baseOutputPath, getPendingDirPath(jobID))
194+
: new Path(baseOutputPath, String.format("%s/%s%s", getPendingDirPath(jobID), taskId,
195+
PARTITIONS_FILE_SUFFIX));
196+
197+
if (!fs.exists(tempPath)) {
198+
return outputPaths;
199+
}
200+
201+
for (FileStatus status : fs.listStatus(tempPath)) {
202+
if (status.getPath().getName().endsWith(PARTITIONS_FILE_SUFFIX)) {
203+
try (FSDataInputStream dis = fs.open(status.getPath())) {
204+
while (true) {
205+
try {
206+
outputPaths.add(dis.readUTF());
207+
} catch (EOFException e) {
208+
break;
209+
}
210+
}
211+
}
212+
}
213+
}
214+
return outputPaths;
215+
}
216+
217+
/**
218+
* This method will create a _temporary_{jobID} directory in base directory path and will create a file with name
219+
* {taskid}_partitions.txt which will store the full path upto path suffix. e.g. gs://basepath/tablename/path_suffix
220+
*
221+
* @param path Split file path upto split field name
222+
* @param context
223+
* @throws IOException
224+
*/
225+
private void writePartitionFile(String path, TaskAttemptContext context) throws IOException {
226+
Path outputPath = new Path(context.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
227+
Path tempPath = new Path(outputPath, getPendingDirPath(context.getJobID()));
228+
FileSystem fs = tempPath.getFileSystem(context.getConfiguration());
229+
String taskId = context.getTaskAttemptID().getTaskID().toString();
230+
Path taskPartitionFile = new Path(tempPath, String.format("%s%s", taskId, PARTITIONS_FILE_SUFFIX));
231+
if (!fs.exists(taskPartitionFile)) {
232+
fs.createNewFile(taskPartitionFile);
233+
} else if (firstTable) {
234+
fs.create(taskPartitionFile, true);
235+
}
236+
try (DataOutputStream out = fs.append(taskPartitionFile)) {
237+
out.writeUTF(path);
238+
}
239+
}
240+
241+
// This will create a directory with name _temporary_{jobId} to write the partition files
242+
// Job ID added as a suffix, so that multiple pipelines can write to same path in parallel.
243+
private String getPendingDirPath(JobID jobId) {
244+
return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
245+
}
246+
144247
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.hadoop.conf.Configuration;
2121
import org.apache.hadoop.io.NullWritable;
2222
import org.apache.hadoop.mapreduce.JobContext;
23-
import org.apache.hadoop.mapreduce.OutputCommitter;
2423
import org.apache.hadoop.mapreduce.OutputFormat;
2524
import org.apache.hadoop.mapreduce.RecordWriter;
2625
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -39,10 +38,8 @@ public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, Struct
3938
public static final String DELEGATE_CLASS = "delegating_output_format.delegate";
4039
public static final String OUTPUT_PATH_BASE_DIR = "delegating_output_format.output.path.base";
4140
public static final String OUTPUT_PATH_SUFFIX = "delegating_output_format.output.path.suffix";
42-
private final DelegatingGCSOutputCommitter outputCommitter;
4341

4442
public DelegatingGCSOutputFormat() {
45-
this.outputCommitter = new DelegatingGCSOutputCommitter();
4643
}
4744

4845
/**
@@ -65,7 +62,7 @@ public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptC
6562
Configuration hConf = context.getConfiguration();
6663
String partitionField = hConf.get(PARTITION_FIELD);
6764

68-
return new DelegatingGCSRecordWriter(context, partitionField, outputCommitter);
65+
return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context));
6966
}
7067

7168
@Override
@@ -74,8 +71,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
7471
}
7572

7673
@Override
77-
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
78-
return outputCommitter;
74+
public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
75+
return new DelegatingGCSOutputCommitter(context);
7976
}
8077

8178
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.hadoop.mapreduce.OutputFormat;
2222
import org.apache.hadoop.mapreduce.RecordWriter;
2323
import org.apache.hadoop.mapreduce.TaskAttemptContext;
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
2624

2725
import java.io.IOException;
2826
import java.util.HashMap;
@@ -34,7 +32,6 @@
3432
* This Record Writer will initialize record writes and Output Committers as needed.
3533
*/
3634
public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
37-
private static final Logger LOG = LoggerFactory.getLogger(DelegatingGCSRecordWriter.class);
3835
private final TaskAttemptContext context;
3936
private final String partitionField;
4037
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
@@ -63,7 +60,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
6360
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());
6461

6562
//Initialize GCS Output Committer for this format.
66-
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, context, tableName);
63+
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);
6764

6865
//Add record writer to delegate map.
6966
delegate = format.getRecordWriter(context);
@@ -79,12 +76,6 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
7976
for (RecordWriter<NullWritable, StructuredRecord> delegate : delegateMap.values()) {
8077
delegate.close(context);
8178
}
82-
83-
// Call the Commit Task and Commit Job implementations of this plugin to copy files into their final directory.
84-
// We need to do this at this stage because the OutputCommitter needs to be aware of the different partitions
85-
// that have been stored so far.
86-
delegatingGCSOutputCommitter.commitTask(context);
87-
delegatingGCSOutputCommitter.commitJob(context);
8879
}
8980

9081
}

0 commit comments

Comments
 (0)