@@ -41,9 +41,9 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
4141 private var byteIndex = startOffset
4242
4343 // Use a buffer to read the data from Hadoop in big chunks
44- private var bufferedStream = new BufferedFSDataInputStream (getHadoopPath (filePath), fileSystem, startOffset, Constants .defaultStreamBufferInMB, maximumBytes)
44+ private var bufferedStream = new BufferedFSDataInputStream (new Path (filePath), fileSystem, startOffset, Constants .defaultStreamBufferInMB, maximumBytes)
4545
46- private val fileSize = getHadoopFileSize(getHadoopPath (filePath))
46+ private val fileSize = getHadoopFileSize(new Path (filePath))
4747
4848 override def inputFileName : String = filePath
4949
@@ -54,16 +54,16 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
5454 override def offset : Long = byteIndex
5555
5656 /**
57- * Retrieves a given number of bytes.
57+ * Retrieves a given number of bytes from the file stream .
5858 *
5959 * One of three situations is possible:
6060 *
61- * 1. There's enough data to be read, thus, the resulting array's length will be exactly '' numberOfBytes'' .
62- * 2. There's not enough data but at least some, thus, the resulting array's length will be the number of available bytes .
63- * 3. The end of the file was already reached, in which case the resulting array will be empty.
61+ * 1. There's enough data to be read, thus, the resulting array's length will be exactly ` numberOfBytes` .
62+ * 2. There's not enough data but at least some bytes are available , thus, the resulting array's length will be less than requested .
63+ * 3. The end of the file was already reached or the stream is closed , in which case the resulting array will be empty.
6464 *
65- * @param numberOfBytes
66- * @return
65+ * @param numberOfBytes The number of bytes to read from the stream
66+ * @return An array containing the requested bytes, or fewer bytes if end of stream is reached, or empty array if no more data
6767 */
6868 override def next (numberOfBytes : Int ): Array [Byte ] = {
6969 val actualBytesToRead = if (maximumBytes > 0 ) {
@@ -115,19 +115,6 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
115115 new FileStreamer (filePath, fileSystem, startOffset, maximumBytes)
116116 }
117117
118- /**
119- * Gets a Hadoop [[Path ]] (HDFS, S3, DBFS, etc) to the file.
120- *
121- * Throws IllegalArgumentException in case the file does not exist.
122- */
123- private def getHadoopPath (path : String ) = {
124- val hadoopPath = new Path (path)
125- if (! fileSystem.exists(hadoopPath)) {
126- throw new IllegalArgumentException (s " File does not exist: $path" )
127- }
128- hadoopPath
129- }
130-
131118 private def getHadoopFileSize (hadoopPath : Path ): Long = {
132119 val cSummary : ContentSummary = fileSystem.getContentSummary(hadoopPath)
133120 cSummary.getLength
0 commit comments