Skip to content

Commit 2cde60a

Browse files
committed
#777 Add support for appending and for the standard output file naming convention.
1 parent 9d70cad commit 2cde60a

File tree

3 files changed

+36
-16
lines changed

3 files changed

+36
-16
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,7 @@ class DefaultSource
8484
fs.delete(outputPath, true)
8585
}
8686
case SaveMode.Append =>
87-
if (fs.exists(outputPath)) {
88-
throw new IllegalArgumentException(
89-
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
90-
"Please use 'Overwrite' mode to write data to a file or folder."
91-
)
92-
}
87+
// In append mode, no action is needed. Tasks will write to different files.
9388
case SaveMode.ErrorIfExists =>
9489
if (fs.exists(outputPath)) {
9590
throw new IllegalArgumentException(

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/RawBinaryOutputFormat.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import org.apache.hadoop.fs.Path
2020
import org.apache.hadoop.mapreduce._
2121
import org.apache.hadoop.io.{BytesWritable, NullWritable}
2222
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
23+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath
2324

2425
import java.io.DataOutputStream
26+
import java.util.UUID
2527

2628
/**
2729
* A custom implementation of `FileOutputFormat` that outputs raw binary data for fixed record length
@@ -39,6 +41,23 @@ import java.io.DataOutputStream
3941
*/
4042

4143
class RawBinaryOutputFormat extends FileOutputFormat[NullWritable, BytesWritable] {
44+
override def checkOutputSpecs(job: JobContext): Unit = {
45+
val outDir = getOutputPath(job)
46+
if (outDir == null) throw new IllegalStateException("Output directory not set.")
47+
}
48+
49+
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
50+
val conf = context.getConfiguration
51+
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
52+
val taskAttemptID = context.getTaskAttemptID
53+
val taskId = f"${taskAttemptID.getTaskID.getId}%05d"
54+
val attemptId = f"c${taskAttemptID.getId}%03d"
55+
56+
val filename = s"part-$taskId-$uniqueWriteJobId-$attemptId$extension"
57+
val outputPath = getOutputPath(context)
58+
new Path(outputPath, filename)
59+
}
60+
4261
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[NullWritable, BytesWritable] = {
4362
val extension = context.getConfiguration.get("cobol.writer.output.extension", ".dat")
4463
val path: Path = getDefaultWorkFile(context, extension)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
6464
val expected = Array[Byte](
6565
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
6666
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
67-
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
67+
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
6868
)
6969

7070
if (!bytes.sameElements(expected)) {
@@ -114,7 +114,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
114114
val expected = Array[Byte](
115115
0xC1.toByte, 0x00.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
116116
0xC2.toByte, 0x00.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
117-
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
117+
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
118118
)
119119

120120
if (!bytes.sameElements(expected)) {
@@ -184,7 +184,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
184184
}
185185

186186

187-
"write should fail with save mode append and the path exists" in {
187+
"write should successfully append" in {
188188
withTempDirectory("cobol_writer3") { tempDir =>
189189
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
190190

@@ -196,13 +196,19 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
196196
.option("copybook_contents", copybookContents)
197197
.save(path.toString)
198198

199-
assertThrows[IllegalArgumentException] {
200-
df.write
201-
.format("cobol")
202-
.mode(SaveMode.Append)
203-
.option("copybook_contents", copybookContents)
204-
.save(path.toString)
205-
}
199+
df.write
200+
.format("cobol")
201+
.mode(SaveMode.Append)
202+
.option("copybook_contents", copybookContents)
203+
.save(path.toString)
204+
205+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
206+
207+
assert(fs.exists(path), "Output directory should exist")
208+
val files = fs.listStatus(path)
209+
.filter(_.getPath.getName.startsWith("part-"))
210+
211+
assert(files.length > 1)
206212
}
207213
}
208214

0 commit comments

Comments
 (0)