@@ -25,6 +25,8 @@ import org.scalatest.{BeforeAndAfterEach, FlatSpec}
2525import za .co .absa .commons .io .TempDirectory
2626import za .co .absa .hyperdrive .ingestor .implementation .reader .kafka .KafkaStreamReaderProps ._
2727
28+ import java .nio .file .{Files , Paths }
29+
2830class TestKafkaStreamReader extends FlatSpec with BeforeAndAfterEach with MockitoSugar {
2931
3032 private val validTopic = " test-topic"
@@ -97,19 +99,43 @@ class TestKafkaStreamReader extends FlatSpec with BeforeAndAfterEach with Mockit
9799 verify(dataStreamReader, never()).options(validExtraConfs)
98100 }
99101
100- it should " set offsets to earliest if no checkpoint location exists " in {
102+ it should " set offsets to earliest if checkpoint location does not exist " in {
101103 val sparkContext = getMockedSparkContext(stopped = false )
102104 val dataStreamReader = getMockedDataStreamReader
103105 val sparkSession = getConfiguredMockedSparkSession(sparkContext, dataStreamReader)
104-
105106 val nonExistent = tempDir.path.resolve(" non-existent" )
107+
106108 val reader = new KafkaStreamReader (validTopic, validBrokers, nonExistent.toUri.getPath, Map ())
107109 reader.read(sparkSession)
108110
109111 verify(dataStreamReader).option(WORD_STARTING_OFFSETS , STARTING_OFFSETS_EARLIEST )
110112 }
111113
112- it should " set offsets to user-defined property if no checkpoint location exists" in {
114+ it should " set offsets to earliest if checkpoint location is empty" in {
115+ val sparkContext = getMockedSparkContext(stopped = false )
116+ val dataStreamReader = getMockedDataStreamReader
117+ val sparkSession = getConfiguredMockedSparkSession(sparkContext, dataStreamReader)
118+ Files .createDirectories(Paths .get(s " $tempDirPath/empty1/empty2/empty3 " ))
119+
120+ val reader = new KafkaStreamReader (validTopic, validBrokers, tempDirPath, Map ())
121+ reader.read(sparkSession)
122+
123+ verify(dataStreamReader).option(WORD_STARTING_OFFSETS , STARTING_OFFSETS_EARLIEST )
124+ }
125+
126+ it should " not set offsets to earliest if a checkpoint location exists and is not empty" in {
127+ val sparkContext = getMockedSparkContext(stopped = false )
128+ val dataStreamReader = getMockedDataStreamReader
129+ val sparkSession = getConfiguredMockedSparkSession(sparkContext, dataStreamReader)
130+ Files .createFile(Paths .get(s " $tempDirPath/anyFile " ))
131+
132+ val reader = new KafkaStreamReader (validTopic, validBrokers, tempDirPath, Map ())
133+ reader.read(sparkSession)
134+
135+ verify(dataStreamReader, never()).option(WORD_STARTING_OFFSETS , STARTING_OFFSETS_EARLIEST )
136+ }
137+
138+ it should " always set offsets to user-defined property e.g. if checkpoint location does not exist" in {
113139 val sparkContext = getMockedSparkContext(stopped = false )
114140 val dataStreamReader = getMockedDataStreamReader
115141 val sparkSession = getConfiguredMockedSparkSession(sparkContext, dataStreamReader)
@@ -121,15 +147,18 @@ class TestKafkaStreamReader extends FlatSpec with BeforeAndAfterEach with Mockit
121147 verify(dataStreamReader).options(Map (WORD_STARTING_OFFSETS -> " latest" ))
122148 }
123149
124- it should " not set offsets if a checkpoint location exists" in {
150+ it should " always set offsets to user-defined property e.g. if checkpoint location exists" in {
125151 val sparkContext = getMockedSparkContext(stopped = false )
126152 val dataStreamReader = getMockedDataStreamReader
127153 val sparkSession = getConfiguredMockedSparkSession(sparkContext, dataStreamReader)
128154
129- val reader = new KafkaStreamReader (validTopic, validBrokers, tempDirPath, Map ())
155+ Files .createFile(Paths .get(s " $tempDirPath/anyFile " ))
156+
157+ val nonExistent = tempDir.path.resolve(" non-existent" )
158+ val reader = new KafkaStreamReader (validTopic, validBrokers, nonExistent.toUri.getPath, Map (WORD_STARTING_OFFSETS -> " latest" ))
130159 reader.read(sparkSession)
131160
132- verify(dataStreamReader, never()).option( WORD_STARTING_OFFSETS , STARTING_OFFSETS_EARLIEST )
161+ verify(dataStreamReader).options( Map ( WORD_STARTING_OFFSETS -> " latest " ) )
133162 }
134163
135164 private def getMockedSparkContext (stopped : Boolean ): SparkContext = {
0 commit comments