Skip to content

Commit 7f909f4

Browse files
authored
[FLINK-38691] [cdc connector mysql] Support for MySQL Transaction Boundary Events in Flink CDC Connector (#4170)
1 parent d26d2ce commit 7f909f4

File tree

9 files changed

+401
-5
lines changed

9 files changed

+401
-5
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public MySqlPipelineRecordEmitter(
9393
debeziumDeserializationSchema,
9494
sourceReaderMetrics,
9595
sourceConfig.isIncludeSchemaChanges(),
96-
false); // Explicitly disable heartbeat events
96+
false, // Explicitly disable heartbeat events
97+
false); // Explicitly disable transaction metadata events
9798
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
9899
this.sourceConfig = sourceConfig;
99100
this.alreadySendCreateTableTables = new HashSet<>();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ public static <T> MySqlSourceBuilder<T> builder() {
133133
deserializationSchema,
134134
sourceReaderMetrics,
135135
sourceConfig.isIncludeSchemaChanges(),
136-
sourceConfig.isIncludeHeartbeatEvents()));
136+
sourceConfig.isIncludeHeartbeatEvents(),
137+
sourceConfig.isIncludeTransactionMetadataEvents()));
137138
}
138139

139140
MySqlSource(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ public MySqlSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges)
198198
return this;
199199
}
200200

201+
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
202+
public MySqlSourceBuilder<T> includeTransactionMetadataEvents(
203+
boolean includeTransactionMetadataEvents) {
204+
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
205+
return this;
206+
}
207+
201208
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
202209
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
203210
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class MySqlSourceConfig implements Serializable {
6363
private final double distributionFactorLower;
6464
private final boolean includeSchemaChanges;
6565
private final boolean includeHeartbeatEvents;
66+
private final boolean includeTransactionMetadataEvents;
6667
private final boolean scanNewlyAddedTableEnabled;
6768
private final boolean closeIdleReaders;
6869
private final Properties jdbcProperties;
@@ -101,6 +102,7 @@ public class MySqlSourceConfig implements Serializable {
101102
double distributionFactorLower,
102103
boolean includeSchemaChanges,
103104
boolean includeHeartbeatEvents,
105+
boolean includeTransactionMetadataEvents,
104106
boolean scanNewlyAddedTableEnabled,
105107
boolean closeIdleReaders,
106108
Properties dbzProperties,
@@ -131,6 +133,7 @@ public class MySqlSourceConfig implements Serializable {
131133
this.distributionFactorLower = distributionFactorLower;
132134
this.includeSchemaChanges = includeSchemaChanges;
133135
this.includeHeartbeatEvents = includeHeartbeatEvents;
136+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
134137
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
135138
this.closeIdleReaders = closeIdleReaders;
136139
this.dbzProperties = checkNotNull(dbzProperties);
@@ -234,6 +237,10 @@ public boolean isIncludeHeartbeatEvents() {
234237
return includeHeartbeatEvents;
235238
}
236239

240+
public boolean isIncludeTransactionMetadataEvents() {
241+
return includeTransactionMetadataEvents;
242+
}
243+
237244
public boolean isScanNewlyAddedTableEnabled() {
238245
return scanNewlyAddedTableEnabled;
239246
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
2424
import org.apache.flink.table.catalog.ObjectPath;
2525

26+
import io.debezium.config.CommonConnectorConfig;
27+
2628
import java.io.Serializable;
2729
import java.time.Duration;
2830
import java.time.ZoneId;
@@ -64,6 +66,7 @@ public class MySqlSourceConfigFactory implements Serializable {
6466
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
6567
private boolean includeSchemaChanges = false;
6668
private boolean includeHeartbeatEvents = false;
69+
private boolean includeTransactionMetadataEvents = false;
6770
private boolean scanNewlyAddedTableEnabled = false;
6871
private boolean closeIdleReaders = false;
6972
private Properties jdbcProperties;
@@ -242,6 +245,13 @@ public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatE
242245
return this;
243246
}
244247

248+
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
249+
public MySqlSourceConfigFactory includeTransactionMetadataEvents(
250+
boolean includeTransactionMetadataEvents) {
251+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
252+
return this;
253+
}
254+
245255
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
246256
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
247257
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -366,6 +376,10 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
366376
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
367377
// only DataStream API program need to emit the schema record, the Table API need not
368378
props.setProperty("include.schema.changes", String.valueOf(true));
379+
// enable transaction metadata if includeTransactionMetadataEvents is true
380+
props.setProperty(
381+
CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(),
382+
String.valueOf(includeTransactionMetadataEvents));
369383
// disable the offset flush totally
370384
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
371385
// disable tombstones
@@ -420,6 +434,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
420434
distributionFactorLower,
421435
includeSchemaChanges,
422436
includeHeartbeatEvents,
437+
includeTransactionMetadataEvents,
423438
scanNewlyAddedTableEnabled,
424439
closeIdleReaders,
425440
props,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,20 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My
5454
private final MySqlSourceReaderMetrics sourceReaderMetrics;
5555
private final boolean includeSchemaChanges;
5656
private final boolean includeHeartbeatEvents;
57+
private final boolean includeTransactionMetadataEvents;
5758
private final OutputCollector<T> outputCollector;
5859

5960
public MySqlRecordEmitter(
6061
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
6162
MySqlSourceReaderMetrics sourceReaderMetrics,
6263
boolean includeSchemaChanges,
63-
boolean includeHeartbeatEvents) {
64+
boolean includeHeartbeatEvents,
65+
boolean includeTransactionMetadataEvents) {
6466
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
6567
this.sourceReaderMetrics = sourceReaderMetrics;
6668
this.includeSchemaChanges = includeSchemaChanges;
6769
this.includeHeartbeatEvents = includeHeartbeatEvents;
70+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
6871
this.outputCollector = new OutputCollector<>();
6972
}
7073

@@ -108,6 +111,11 @@ protected void processElement(
108111
if (includeHeartbeatEvents) {
109112
emitElement(element, output);
110113
}
114+
} else if (RecordUtils.isTransactionMetadataEvent(element)) {
115+
updateStartingOffsetForSplit(splitState, element);
116+
if (includeTransactionMetadataEvents) {
117+
emitElement(element, output);
118+
}
111119
} else {
112120
// unknown element
113121
LOG.info("Meet unknown element {}, just skip.", element);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ private RecordUtils() {}
7373
"io.debezium.connector.mysql.SchemaChangeKey";
7474
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
7575
"io.debezium.connector.common.Heartbeat";
76+
public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
77+
"io.debezium.connector.common.TransactionMetadataKey";
7678
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
7779

7880
/** Converts a {@link ResultSet} row to an array of Objects. */
@@ -339,6 +341,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
339341
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
340342
}
341343

344+
/**
345+
* Check whether the given source record is a transaction metadata event (BEGIN or END).
346+
*
347+
* <p>Transaction events are emitted by Debezium to mark transaction boundaries when
348+
* provide.transaction.metadata is enabled.
349+
*/
350+
public static boolean isTransactionMetadataEvent(SourceRecord record) {
351+
Schema keySchema = record.keySchema();
352+
return keySchema != null
353+
&& SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
354+
}
355+
342356
/**
343357
* Return the finished snapshot split information.
344358
*

0 commit comments

Comments
 (0)