From cfb0e21179ba2c0bde5a5b7823e9c4c998533f14 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 4 Mar 2025 13:06:48 +0100 Subject: [PATCH] #748 Implement the ability to specify copybooks directly in JAR. --- README.md | 40 ++++++++++--------- pom.xml | 4 +- .../copybook/CopybookContentLoader.scala | 38 +++++++++--------- .../parameters/CobolParametersValidator.scala | 14 +++++-- .../spark/cobol/utils/FileNameUtils.scala | 16 +++++--- .../cobrix/spark/cobol/utils/FsType.scala | 25 ++++++++++++ .../src/test/resources/test/copybook.cpy | 21 ++++++++++ .../regression/Test11NoCopybookErrMsg.scala | 40 ++++++++++++++++++- 8 files changed, 149 insertions(+), 49 deletions(-) create mode 100644 spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FsType.scala create mode 100644 spark-cobol/src/test/resources/test/copybook.cpy diff --git a/README.md b/README.md index 647904d80..147bc1c34 100644 --- a/README.md +++ b/README.md @@ -15,28 +15,28 @@ Add mainframe as a source to your data engineering strategy. Among the motivations for this project, it is possible to highlight: -- Lack of expertise in the Cobol ecosystem, which makes it hard to integrate mainframes into data engineering strategies +- Lack of expertise in the Cobol ecosystem, which makes it hard to integrate mainframes into data engineering strategies. -- Lack of support from the open-source community to initiatives in this field +- Lack of support from the open-source community to initiatives in this field. -- The overwhelming majority (if not all) of tools to cope with this domain are proprietary +- The overwhelming majority (if not all) of tools to cope with this domain are proprietary. -- Several institutions struggle daily to maintain their legacy mainframes, which prevents them from evolving to more modern approaches to data management +- Several institutions struggle daily to maintain their legacy mainframes, which prevents them from evolving to more modern approaches to data management. -- Mainframe data can only take part in data science activities through very expensive investments +- Mainframe data can only take part in data science activities through very expensive investments. ## Features -- Supports primitive types (although some are "Cobol compiler specific") +- Supports primitive types (although some are "Cobol compiler specific"). -- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays) +- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays). -- Supports nested structures and arrays +- Supports nested structures and arrays. -- Supports HDFS as well as local file systems +- Supports Hadoop (HDFS, S3, ...) as well as local file system. -- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines +- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines. ## Videos @@ -135,18 +135,20 @@ Code coverage will be generated on path: {project-root}/cobrix/{module}/target/scala-{scala_version}/jacoco/report/html ``` -### Reading Cobol binary files from HDFS/local and querying them +### Reading Cobol binary files from Hadoop/local and querying them 1. Create a Spark ```SQLContext``` 2. Start a ```sqlContext.read``` operation specifying ```za.co.absa.cobrix.spark.cobol.source``` as the format -3. Inform the path to the copybook describing the files through ```... .option("copybook", "path_to_copybook_file")```. By default the copybook - is expected to be in HDFS. You can specify that a copybook is located in the local file system by adding `file://` prefix. For example, you - can specify a local file like this `.option("copybook", "file:///home/user/data/compybook.cpy")`. Alternatively, instead of providing a path - to a copybook file you can provide the contents of the copybook itself by using `.option("copybook_contents", "...copybook contents...")`. +3. Inform the path to the copybook describing the files through ```... .option("copybook", "path_to_copybook_file")```. + - By default the copybook is expected to be in the default Hadoop filesystem (HDFS, S3, etc). + - You can specify that a copybook is located in the local file system by adding `file://` prefix. + - For example, you can specify a local file like this `.option("copybook", "file:///home/user/data/copybook.cpy")`. + - Alternatively, instead of providing a path to a copybook file you can provide the contents of the copybook itself by using `.option("copybook_contents", "...copybook contents...")`. + - You can store the copybook in the JAR itself at resources section in this case use `jar://` prefix, e.g.: `.option("copybook", "jar:///copybooks/copybook.cpy")`. -4. Inform the path to the HDFS directory containing the files: ```... .load("path_to_directory_containing_the_binary_files")``` +4. Inform the path to the Hadoop directory containing the files: ```... .load("s3a://path_to_directory_containing_the_binary_files")``` 5. Inform the query you would like to run on the Cobol Dataframe @@ -208,7 +210,7 @@ val spark = SparkSession .master("local[2]") .config("duration", 2) .config("copybook", "path_to_the_copybook") - .config("path", "path_to_source_directory") // could be both, local or HDFS + .config("path", "path_to_source_directory") // could be both, local or Hadoop (s3://, hdfs://, etc) .getOrCreate() val streamingContext = new StreamingContext(spark.sparkContext, Seconds(3)) @@ -607,7 +609,7 @@ records parsing. However effective, this strategy may also suffer from excessive shuffling, since indexes may be sent to executors far from the actual data. -The latter issue is overcome by extracting the preferred locations for each index directly from HDFS, and then passing those locations to +The latter issue is overcome by extracting the preferred locations for each index directly from HDFS/S3/..., and then passing those locations to Spark during the creation of the RDD that distributes the indexes. When processing large collections, the overhead of collecting the locations is offset by the benefits of locality, thus, this feature is @@ -618,6 +620,8 @@ enabled by default, but can be disabled by the configuration below: ### Workload optimization for variable-length records parsing +This feature works only for HDFS, not for any other of Hadoop filesystems. + When dealing with variable-length records, Cobrix strives to maximize locality by identifying the preferred locations in the cluster to parse each record, i.e. the nodes where the record resides. diff --git a/pom.xml b/pom.xml index 037e0532c..fcb08b663 100644 --- a/pom.xml +++ b/pom.xml @@ -108,9 +108,9 @@ 1.6 - 2.12.17 + 2.12.20 2.12 - 3.2.3 + 3.4.4 3.2.14 2.4.16 15.0 diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/copybook/CopybookContentLoader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/copybook/CopybookContentLoader.scala index 072091938..3453413bf 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/copybook/CopybookContentLoader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/copybook/CopybookContentLoader.scala @@ -18,7 +18,8 @@ package za.co.absa.cobrix.spark.cobol.source.copybook import org.apache.hadoop.conf.Configuration import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters -import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, HDFSUtils} +import za.co.absa.cobrix.spark.cobol.utils.FsType.LocalFs +import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType, HDFSUtils, ResourceUtils} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} @@ -33,28 +34,27 @@ object CopybookContentLoader { (copyBookContents, copyBookPathFileName) match { case (Some(contents), _) => Seq(contents) case (None, Some(_)) => - val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(copyBookPathFileName.get) - Seq( - if (isLocalFS) { - loadCopybookFromLocalFS(copyBookFileName) - } else { - HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName) - } - ) - case (None, None) => parameters.multiCopybookPath.map( - fileName => { - val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName) - if (isLocalFS) { - loadCopybookFromLocalFS(copyBookFileName) - } else { - HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName) - } - } - ) + val copybookContent = loadCopybook(copyBookPathFileName.get, hadoopConf) + Seq(copybookContent) + case (None, None) => + parameters.multiCopybookPath.map(fileName => loadCopybook(fileName, hadoopConf)) + } + } + + private def loadCopybook(pathToCopybook: String, hadoopConf: Configuration): String = { + val (fsType, copyBookFileName) = FileNameUtils.getCopyBookFileName(pathToCopybook) + fsType match { + case FsType.LocalFs => loadCopybookFromLocalFS(copyBookFileName) + case FsType.JarFs => loadCopybookFromJarResources(copyBookFileName) + case FsType.HadoopFs => HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName) } } private def loadCopybookFromLocalFS(copyBookLocalPath: String): String = { Files.readAllLines(Paths.get(copyBookLocalPath), StandardCharsets.ISO_8859_1).toArray.mkString("\n") } + + private def loadCopybookFromJarResources(copyBookJarPath: String): String = { + ResourceUtils.readResourceAsString(copyBookJarPath) + } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala index aa121e3a9..8941b9230 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala @@ -18,13 +18,13 @@ package za.co.absa.cobrix.spark.cobol.source.parameters import java.io.FileNotFoundException import java.nio.file.{Files, Paths} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._ -import za.co.absa.cobrix.spark.cobol.utils.FileNameUtils +import za.co.absa.cobrix.spark.cobol.utils.ResourceUtils.getClass +import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType} /** * This class provides methods for checking the Spark job options after parsed. @@ -66,8 +66,8 @@ object CobolParametersValidator { } def validatePath(fileName: String): Unit = { - val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName) - if (isLocalFS) { + val (fsType, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName) + if (fsType == FsType.LocalFs) { if (!Files.exists(Paths.get(copyBookFileName))) { throw new FileNotFoundException(s"Copybook not found at $copyBookFileName") } @@ -77,6 +77,12 @@ object CobolParametersValidator { if (!Files.isReadable(Paths.get(copyBookFileName))) { throw new IllegalArgumentException(s"The copybook path '$copyBookFileName' is not readable.") } + } else if (fsType == FsType.JarFs) { + if (getClass.getResourceAsStream(copyBookFileName) == null) { + if (!Files.exists(Paths.get(copyBookFileName))) { + throw new FileNotFoundException(s"Copybook not found at the JAR resource path: $copyBookFileName") + } + } } else { val fs = new Path(fileName).getFileSystem(hadoopConf) if (!fs.exists(new Path(copyBookFileName))) { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileNameUtils.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileNameUtils.scala index 268208b73..15083c58a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileNameUtils.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileNameUtils.scala @@ -18,14 +18,20 @@ package za.co.absa.cobrix.spark.cobol.utils object FileNameUtils { private val LOCALFS_PREFIX = "file://" + private val JAR_PREFIX = "jar://" - def getCopyBookFileName(fileNameURI: String):(Boolean, String) = { + def getCopyBookFileName(fileNameURI: String): (FsType, String) = { val isLocalFS = fileNameURI.toLowerCase.startsWith(LOCALFS_PREFIX) - val copyBookFileName = if (isLocalFS) - fileNameURI.drop(LOCALFS_PREFIX.length) + val isJar = fileNameURI.toLowerCase.startsWith(JAR_PREFIX) + if (isLocalFS) + (FsType.LocalFs, fileNameURI.drop(LOCALFS_PREFIX.length)) + else if (isJar) { + val fileCandidate = fileNameURI.drop(JAR_PREFIX.length) + val filePath = if (fileCandidate.startsWith("/")) fileCandidate else s"/$fileCandidate" + (FsType.JarFs, filePath) + } else - fileNameURI - (isLocalFS, copyBookFileName) + (FsType.HadoopFs, fileNameURI) } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FsType.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FsType.scala new file mode 100644 index 000000000..c7ca621ea --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FsType.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.utils + +sealed trait FsType + +object FsType { + case object LocalFs extends FsType + case object JarFs extends FsType + case object HadoopFs extends FsType +} diff --git a/spark-cobol/src/test/resources/test/copybook.cpy b/spark-cobol/src/test/resources/test/copybook.cpy new file mode 100644 index 000000000..73608dbbc --- /dev/null +++ b/spark-cobol/src/test/resources/test/copybook.cpy @@ -0,0 +1,21 @@ + **************************************************************************** + * * + * Copyright 2018 ABSA Group Limited * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + * * + **************************************************************************** + + 01 R. + 03 A PIC X(1). + 03 B PIC X(2). diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test11NoCopybookErrMsg.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test11NoCopybookErrMsg.scala index 4cc653a0f..add67ff36 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test11NoCopybookErrMsg.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test11NoCopybookErrMsg.scala @@ -17,12 +17,13 @@ package za.co.absa.cobrix.spark.cobol.source.regression import java.nio.file.Paths - import org.scalatest.funsuite.AnyFunSuite import org.slf4j.{Logger, LoggerFactory} import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture +import java.io.FileNotFoundException + class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryFileFixture { private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -50,6 +51,32 @@ class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryF } } + test("Test a file loads normally when the copybook is a JAR resource") { + withTempBinFile("bin_file2", ".dat", binFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook", "jar:///test/copybook.cpy") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + assert(df.count == 1) + } + } + + test("Test a file loads normally when the copybook is a JAR resource short") { + withTempBinFile("bin_file2", ".dat", binFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook", "jar://test/copybook.cpy") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + assert(df.count == 1) + } + } + test("Test the error message logged when no copybook is provided") { val ex = intercept[IllegalStateException] { spark @@ -132,4 +159,15 @@ class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryF } } + test("Test the error message of the copybook is not in the JAr resource") { + val ex = intercept[FileNotFoundException] { + spark + .read + .format("cobol") + .option("copybook", "jar://test/copybook_non_existent.cpy") + .load("/tmp/doesnt/matter") + } + + assert(ex.getMessage == "Copybook not found at the JAR resource path: /test/copybook_non_existent.cpy") + } }