Skip to content

Commit ef37f9a

Browse files
ostronautHeartSaVioR
authored andcommitted
[SPARK-50568][SS][TESTS] Fix KafkaMicroBatchSourceSuite to cover KafkaOffsetReaderConsumer
### What changes were proposed in this pull request? This was missed when the default value of configuration is changed via the following. - #38306 KafkaMicroBatchSourceSuite consists of set of different suites, where KafkaMicroBatchSourceSuiteBase based suite is defined. There are 4 implementations of this abstract class for now: 1. `KafkaMicroBatchV1SourceSuite` - V1 source that supposes to use `KafkaOffsetReaderConsumer` as `KafkaOffsetReader`. 2. `KafkaMicroBatchV2SourceSuite` - V2 source that supposes to use `KafkaOffsetReaderConsumer` as `KafkaOffsetReader`. 3. `KafkaMicroBatchV1SourceWithAdminSuite` - V1 source that uses `KafkaOffsetReaderAdmin` as `KafkaOffsetReader`. 4. `KafkaMicroBatchV2SourceWithAdminSuite` - V2 source that uses `KafkaOffsetReaderAdmin` as `KafkaOffsetReader`. But `KafkaMicroBatchV1SourceSuite` and `KafkaMicroBatchV2SourceSuite` are still running based on `KafkaOffsetReaderAdmin`, as `USE_DEPRECATED_KAFKA_OFFSET_FETCHING` is `false` be default. By switching it to `true` in `beforeAll`, we make sure that corresponding `KafkaOffsetReader` is in use. ### Why are the changes needed? To improve unit tests coverage for `KafkaOffsetReaderConsumer` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit Tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49164 from ostronaut/hotifx/KafkaMicroBatchSourceSuite-cover-KafkaOffsetReaderConsumer. Authored-by: Dima <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent a6f82c6 commit ef37f9a

File tree

1 file changed

+31
-17
lines changed

1 file changed

+31
-17
lines changed

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,22 +1591,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
15911591
}
15921592
}
15931593

1594-
1595-
class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite {
1596-
override def beforeAll(): Unit = {
1597-
super.beforeAll()
1598-
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false")
1599-
}
1600-
}
1601-
1602-
class KafkaMicroBatchV2SourceWithAdminSuite extends KafkaMicroBatchV2SourceSuite {
1603-
override def beforeAll(): Unit = {
1604-
super.beforeAll()
1605-
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false")
1606-
}
1607-
}
1608-
1609-
class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
1594+
abstract class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
16101595
override def beforeAll(): Unit = {
16111596
super.beforeAll()
16121597
spark.conf.set(
@@ -1637,7 +1622,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
16371622
}
16381623
}
16391624

1640-
class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
1625+
abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
16411626

16421627
test("V2 Source is used by default") {
16431628
val topic = newTopic()
@@ -1870,6 +1855,35 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
18701855
}
18711856
}
18721857

1858+
class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite {
1859+
override def beforeAll(): Unit = {
1860+
super.beforeAll()
1861+
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false")
1862+
}
1863+
}
1864+
1865+
class KafkaMicroBatchV1SourceWithConsumerSuite extends KafkaMicroBatchV1SourceSuite {
1866+
override def beforeAll(): Unit = {
1867+
super.beforeAll()
1868+
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true")
1869+
}
1870+
}
1871+
1872+
class KafkaMicroBatchV2SourceWithAdminSuite extends KafkaMicroBatchV2SourceSuite {
1873+
override def beforeAll(): Unit = {
1874+
super.beforeAll()
1875+
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false")
1876+
}
1877+
}
1878+
1879+
class KafkaMicroBatchV2SourceWithConsumerSuite extends KafkaMicroBatchV2SourceSuite {
1880+
override def beforeAll(): Unit = {
1881+
super.beforeAll()
1882+
spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true")
1883+
}
1884+
}
1885+
1886+
18731887
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
18741888

18751889
import testImplicits._

0 commit comments

Comments
 (0)