Skip to content

Commit 9540f87

Browse files
committed
[FLINK-25509][connector/kafka] Add tests for stopping source based on de-serialized records
1 parent 6b053cf commit 9540f87

File tree

4 files changed

+249
-31
lines changed

4 files changed

+249
-31
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.connector.kafka.source;
1919

2020
import org.apache.flink.configuration.ConfigOptions;
21+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2122
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
2223
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
2324
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
@@ -191,6 +192,16 @@ public void testSettingCustomKafkaSubscriber() {
191192
"Cannot use partitions for consumption because a ExampleCustomSubscriber is already set for consumption.");
192193
}
193194

195+
@Test
196+
public void testSettingRecordEvaluator() {
197+
assertThat(getBasicBuilder().build().getEofRecordEvaluator()).isNull();
198+
199+
RecordEvaluator<String> recordEvaluator = s -> s.contains("test");
200+
final KafkaSource<String> kafkaSource =
201+
getBasicBuilder().setEofRecordEvaluator(recordEvaluator).build();
202+
assertThat(kafkaSource.getEofRecordEvaluator()).isEqualTo(recordEvaluator);
203+
}
204+
194205
private KafkaSourceBuilder<String> getBasicBuilder() {
195206
return new KafkaSourceBuilder<String>()
196207
.setBootstrapServers("testServer")

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.api.common.functions.MapFunction;
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
2929
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
3031
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
3132
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
3233
import org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory;
@@ -80,6 +81,7 @@
8081

8182
import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
8283
import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
84+
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
8385
import static org.assertj.core.api.Assertions.assertThat;
8486

8587
/** Unite test class for {@link KafkaSource}. */
@@ -167,6 +169,45 @@ public void testBasicRead(boolean enableObjectReuse) throws Exception {
167169
executeAndVerify(env, stream);
168170
}
169171

172+
@Test
173+
public void testEndWithRecordEvaluator() throws Throwable {
174+
KafkaSource<PartitionAndValue> source =
175+
KafkaSource.<PartitionAndValue>builder()
176+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
177+
.setGroupId("testEndWithRecordEvaluator")
178+
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
179+
.setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
180+
.setStartingOffsets(OffsetsInitializer.earliest())
181+
.setBounded(new NoStoppingOffsetsInitializer())
182+
.setEofRecordEvaluator(
183+
pav -> {
184+
String tp = pav.tp;
185+
int expectedValue =
186+
Integer.parseInt(tp.substring(tp.lastIndexOf('-')));
187+
return pav.value != expectedValue;
188+
})
189+
.build();
190+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
191+
env.setParallelism(1);
192+
DataStream<PartitionAndValue> stream =
193+
env.fromSource(
194+
source, WatermarkStrategy.noWatermarks(), "testEndWithRecordEvaluator");
195+
196+
Map<String, List<Integer>> resultPerPartition =
197+
executeAndGetResultPerPartition(env, stream);
198+
resultPerPartition.forEach(
199+
(tp, values) -> {
200+
int expectedValue = Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1));
201+
assertThat(values.size()).isEqualTo(1);
202+
assertThat((int) values.get(0))
203+
.as(
204+
String.format(
205+
"The value for partition %s should be only %d",
206+
tp, expectedValue))
207+
.isEqualTo(expectedValue);
208+
});
209+
}
210+
170211
@Test
171212
public void testValueOnlyDeserializer() throws Exception {
172213
KafkaSource<Integer> source =
@@ -201,9 +242,7 @@ public void testValueOnlyDeserializer() throws Exception {
201242
for (int partition = 0;
202243
partition < KafkaSourceTestEnv.NUM_PARTITIONS;
203244
partition++) {
204-
for (int value = partition;
205-
value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
206-
value++) {
245+
for (int value = partition; value < NUM_RECORDS_PER_PARTITION; value++) {
207246
expectedSum += value;
208247
}
209248
}
@@ -481,6 +520,30 @@ public void processElement(StreamRecord<PartitionAndValue> element) {
481520

482521
private void executeAndVerify(
483522
StreamExecutionEnvironment env, DataStream<PartitionAndValue> stream) throws Exception {
523+
Map<String, List<Integer>> resultPerPartition =
524+
executeAndGetResultPerPartition(env, stream);
525+
526+
// Expected elements from partition P should be an integer sequence from P to
527+
// NUM_RECORDS_PER_PARTITION.
528+
resultPerPartition.forEach(
529+
(tp, values) -> {
530+
int firstExpectedValue =
531+
Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1));
532+
assertThat(values.size())
533+
.isEqualTo(NUM_RECORDS_PER_PARTITION - firstExpectedValue);
534+
for (int i = 0; i < values.size(); i++) {
535+
assertThat((int) values.get(i))
536+
.as(
537+
String.format(
538+
"The %d-th value for partition %s should be %d",
539+
i, tp, firstExpectedValue + i))
540+
.isEqualTo(firstExpectedValue + i);
541+
}
542+
});
543+
}
544+
545+
private Map<String, List<Integer>> executeAndGetResultPerPartition(
546+
StreamExecutionEnvironment env, DataStream<PartitionAndValue> stream) throws Exception {
484547
stream.addSink(
485548
new RichSinkFunction<PartitionAndValue>() {
486549
@Override
@@ -501,22 +564,7 @@ public void invoke(PartitionAndValue value, Context context) {
501564
resultPerPartition
502565
.computeIfAbsent(partitionAndValue.tp, ignored -> new ArrayList<>())
503566
.add(partitionAndValue.value));
504-
505-
// Expected elements from partition P should be an integer sequence from P to
506-
// NUM_RECORDS_PER_PARTITION.
507-
resultPerPartition.forEach(
508-
(tp, values) -> {
509-
int firstExpectedValue =
510-
Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1));
511-
for (int i = 0; i < values.size(); i++) {
512-
assertThat((int) values.get(i))
513-
.as(
514-
String.format(
515-
"The %d-th value for partition %s should be %d",
516-
i, tp, firstExpectedValue + i))
517-
.isEqualTo(firstExpectedValue + i);
518-
}
519-
});
567+
return resultPerPartition;
520568
}
521569

522570
private static class OnEventWatermarkGenerator

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 141 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.connector.source.SourceReader;
2424
import org.apache.flink.api.connector.source.SourceReaderContext;
2525
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2627
import org.apache.flink.connector.kafka.source.KafkaSource;
2728
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
2829
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -56,6 +57,8 @@
5657
import org.junit.jupiter.api.Test;
5758
import org.mockito.Mockito;
5859

60+
import javax.annotation.Nullable;
61+
5962
import java.time.Duration;
6063
import java.util.ArrayList;
6164
import java.util.Arrays;
@@ -70,6 +73,8 @@
7073
import java.util.Set;
7174
import java.util.function.Consumer;
7275
import java.util.function.Supplier;
76+
import java.util.stream.Collectors;
77+
import java.util.stream.IntStream;
7378

7479
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER;
7580
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITTED_OFFSET_METRIC_GAUGE;
@@ -80,6 +85,9 @@
8085
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.PARTITION_GROUP;
8186
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP;
8287
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
88+
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
89+
import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
90+
import static org.apache.flink.core.io.InputStatus.NOTHING_AVAILABLE;
8391
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
8492
import static org.assertj.core.api.Assertions.assertThat;
8593
import static org.mockito.Mockito.never;
@@ -144,7 +152,7 @@ void testCommitOffsetsWithoutAliveFetchers() throws Exception {
144152
InputStatus status;
145153
do {
146154
status = reader.pollNext(output);
147-
} while (status != InputStatus.NOTHING_AVAILABLE);
155+
} while (status != NOTHING_AVAILABLE);
148156
pollUntil(
149157
reader,
150158
output,
@@ -296,6 +304,7 @@ void testDisableOffsetCommit() throws Exception {
296304
new TestingReaderContext(),
297305
(ignore) -> {},
298306
properties,
307+
null,
299308
null)) {
300309
reader.addSplits(
301310
getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED));
@@ -517,7 +526,8 @@ public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
517526
new TestingReaderContext(),
518527
(ignore) -> {},
519528
new Properties(),
520-
rackIdSupplier)) {
529+
rackIdSupplier,
530+
null)) {
521531
// Do nothing here
522532
}
523533

@@ -530,13 +540,15 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
530540
Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
531541

532542
try (KafkaSourceReader<Integer> reader =
533-
(KafkaSourceReader<Integer>)
534-
createReader(
535-
Boundedness.CONTINUOUS_UNBOUNDED,
536-
new TestingReaderContext(),
537-
(ignore) -> {},
538-
new Properties(),
539-
rackIdSupplier)) {
543+
(KafkaSourceReader<Integer>)
544+
createReader(
545+
Boundedness.CONTINUOUS_UNBOUNDED,
546+
new TestingReaderContext(),
547+
(ignore) -> {
548+
},
549+
new Properties(),
550+
rackIdSupplier,
551+
null)) {
540552
reader.addSplits(
541553
Collections.singletonList(
542554
new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L)));
@@ -545,6 +557,111 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
545557
verify(rackIdSupplier).get();
546558
}
547559

560+
@Test
561+
public void testReadingWithRecordEvaluatorAndAllSplitsFinished() throws Exception {
562+
final int readRecordNumPerSplit = 9;
563+
final int readSplits = 2;
564+
final Set<String> finishedSplits = new HashSet<>();
565+
try (final KafkaSourceReader<Integer> reader =
566+
(KafkaSourceReader<Integer>)
567+
createReader(
568+
Boundedness.BOUNDED,
569+
"groupId",
570+
new TestingReaderContext(),
571+
finishedSplits::addAll,
572+
r -> (r % NUM_RECORDS_PER_SPLIT) == readRecordNumPerSplit)) {
573+
List<KafkaPartitionSplit> splits = new ArrayList<>();
574+
List<Integer> excepted = new ArrayList<>();
575+
for (int i = 0; i < readSplits; i++) {
576+
splits.add(
577+
new KafkaPartitionSplit(
578+
new TopicPartition(TOPIC, i), 0, Integer.MAX_VALUE));
579+
excepted.addAll(
580+
IntStream.range(
581+
i * NUM_RECORDS_PER_SPLIT,
582+
(i + 1) * NUM_RECORDS_PER_SPLIT - 1)
583+
.boxed()
584+
.collect(Collectors.toList()));
585+
}
586+
587+
reader.addSplits(splits);
588+
reader.notifyNoMoreSplits();
589+
590+
TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
591+
pollUntil(
592+
reader,
593+
output,
594+
() -> finishedSplits.size() == splits.size(),
595+
"The reader cannot get the excepted result before timeout.");
596+
InputStatus status = reader.pollNext(output);
597+
assertThat(output.getEmittedRecords().size())
598+
.isEqualTo(readRecordNumPerSplit * readSplits);
599+
assertThat(finishedSplits)
600+
.containsExactly(
601+
splits.stream()
602+
.map(s -> s.getTopicPartition().toString())
603+
.toArray(String[]::new));
604+
assertThat(output.getEmittedRecords())
605+
.containsExactlyInAnyOrder(excepted.toArray(new Integer[0]));
606+
assertThat(status).isEqualTo(END_OF_INPUT);
607+
}
608+
}
609+
610+
@Test
611+
public void testReadingWithRecordEvaluatorAndSomeSplitsFinished() throws Exception {
612+
final int finishPartitionIndex = 1;
613+
final int readRecordNumInFinishedSplit = 7;
614+
final int readSplits = 2;
615+
final Set<String> finishedSplits = new HashSet<>();
616+
617+
try (final KafkaSourceReader<Integer> reader =
618+
(KafkaSourceReader<Integer>)
619+
createReader(
620+
Boundedness.BOUNDED,
621+
"groupId",
622+
new TestingReaderContext(),
623+
finishedSplits::addAll,
624+
r ->
625+
r
626+
== (finishPartitionIndex * NUM_RECORDS_PER_PARTITION
627+
+ readRecordNumInFinishedSplit))) {
628+
List<KafkaPartitionSplit> splits = new ArrayList<>();
629+
List<Integer> excepted = new ArrayList<>();
630+
for (int i = 0; i < readSplits; i++) {
631+
splits.add(
632+
new KafkaPartitionSplit(
633+
new TopicPartition(TOPIC, i), 0, Integer.MAX_VALUE));
634+
excepted.addAll(
635+
IntStream.range(
636+
i * NUM_RECORDS_PER_SPLIT,
637+
i * NUM_RECORDS_PER_SPLIT
638+
+ (i == finishPartitionIndex
639+
? readRecordNumInFinishedSplit
640+
: NUM_RECORDS_PER_SPLIT))
641+
.boxed()
642+
.collect(Collectors.toList()));
643+
}
644+
645+
reader.addSplits(splits);
646+
reader.notifyNoMoreSplits();
647+
648+
TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
649+
pollUntil(
650+
reader,
651+
output,
652+
() -> output.getEmittedRecords().size() == excepted.size(),
653+
"The reader cannot get the excepted result before timeout.");
654+
assertThat(finishedSplits)
655+
.containsExactly(new TopicPartition(TOPIC, finishPartitionIndex).toString());
656+
assertThat(output.getEmittedRecords())
657+
.containsExactlyInAnyOrder(excepted.toArray(new Integer[0]));
658+
659+
InputStatus status = reader.pollNext(output);
660+
assertThat(output.getEmittedRecords().size()).isEqualTo(excepted.size());
661+
assertThat(status).isEqualTo(NOTHING_AVAILABLE);
662+
}
663+
}
664+
548665
// ------------------------------------------
549666

550667
@Override
@@ -599,17 +716,28 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
599716
SourceReaderContext context,
600717
Consumer<Collection<String>> splitFinishedHook)
601718
throws Exception {
719+
return createReader(boundedness, groupId, context, splitFinishedHook, null);
720+
}
721+
722+
private SourceReader<Integer, KafkaPartitionSplit> createReader(
723+
Boundedness boundedness,
724+
String groupId,
725+
SourceReaderContext context,
726+
Consumer<Collection<String>> splitFinishedHook,
727+
@Nullable RecordEvaluator<Integer> recordEvaluator)
728+
throws Exception {
602729
Properties properties = new Properties();
603730
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
604-
return createReader(boundedness, context, splitFinishedHook, properties, null);
731+
return createReader(boundedness, context, splitFinishedHook, properties, null, recordEvaluator);
605732
}
606733

607734
private SourceReader<Integer, KafkaPartitionSplit> createReader(
608735
Boundedness boundedness,
609736
SourceReaderContext context,
610737
Consumer<Collection<String>> splitFinishedHook,
611738
Properties props,
612-
SerializableSupplier<String> rackIdSupplier)
739+
SerializableSupplier<String> rackIdSupplier,
740+
@Nullable RecordEvaluator<Integer> recordEvaluator)
613741
throws Exception {
614742
KafkaSourceBuilder<Integer> builder =
615743
KafkaSource.<Integer>builder()
@@ -622,7 +750,8 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
622750
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
623751
KafkaSourceTestEnv.brokerConnectionStrings)
624752
.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
625-
.setProperties(props);
753+
.setProperties(props)
754+
.setEofRecordEvaluator(recordEvaluator);
626755
if (boundedness == Boundedness.BOUNDED) {
627756
builder.setBounded(OffsetsInitializer.latest());
628757
}

0 commit comments

Comments
 (0)