1616
1717package io .cdap .plugin .gcp .gcs .sink ;
1818
19+ import org .apache .hadoop .fs .FSDataInputStream ;
20+ import org .apache .hadoop .fs .FileStatus ;
21+ import org .apache .hadoop .fs .FileSystem ;
22+ import org .apache .hadoop .fs .Path ;
1923import org .apache .hadoop .mapreduce .JobContext ;
24+ import org .apache .hadoop .mapreduce .JobID ;
2025import org .apache .hadoop .mapreduce .JobStatus ;
2126import org .apache .hadoop .mapreduce .OutputCommitter ;
2227import org .apache .hadoop .mapreduce .OutputFormat ;
2328import org .apache .hadoop .mapreduce .TaskAttemptContext ;
29+ import org .apache .hadoop .mapreduce .lib .output .FileOutputCommitter ;
2430import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
2531
32+ import java .io .DataOutputStream ;
33+ import java .io .EOFException ;
2634import java .io .IOException ;
27- import java .util .HashMap ;
28- import java .util .Map ;
35+ import java .util .HashSet ;
36+ import java .util .Set ;
37+ import javax .annotation .Nullable ;
2938
3039/**
3140 * Output Committer which creates and delegates operations to other GCS Output Committer instances.
32- *
41+ * <p>
3342 * Delegated instances are created based on a supplied Output Format and Destination Table Names.
3443 */
3544public class DelegatingGCSOutputCommitter extends OutputCommitter {
36- private final Map <String , OutputCommitter > committerMap ;
3745
38- public DelegatingGCSOutputCommitter () {
39- committerMap = new HashMap <>();
46+ private final TaskAttemptContext taskAttemptContext ;
47+ private boolean firstTable = true ;
48+ private static final String PARTITIONS_FILE_SUFFIX = "_partitions.txt" ;
49+
50+ public DelegatingGCSOutputCommitter (TaskAttemptContext taskAttemptContext ) {
51+ this .taskAttemptContext = taskAttemptContext ;
4052 }
4153
4254 /**
4355 * Add a new GCSOutputCommitter based on a supplied Output Format and Table Name.
44- *
56+ * <p>
4557 * This GCS Output Committer gets initialized when created.
4658 */
4759 @ SuppressWarnings ("rawtypes" )
4860 public void addGCSOutputCommitterFromOutputFormat (OutputFormat outputFormat ,
49- TaskAttemptContext context ,
5061 String tableName ) throws IOException , InterruptedException {
5162 //Set output directory
52- context .getConfiguration ().set (FileOutputFormat .OUTDIR ,
53- DelegatingGCSOutputUtils .buildOutputPath (context .getConfiguration (), tableName ));
63+ taskAttemptContext .getConfiguration ().set (FileOutputFormat .OUTDIR ,
64+ DelegatingGCSOutputUtils .buildOutputPath (
65+ taskAttemptContext .getConfiguration (), tableName ));
5466
5567 //Wrap output committer into the GCS Output Committer.
56- GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter (outputFormat .getOutputCommitter (context ));
68+ GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter (outputFormat .getOutputCommitter (taskAttemptContext ));
5769
58- //Initialize the new GCS Output Committer and add it to the Committer Map
59- gcsOutputCommitter .setupJob ( context );
60- gcsOutputCommitter . setupTask ( context );
61- committerMap . put ( tableName , gcsOutputCommitter ) ;
70+ gcsOutputCommitter . setupJob ( taskAttemptContext );
71+ gcsOutputCommitter .setupTask ( taskAttemptContext );
72+ writePartitionFile ( taskAttemptContext . getConfiguration (). get ( FileOutputFormat . OUTDIR ), taskAttemptContext );
73+ firstTable = false ;
6274 }
6375
6476 @ Override
6577 public void setupJob (JobContext jobContext ) throws IOException {
66- //no-op
78+ Path outputPath = new Path (jobContext .getConfiguration ().get (DelegatingGCSOutputFormat .OUTPUT_PATH_BASE_DIR ));
79+ FileSystem fs = outputPath .getFileSystem (jobContext .getConfiguration ());
80+ Path tempPath = new Path (outputPath , getPendingDirPath (jobContext .getJobID ()));
81+ fs .mkdirs (tempPath );
6782 }
6883
6984 @ Override
@@ -73,39 +88,40 @@ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException
7388
7489 @ Override
7590 public boolean needsTaskCommit (TaskAttemptContext taskAttemptContext ) throws IOException {
76- if (committerMap .isEmpty ()) {
77- return false ;
78- }
79-
80- boolean needsTaskCommit = true ;
81-
82- for (OutputCommitter committer : committerMap .values ()) {
83- needsTaskCommit = needsTaskCommit && committer .needsTaskCommit (taskAttemptContext );
84- }
85-
86- return needsTaskCommit ;
91+ return true ;
8792 }
8893
8994 @ Override
9095 public void commitTask (TaskAttemptContext taskAttemptContext ) throws IOException {
91- for (OutputCommitter committer : committerMap .values ()) {
96+ for (String output : getOutputPaths (taskAttemptContext )) {
97+ FileOutputCommitter committer = new FileOutputCommitter (new Path (output ), taskAttemptContext );
9298 committer .commitTask (taskAttemptContext );
9399 }
94100 }
95101
96102 @ Override
97103 public void commitJob (JobContext jobContext ) throws IOException {
98- for (OutputCommitter committer : committerMap .values ()) {
104+ for (String output : getOutputPaths (jobContext )) {
105+ FileOutputCommitter committer = new FileOutputCommitter (new Path (output ), taskAttemptContext );
99106 committer .commitJob (jobContext );
100107 }
108+ cleanupJob (jobContext );
109+ }
110+
111+ @ Override
112+ public void cleanupJob (JobContext jobContext ) throws IOException {
113+ Path outputPath = new Path (jobContext .getConfiguration ().get (DelegatingGCSOutputFormat .OUTPUT_PATH_BASE_DIR ));
114+ FileSystem fs = outputPath .getFileSystem (jobContext .getConfiguration ());
115+ // delete the temporary directory that has partition information in text files.
116+ fs .delete (new Path (outputPath , getPendingDirPath (jobContext .getJobID ())), true );
101117 }
102118
103119 @ Override
104120 public void abortTask (TaskAttemptContext taskAttemptContext ) throws IOException {
105121 IOException ioe = null ;
106-
107- for (OutputCommitter committer : committerMap .values ()) {
122+ for (String output : getOutputPaths (taskAttemptContext )) {
108123 try {
124+ FileOutputCommitter committer = new FileOutputCommitter (new Path (output ), taskAttemptContext );
109125 committer .abortTask (taskAttemptContext );
110126 } catch (IOException e ) {
111127 if (ioe == null ) {
@@ -124,21 +140,108 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
124140 @ Override
125141 public void abortJob (JobContext jobContext , JobStatus .State state ) throws IOException {
126142 IOException ioe = null ;
127-
128- for (OutputCommitter committer : committerMap .values ()) {
129- try {
143+ try {
144+ for (String output : getOutputPaths (jobContext )) {
145+ taskAttemptContext .getConfiguration ().set (FileOutputFormat .OUTDIR , output );
146+ FileOutputCommitter committer = new FileOutputCommitter (new Path (output ), taskAttemptContext );
130147 committer .abortJob (jobContext , state );
131- } catch (IOException e ) {
132- if (ioe == null ) {
133- ioe = e ;
134- } else {
135- ioe .addSuppressed (e );
136- }
137148 }
149+ } catch (IOException e ) {
150+ if (ioe == null ) {
151+ ioe = e ;
152+ } else {
153+ ioe .addSuppressed (e );
154+ }
155+ } finally {
156+ cleanupJob (jobContext );
138157 }
139-
140158 if (ioe != null ) {
141159 throw ioe ;
142160 }
143161 }
162+
163+ // return path lists based on JobContext configuration.
164+ private Set <String > getOutputPaths (JobContext jobContext ) throws IOException {
165+ Path outputPath = new Path (jobContext .getConfiguration ().get (DelegatingGCSOutputFormat .OUTPUT_PATH_BASE_DIR ));
166+ FileSystem fs = outputPath .getFileSystem (jobContext .getConfiguration ());
167+ return getOutputPathsFromTempPartitionFile (outputPath , fs , null , jobContext .getJobID ());
168+ }
169+
170+ private Set <String > getOutputPaths (TaskAttemptContext taskAttemptContext ) throws IOException {
171+ Path outputPath = new Path (
172+ taskAttemptContext .getConfiguration ().get (DelegatingGCSOutputFormat .OUTPUT_PATH_BASE_DIR ));
173+ FileSystem fs = outputPath .getFileSystem (taskAttemptContext .getConfiguration ());
174+ return getOutputPathsFromTempPartitionFile (outputPath , fs ,
175+ taskAttemptContext .getTaskAttemptID ().getTaskID ().toString (),
176+ taskAttemptContext .getJobID ());
177+ }
178+
179+ /**
180+ * This method will return the full path up to path suffix after reading from partitions.txt file
181+ * If method is getting called from task context, it will return paths from single file, otherwise all paths
182+ *
183+ * @param baseOutputPath
184+ * @param fs
185+ * @param taskId
186+ * @param jobID
187+ * @return
188+ * @throws IOException
189+ */
190+ private Set <String > getOutputPathsFromTempPartitionFile (Path baseOutputPath , FileSystem fs , @ Nullable String taskId ,
191+ JobID jobID ) throws IOException {
192+ Set <String > outputPaths = new HashSet <>();
193+ Path tempPath = taskId == null ? new Path (baseOutputPath , getPendingDirPath (jobID ))
194+ : new Path (baseOutputPath , String .format ("%s/%s%s" , getPendingDirPath (jobID ), taskId ,
195+ PARTITIONS_FILE_SUFFIX ));
196+
197+ if (!fs .exists (tempPath )) {
198+ return outputPaths ;
199+ }
200+
201+ for (FileStatus status : fs .listStatus (tempPath )) {
202+ if (status .getPath ().getName ().endsWith (PARTITIONS_FILE_SUFFIX )) {
203+ try (FSDataInputStream dis = fs .open (status .getPath ())) {
204+ while (true ) {
205+ try {
206+ outputPaths .add (dis .readUTF ());
207+ } catch (EOFException e ) {
208+ break ;
209+ }
210+ }
211+ }
212+ }
213+ }
214+ return outputPaths ;
215+ }
216+
217+ /**
218+ * This method will create a _temporary_{jobID} directory in base directory path and will create a file with name
219+ * {taskid}_partitions.txt which will store the full path upto path suffix. e.g. gs://basepath/tablename/path_suffix
220+ *
221+ * @param path Split file path upto split field name
222+ * @param context
223+ * @throws IOException
224+ */
225+ private void writePartitionFile (String path , TaskAttemptContext context ) throws IOException {
226+ Path outputPath = new Path (context .getConfiguration ().get (DelegatingGCSOutputFormat .OUTPUT_PATH_BASE_DIR ));
227+ Path tempPath = new Path (outputPath , getPendingDirPath (context .getJobID ()));
228+ FileSystem fs = tempPath .getFileSystem (context .getConfiguration ());
229+ String taskId = context .getTaskAttemptID ().getTaskID ().toString ();
230+ Path taskPartitionFile = new Path (tempPath , String .format ("%s%s" , taskId , PARTITIONS_FILE_SUFFIX ));
231+ if (!fs .exists (taskPartitionFile )) {
232+ fs .createNewFile (taskPartitionFile );
233+ } else if (firstTable ) {
234+ fs .create (taskPartitionFile , true );
235+ }
236+ try (DataOutputStream out = fs .append (taskPartitionFile )) {
237+ out .writeUTF (path );
238+ }
239+ }
240+
241+ // This will create a directory with name _temporary_{jobId} to write the partition files
242+ // Job ID added as a suffix, so that multiple pipelines can write to same path in parallel.
243+ private String getPendingDirPath (JobID jobId ) {
244+ return String .format ("%s_%s" , FileOutputCommitter .PENDING_DIR_NAME , jobId );
245+ }
246+
144247}
0 commit comments