From 50cf4e89a2c04c11f04a287dcd6688842df5f417 Mon Sep 17 00:00:00 2001 From: John <179044447+watsonjo737@users.noreply.github.com> Date: Tue, 14 Oct 2025 14:02:59 +0100 Subject: [PATCH 1/5] [FLINK-38522][cdc connector mysql] use ssl for BinaryLogClient when searching for binlog offset for starting mysql cdc from timestamp --- .../mysql/debezium/DebeziumUtils.java | 19 ++++++++++++++ .../mysql/debezium/DebeziumUtilsTest.java | 26 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 7ca60be5bf2..b4b4f8a317e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -27,6 +27,7 @@ import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.network.SSLMode; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; @@ -242,12 +243,30 @@ private static Map querySystemVariables( return variables; } + static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) { + switch (mode) { + case DISABLED: + return SSLMode.DISABLED; + case PREFERRED: + return SSLMode.PREFERRED; + case REQUIRED: + return SSLMode.REQUIRED; + case VERIFY_CA: + return SSLMode.VERIFY_CA; + case VERIFY_IDENTITY: + return SSLMode.VERIFY_IDENTITY; + } + return null; + } + public static BinlogOffset findBinlogOffset( long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); BinaryLogClient client = new BinaryLogClient( config.hostname(), config.port(), config.username(), config.password()); + client.setSSLMode(sslModeFor(config.sslMode())); + if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index eeae681ea7b..c8913fdd168 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -21,13 +21,19 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import com.github.shyiko.mysql.binlog.network.SSLMode; import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.connector.mysql.MySqlConnectorConfig; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.ZoneId; import java.util.Properties; +import java.util.stream.Stream; /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */ class DebeziumUtilsTest { @@ -83,6 +89,26 @@ private MySqlSourceConfig getConfig(Properties jdbcProperties) { .createConfig(0); } + @ParameterizedTest + @MethodSource("sslModeProvider") + void testSslModeConversion(MySqlConnectorConfig.SecureConnectionMode input, SSLMode expected) { + SSLMode actual = DebeziumUtils.sslModeFor(input); + Assertions.assertThat(actual).isEqualTo(expected); + } + + static Stream sslModeProvider() { + return Stream.of( + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.DISABLED, SSLMode.DISABLED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.PREFERRED, SSLMode.PREFERRED), + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.REQUIRED, SSLMode.REQUIRED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA, SSLMode.VERIFY_CA), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_IDENTITY, + SSLMode.VERIFY_IDENTITY)); + } + private void assertJdbcUrl(String expected, String actual) { // Compare after splitting to avoid the orderless jdbc parameters in jdbc url at Java 11 String[] expectedParam = expected.split("&"); From 111681cea08c03ac343b3b4398e9cfa98e1ae603 Mon Sep 17 00:00:00 2001 From: John <179044447+watsonjo737@users.noreply.github.com> Date: Tue, 21 Oct 2025 18:58:42 +0100 Subject: [PATCH 2/5] [FLINK-38522][cdc connector mysql]: accommodate changes from PR review --- .../connectors/mysql/debezium/DebeziumUtils.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index b4b4f8a317e..90efdd73901 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -244,17 +244,10 @@ private static Map querySystemVariables( } static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) { - switch (mode) { - case DISABLED: - return SSLMode.DISABLED; - case PREFERRED: - return SSLMode.PREFERRED; - case REQUIRED: - return SSLMode.REQUIRED; - case VERIFY_CA: - return SSLMode.VERIFY_CA; - case VERIFY_IDENTITY: - return SSLMode.VERIFY_IDENTITY; + try { + return mode == null ? null : SSLMode.valueOf(mode.name()); + } catch (IllegalArgumentException e) { + LOG.error("Invalid SecureConnectionMode provided %s", mode.name(), e); } return null; } From 22e99b71bd83945d087a25334c7ebd3b37782928 Mon Sep 17 00:00:00 2001 From: John <179044447+watsonjo737@users.noreply.github.com> Date: Wed, 22 Oct 2025 10:40:03 +0100 Subject: [PATCH 3/5] [FLINK-38522][cdc connector mysql]: accommodate changes from PR review for ssl mode --- .../flink/cdc/connectors/mysql/debezium/DebeziumUtils.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 90efdd73901..98fa652b684 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -247,7 +247,7 @@ static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) { try { return mode == null ? null : SSLMode.valueOf(mode.name()); } catch (IllegalArgumentException e) { - LOG.error("Invalid SecureConnectionMode provided %s", mode.name(), e); + LOG.error("Invalid SecureConnectionMode provided {}", mode.name(), e); } return null; } @@ -258,7 +258,10 @@ public static BinlogOffset findBinlogOffset( BinaryLogClient client = new BinaryLogClient( config.hostname(), config.port(), config.username(), config.password()); - client.setSSLMode(sslModeFor(config.sslMode())); + SSLMode sslMode = sslModeFor(config.sslMode()); + if (sslMode != null) { + client.setSSLMode(sslMode); + } if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); From 8aedf5f90091f034f2fe2dc55883b738bfd4fd81 Mon Sep 17 00:00:00 2001 From: John <179044447+watsonjo737@users.noreply.github.com> Date: Wed, 22 Oct 2025 13:51:00 +0100 Subject: [PATCH 4/5] [FLINK-38522][cdc connector mysql]: add IT which shows how to configure ssl and tests to ensure ssl config is honored --- .../MySqlSourceSSLConnectionITCase.java | 116 ++++++++++++++++++ .../mysql/testutils/UniqueDatabase.java | 20 +++ 2 files changed, 136 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java new file mode 100644 index 00000000000..efa2a310388 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.CloseableIterator; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** IT Tests for {@link MySqlSource}. */ +@Timeout(value = 300, unit = TimeUnit.SECONDS) +class MySqlSourceSSLConnectionITCase extends MySqlSourceTestBase { + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + + private final List initialData = List.of( + "{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14}", + "{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1}", + "{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8}", + "{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75}", + "{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875}", + "{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0}", + "{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3}", + "{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1}", + "{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2}" + ); + + @Test + void testSetupMysqlSourceWithSSL() throws Exception { + inventoryDatabase.createAndInitialize(); + + // Enable SSL requirement on the MySQL side, all future connections must use SSL + inventoryDatabase.enableSSLForUser(); + + Properties jdbcConfig = new Properties(); + jdbcConfig.setProperty("useSSL", "true"); + jdbcConfig.setProperty("requireSSL", "true"); + jdbcConfig.setProperty("verifyServerCertificate", "false"); + + Properties debeziumConfig = new Properties(); + debeziumConfig.setProperty("database.ssl.mode", "required"); + debeziumConfig.setProperty("database.ssl.trustServerCertificate", "true"); + + Instant startTime = Instant.now().minusSeconds(100); + + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".products") + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .jdbcProperties(jdbcConfig) + .debeziumProperties(debeziumConfig) + .serverId("5401-5404") + .deserializer(new JsonDebeziumDeserializationSchema()) + .serverTimeZone("UTC") + .includeSchemaChanges(true) // output the schema changes as well + .startupOptions(StartupOptions.timestamp(startTime.toEpochMilli())) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // enable checkpoint + env.enableCheckpointing(3000); + // set the source parallelism to 4 + DataStreamSource source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") + .setParallelism(4); + + try (CloseableIterator iterator = source.executeAndCollect()) { + List rows = new ArrayList<>(); + int size = initialData.size(); + while (size > 0 && iterator.hasNext()) { + String next = iterator.next(); + rows.add(next); + size--; + } + Assertions.assertThat(rows) + .withFailMessage("should read all initial records") + .hasSize(initialData.size()); + env.close(); + } + + } + + +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java index a749bca5329..dc425e1f657 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java @@ -162,6 +162,26 @@ public Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection(container.getJdbcUrl(databaseName), username, password); } + /** + * Enable SSL requirement for the database user. This should be called after createAndInitialize() + * to enforce SSL connections for the user. + */ + public void enableSSLForUser() { + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), username, password); + Statement statement = connection.createStatement()) { + String alterUserSQL = String.format( + "ALTER USER '%s'@'%%' REQUIRE SSL;", username); + statement.execute(alterUserSQL); + statement.execute("FLUSH PRIVILEGES;"); + } + } catch (final Exception e) { + throw new IllegalStateException("Failed to enable SSL for user: " + username, e); + } + } + private String convertSQL(final String sql) { return sql.replace("$DBNAME$", databaseName); } From 6615f911f697687e6ec9103f0a6bcd52e7ce508d Mon Sep 17 00:00:00 2001 From: John <179044447+watsonjo737@users.noreply.github.com> Date: Wed, 22 Oct 2025 14:18:53 +0100 Subject: [PATCH 5/5] [FLINK-38522][cdc connector mysql]: ensure new IT Test fails when SSL is not possible --- .../MySqlSourceSSLConnectionITCase.java | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java index efa2a310388..da5657e339e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.data.RowData; import org.apache.flink.util.CloseableIterator; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -33,10 +32,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** IT Tests for {@link MySqlSource}. */ -@Timeout(value = 300, unit = TimeUnit.SECONDS) +@Timeout(value = 20, unit = TimeUnit.SECONDS) class MySqlSourceSSLConnectionITCase extends MySqlSourceTestBase { private final UniqueDatabase inventoryDatabase = @@ -96,17 +100,39 @@ void testSetupMysqlSourceWithSSL() throws Exception { DataStreamSource source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") .setParallelism(4); + ExecutorService executor = Executors.newSingleThreadExecutor(); try (CloseableIterator iterator = source.executeAndCollect()) { List rows = new ArrayList<>(); - int size = initialData.size(); - while (size > 0 && iterator.hasNext()) { - String next = iterator.next(); - rows.add(next); - size--; + int expectedSize = initialData.size(); + long timeoutSeconds = 30; + + while (rows.size() < expectedSize) { + // Wrap the blocking hasNext() call in a CompletableFuture with timeout + CompletableFuture hasNextFuture = CompletableFuture.supplyAsync( + iterator::hasNext, executor); + + try { + Boolean hasNext = hasNextFuture.get(timeoutSeconds, TimeUnit.SECONDS); + if (hasNext) { + String next = iterator.next(); + rows.add(next); + } else { + // No more data available + break; + } + } catch (java.util.concurrent.TimeoutException e) { + throw new TimeoutException(("Timeout while waiting for records, application" + + " is likely unable to process data from MySQL over SSL")); + } catch (ExecutionException e) { + throw new RuntimeException("Error while checking for next element", e.getCause()); + } } + Assertions.assertThat(rows) .withFailMessage("should read all initial records") - .hasSize(initialData.size()); + .hasSize(expectedSize); + } finally { + executor.shutdownNow(); env.close(); }