Skip to content

Commit 05390ad

Browse files
committed
#777 Add support for appending and for the standard output file naming convention.
1 parent 9d70cad commit 05390ad

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,7 @@ class DefaultSource
8484
fs.delete(outputPath, true)
8585
}
8686
case SaveMode.Append =>
87-
if (fs.exists(outputPath)) {
88-
throw new IllegalArgumentException(
89-
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
90-
"Please use 'Overwrite' mode to write data to a file or folder."
91-
)
92-
}
87+
// In append mode, no action is needed. Tasks will write to different files.
9388
case SaveMode.ErrorIfExists =>
9489
if (fs.exists(outputPath)) {
9590
throw new IllegalArgumentException(

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/RawBinaryOutputFormat.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import org.apache.hadoop.fs.Path
2020
import org.apache.hadoop.mapreduce._
2121
import org.apache.hadoop.io.{BytesWritable, NullWritable}
2222
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
23+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath
2324

2425
import java.io.DataOutputStream
26+
import java.util.UUID
2527

2628
/**
2729
* A custom implementation of `FileOutputFormat` that outputs raw binary data for fixed record length
@@ -39,6 +41,26 @@ import java.io.DataOutputStream
3941
*/
4042

4143
class RawBinaryOutputFormat extends FileOutputFormat[NullWritable, BytesWritable] {
44+
private val uniqueUuid = UUID.randomUUID().toString
45+
46+
override def checkOutputSpecs(job: JobContext): Unit = {
47+
val outDir = getOutputPath(job)
48+
if (outDir == null) throw new IllegalStateException("Output directory not set.")
49+
}
50+
51+
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
52+
val conf = context.getConfiguration
53+
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
54+
val idFilePart = if (uniqueWriteJobId == null) uniqueUuid else uniqueWriteJobId
55+
val taskAttemptID = context.getTaskAttemptID
56+
val taskId = f"${taskAttemptID.getTaskID.getId}%05d"
57+
val attemptId = f"c${taskAttemptID.getId}%03d"
58+
59+
val filename = s"part-$taskId-$idFilePart-$attemptId$extension"
60+
val outputPath = getOutputPath(context)
61+
new Path(outputPath, filename)
62+
}
63+
4264
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[NullWritable, BytesWritable] = {
4365
val extension = context.getConfiguration.get("cobol.writer.output.extension", ".dat")
4466
val path: Path = getDefaultWorkFile(context, extension)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
6464
val expected = Array[Byte](
6565
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
6666
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
67-
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
67+
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
6868
)
6969

7070
if (!bytes.sameElements(expected)) {
@@ -114,7 +114,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
114114
val expected = Array[Byte](
115115
0xC1.toByte, 0x00.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
116116
0xC2.toByte, 0x00.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
117-
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
117+
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
118118
)
119119

120120
if (!bytes.sameElements(expected)) {
@@ -184,7 +184,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
184184
}
185185

186186

187-
"write should fail with save mode append and the path exists" in {
187+
"write should successfully append" in {
188188
withTempDirectory("cobol_writer3") { tempDir =>
189189
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
190190

@@ -196,13 +196,19 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
196196
.option("copybook_contents", copybookContents)
197197
.save(path.toString)
198198

199-
assertThrows[IllegalArgumentException] {
200-
df.write
201-
.format("cobol")
202-
.mode(SaveMode.Append)
203-
.option("copybook_contents", copybookContents)
204-
.save(path.toString)
205-
}
199+
df.write
200+
.format("cobol")
201+
.mode(SaveMode.Append)
202+
.option("copybook_contents", copybookContents)
203+
.save(path.toString)
204+
205+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
206+
207+
assert(fs.exists(path), "Output directory should exist")
208+
val files = fs.listStatus(path)
209+
.filter(_.getPath.getName.startsWith("part-"))
210+
211+
assert(files.length > 1)
206212
}
207213
}
208214

0 commit comments

Comments
 (0)