Skip to content

Commit b47e7d6

Browse files
committed
Fix compile errors
1 parent 79573fe commit b47e7d6

File tree

4 files changed

+38
-7
lines changed

4 files changed

+38
-7
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.api.connector.source.SplitEnumerator;
3030
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
3131
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
32+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
3233
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
3334
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
3435
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
@@ -42,6 +43,8 @@
4243
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
4344
import org.apache.flink.core.io.SimpleVersionedSerializer;
4445

46+
import javax.annotation.Nullable;
47+
4548
import java.util.Properties;
4649

4750
/**
@@ -87,6 +90,7 @@ public class DynamicKafkaSource<T>
8790
private final OffsetsInitializer stoppingOffsetsInitializer;
8891
private final Properties properties;
8992
private final Boundedness boundedness;
93+
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;
9094

9195
DynamicKafkaSource(
9296
KafkaStreamSubscriber kafkaStreamSubscriber,
@@ -95,14 +99,16 @@ public class DynamicKafkaSource<T>
9599
OffsetsInitializer startingOffsetsInitializer,
96100
OffsetsInitializer stoppingOffsetsInitializer,
97101
Properties properties,
98-
Boundedness boundedness) {
102+
Boundedness boundedness,
103+
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
99104
this.kafkaStreamSubscriber = kafkaStreamSubscriber;
100105
this.deserializationSchema = deserializationSchema;
101106
this.properties = properties;
102107
this.kafkaMetadataService = kafkaMetadataService;
103108
this.startingOffsetsInitializer = startingOffsetsInitializer;
104109
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
105110
this.boundedness = boundedness;
111+
this.eofRecordEvaluator = eofRecordEvaluator;
106112
}
107113

108114
/**
@@ -134,7 +140,8 @@ public Boundedness getBoundedness() {
134140
@Override
135141
public SourceReader<T, DynamicKafkaSourceSplit> createReader(
136142
SourceReaderContext readerContext) {
137-
return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties);
143+
return new DynamicKafkaSourceReader<>(
144+
readerContext, deserializationSchema, properties, eofRecordEvaluator);
138145
}
139146

140147
/**

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.api.connector.source.Boundedness;
23+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2324
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
2425
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
2526
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
@@ -52,6 +53,7 @@ public class DynamicKafkaSourceBuilder<T> {
5253
private OffsetsInitializer stoppingOffsetsInitializer;
5354
private Boundedness boundedness;
5455
private final Properties props;
56+
private RecordEvaluator<T> eofRecordEvaluator;
5557

5658
DynamicKafkaSourceBuilder() {
5759
this.kafkaStreamSubscriber = null;
@@ -140,6 +142,18 @@ public DynamicKafkaSourceBuilder<T> setDeserializer(
140142
return this;
141143
}
142144

145+
/**
146+
* Set the {@link RecordEvaluator}.
147+
*
148+
* @param eofRecordEvaluator the {@link RecordEvaluator}.
149+
* @return the builder.
150+
*/
151+
public DynamicKafkaSourceBuilder<T> setEofRecordEvaluator(
152+
RecordEvaluator<T> eofRecordEvaluator) {
153+
this.eofRecordEvaluator = eofRecordEvaluator;
154+
return this;
155+
}
156+
143157
/**
144158
* Set the starting offsets of the stream. This will be applied to all clusters.
145159
*
@@ -217,7 +231,8 @@ public DynamicKafkaSource<T> build() {
217231
startingOffsetsInitializer,
218232
stoppingOffsetsInitializer,
219233
props,
220-
boundedness);
234+
boundedness,
235+
eofRecordEvaluator);
221236
}
222237

223238
// Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.api.connector.source.SourceReader;
2727
import org.apache.flink.api.connector.source.SourceReaderContext;
2828
import org.apache.flink.configuration.Configuration;
29+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2930
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
3031
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3132
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
@@ -54,6 +55,8 @@
5455
import org.slf4j.Logger;
5556
import org.slf4j.LoggerFactory;
5657

58+
import javax.annotation.Nullable;
59+
5760
import java.util.ArrayList;
5861
import java.util.HashMap;
5962
import java.util.HashSet;
@@ -95,11 +98,13 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
9598
private boolean isActivelyConsumingSplits;
9699
private boolean isNoMoreSplits;
97100
private AtomicBoolean restartingReaders;
101+
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;
98102

99103
public DynamicKafkaSourceReader(
100104
SourceReaderContext readerContext,
101105
KafkaRecordDeserializationSchema<T> deserializationSchema,
102-
Properties properties) {
106+
Properties properties,
107+
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
103108
this.readerContext = readerContext;
104109
this.clusterReaderMap = new TreeMap<>();
105110
this.deserializationSchema = deserializationSchema;
@@ -116,6 +121,7 @@ public DynamicKafkaSourceReader(
116121
this.isActivelyConsumingSplits = false;
117122
this.restartingReaders = new AtomicBoolean();
118123
this.clustersProperties = new HashMap<>();
124+
this.eofRecordEvaluator = eofRecordEvaluator;
119125
}
120126

121127
/**
@@ -448,7 +454,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
448454
}
449455
});
450456

451-
KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
457+
KafkaRecordEmitter<T> recordEmitter =
458+
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);
452459
return new KafkaSourceReader<>(
453460
elementsQueue,
454461
new KafkaSourceFetcherManager(
@@ -463,7 +470,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
463470
recordEmitter,
464471
toConfiguration(readerSpecificProperties),
465472
readerContext,
466-
kafkaSourceReaderMetrics);
473+
kafkaSourceReaderMetrics,
474+
eofRecordEvaluator);
467475
}
468476

469477
/**

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ private DynamicKafkaSourceReader<Integer> createReaderWithoutStart(
271271
return new DynamicKafkaSourceReader<>(
272272
context,
273273
KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class),
274-
properties);
274+
properties,
275+
null);
275276
}
276277

277278
private SourceReader<Integer, DynamicKafkaSourceSplit> startReader(

0 commit comments

Comments
 (0)