Skip to content

Commit 91b048d

Browse files
committed
[FLINK-34554] Adding strategies to table API
1 parent 2d4f402 commit 91b048d

File tree

9 files changed

+253
-25
lines changed

9 files changed

+253
-25
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
24+
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
25+
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
26+
import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
27+
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
28+
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
29+
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
30+
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
2231
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2332
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2433
import org.apache.flink.connector.kafka.sink.KafkaSink;
@@ -27,22 +36,28 @@
2736
import org.apache.flink.types.RowKind;
2837
import org.apache.flink.util.Preconditions;
2938

39+
import com.google.common.reflect.TypeToken;
3040
import org.apache.kafka.clients.producer.ProducerRecord;
3141

3242
import javax.annotation.Nullable;
3343

44+
import java.util.ArrayList;
3445
import java.util.HashMap;
3546
import java.util.HashSet;
3647
import java.util.List;
3748
import java.util.Map;
49+
import java.util.Optional;
3850
import java.util.Set;
3951
import java.util.regex.Pattern;
4052

4153
import static org.apache.flink.util.Preconditions.checkNotNull;
4254

4355
/** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */
4456
@Internal
45-
class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> {
57+
class DynamicKafkaRecordSerializationSchema
58+
implements KafkaRecordSerializationSchema<RowData>,
59+
KafkaDatasetFacetProvider,
60+
TypeDatasetFacetProvider {
4661

4762
private final Set<String> topics;
4863
private final Pattern topicPattern;
@@ -170,6 +185,41 @@ public void open(
170185
valueSerialization.open(context);
171186
}
172187

188+
@Override
189+
public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
190+
if (topics != null) {
191+
return Optional.of(
192+
new DefaultKafkaDatasetFacet(
193+
DefaultKafkaDatasetIdentifier.ofTopics(new ArrayList<>(topics))));
194+
}
195+
if (topicPattern != null) {
196+
return Optional.of(
197+
new DefaultKafkaDatasetFacet(
198+
DefaultKafkaDatasetIdentifier.ofPattern(topicPattern)));
199+
}
200+
return Optional.empty();
201+
}
202+
203+
@Override
204+
public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
205+
if (this.valueSerialization instanceof ResultTypeQueryable) {
206+
return Optional.of(
207+
new DefaultTypeDatasetFacet(
208+
((ResultTypeQueryable<?>) this.valueSerialization).getProducedType()));
209+
} else {
210+
// gets type information from serialize method signature
211+
TypeToken serializationSchemaType = TypeToken.of(valueSerialization.getClass());
212+
Class parameterType =
213+
serializationSchemaType
214+
.resolveType(SerializationSchema.class.getTypeParameters()[0])
215+
.getRawType();
216+
if (parameterType != Object.class) {
217+
return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
218+
}
219+
}
220+
return Optional.empty();
221+
}
222+
173223
private String getTargetTopic(RowData element) {
174224
if (topics != null && topics.size() == 1) {
175225
// If topics is a singleton list, we only return the provided topic.

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.configuration.description.Description;
2626
import org.apache.flink.configuration.description.InlineElement;
2727
import org.apache.flink.connector.base.DeliveryGuarantee;
28+
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
2829
import org.apache.flink.table.factories.FactoryUtil;
2930

3031
import java.time.Duration;
@@ -272,6 +273,30 @@ public class KafkaConnectorOptions {
272273
+ DeliveryGuarantee.EXACTLY_ONCE
273274
+ " this value is used a prefix for the identifier of all opened Kafka transactions.");
274275

276+
/**
277+
* The strategy to name transactions. Naming strategy has implications on the resource
278+
* consumption on the broker because each unique transaction name requires the broker to keep
279+
* some metadata in memory for 7 days.
280+
*
281+
* <p>All naming strategies use the format {@code transactionalIdPrefix-subtask-offset} where
282+
* offset is calculated differently.
283+
*/
284+
public static final ConfigOption<TransactionNamingStrategy> TRANSACTION_NAMING_STRATEGY =
285+
ConfigOptions.key("sink.transaction-naming-strategy")
286+
.enumType(TransactionNamingStrategy.class)
287+
.defaultValue(TransactionNamingStrategy.DEFAULT)
288+
.withDescription(
289+
Description.builder()
290+
.text(
291+
"Advanced option to influence how transactions are named.")
292+
.linebreak()
293+
.text(
294+
"INCREMENTING is the strategy used in flink-kafka-connector 3.X (DEFAULT). It wastes memory of the Kafka broker but works with older Kafka broker versions (Kafka 2.X).")
295+
.linebreak()
296+
.text(
297+
"POOLING is a new strategy introduced in flink-kafka-connector 4.X. It is more resource-friendly than INCREMENTING but requires Kafka 3.0+. Switching to this strategy requires a checkpoint taken with flink-kafka-connector 4.X or a snapshot taken with earlier versions.")
298+
.build());
299+
275300
// --------------------------------------------------------------------------------------------
276301
// Enums
277302
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2828
import org.apache.flink.connector.kafka.sink.KafkaSink;
2929
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
30+
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
3031
import org.apache.flink.streaming.api.datastream.DataStream;
3132
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3233
import org.apache.flink.table.api.DataTypes;
@@ -139,6 +140,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
139140
/** Parallelism of the physical Kafka producer. * */
140141
protected final @Nullable Integer parallelism;
141142

143+
private final TransactionNamingStrategy transactionNamingStrategy;
144+
142145
public KafkaDynamicSink(
143146
DataType consumedDataType,
144147
DataType physicalDataType,
@@ -155,7 +158,8 @@ public KafkaDynamicSink(
155158
boolean upsertMode,
156159
SinkBufferFlushMode flushMode,
157160
@Nullable Integer parallelism,
158-
@Nullable String transactionalIdPrefix) {
161+
@Nullable String transactionalIdPrefix,
162+
TransactionNamingStrategy transactionNamingStrategy) {
159163
// Format attributes
160164
this.consumedDataType =
161165
checkNotNull(consumedDataType, "Consumed data type must not be null.");
@@ -168,6 +172,7 @@ public KafkaDynamicSink(
168172
this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null.");
169173
this.keyPrefix = keyPrefix;
170174
this.transactionalIdPrefix = transactionalIdPrefix;
175+
this.transactionNamingStrategy = transactionNamingStrategy;
171176
// Mutable attributes
172177
this.metadataKeys = Collections.emptyList();
173178
// Kafka-specific attributes
@@ -222,6 +227,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
222227
hasMetadata(),
223228
getMetadataPositions(physicalChildren),
224229
upsertMode))
230+
.setTransactionNamingStrategy(transactionNamingStrategy)
225231
.build();
226232
if (flushMode.isEnabled() && upsertMode) {
227233
return new DataStreamSinkProvider() {
@@ -292,7 +298,8 @@ public DynamicTableSink copy() {
292298
upsertMode,
293299
flushMode,
294300
parallelism,
295-
transactionalIdPrefix);
301+
transactionalIdPrefix,
302+
transactionNamingStrategy);
296303
copy.metadataKeys = metadataKeys;
297304
return copy;
298305
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.configuration.ReadableConfig;
2828
import org.apache.flink.connector.base.DeliveryGuarantee;
2929
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
30+
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
3031
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
3132
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
3233
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
@@ -84,6 +85,7 @@
8485
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
8586
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
8687
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
88+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY;
8789
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
8890
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
8991
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -154,6 +156,7 @@ public Set<ConfigOption<?>> optionalOptions() {
154156
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
155157
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
156158
options.add(SCAN_PARALLELISM);
159+
options.add(TRANSACTION_NAMING_STRATEGY);
157160
return options;
158161
}
159162

@@ -171,7 +174,8 @@ public Set<ConfigOption<?>> forwardOptions() {
171174
SCAN_PARALLELISM,
172175
SINK_PARTITIONER,
173176
SINK_PARALLELISM,
174-
TRANSACTIONAL_ID_PREFIX)
177+
TRANSACTIONAL_ID_PREFIX,
178+
TRANSACTION_NAMING_STRATEGY)
175179
.collect(Collectors.toSet());
176180
}
177181

@@ -290,7 +294,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
290294
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
291295
deliveryGuarantee,
292296
parallelism,
293-
tableOptions.get(TRANSACTIONAL_ID_PREFIX));
297+
tableOptions.get(TRANSACTIONAL_ID_PREFIX),
298+
tableOptions.get(TRANSACTION_NAMING_STRATEGY));
294299
}
295300

296301
// --------------------------------------------------------------------------------------------
@@ -438,7 +443,8 @@ protected KafkaDynamicSink createKafkaTableSink(
438443
KafkaPartitioner<RowData> partitioner,
439444
DeliveryGuarantee deliveryGuarantee,
440445
Integer parallelism,
441-
@Nullable String transactionalIdPrefix) {
446+
@Nullable String transactionalIdPrefix,
447+
TransactionNamingStrategy transactionNamingStrategy) {
442448
return new KafkaDynamicSink(
443449
physicalDataType,
444450
physicalDataType,
@@ -455,6 +461,7 @@ protected KafkaDynamicSink createKafkaTableSink(
455461
false,
456462
SinkBufferFlushMode.DISABLED,
457463
parallelism,
458-
transactionalIdPrefix);
464+
transactionalIdPrefix,
465+
transactionNamingStrategy);
459466
}
460467
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
7070
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
7171
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
72+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY;
7273
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
7374
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
7475
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -117,12 +118,14 @@ public Set<ConfigOption<?>> optionalOptions() {
117118
options.add(DELIVERY_GUARANTEE);
118119
options.add(TRANSACTIONAL_ID_PREFIX);
119120
options.add(SCAN_PARALLELISM);
121+
options.add(TRANSACTION_NAMING_STRATEGY);
120122
return options;
121123
}
122124

123125
@Override
124126
public Set<ConfigOption<?>> forwardOptions() {
125-
return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX).collect(Collectors.toSet());
127+
return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX, TRANSACTION_NAMING_STRATEGY)
128+
.collect(Collectors.toSet());
126129
}
127130

128131
@Override
@@ -227,7 +230,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
227230
true,
228231
flushMode,
229232
parallelism,
230-
tableOptions.get(TRANSACTIONAL_ID_PREFIX));
233+
tableOptions.get(TRANSACTIONAL_ID_PREFIX),
234+
tableOptions.get(TRANSACTION_NAMING_STRATEGY));
231235
}
232236

233237
private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,8 @@ public void checkMigration(
514514
.get()
515515
.asInstanceOf(InstanceOfAssertFactories.THROWABLE)
516516
.rootCause()
517-
.hasMessageContaining("Attempted to switch back to INCREMENTING");
517+
.hasMessageContaining(
518+
"Attempted to switch the transaction naming strategy back to INCREMENTING");
518519
}
519520
}
520521

0 commit comments

Comments
 (0)