Skip to content

Commit f6a077a

Browse files
authored
[FLINK-36780] Kafka source disable partition discovery unexpectedly (#136)
1 parent 59baacc commit f6a077a

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -474,10 +474,9 @@ private void parseAndSetRequiredProperties() {
474474
true);
475475

476476
// If the source is bounded, do not run periodic partition discovery.
477-
maybeOverride(
478-
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
479-
"-1",
480-
boundedness == Boundedness.BOUNDED);
477+
if (boundedness == Boundedness.BOUNDED) {
478+
maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", true);
479+
}
481480

482481
// If the client id prefix is not set, reuse the consumer group id as the client id prefix,
483482
// or generate a random string if consumer group id is not specified.

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,29 @@ public void testSettingInvalidCustomDeserializers(
217217
.hasMessageContaining(expectedError);
218218
}
219219

220+
@Test
221+
public void testDefaultPartitionDiscovery() {
222+
final KafkaSource<String> kafkaSource = getBasicBuilder().build();
223+
// Commit on checkpoint and auto commit should be disabled because group.id is not specified
224+
assertThat(
225+
kafkaSource
226+
.getConfiguration()
227+
.get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS))
228+
.isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue());
229+
}
230+
231+
@Test
232+
public void testPeriodPartitionDiscovery() {
233+
final KafkaSource<String> kafkaSource =
234+
getBasicBuilder().setBounded(OffsetsInitializer.latest()).build();
235+
// Commit on checkpoint and auto commit should be disabled because group.id is not specified
236+
assertThat(
237+
kafkaSource
238+
.getConfiguration()
239+
.get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS))
240+
.isEqualTo(-1L);
241+
}
242+
220243
private KafkaSourceBuilder<String> getBasicBuilder() {
221244
return new KafkaSourceBuilder<String>()
222245
.setBootstrapServers("testServer")

0 commit comments

Comments
 (0)