Skip to content

Commit 5b83b1a

Browse files
committed
#803 Ensure Hadoop file stream opens the file only if it is actually used.
1 parent fb63d6b commit 5b83b1a

File tree

6 files changed

+103
-57
lines changed

6 files changed

+103
-57
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,17 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
171171
case None => false
172172
}
173173

174+
val recordExtractorOpt = recordExtractor(0L, dataStream, headerStream)
175+
if (recordExtractorOpt.isEmpty) {
176+
headerStream.close()
177+
}
178+
174179
segmentIdField match {
175180
case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber,
176181
dataStream,
177182
readerProperties.fileStartOffset,
178183
recordHeaderParser,
179-
recordExtractor(0L, dataStream, headerStream),
184+
recordExtractorOpt,
180185
inputSplitSizeRecords,
181186
inputSplitSizeMB,
182187
Some(copybook),
@@ -187,7 +192,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
187192
dataStream,
188193
readerProperties.fileStartOffset,
189194
recordHeaderParser,
190-
recordExtractor(0L, dataStream, headerStream),
195+
recordExtractorOpt,
191196
inputSplitSizeRecords,
192197
inputSplitSizeMB,
193198
None,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,19 @@ final class VarLenNestedReader(copybookContents: Seq[String],
5353
headerStream: SimpleStream,
5454
startingFileOffset: Long,
5555
fileNumber: Int,
56-
startingRecordIndex: Long): Iterator[Row] =
56+
startingRecordIndex: Long): Iterator[Row] = {
57+
val recordExtractorOpt = recordExtractor(startingRecordIndex, dataStream, headerStream)
58+
if (recordExtractorOpt.isEmpty) {
59+
headerStream.close()
60+
}
61+
5762
if (cobolSchema.copybook.isHierarchical) {
5863
new RowIterator(
5964
new VarLenHierarchicalIterator(cobolSchema.copybook,
6065
dataStream,
6166
getReaderProperties,
6267
recordHeaderParser,
63-
recordExtractor(startingRecordIndex, dataStream, headerStream),
68+
recordExtractorOpt,
6469
fileNumber,
6570
startingRecordIndex,
6671
startingFileOffset,
@@ -72,12 +77,13 @@ final class VarLenNestedReader(copybookContents: Seq[String],
7277
dataStream,
7378
getReaderProperties,
7479
recordHeaderParser,
75-
recordExtractor(startingRecordIndex, dataStream, headerStream),
80+
recordExtractorOpt,
7681
fileNumber,
7782
startingRecordIndex,
7883
startingFileOffset,
7984
cobolSchema.segmentIdPrefix,
8085
new RowHandler())
8186
)
8287
}
88+
}
8389
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -215,56 +215,64 @@ private[cobol] object IndexBuilder extends Logging {
215215
readerProperties.recordExtractor.foreach { recordExtractorClass =>
216216
val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config)
217217

218-
val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream)
218+
try {
219+
val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream)
219220

220-
var offset = -1L
221-
var record = Array[Byte]()
222-
223-
extractorOpt.foreach { extractor =>
224-
if (extractor.hasNext) {
225-
// Getting the first record, if available
226-
extractor.next()
227-
offset = extractor.offset // Saving offset to jump to later
221+
var offset = -1L
222+
var record = Array[Byte]()
228223

224+
extractorOpt.foreach { extractor =>
229225
if (extractor.hasNext) {
230-
// Getting the second record, if available
231-
record = extractor.next() // Saving the record to check later
232-
233-
dataStream.close()
234-
headerStream.close()
235-
236-
// Getting new streams and record extractor that points directly to the second record
237-
val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
238-
val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)
239-
240-
extractorOpt2.foreach { extractor2 =>
241-
if (!extractor2.hasNext) {
242-
// If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
243-
throw new RuntimeException(
244-
s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
245-
"Please, use 'enable_indexes = false'. " +
246-
s"File: $filePath, offset: $offset"
247-
)
248-
}
249-
250-
// Getting the second record from the extractor pointing to the second record offset at the start.
251-
val expectedRecord = extractor2.next()
252-
253-
if (!expectedRecord.sameElements(record)) {
254-
// Records should match. If they don't, the record extractor is faulty in terms of indexing support..
255-
throw new RuntimeException(
256-
s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
257-
"Please, use 'enable_indexes = false'. " +
258-
s"File: $filePath, offset: $offset"
259-
)
260-
} else {
261-
logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset")
226+
// Getting the first record, if available
227+
extractor.next()
228+
offset = extractor.offset // Saving offset to jump to later
229+
230+
if (extractor.hasNext) {
231+
// Getting the second record, if available
232+
record = extractor.next() // Saving the record to check later
233+
234+
dataStream.close()
235+
headerStream.close()
236+
237+
// Getting new streams and record extractor that points directly to the second record
238+
val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
239+
try {
240+
val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)
241+
242+
extractorOpt2.foreach { extractor2 =>
243+
if (!extractor2.hasNext) {
244+
// If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
245+
throw new RuntimeException(
246+
s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
247+
"Please, use 'enable_indexes = false'. " +
248+
s"File: $filePath, offset: $offset"
249+
)
250+
}
251+
252+
// Getting the second record from the extractor pointing to the second record offset at the start.
253+
val expectedRecord = extractor2.next()
254+
255+
if (!expectedRecord.sameElements(record)) {
256+
// Records should match. If they don't, the record extractor is faulty in terms of indexing support..
257+
throw new RuntimeException(
258+
s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
259+
"Please, use 'enable_indexes = false'. " +
260+
s"File: $filePath, offset: $offset"
261+
)
262+
} else {
263+
logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset")
264+
}
265+
}
266+
} finally {
267+
dataStream2.close()
268+
headerStream2.close()
262269
}
263-
dataStream2.close()
264-
headerStream2.close()
265270
}
266271
}
267272
}
273+
} finally {
274+
dataStream.close()
275+
headerStream.close()
268276
}
269277
}
270278
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package za.co.absa.cobrix.spark.cobol.source.streaming
1818

1919
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
2020

21+
import java.io.IOException
22+
2123
class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) {
2224
val bytesInMegabyte: Int = 1048576
2325

@@ -38,6 +40,7 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff
3840
private var bufferConitainBytes = 0
3941
private var bytesRead = 0
4042

43+
@throws[IOException]
4144
def close(): Unit = {
4245
if (!isStreamClosed) {
4346
in.close()

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,34 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2222
import org.apache.hadoop.fs.ContentSummary
2323
import za.co.absa.cobrix.cobol.reader.common.Constants
2424

25+
import java.io.IOException
26+
2527
/**
26-
* This class provides methods for streaming bytes from an Hadoop file.
28+
* This class provides methods for streaming bytes from a Hadoop file.
2729
*
2830
* It is stateful, which means that it stores the offset until which the file has been consumed.
2931
*
30-
* Instances of this class are not reusable, i.e. once the file is fully read it can neither be reopened nor can other
32+
* Instances of this class are not reusable, i.e., once the file is fully read, it can neither be reopened nor can another
3133
* file be consumed.
3234
*
33-
* @param filePath String contained the fully qualified path to the file.
34-
* @param fileSystem Underlying FileSystem point of access.
35-
* @throws IllegalArgumentException in case the file is not found in the underlying file system.
35+
* @param filePath String containing the fully qualified path to the file.
36+
* @param fileSystem Underlying Hadoop file system.
37+
* @note This class is not thread-safe and should only be accessed from a single thread
3638
*/
3739
class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {
3840

3941
private val logger = Logger.getLogger(FileStreamer.this.getClass)
4042

4143
private var byteIndex = startOffset
4244

43-
// Use a buffer to read the data from Hadoop in big chunks
44-
private var bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
45+
// This ensures that the file is never opened if the stream is never used. This serves two purposes:
46+
// - Safety: ensures that unused streams are closed.
47+
// - Performance: prevents time being spent on opening unused files.
48+
// Note: Since we are working with a network file system, opening a file is a very expensive operation.
49+
private var wasOpened = false
50+
private var bufferedStream: BufferedFSDataInputStream = _
4551

46-
private val fileSize = getHadoopFileSize(new Path(filePath))
52+
private lazy val fileSize = getHadoopFileSize(new Path(filePath))
4753

4854
override def inputFileName: String = filePath
4955

@@ -65,7 +71,9 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
6571
* @param numberOfBytes The number of bytes to read from the stream
6672
* @return An array containing the requested bytes, or fewer bytes if end of stream is reached, or empty array if no more data
6773
*/
74+
@throws[IOException]
6875
override def next(numberOfBytes: Int): Array[Byte] = {
76+
ensureOpened()
6977
val actualBytesToRead = if (maximumBytes > 0) {
7078
Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt
7179
} else {
@@ -104,7 +112,9 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
104112
}
105113
}
106114

115+
@throws[IOException]
107116
override def close(): Unit = {
117+
wasOpened = true
108118
if (bufferedStream != null && !bufferedStream.isClosed) {
109119
bufferedStream.close()
110120
bufferedStream = null
@@ -115,6 +125,14 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
115125
new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
116126
}
117127

128+
@throws[IOException]
129+
private def ensureOpened(): Unit = {
130+
if (!wasOpened) {
131+
bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
132+
wasOpened = true
133+
}
134+
}
135+
118136
private def getHadoopFileSize(hadoopPath: Path): Long = {
119137
val cSummary: ContentSummary = fileSystem.getContentSummary(hadoopPath)
120138
cSummary.getLength

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ class FileStreamerSpec extends AnyFlatSpec with BeforeAndAfter with Matchers {
4444

4545
it should "throw if file does not exist" in {
4646
assertThrows[FileNotFoundException] {
47-
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
47+
val stream = new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
48+
stream.size
4849
}
4950
}
5051

52+
it should "not throw if the stream is never used, even if the file does not exist" in {
53+
noException should be thrownBy {
54+
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
55+
}
56+
}
5157
it should "return array of same length than expected number of bytes if enough data" in {
5258
val batchLength = 8
5359
val iterations = 10

0 commit comments

Comments
 (0)