Skip to content

Commit f42b640

Browse files
committed
[FLINK-25509][connector/kafka] Add tests for Kafka tables to stop source based on de-serialized records
1 parent fab2e7c commit f42b640

File tree

5 files changed

+163
-20
lines changed

5 files changed

+163
-20
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import javax.annotation.Nullable;
4343

44-
import java.util.Collection;
4544
import java.util.Collections;
4645
import java.util.HashMap;
4746
import java.util.List;

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -540,15 +540,14 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
540540
Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
541541

542542
try (KafkaSourceReader<Integer> reader =
543-
(KafkaSourceReader<Integer>)
544-
createReader(
545-
Boundedness.CONTINUOUS_UNBOUNDED,
546-
new TestingReaderContext(),
547-
(ignore) -> {
548-
},
549-
new Properties(),
550-
rackIdSupplier,
551-
null)) {
543+
(KafkaSourceReader<Integer>)
544+
createReader(
545+
Boundedness.CONTINUOUS_UNBOUNDED,
546+
new TestingReaderContext(),
547+
(ignore) -> {},
548+
new Properties(),
549+
rackIdSupplier,
550+
null)) {
552551
reader.addSplits(
553552
Collections.singletonList(
554553
new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L)));
@@ -728,7 +727,8 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
728727
throws Exception {
729728
Properties properties = new Properties();
730729
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
731-
return createReader(boundedness, context, splitFinishedHook, properties, null, recordEvaluator);
730+
return createReader(
731+
boundedness, context, splitFinishedHook, properties, null, recordEvaluator);
732732
}
733733

734734
private SourceReader<Integer, KafkaPartitionSplit> createReader(

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.configuration.ConfigOptions;
2727
import org.apache.flink.configuration.Configuration;
2828
import org.apache.flink.connector.base.DeliveryGuarantee;
29+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2930
import org.apache.flink.connector.kafka.sink.KafkaSink;
3031
import org.apache.flink.connector.kafka.source.KafkaSource;
3132
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -47,6 +48,7 @@
4748
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
4849
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
4950
import org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever;
51+
import org.apache.flink.streaming.connectors.kafka.testutils.MockRecordEvaluator;
5052
import org.apache.flink.table.api.DataTypes;
5153
import org.apache.flink.table.api.ValidationException;
5254
import org.apache.flink.table.catalog.Column;
@@ -91,6 +93,7 @@
9193
import java.util.HashSet;
9294
import java.util.List;
9395
import java.util.Map;
96+
import java.util.Objects;
9497
import java.util.Optional;
9598
import java.util.Properties;
9699
import java.util.Set;
@@ -212,7 +215,8 @@ public void testTableSource() {
212215
KAFKA_SOURCE_PROPERTIES,
213216
StartupMode.SPECIFIC_OFFSETS,
214217
specificOffsets,
215-
0);
218+
0,
219+
null);
216220
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
217221

218222
ScanTableSource.ScanRuntimeProvider provider =
@@ -254,7 +258,8 @@ public void testTableSourceWithPattern() {
254258
KAFKA_SOURCE_PROPERTIES,
255259
StartupMode.EARLIEST,
256260
specificOffsets,
257-
0);
261+
0,
262+
null);
258263
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
259264
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
260265

@@ -295,7 +300,8 @@ public void testTableSourceWithKeyValue() {
295300
KAFKA_FINAL_SOURCE_PROPERTIES,
296301
StartupMode.GROUP_OFFSETS,
297302
Collections.emptyMap(),
298-
0);
303+
0,
304+
null);
299305

300306
assertThat(actualSource).isEqualTo(expectedKafkaSource);
301307
}
@@ -346,7 +352,8 @@ public void testTableSourceWithKeyValueAndMetadata() {
346352
KAFKA_FINAL_SOURCE_PROPERTIES,
347353
StartupMode.GROUP_OFFSETS,
348354
Collections.emptyMap(),
349-
0);
355+
0,
356+
null);
350357
expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
351358
expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp");
352359

@@ -397,6 +404,47 @@ public void testTableSourceSetOffsetResetWithException() {
397404
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, errorStrategy));
398405
}
399406

407+
@Test
408+
public void testTableSourceSetRecordEvaluator() {
409+
Map<String, String> tableOptions = getBasicSourceOptions();
410+
tableOptions.put("scan.record.evaluator.class", MockRecordEvaluator.class.getName());
411+
final DynamicTableSource actualSource = createTableSource(SCHEMA, tableOptions);
412+
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
413+
414+
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
415+
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
416+
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
417+
418+
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
419+
new DecodingFormatMock(",", true);
420+
421+
// Test scan source equals
422+
final KafkaDynamicSource expectedKafkaSource =
423+
createExpectedScanSource(
424+
SCHEMA_DATA_TYPE,
425+
null,
426+
valueDecodingFormat,
427+
new int[0],
428+
new int[] {0, 1, 2},
429+
null,
430+
Collections.singletonList(TOPIC),
431+
null,
432+
KAFKA_SOURCE_PROPERTIES,
433+
StartupMode.SPECIFIC_OFFSETS,
434+
specificOffsets,
435+
0,
436+
new MockRecordEvaluator());
437+
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
438+
assertThat(
439+
Objects.requireNonNull(actualKafkaSource.getRecordEvaluator())
440+
.isEndOfStream(null))
441+
.isTrue();
442+
443+
ScanTableSource.ScanRuntimeProvider provider =
444+
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
445+
assertKafkaSource(provider);
446+
}
447+
400448
private void testSetOffsetResetForStartFromGroupOffsets(String value) {
401449
final Map<String, String> modifiedOptions =
402450
getModifiedOptions(
@@ -1128,7 +1176,8 @@ public void testDiscoverPartitionByDefault() {
11281176
props,
11291177
StartupMode.SPECIFIC_OFFSETS,
11301178
specificOffsets,
1131-
0);
1179+
0,
1180+
null);
11321181
assertThat(actualSource).isEqualTo(expectedKafkaSource);
11331182
ScanTableSource.ScanRuntimeProvider provider =
11341183
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -1166,7 +1215,8 @@ public void testDisableDiscoverPartition() {
11661215
props,
11671216
StartupMode.SPECIFIC_OFFSETS,
11681217
specificOffsets,
1169-
0);
1218+
0,
1219+
null);
11701220
assertThat(actualSource).isEqualTo(expectedKafkaSource);
11711221
ScanTableSource.ScanRuntimeProvider provider =
11721222
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -1189,7 +1239,8 @@ private static KafkaDynamicSource createExpectedScanSource(
11891239
Properties properties,
11901240
StartupMode startupMode,
11911241
Map<KafkaTopicPartition, Long> specificStartupOffsets,
1192-
long startupTimestampMillis) {
1242+
long startupTimestampMillis,
1243+
@Nullable RecordEvaluator<RowData> recordEvaluator) {
11931244
return new KafkaDynamicSource(
11941245
physicalDataType,
11951246
keyDecodingFormat,
@@ -1207,7 +1258,8 @@ private static KafkaDynamicSource createExpectedScanSource(
12071258
Collections.emptyMap(),
12081259
0,
12091260
false,
1210-
FactoryMocks.IDENTIFIER.asSummaryString());
1261+
FactoryMocks.IDENTIFIER.asSummaryString(),
1262+
recordEvaluator);
12111263
}
12121264

12131265
private static KafkaDynamicSink createExpectedSink(

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3030
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
3131
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
32+
import org.apache.flink.streaming.connectors.kafka.testutils.MockRecordEvaluator;
3233
import org.apache.flink.table.api.TableResult;
3334
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
3435
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -45,6 +46,8 @@
4546
import org.junit.Test;
4647
import org.junit.runner.RunWith;
4748
import org.junit.runners.Parameterized;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
4851

4952
import java.time.Duration;
5053
import java.time.Instant;
@@ -78,6 +81,7 @@
7881
/** Basic IT cases for the Kafka table source and sink. */
7982
@RunWith(Parameterized.class)
8083
public class KafkaTableITCase extends KafkaTableTestBase {
84+
private static final Logger LOG = LoggerFactory.getLogger(KafkaTableITCase.class);
8185

8286
private static final String JSON_FORMAT = "json";
8387
private static final String AVRO_FORMAT = "avro";
@@ -1099,6 +1103,93 @@ public void testStartFromGroupOffsetsNone() {
10991103
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
11001104
}
11011105

1106+
@Test
1107+
public void testKafkaSourceWithRecordEvaluator() throws Throwable {
1108+
// we always use a different topic name for each parameterized topic,
1109+
// in order to make sure the topic can be created.
1110+
final String topic = "recordEvaluator_" + format + "_" + UUID.randomUUID();
1111+
TableResult tableResult = null;
1112+
try {
1113+
createTestTopic(topic, 3, 1);
1114+
1115+
// ---------- Produce an event time stream into Kafka -------------------
1116+
String groupId = getStandardProps().getProperty("group.id");
1117+
String bootstraps = getBootstrapServers();
1118+
tEnv.getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
1119+
1120+
final String createTable =
1121+
String.format(
1122+
"CREATE TABLE kafka (\n"
1123+
+ " `partition_id` INT,\n"
1124+
+ " `value` STRING\n"
1125+
+ ") WITH (\n"
1126+
+ " 'connector' = 'kafka',\n"
1127+
+ " 'topic' = '%s',\n"
1128+
+ " 'properties.bootstrap.servers' = '%s',\n"
1129+
+ " 'properties.group.id' = '%s',\n"
1130+
+ " 'scan.startup.mode' = 'earliest-offset',\n"
1131+
+ " 'sink.partitioner' = '%s',\n"
1132+
+ " 'format' = '%s',\n"
1133+
+ " 'scan.record.evaluator.class' = '%s'\n"
1134+
+ ")",
1135+
topic,
1136+
bootstraps,
1137+
groupId,
1138+
TestPartitioner.class.getName(),
1139+
format,
1140+
MockRecordEvaluator.class.getName());
1141+
tEnv.executeSql(createTable);
1142+
1143+
env.setParallelism(1);
1144+
String initialValues =
1145+
"INSERT INTO kafka\n"
1146+
+ "VALUES\n"
1147+
+ " (0, 'test0'),\n"
1148+
+ " (1, 'test1'),\n"
1149+
+ " (2, 'test2'),\n"
1150+
+ " (3, 'End'),\n"
1151+
+ " (4, 'End'),\n"
1152+
+ " (5, 'End'),\n"
1153+
+ " (6, 'should not send'),\n"
1154+
+ " (7, 'should not send'),\n"
1155+
+ " (8, 'should not send')\n";
1156+
tEnv.executeSql(initialValues).await();
1157+
1158+
// ---------- Consume stream from Kafka -------------------
1159+
String sinkName = "MySink";
1160+
String createSink =
1161+
"CREATE TABLE "
1162+
+ sinkName
1163+
+ "(\n"
1164+
+ " `partition` INT,\n"
1165+
+ " `value` STRING\n"
1166+
+ ") WITH (\n"
1167+
+ " 'connector' = 'values'\n"
1168+
+ ")";
1169+
tEnv.executeSql(createSink);
1170+
1171+
tableResult = tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM kafka");
1172+
List<String> expected = Arrays.asList("+I[0, test0]", "+I[1, test1]", "+I[2, test2]");
1173+
KafkaTableTestUtils.waitingExpectedResults(sinkName, expected, Duration.ofSeconds(15));
1174+
1175+
// insert some records and make sure that these records will not be sent
1176+
String insertValues =
1177+
"INSERT INTO kafka\n"
1178+
+ " VALUES\n"
1179+
+ " (9, 'Insert new values. This should not be sent.'),\n"
1180+
+ " (10, 'Insert new values. This should not be sent.'),\n"
1181+
+ " (11, 'Insert new values. This should not be sent.')\n";
1182+
tEnv.executeSql(insertValues).await();
1183+
KafkaTableTestUtils.waitingExpectedResults(sinkName, expected, Duration.ofSeconds(15));
1184+
} finally {
1185+
// ------------- cleanup -------------------
1186+
if (tableResult != null) {
1187+
tableResult.getJobClient().ifPresent(JobClient::cancel);
1188+
}
1189+
deleteTestTopic(topic);
1190+
}
1191+
}
1192+
11021193
private List<String> appendNewData(
11031194
String topic, String tableName, String groupId, int targetNum) throws Exception {
11041195
waitUtil(

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,8 @@ private KafkaDynamicSource createExpectedScanSource(
791791
Collections.emptyMap(),
792792
0,
793793
true,
794-
FactoryMocks.IDENTIFIER.asSummaryString());
794+
FactoryMocks.IDENTIFIER.asSummaryString(),
795+
null);
795796
}
796797

797798
private static KafkaDynamicSink createExpectedSink(

0 commit comments

Comments
 (0)