Skip to content

Commit d154403

Browse files
Allow S3 checkpoint folder (#225)
* file system change * adding generic file system * pr fixes * undo whitespaces
1 parent 16a23c7 commit d154403

File tree

4 files changed

+30
-31
lines changed

4 files changed

+30
-31
lines changed

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.reader.kafka
1717

18+
import java.net.URI
19+
1820
import org.apache.commons.configuration2.Configuration
21+
import org.apache.hadoop.fs.FileSystem
1922
import org.apache.commons.lang3.StringUtils
2023
import org.apache.logging.log4j.LogManager
2124
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -62,6 +65,7 @@ private[reader] class KafkaStreamReader(
6265
* thus, if not properly configured, the issue will ONLY BE FOUND AT RUNTIME.
6366
*/
6467
override def read(spark: SparkSession): DataFrame = {
68+
implicit val fs: FileSystem = FileSystem.get(new URI(checkpointLocation), spark.sparkContext.hadoopConfiguration)
6569

6670
if (spark.sparkContext.isStopped) {
6771
throw new IllegalStateException("SparkSession is stopped.")
@@ -75,14 +79,14 @@ private[reader] class KafkaStreamReader(
7579
.option(TOPIC_SUBSCRIPTION_KEY, topic)
7680
.option(SPARK_BROKERS_SETTING_KEY, brokers)
7781

78-
val streamReaderWithStartingOffsets = configureStartingOffsets(streamReader, spark.sparkContext.hadoopConfiguration)
82+
val streamReaderWithStartingOffsets = configureStartingOffsets(streamReader)
7983
streamReaderWithStartingOffsets
8084
.options(extraConfs)
8185
.load()
8286
}
8387

84-
private def configureStartingOffsets(streamReader: DataStreamReader, configuration: org.apache.hadoop.conf.Configuration): DataStreamReader = {
85-
val startingOffsets = getStartingOffsets(checkpointLocation, configuration)
88+
private def configureStartingOffsets(streamReader: DataStreamReader)(implicit fileSystem: FileSystem): DataStreamReader = {
89+
val startingOffsets = getStartingOffsets(checkpointLocation)
8690

8791
startingOffsets match {
8892
case Some(startOffset) =>
@@ -94,9 +98,9 @@ private[reader] class KafkaStreamReader(
9498
}
9599
}
96100

97-
private def getStartingOffsets(checkpointLocation: String, configuration: org.apache.hadoop.conf.Configuration): Option[String] = {
98-
if (FileUtils.exists(checkpointLocation, configuration) && !FileUtils.isEmpty(checkpointLocation, configuration)) {
99-
Option.empty
101+
private def getStartingOffsets(checkpointLocation: String)(implicit fileSystem: FileSystem): Option[String] = {
102+
if (FileUtils.exists(checkpointLocation) && !FileUtils.isEmpty(checkpointLocation)) {
103+
Option.empty
100104
}
101105
else {
102106
Option(STARTING_OFFSETS_EARLIEST)

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.utils
1717

18+
import java.net.URI
19+
20+
import org.apache.hadoop.fs.FileSystem
1821
import org.apache.spark.sql.SparkSession
1922
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
2023
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
@@ -26,8 +29,8 @@ import scala.util.{Failure, Success, Try}
2629

2730
object MetadataLogUtil {
2831
def getParquetFilesNotListedInMetadataLog(spark: SparkSession, rootPath: String): Try[Set[String]] = {
29-
val config = spark.sparkContext.hadoopConfiguration
30-
if(FileUtils.notExists(rootPath, config) || FileUtils.isEmpty(rootPath, config)) {
32+
implicit val fs: FileSystem = FileSystem.get(new URI(rootPath), spark.sparkContext.hadoopConfiguration)
33+
if(FileUtils.notExists(rootPath) || FileUtils.isEmpty(rootPath)) {
3134
Success(Set.empty)
3235
} else {
3336
for {

shared/src/main/scala/za/co/absa/hyperdrive/shared/utils/FileUtils.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,24 @@
1515

1616
package za.co.absa.hyperdrive.shared.utils
1717

18-
import org.apache.hadoop.conf.Configuration
1918
import org.apache.hadoop.fs.{FileSystem, Path}
2019

2120
private[hyperdrive] object FileUtils {
2221

23-
def exists(file: String, configuration: Configuration): Boolean = {
24-
val fileSystem = getFileSystem(configuration)
25-
fileSystem.exists(new Path(file))
22+
def exists(file: String)(implicit fs: FileSystem): Boolean = {
23+
fs.exists(new Path(file))
2624
}
2725

28-
def notExists(file: String, configuration: Configuration): Boolean = !exists(file, configuration)
26+
def notExists(file: String)(implicit fs: FileSystem): Boolean = !exists(file)
2927

30-
def isDirectory(file: String, configuration: Configuration): Boolean = {
31-
val fileSystem = getFileSystem(configuration)
32-
fileSystem.isDirectory(new Path(file))
28+
def isDirectory(file: String)(implicit fs: FileSystem): Boolean = {
29+
fs.isDirectory(new Path(file))
3330
}
3431

35-
def isNotDirectory(file: String, configuration: Configuration): Boolean = !isDirectory(file, configuration)
32+
def isNotDirectory(file: String)(implicit fs: FileSystem): Boolean = !isDirectory(file)
3633

37-
def isEmpty(directory: String, configuration: Configuration): Boolean = {
38-
val fs = getFileSystem(configuration)
34+
def isEmpty(directory: String)(implicit fs: FileSystem): Boolean = {
3935
val path = new Path(directory)
4036
fs.exists(path) && !fs.listFiles(path, true).hasNext
4137
}
42-
43-
private def getFileSystem(configuration: Configuration): FileSystem = {
44-
FileSystem.get(configuration)
45-
}
4638
}

shared/src/test/scala/za/co/absa/hyperdrive/shared/utils/TestFileUtils.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
package za.co.absa.hyperdrive.shared.utils
1717

1818
import java.util.UUID
19-
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
19+
import org.apache.hadoop.fs.{FileSystem, Path}
2020
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
2121
import za.co.absa.commons.io.TempDirectory
2222
import za.co.absa.commons.spark.SparkTestBase
2323

2424
class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter {
2525

2626
behavior of "FileUtils"
27-
private val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
28-
private val config = spark.sparkContext.hadoopConfiguration
2927
private var baseDirectory: TempDirectory = _
3028
private var baseDirPath: String = _
29+
private val dummyDirectory = TempDirectory("DummyDirectory")
30+
private implicit val fs: FileSystem = FileSystem.get(dummyDirectory.path.toUri, spark.sparkContext.hadoopConfiguration)
3131

3232
before {
3333
baseDirectory = TempDirectory("FileUtilsTest")
@@ -45,7 +45,7 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
4545
fs.mkdirs(new Path(directory))
4646

4747
// when, then
48-
FileUtils.isEmpty(directory, config) shouldBe true
48+
FileUtils.isEmpty(directory) shouldBe true
4949
}
5050

5151
it should "return true if the directory only contains other directories, but no files" in {
@@ -55,7 +55,7 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
5555
fs.mkdirs(new Path(subDirectory))
5656

5757
// when, then
58-
FileUtils.isEmpty(directory, config) shouldBe true
58+
FileUtils.isEmpty(directory) shouldBe true
5959
}
6060

6161
it should "return false if the directory is not empty" in {
@@ -66,15 +66,15 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
6666
fs.create(new Path(subDirectory, "_INFO"))
6767

6868
// when, then
69-
FileUtils.isEmpty(directory, config) shouldBe false
69+
FileUtils.isEmpty(directory) shouldBe false
7070
}
7171

7272
it should "return false if the directory does not exist" in {
7373
// given
7474
val doesNotExist = s"$baseDirPath/${UUID.randomUUID().toString}"
7575

7676
// when, then
77-
FileUtils.isEmpty(doesNotExist, config) shouldBe false
77+
FileUtils.isEmpty(doesNotExist) shouldBe false
7878
}
7979

8080
it should "return false if the argument is a file" in {
@@ -83,6 +83,6 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
8383
fs.create(new Path(file))
8484

8585
// when, then
86-
FileUtils.isEmpty(file, config) shouldBe false
86+
FileUtils.isEmpty(file) shouldBe false
8787
}
8888
}

0 commit comments

Comments
 (0)