Skip to content

Commit 9d9c23d

Browse files
committed
#795 Add support for indexes when processing raw records via RDDs.
1 parent 89520b8 commit 9d9c23d

File tree

9 files changed

+119
-40
lines changed

9 files changed

+119
-40
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
3030
abstract class CobolProcessorBase extends CobolProcessor with Serializable
3131

3232
object CobolProcessorBase {
33-
def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream): RawRecordExtractor = {
33+
def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream, headerStreamOpt: Option[SimpleStream]): RawRecordExtractor = {
3434
val dataStream = inputStream.copyStream()
35-
val headerStream = inputStream.copyStream()
35+
val headerStream = headerStreamOpt match {
36+
case Some(stream) => stream
37+
case None => inputStream.copyStream()
38+
}
3639

3740
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
3841

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters,
4444
override def process(inputStream: SimpleStream,
4545
outputStream: OutputStream)
4646
(rawRecordProcessor: RawRecordProcessor): Long = {
47-
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream)
47+
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)
4848

4949
val dataStream = inputStream.copyStream()
5050
try {

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class CobolProcessorToRdw(readerParameters: ReaderParameters,
4242
override def process(inputStream: SimpleStream,
4343
outputStream: OutputStream)
4444
(rawRecordProcessor: RawRecordProcessor): Long = {
45-
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream)
45+
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)
4646

4747
StreamProcessor.processStreamToRdw(copybook,
4848
options,

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
3333
"work for an fixed-record-length files" in {
3434
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
3535

36-
val ext = CobolProcessorBase.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), copybook, stream)
36+
val ext = CobolProcessorBase.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), copybook, stream, None)
3737

3838
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
3939

@@ -49,7 +49,7 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
4949
val ext = CobolProcessorBase.getRecordExtractor(ReaderParameters(
5050
recordFormat = RecordFormat.VariableLength,
5151
isText = true
52-
), copybook, stream)
52+
), copybook, stream, None)
5353

5454
assert(ext.isInstanceOf[TextFullRecordExtractor])
5555
}
@@ -61,7 +61,7 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
6161
CobolProcessorBase.getRecordExtractor(ReaderParameters(
6262
recordFormat = RecordFormat.VariableLength,
6363
isRecordSequence = true
64-
), copybook, stream)
64+
), copybook, stream, None)
6565
}
6666

6767
assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters."))

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ import org.apache.spark.sql.SparkSession
2222
import org.slf4j.LoggerFactory
2323
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorBase
2424
import za.co.absa.cobrix.cobol.processor.{CobolProcessingStrategy, CobolProcessor, SerializableRawRecordProcessor}
25-
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters}
26-
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
25+
import za.co.absa.cobrix.cobol.reader.common.Constants
26+
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
27+
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_ID
28+
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
29+
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader
30+
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
31+
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
2732
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
33+
import za.co.absa.cobrix.spark.cobol.source.{CobolRelation, DefaultSource, SerializableConfiguration}
2834
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
2935

3036
import java.io.BufferedOutputStream
@@ -45,6 +51,7 @@ trait SparkCobolProcessor {
4551
}
4652

4753
object SparkCobolProcessor {
54+
@transient
4855
private val log = LoggerFactory.getLogger(this.getClass)
4956

5057
class SparkCobolProcessorBuilder(implicit spark: SparkSession) {
@@ -197,15 +204,45 @@ object SparkCobolProcessor {
197204
options: Map[String, String],
198205
sconf: SerializableConfiguration)(implicit spark: SparkSession): RDD[Array[Byte]] = {
199206

200-
val cobolParameters = CobolParametersParser.parse(new Parameters(options))
201-
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
207+
val varLenOptions = options + (PARAM_GENERATE_RECORD_ID -> "true")
202208

203-
spark.sparkContext.parallelize(listOfFiles).flatMap { inputFile =>
204-
val hadoopConfig = sconf.value
205-
val inputFs = new Path(inputFile).getFileSystem(hadoopConfig)
206-
val ifs = new FileStreamer(inputFile, inputFs)
209+
val cobolParameters: CobolParameters = CobolParametersParser.parse(new Parameters(varLenOptions))
210+
.copy(sourcePaths = listOfFiles, copybookContent = Option(copybookContents))
207211

208-
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, ifs)
212+
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
213+
val cobolReader = DefaultSource.createVariableLengthReader(cobolParameters, spark)
214+
val allowIndexes = readerParameters.isIndexGenerationNeeded
215+
216+
cobolReader match {
217+
case reader: VarLenReader if reader.isIndexGenerationNeeded && allowIndexes =>
218+
val orderedFiles = CobolRelation.getListFilesWithOrder(listOfFiles, spark.sqlContext, isRecursiveRetrieval = false)
219+
val filesMap = orderedFiles.map(fileWithOrder => (fileWithOrder.order, fileWithOrder.filePath)).toMap
220+
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext)(LocalityParameters(improveLocality = false, optimizeAllocation = false))
221+
222+
indexes.flatMap(indexEntry => {
223+
val filePathName = filesMap(indexEntry.fileId)
224+
val path = new Path(filePathName)
225+
val fileSystem = path.getFileSystem(sconf.value)
226+
val fileName = path.getName
227+
val numOfBytes = if (indexEntry.offsetTo > 0L) indexEntry.offsetTo - indexEntry.offsetFrom else 0L
228+
val numOfBytesMsg = if (numOfBytes > 0) s"${numOfBytes / Constants.megabyte} MB" else "until the end"
229+
230+
log.info(s"Going to process offsets ${indexEntry.offsetFrom}...${indexEntry.offsetTo} ($numOfBytesMsg) of $fileName")
231+
val dataStream = new FileStreamer(filePathName, fileSystem, indexEntry.offsetFrom, numOfBytes)
232+
val headerStream = new FileStreamer(filePathName, fileSystem)
233+
234+
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, dataStream, Some(headerStream))
235+
})
236+
237+
case _ =>
238+
spark.sparkContext.parallelize(listOfFiles).flatMap { inputFile =>
239+
val hadoopConfig = sconf.value
240+
log.info(s"Going to process data from $inputFile")
241+
val inputFs = new Path(inputFile).getFileSystem(hadoopConfig)
242+
val ifs = new FileStreamer(inputFile, inputFs)
243+
244+
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, ifs, None)
245+
}
209246
}
210247
}
211248

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class CobolRelation(sourceDirs: Seq[String],
7171
with Serializable
7272
with TableScan {
7373

74-
private val filesList = getListFilesWithOrder(sourceDirs)
74+
private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)
7575

7676
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams)
7777

@@ -94,23 +94,6 @@ class CobolRelation(sourceDirs: Seq[String],
9494
}
9595
}
9696

97-
/**
98-
* Retrieves a list containing the files contained in the directory to be processed attached to numbers which serve
99-
* as their order.
100-
*
101-
* The List contains [[za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder]] instances.
102-
*/
103-
private def getListFilesWithOrder(sourceDirs: Seq[String]): Array[FileWithOrder] = {
104-
val allFiles = sourceDirs.flatMap(sourceDir => {
105-
FileUtils
106-
.getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval)
107-
}).toArray
108-
109-
allFiles
110-
.zipWithIndex
111-
.map(file => FileWithOrder(file._1, file._2))
112-
}
113-
11497
/**
11598
* Checks if the recursive file retrieval flag is set
11699
*/
@@ -127,4 +110,23 @@ class CobolRelation(sourceDirs: Seq[String],
127110
}
128111
})
129112
}
113+
}
114+
115+
object CobolRelation {
116+
/**
117+
* Retrieves a list containing the files contained in the directory to be processed attached to numbers which serve
118+
* as their order.
119+
*
120+
* The List contains [[za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder]] instances.
121+
*/
122+
def getListFilesWithOrder(sourceDirs: Seq[String], sqlContext: SQLContext, isRecursiveRetrieval: Boolean): Array[FileWithOrder] = {
123+
val allFiles = sourceDirs.flatMap(sourceDir => {
124+
FileUtils
125+
.getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval)
126+
}).toArray
127+
128+
allFiles
129+
.zipWithIndex
130+
.map(file => FileWithOrder(file._1, file._2))
131+
}
130132
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.io.{BytesWritable, NullWritable}
2121
import org.apache.spark.sql.sources._
2222
import org.apache.spark.sql.types.StructType
2323
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
24+
import org.slf4j.{Logger, LoggerFactory}
2425
import za.co.absa.cobrix.cobol.internal.Logging
2526
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
2627
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
@@ -41,6 +42,7 @@ class DefaultSource
4142
with DataSourceRegister
4243
with ReaderFactory
4344
with Logging {
45+
import DefaultSource._
4446

4547
override def shortName(): String = SHORT_NAME
4648

@@ -124,13 +126,17 @@ class DefaultSource
124126

125127
//TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
126128
override def buildReader(spark: SparkSession, parameters: Map[String, String]): FixedLenReader = null
129+
}
130+
131+
object DefaultSource {
132+
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
127133

128134
/**
129135
* Builds one of two Readers, depending on the parameters.
130136
*
131137
* This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place.
132138
*/
133-
private def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = {
139+
def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = {
134140
val reader = if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) {
135141
createTextReader(cobolParameters, spark)
136142
} else if (cobolParameters.variableLengthParams.isEmpty) {
@@ -148,7 +154,7 @@ class DefaultSource
148154
/**
149155
* Creates a Reader that knows how to consume text Cobol records.
150156
*/
151-
private def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
157+
def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
152158
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
153159
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
154160
new FixedLenTextReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
@@ -158,7 +164,7 @@ class DefaultSource
158164
/**
159165
* Creates a Reader that knows how to consume fixed-length Cobol records.
160166
*/
161-
private def createFixedLengthReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
167+
def createFixedLengthReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
162168

163169
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
164170
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
@@ -171,7 +177,7 @@ class DefaultSource
171177
*
172178
* The variable-length reading process is approached as if reading from a stream.
173179
*/
174-
private def createVariableLengthReader(parameters: CobolParameters, spark: SparkSession): VarLenReader = {
180+
def createVariableLengthReader(parameters: CobolParameters, spark: SparkSession): VarLenReader = {
175181

176182

177183
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import scala.collection.mutable.ArrayBuffer
4444
*
4545
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
4646
*/
47-
private[source] object IndexBuilder extends Logging {
47+
private[cobol] object IndexBuilder extends Logging {
4848
def buildIndex(filesList: Array[FileWithOrder],
4949
cobolReader: Reader,
5050
sqlContext: SQLContext)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
149149
}
150150
}
151151

152-
"convert input format into an RDD" in {
152+
"convert input format into an RDD without indexes" in {
153153
val expected = """-13, -14, -15"""
154154
withTempDirectory("spark_cobol_processor") { tempDir =>
155155
val binData = Array(0xF1, 0xF2, 0xF3, 0xF1).map(_.toByte)
@@ -161,6 +161,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
161161

162162
val rdd = SparkCobolProcessor.builder
163163
.withCopybookContents(copybook)
164+
.option("enable_indexes", "false")
164165
.toRDD(inputPath)
165166

166167
val count = rdd.count()
@@ -176,4 +177,34 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
176177
assert(actual == expected)
177178
}
178179
}
180+
181+
"convert input format into an RDD with index" in {
182+
val expected = """-10, -11, -12, -13, -14, -15"""
183+
withTempDirectory("spark_cobol_processor") { tempDir =>
184+
val binData = Array(0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF1).map(_.toByte)
185+
186+
val inputPath = new Path(tempDir, "input.dat").toString
187+
val outputPath = new Path(tempDir, "output").toString
188+
189+
writeBinaryFile(inputPath, binData)
190+
191+
val rdd = SparkCobolProcessor.builder
192+
.withCopybookContents(copybook)
193+
.option("enable_indexes", "true")
194+
.option("input_split_records", "2")
195+
.toRDD(inputPath)
196+
197+
val count = rdd.count()
198+
199+
assert(count == 7)
200+
201+
val actual = rdd
202+
.map(row => row.mkString)
203+
.distinct
204+
.sortBy(x => x)
205+
.collect().mkString(", ")
206+
207+
assert(actual == expected)
208+
}
209+
}
179210
}

0 commit comments

Comments
 (0)