diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 7ca60be5bf2..98fa652b684 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -27,6 +27,7 @@ import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.network.SSLMode; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; @@ -242,12 +243,26 @@ private static Map querySystemVariables( return variables; } + static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) { + try { + return mode == null ? null : SSLMode.valueOf(mode.name()); + } catch (IllegalArgumentException e) { + LOG.error("Invalid SecureConnectionMode provided {}", mode.name(), e); + } + return null; + } + public static BinlogOffset findBinlogOffset( long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); BinaryLogClient client = new BinaryLogClient( config.hostname(), config.port(), config.username(), config.password()); + SSLMode sslMode = sslModeFor(config.sslMode()); + if (sslMode != null) { + client.setSSLMode(sslMode); + } + if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index eeae681ea7b..c8913fdd168 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -21,13 +21,19 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import com.github.shyiko.mysql.binlog.network.SSLMode; import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.connector.mysql.MySqlConnectorConfig; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.ZoneId; import java.util.Properties; +import java.util.stream.Stream; /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */ class DebeziumUtilsTest { @@ -83,6 +89,26 @@ private MySqlSourceConfig getConfig(Properties jdbcProperties) { .createConfig(0); } + @ParameterizedTest + @MethodSource("sslModeProvider") + void testSslModeConversion(MySqlConnectorConfig.SecureConnectionMode input, SSLMode expected) { + SSLMode actual = DebeziumUtils.sslModeFor(input); + Assertions.assertThat(actual).isEqualTo(expected); + } + + static Stream sslModeProvider() { + return Stream.of( + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.DISABLED, SSLMode.DISABLED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.PREFERRED, SSLMode.PREFERRED), + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.REQUIRED, SSLMode.REQUIRED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA, SSLMode.VERIFY_CA), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_IDENTITY, + SSLMode.VERIFY_IDENTITY)); + } + private void assertJdbcUrl(String expected, String actual) { // Compare after splitting to avoid the orderless jdbc parameters in jdbc url at Java 11 String[] expectedParam = expected.split("&"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java new file mode 100644 index 00000000000..da5657e339e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** IT Tests for {@link MySqlSource}. */ +@Timeout(value = 20, unit = TimeUnit.SECONDS) +class MySqlSourceSSLConnectionITCase extends MySqlSourceTestBase { + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + + private final List initialData = List.of( + "{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14}", + "{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1}", + "{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8}", + "{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75}", + "{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875}", + "{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0}", + "{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3}", + "{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1}", + "{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2}" + ); + + @Test + void testSetupMysqlSourceWithSSL() throws Exception { + inventoryDatabase.createAndInitialize(); + + // Enable SSL requirement on the MySQL side, all future connections must use SSL + inventoryDatabase.enableSSLForUser(); + + Properties jdbcConfig = new Properties(); + jdbcConfig.setProperty("useSSL", "true"); + jdbcConfig.setProperty("requireSSL", "true"); + jdbcConfig.setProperty("verifyServerCertificate", "false"); + + Properties debeziumConfig = new Properties(); + debeziumConfig.setProperty("database.ssl.mode", "required"); + debeziumConfig.setProperty("database.ssl.trustServerCertificate", "true"); + + Instant startTime = Instant.now().minusSeconds(100); + + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".products") + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .jdbcProperties(jdbcConfig) + .debeziumProperties(debeziumConfig) + .serverId("5401-5404") + .deserializer(new JsonDebeziumDeserializationSchema()) + .serverTimeZone("UTC") + .includeSchemaChanges(true) // output the schema changes as well + .startupOptions(StartupOptions.timestamp(startTime.toEpochMilli())) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // enable checkpoint + env.enableCheckpointing(3000); + // set the source parallelism to 4 + DataStreamSource source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") + .setParallelism(4); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try (CloseableIterator iterator = source.executeAndCollect()) { + List rows = new ArrayList<>(); + int expectedSize = initialData.size(); + long timeoutSeconds = 30; + + while (rows.size() < expectedSize) { + // Wrap the blocking hasNext() call in a CompletableFuture with timeout + CompletableFuture hasNextFuture = CompletableFuture.supplyAsync( + iterator::hasNext, executor); + + try { + Boolean hasNext = hasNextFuture.get(timeoutSeconds, TimeUnit.SECONDS); + if (hasNext) { + String next = iterator.next(); + rows.add(next); + } else { + // No more data available + break; + } + } catch (java.util.concurrent.TimeoutException e) { + throw new TimeoutException(("Timeout while waiting for records, application" + + " is likely unable to process data from MySQL over SSL")); + } catch (ExecutionException e) { + throw new RuntimeException("Error while checking for next element", e.getCause()); + } + } + + Assertions.assertThat(rows) + .withFailMessage("should read all initial records") + .hasSize(expectedSize); + } finally { + executor.shutdownNow(); + env.close(); + } + + } + + +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java index a749bca5329..dc425e1f657 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java @@ -162,6 +162,26 @@ public Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection(container.getJdbcUrl(databaseName), username, password); } + /** + * Enable SSL requirement for the database user. This should be called after createAndInitialize() + * to enforce SSL connections for the user. + */ + public void enableSSLForUser() { + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), username, password); + Statement statement = connection.createStatement()) { + String alterUserSQL = String.format( + "ALTER USER '%s'@'%%' REQUIRE SSL;", username); + statement.execute(alterUserSQL); + statement.execute("FLUSH PRIVILEGES;"); + } + } catch (final Exception e) { + throw new IllegalStateException("Failed to enable SSL for user: " + username, e); + } + } + private String convertSQL(final String sql) { return sql.replace("$DBNAME$", databaseName); }