Skip to content

Commit 6b053cf

Browse files
committed
[FLINK-25509][connector/kafka] KafkaSource supports to set eofRecordEvaluator to stop source based on de-serialized records
1 parent 301021f commit 6b053cf

File tree

4 files changed

+85
-7
lines changed

4 files changed

+85
-7
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
3232
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
3333
import org.apache.flink.configuration.Configuration;
34+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
3435
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
3536
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3637
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
@@ -102,11 +103,13 @@ public class KafkaSource<OUT>
102103
private final Properties props;
103104
// Client rackId callback
104105
private final SerializableSupplier<String> rackIdSupplier;
106+
@Nullable private RecordEvaluator<OUT> eofRecordEvaluator;
105107

106108
KafkaSource(
107109
KafkaSubscriber subscriber,
108110
OffsetsInitializer startingOffsetsInitializer,
109111
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
112+
@Nullable RecordEvaluator<OUT> eofRecordEvaluator,
110113
Boundedness boundedness,
111114
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
112115
Properties props,
@@ -118,6 +121,7 @@ public class KafkaSource<OUT>
118121
this.deserializationSchema = deserializationSchema;
119122
this.props = props;
120123
this.rackIdSupplier = rackIdSupplier;
124+
this.eofRecordEvaluator = eofRecordEvaluator;
121125
}
122126

123127
/**
@@ -171,7 +175,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
171175
Optional.ofNullable(rackIdSupplier)
172176
.map(Supplier::get)
173177
.orElse(null));
174-
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
178+
KafkaRecordEmitter<OUT> recordEmitter =
179+
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);
175180

176181
return new KafkaSourceReader<>(
177182
elementsQueue,
@@ -180,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
180185
recordEmitter,
181186
toConfiguration(props),
182187
readerContext,
183-
kafkaSourceReaderMetrics);
188+
kafkaSourceReaderMetrics,
189+
eofRecordEvaluator);
184190
}
185191

186192
@Internal
@@ -251,4 +257,10 @@ KafkaSubscriber getKafkaSubscriber() {
251257
OffsetsInitializer getStoppingOffsetsInitializer() {
252258
return stoppingOffsetsInitializer;
253259
}
260+
261+
@VisibleForTesting
262+
@Nullable
263+
RecordEvaluator<OUT> getEofRecordEvaluator() {
264+
return eofRecordEvaluator;
265+
}
254266
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.api.common.serialization.DeserializationSchema;
2323
import org.apache.flink.api.connector.source.Boundedness;
24+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2425
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
2526
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
2627
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
@@ -104,6 +105,7 @@ public class KafkaSourceBuilder<OUT> {
104105
protected Properties props;
105106
// Client rackId supplier
106107
private SerializableSupplier<String> rackIdSupplier;
108+
private RecordEvaluator<OUT> eofRecordEvaluator;
107109

108110
KafkaSourceBuilder() {
109111
this.subscriber = null;
@@ -350,6 +352,26 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
350352
return this;
351353
}
352354

355+
/**
356+
* Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
357+
*
358+
* <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
359+
* whether the corresponding split has reached end of stream. If a record is matched by the
360+
* evaluator, the source would not emit this record as well as the following records in the same
361+
* split.
362+
*
363+
* <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
364+
* #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
365+
* stops consuming from a split when any of these conditions is met.
366+
*
367+
* @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
368+
* @return this KafkaSourceBuilder.
369+
*/
370+
public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
371+
this.eofRecordEvaluator = eofRecordEvaluator;
372+
return this;
373+
}
374+
353375
/**
354376
* Sets the client id prefix of this KafkaSource.
355377
*
@@ -436,6 +458,7 @@ public KafkaSource<OUT> build() {
436458
subscriber,
437459
startingOffsetsInitializer,
438460
stoppingOffsetsInitializer,
461+
eofRecordEvaluator,
439462
boundedness,
440463
deserializationSchema,
441464
props,

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,36 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.connector.source.SourceOutput;
2323
import org.apache.flink.connector.base.source.reader.RecordEmitter;
24+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2425
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
2526
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
2627
import org.apache.flink.util.Collector;
2728

2829
import org.apache.kafka.clients.consumer.ConsumerRecord;
2930

31+
import javax.annotation.Nullable;
32+
3033
import java.io.IOException;
34+
import java.util.HashSet;
35+
import java.util.Set;
3136

3237
/** The {@link RecordEmitter} implementation for {@link KafkaSourceReader}. */
3338
@Internal
3439
public class KafkaRecordEmitter<T>
3540
implements RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> {
3641

3742
private final KafkaRecordDeserializationSchema<T> deserializationSchema;
38-
private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
43+
private final SourceOutputWrapper<T> sourceOutputWrapper;
44+
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;
45+
private final Set<String> finishedSplits;
3946

40-
public KafkaRecordEmitter(KafkaRecordDeserializationSchema<T> deserializationSchema) {
47+
public KafkaRecordEmitter(
48+
KafkaRecordDeserializationSchema<T> deserializationSchema,
49+
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
4150
this.deserializationSchema = deserializationSchema;
51+
this.sourceOutputWrapper = new SourceOutputWrapper<>(eofRecordEvaluator);
52+
this.eofRecordEvaluator = eofRecordEvaluator;
53+
this.finishedSplits = new HashSet<>();
4254
}
4355

4456
@Override
@@ -51,20 +63,35 @@ public void emitRecord(
5163
sourceOutputWrapper.setSourceOutput(output);
5264
sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
5365
deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
54-
splitState.setCurrentOffset(consumerRecord.offset() + 1);
66+
67+
if (sourceOutputWrapper.isEofRecord()) {
68+
finishedSplits.add(splitState.splitId());
69+
}
70+
if (eofRecordEvaluator == null || !finishedSplits.contains(splitState.splitId())) {
71+
splitState.setCurrentOffset(consumerRecord.offset() + 1);
72+
}
5573
} catch (Exception e) {
5674
throw new IOException("Failed to deserialize consumer record due to", e);
5775
}
5876
}
5977

6078
private static class SourceOutputWrapper<T> implements Collector<T> {
79+
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;
6180

6281
private SourceOutput<T> sourceOutput;
6382
private long timestamp;
83+
private boolean isEofRecord = false;
84+
85+
public SourceOutputWrapper(@Nullable RecordEvaluator<T> eofRecordEvaluator) {
86+
this.eofRecordEvaluator = eofRecordEvaluator;
87+
}
6488

6589
@Override
6690
public void collect(T record) {
6791
sourceOutput.collect(record, timestamp);
92+
if (eofRecordEvaluator != null) {
93+
isEofRecord = eofRecordEvaluator.isEndOfStream(record);
94+
}
6895
}
6996

7097
@Override
@@ -77,5 +104,10 @@ private void setSourceOutput(SourceOutput<T> sourceOutput) {
77104
private void setTimestamp(long timestamp) {
78105
this.timestamp = timestamp;
79106
}
107+
108+
/** Whether the previous sent record is an eof record. */
109+
public boolean isEofRecord() {
110+
return isEofRecord;
111+
}
80112
}
81113
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.connector.source.SourceReaderContext;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.connector.base.source.reader.RecordEmitter;
26+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2627
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
2728
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
2829
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -38,6 +39,9 @@
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

42+
import javax.annotation.Nullable;
43+
44+
import java.util.Collection;
4145
import java.util.Collections;
4246
import java.util.HashMap;
4347
import java.util.List;
@@ -68,8 +72,15 @@ public KafkaSourceReader(
6872
recordEmitter,
6973
Configuration config,
7074
SourceReaderContext context,
71-
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
72-
super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context);
75+
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
76+
@Nullable RecordEvaluator<T> recordEvaluator) {
77+
super(
78+
elementsQueue,
79+
kafkaSourceFetcherManager,
80+
recordEmitter,
81+
recordEvaluator,
82+
config,
83+
context);
7384
this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
7485
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
7586
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;

0 commit comments

Comments
 (0)