Skip to content

Commit 3738dd2

Browse files
committed
#803 Ensure Hadoop file stream opens the file only if it is actually used.
1 parent 1954ab8 commit 3738dd2

File tree

2 files changed

+28
-9
lines changed

2 files changed

+28
-9
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,31 @@ import org.apache.hadoop.fs.ContentSummary
2323
import za.co.absa.cobrix.cobol.reader.common.Constants
2424

2525
/**
26-
* This class provides methods for streaming bytes from an Hadoop file.
26+
* This class provides methods for streaming bytes from a Hadoop file.
2727
*
2828
* It is stateful, which means that it stores the offset until which the file has been consumed.
2929
*
30-
* Instances of this class are not reusable, i.e. once the file is fully read it can neither be reopened nor can other
30+
* Instances of this class are not reusable, i.e., once the file is fully read, it can neither be reopened nor can another
3131
* file be consumed.
3232
*
33-
* @param filePath String contained the fully qualified path to the file.
34-
* @param fileSystem Underlying FileSystem point of access.
35-
* @throws IllegalArgumentException in case the file is not found in the underlying file system.
33+
* @param filePath String containing the fully qualified path to the file.
34+
* @param fileSystem Underlying Hadoop file system.
35+
* @throws IllegalArgumentException if the file is not found in the underlying file system.
3636
*/
3737
class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {
3838

3939
private val logger = Logger.getLogger(FileStreamer.this.getClass)
4040

4141
private var byteIndex = startOffset
4242

43-
// Use a buffer to read the data from Hadoop in big chunks
44-
private var bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
43+
// This ensures that the file is never opened if the stream is never used. This serves two purposes:
44+
// - Safety: ensures that unused streams are closed.
45+
// - Performance: prevents time being spent on opening unused files.
46+
// Note: Since we are working with a network file system, opening a file is a very expensive operation.
47+
private var wasOpened = false
48+
private var bufferedStream: BufferedFSDataInputStream = _
4549

46-
private val fileSize = getHadoopFileSize(new Path(filePath))
50+
private lazy val fileSize = getHadoopFileSize(new Path(filePath))
4751

4852
override def inputFileName: String = filePath
4953

@@ -66,6 +70,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
6670
* @return An array containing the requested bytes, or fewer bytes if end of stream is reached, or empty array if no more data
6771
*/
6872
override def next(numberOfBytes: Int): Array[Byte] = {
73+
ensureOpened()
6974
val actualBytesToRead = if (maximumBytes > 0) {
7075
Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt
7176
} else {
@@ -106,6 +111,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
106111

107112
override def close(): Unit = {
108113
if (bufferedStream != null && !bufferedStream.isClosed) {
114+
wasOpened = true
109115
bufferedStream.close()
110116
bufferedStream = null
111117
}
@@ -114,6 +120,13 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
114120
override def copyStream(): SimpleStream = {
115121
new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
116122
}
123+
124+
private def ensureOpened(): Unit = {
125+
if (!wasOpened) {
126+
bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
127+
wasOpened = true
128+
}
129+
}
117130

118131
private def getHadoopFileSize(hadoopPath: Path): Long = {
119132
val cSummary: ContentSummary = fileSystem.getContentSummary(hadoopPath)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ class FileStreamerSpec extends AnyFlatSpec with BeforeAndAfter with Matchers {
4444

4545
it should "throw if file does not exist" in {
4646
assertThrows[FileNotFoundException] {
47-
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
47+
val stream = new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
48+
stream.size
4849
}
4950
}
5051

52+
it should "not throw if the stream is never used, even if the file does not exist" in {
53+
noException should be thrownBy {
54+
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
55+
}
56+
}
5157
it should "return array of same length than expected number of bytes if enough data" in {
5258
val batchLength = 8
5359
val iterations = 10

0 commit comments

Comments
 (0)