Skip to content

Commit 85f9424

Browse files
committed
#777 Implement the usage of binary encoders for the spark-cobol writer.
1 parent 05390ad commit 85f9424

File tree

4 files changed

+114
-9
lines changed

4 files changed

+114
-9
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,11 +1684,12 @@ The writer is still in its early stages and has several limitations:
16841684
05 FIELD_1 PIC X(1).
16851685
05 FIELD_2 PIC X(5).
16861686
```
1687-
- Only `PIC X(n)` fields are supported; numeric types are not.
1687+
- Supported types:
1688+
- `PIC X(n)` alphanumeric.
1689+
- `PIC S9(n)` numeric (integral and decimal) with `COMP`, `COMP-3`, `COMP-4`, `COMP-9` (little-endian).
16881690
- Only fixed record length output is supported (`record_format = F`).
16891691
- `REDEFINES` and `OCCURS` are not supported.
16901692
- Only the core EBCDIC encoder is supported; specific EBCDIC code pages are not yet available.
1691-
- Save mode `append` is not supported; only `overwrite` is.
16921693
- Partitioning by DataFrame fields is not supported.
16931694

16941695
### Implementation details

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/BinaryUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ object BinaryUtils {
9797

9898
def getBytesCount(compression: Option[Usage], precision: Int, isSigned: Boolean, isExplicitDecimalPt: Boolean, isSignSeparate: Boolean): Int = {
9999
import Constants._
100-
val isRealSigned = if (isSignSeparate) false else isSigned
100+
101101
val bytes = compression match {
102102
case Some(comp) if comp == COMP4() || comp == COMP5() || comp == COMP9() => // || comp == binary2()
103103
// if native binary follow IBM guide to digit binary length

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/EncoderSelector.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package za.co.absa.cobrix.cobol.parser.encoding
1818

19-
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, COMP3U, CobolType, Decimal, Integral}
19+
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, COMP3U, COMP4, COMP9, CobolType, Decimal, Integral, Usage}
20+
import za.co.absa.cobrix.cobol.parser.decoders.BinaryUtils
2021
import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon}
2122

2223
import java.nio.charset.{Charset, StandardCharsets}
@@ -29,16 +30,24 @@ object EncoderSelector {
2930
ebcdicCodePage: CodePage = new CodePageCommon,
3031
asciiCharset: Charset = StandardCharsets.US_ASCII): Option[Encoder] = {
3132
dataType match {
32-
case alphaNumeric: AlphaNumeric if alphaNumeric.compact.isEmpty =>
33+
case alphaNumeric: AlphaNumeric if alphaNumeric.compact.isEmpty =>
3334
getStringEncoder(alphaNumeric.enc.getOrElse(EBCDIC), ebcdicCodePage, asciiCharset, alphaNumeric.length)
34-
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3]) =>
35+
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3]) =>
3536
Option(getBdcEncoder(integralComp3.precision, 0, 0, integralComp3.signPosition.isDefined, mandatorySignNibble = true))
36-
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
37+
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
3738
Option(getBdcEncoder(integralComp3.precision, 0, 0, integralComp3.signPosition.isDefined, mandatorySignNibble = false))
38-
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3]) =>
39+
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3]) =>
3940
Option(getBdcEncoder(decimalComp3.precision, decimalComp3.scale, decimalComp3.scaleFactor, decimalComp3.signPosition.isDefined, mandatorySignNibble = true))
40-
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
41+
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
4142
Option(getBdcEncoder(decimalComp3.precision, decimalComp3.scale, decimalComp3.scaleFactor, decimalComp3.signPosition.isDefined, mandatorySignNibble = false))
43+
case integralBinary: Integral if integralBinary.compact.exists(_.isInstanceOf[COMP4]) =>
44+
Option(getBinaryEncoder(integralBinary.compact, integralBinary.precision, 0, 0, integralBinary.signPosition.isDefined, isBigEndian = true))
45+
case integralBinary: Integral if integralBinary.compact.exists(_.isInstanceOf[COMP9]) =>
46+
Option(getBinaryEncoder(integralBinary.compact, integralBinary.precision, 0, 0, integralBinary.signPosition.isDefined, isBigEndian = false))
47+
case decimalBinary: Decimal if decimalBinary.compact.exists(_.isInstanceOf[COMP4]) =>
48+
Option(getBinaryEncoder(decimalBinary.compact, decimalBinary.precision, decimalBinary.scale, decimalBinary.scaleFactor, decimalBinary.signPosition.isDefined, isBigEndian = true))
49+
case decimalBinary: Decimal if decimalBinary.compact.exists(_.isInstanceOf[COMP9]) =>
50+
Option(getBinaryEncoder(decimalBinary.compact, decimalBinary.precision, decimalBinary.scale, decimalBinary.scaleFactor, decimalBinary.signPosition.isDefined, isBigEndian = false))
4251
case _ =>
4352
None
4453
}
@@ -88,6 +97,27 @@ object EncoderSelector {
8897
buf
8998
}
9099

100+
def getBinaryEncoder(compression: Option[Usage],
101+
precision: Int,
102+
scale: Int,
103+
scaleFactor: Int,
104+
isSigned: Boolean,
105+
isBigEndian: Boolean): Encoder = {
106+
val numBytes = BinaryUtils.getBytesCount(compression, precision, isSigned, isExplicitDecimalPt = false, isSignSeparate = false)
107+
(a: Any) => {
108+
val number = a match {
109+
case null => null
110+
case d: java.math.BigDecimal => d
111+
case n: java.math.BigInteger => new java.math.BigDecimal(n)
112+
case n: Byte => new java.math.BigDecimal(n)
113+
case n: Int => new java.math.BigDecimal(n)
114+
case n: Long => new java.math.BigDecimal(n)
115+
case x => new java.math.BigDecimal(x.toString)
116+
}
117+
BinaryEncoders.encodeBinaryNumber(number, isSigned, numBytes, isBigEndian, precision, scale, scaleFactor)
118+
}
119+
}
120+
91121
def getBdcEncoder(precision: Int,
92122
scale: Int,
93123
scaleFactor: Int,

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,80 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
183183
}
184184
}
185185

186+
"write data frames with COMP fields" in {
187+
withTempDirectory("cobol_writer1") { tempDir =>
188+
val df = List(
189+
(1, 100.5, new java.math.BigDecimal(10.23), 1, 10050, new java.math.BigDecimal(10.12)),
190+
(2, 800.4, new java.math.BigDecimal(30), 2, 80040, new java.math.BigDecimal(30)),
191+
(3, 22.33, new java.math.BigDecimal(-20), 3, -2233, new java.math.BigDecimal(-20))
192+
).toDF("A", "B", "C", "D", "E", "F")
193+
194+
val path = new Path(tempDir, "writer1")
195+
196+
val copybookContentsWithBinFields =
197+
""" 01 RECORD.
198+
05 A PIC S9(1) COMP.
199+
05 B PIC 9(4)V9(2) COMP-4.
200+
05 C PIC S9(2)V9(2) BINARY.
201+
05 D PIC 9(1) COMP-9.
202+
05 E PIC S9(6) COMP-9.
203+
05 F PIC 9(2)V9(2) COMP-9.
204+
"""
205+
206+
df.coalesce(1)
207+
.orderBy("A")
208+
.write
209+
.format("cobol")
210+
.mode(SaveMode.Overwrite)
211+
.option("copybook_contents", copybookContentsWithBinFields)
212+
.save(path.toString)
213+
214+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
215+
216+
assert(fs.exists(path), "Output directory should exist")
217+
val files = fs.listStatus(path)
218+
.filter(_.getPath.getName.startsWith("part-"))
219+
220+
assert(files.nonEmpty, "Output directory should contain part files")
221+
222+
val partFile = files.head.getPath
223+
val data = fs.open(partFile)
224+
val bytes = new Array[Byte](files.head.getLen.toInt)
225+
data.readFully(bytes)
226+
data.close()
227+
228+
// Expected EBCDIC data for sample test data
229+
val expected = Array(
230+
0x00, 0x01, // 1 (short, big-endian)
231+
0x00, 0x00, 0x27, 0x42, // 100.5 -> 10050(int, big-endian)
232+
0x03, 0xFF, // 10.23 -> 1023(short, big-endian)
233+
0x01, // 1 (byte)
234+
0x42, 0x27, 0x00, 0x00, // 10050(int, little-endian)
235+
0xF4, 0x03, // 10.12 -> 1012(short, little-endian)
236+
237+
0x00, 0x02, // 2 (short, big-endian)
238+
0x00, 0x01, 0x38, 0xA8, // 800.4 -> 80040(int, big-endian)
239+
0x0B, 0xB8, // 30 -> 3000(short, big-endian)
240+
0x02, // 2 (byte)
241+
0xA8, 0x38, 0x01, 0x00, // 80040(int, little-endian)
242+
0xB8, 0x0B, // 30 -> 3000(short, little-endian)
243+
244+
0x00, 0x03, // 3 (short, big-endian)
245+
0x00, 0x00, 0x08, 0xB9, // 22.33 -> 2233(int, big-endian)
246+
0xF8, 0x30, // -20 -> -2000(short, big-endian)
247+
0x03, // 3 (byte)
248+
0x47, 0xF7, 0xFF, 0xFF, // -2233(int, little-endian)
249+
0x00, 0x00 // null, because -20 cannot fix the unsigned type
250+
).map(_.toByte)
251+
252+
if (!bytes.sameElements(expected)) {
253+
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
254+
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")
255+
256+
assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
257+
}
258+
}
259+
}
186260

187261
"write should successfully append" in {
188262
withTempDirectory("cobol_writer3") { tempDir =>

0 commit comments

Comments
 (0)