Skip to content

Commit fab2e7c

Browse files
committed
[FLINK-25509][connector/kafka] Add new table option scan.record.evaluator.class for Kafka tables to stop source based on the deserialized record
1 parent 9540f87 commit fab2e7c

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ public class KafkaConnectorOptions {
194194
+ "The value 0 disables the partition discovery."
195195
+ "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka.");
196196

197+
public static final ConfigOption<String> SCAN_RECORD_EVALUATOR_CLASS =
198+
ConfigOptions.key("scan.record.evaluator.class")
199+
.stringType()
200+
.noDefaultValue()
201+
.withDescription("Record evaluator to decide the end of the Kafka stream.");
202+
197203
// --------------------------------------------------------------------------------------------
198204
// Sink specific options
199205
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.flink.streaming.connectors.kafka.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2324
import org.apache.flink.api.common.serialization.DeserializationSchema;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.connector.source.Boundedness;
27+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2628
import org.apache.flink.connector.kafka.source.KafkaSource;
2729
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
2830
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
@@ -171,6 +173,8 @@ public class KafkaDynamicSource
171173

172174
protected final String tableIdentifier;
173175

176+
@Nullable protected final RecordEvaluator<RowData> recordEvaluator;
177+
174178
public KafkaDynamicSource(
175179
DataType physicalDataType,
176180
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
@@ -188,7 +192,8 @@ public KafkaDynamicSource(
188192
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
189193
long boundedTimestampMillis,
190194
boolean upsertMode,
191-
String tableIdentifier) {
195+
String tableIdentifier,
196+
@Nullable RecordEvaluator<RowData> recordEvaluator) {
192197
// Format attributes
193198
this.physicalDataType =
194199
Preconditions.checkNotNull(
@@ -228,6 +233,7 @@ public KafkaDynamicSource(
228233
this.boundedTimestampMillis = boundedTimestampMillis;
229234
this.upsertMode = upsertMode;
230235
this.tableIdentifier = tableIdentifier;
236+
this.recordEvaluator = recordEvaluator;
231237
}
232238

233239
@Override
@@ -344,7 +350,8 @@ public DynamicTableSource copy() {
344350
specificBoundedOffsets,
345351
boundedTimestampMillis,
346352
upsertMode,
347-
tableIdentifier);
353+
tableIdentifier,
354+
recordEvaluator);
348355
copy.producedDataType = producedDataType;
349356
copy.metadataKeys = metadataKeys;
350357
copy.watermarkStrategy = watermarkStrategy;
@@ -486,6 +493,10 @@ protected KafkaSource<RowData> createKafkaSource(
486493
break;
487494
}
488495

496+
if (recordEvaluator != null) {
497+
kafkaSourceBuilder.setEofRecordEvaluator(recordEvaluator);
498+
}
499+
489500
kafkaSourceBuilder
490501
.setProperties(properties)
491502
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
@@ -568,6 +579,12 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
568579
return format.createRuntimeDecoder(context, physicalFormatDataType);
569580
}
570581

582+
@VisibleForTesting
583+
@Nullable
584+
public RecordEvaluator<RowData> getRecordEvaluator() {
585+
return recordEvaluator;
586+
}
587+
571588
// --------------------------------------------------------------------------------------------
572589
// Metadata handling
573590
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.configuration.Configuration;
2727
import org.apache.flink.configuration.ReadableConfig;
2828
import org.apache.flink.connector.base.DeliveryGuarantee;
29+
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
2930
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
3031
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
3132
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
@@ -74,6 +75,7 @@
7475
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
7576
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
7677
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
78+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_RECORD_EVALUATOR_CLASS;
7779
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
7880
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
7981
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -144,6 +146,7 @@ public Set<ConfigOption<?>> optionalOptions() {
144146
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
145147
options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
146148
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
149+
options.add(SCAN_RECORD_EVALUATOR_CLASS);
147150
options.add(SINK_PARTITIONER);
148151
options.add(SINK_PARALLELISM);
149152
options.add(DELIVERY_GUARANTEE);
@@ -166,6 +169,7 @@ public Set<ConfigOption<?>> forwardOptions() {
166169
SCAN_STARTUP_SPECIFIC_OFFSETS,
167170
SCAN_TOPIC_PARTITION_DISCOVERY,
168171
SCAN_STARTUP_TIMESTAMP_MILLIS,
172+
SCAN_RECORD_EVALUATOR_CLASS,
169173
SINK_PARTITIONER,
170174
SINK_PARALLELISM,
171175
TRANSACTIONAL_ID_PREFIX)
@@ -215,6 +219,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
215219

216220
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
217221

222+
final RecordEvaluator<RowData> recordEvaluator;
223+
try {
224+
recordEvaluator =
225+
loadRecordEvaluator(
226+
tableOptions.getOptional(SCAN_RECORD_EVALUATOR_CLASS).orElse(null));
227+
} catch (Exception e) {
228+
throw new IllegalArgumentException("Fail to load the RecordEvaluator class.", e);
229+
}
230+
218231
return createKafkaTableSource(
219232
physicalDataType,
220233
keyDecodingFormat.orElse(null),
@@ -231,7 +244,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
231244
boundedOptions.boundedMode,
232245
boundedOptions.specificOffsets,
233246
boundedOptions.boundedTimestampMillis,
234-
context.getObjectIdentifier().asSummaryString());
247+
context.getObjectIdentifier().asSummaryString(),
248+
recordEvaluator);
235249
}
236250

237251
@Override
@@ -377,6 +391,16 @@ private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig table
377391
return tableOptions.get(DELIVERY_GUARANTEE);
378392
}
379393

394+
private RecordEvaluator<RowData> loadRecordEvaluator(String recordEvaluatorClassName)
395+
throws Exception {
396+
if (recordEvaluatorClassName == null) {
397+
return null;
398+
}
399+
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
400+
Class<?> recordEvaluatorClass = classLoader.loadClass(recordEvaluatorClassName);
401+
return (RecordEvaluator<RowData>) recordEvaluatorClass.newInstance();
402+
}
403+
380404
// --------------------------------------------------------------------------------------------
381405

382406
protected KafkaDynamicSource createKafkaTableSource(
@@ -395,7 +419,8 @@ protected KafkaDynamicSource createKafkaTableSource(
395419
BoundedMode boundedMode,
396420
Map<KafkaTopicPartition, Long> specificEndOffsets,
397421
long endTimestampMillis,
398-
String tableIdentifier) {
422+
String tableIdentifier,
423+
@Nullable RecordEvaluator<RowData> recordEvaluator) {
399424
return new KafkaDynamicSource(
400425
physicalDataType,
401426
keyDecodingFormat,
@@ -413,7 +438,8 @@ protected KafkaDynamicSource createKafkaTableSource(
413438
specificEndOffsets,
414439
endTimestampMillis,
415440
false,
416-
tableIdentifier);
441+
tableIdentifier,
442+
recordEvaluator);
417443
}
418444

419445
protected KafkaDynamicSink createKafkaTableSink(

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
165165
boundedOptions.specificOffsets,
166166
boundedOptions.boundedTimestampMillis,
167167
true,
168-
context.getObjectIdentifier().asSummaryString());
168+
context.getObjectIdentifier().asSummaryString(),
169+
null);
169170
}
170171

171172
@Override

0 commit comments

Comments
 (0)