diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index 7ba0c06dd..d9026cb5b 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -92,6 +92,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { protected final String pwd; protected final String baseUrl; protected final String defaultUrl; + protected final String extraUrlParam; public AbstractJdbcCatalog( ClassLoader userClassLoader, @@ -99,7 +100,8 @@ public AbstractJdbcCatalog( String defaultDatabase, String username, String pwd, - String baseUrl) { + String baseUrl, + String extraUrlParam) { super(catalogName, defaultDatabase); checkNotNull(userClassLoader); @@ -112,8 +114,13 @@ public AbstractJdbcCatalog( this.userClassLoader = userClassLoader; this.username = username; this.pwd = pwd; + this.extraUrlParam = extraUrlParam; this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - this.defaultUrl = this.baseUrl + defaultDatabase; + this.defaultUrl = initDefaultUrl(this.baseUrl, defaultDatabase, extraUrlParam); + } + + protected String initDefaultUrl(String baseUrl, String defaultDatabase, String extraUrlParam) { + return baseUrl + defaultDatabase + extraUrlParam; } @Override @@ -246,7 +253,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) } String databaseName = tablePath.getDatabaseName(); - String dbUrl = baseUrl + databaseName; + String dbUrl = defaultUrl; try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { DatabaseMetaData metaData = conn.getMetaData(); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java index 8bab2be9c..3fb8e5cf1 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java @@ -74,11 +74,28 @@ public JdbcCatalog( String username, String pwd, String baseUrl) { - super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + this(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, ""); + } + + public JdbcCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + String extraUrlParam) { + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam); internal = JdbcCatalogUtils.createCatalog( - userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + userClassLoader, + catalogName, + defaultDatabase, + username, + pwd, + baseUrl, + extraUrlParam); } // ------ databases ----- diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index 84ac0abdb..d371022fd 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -48,18 +48,31 @@ public static AbstractJdbcCatalog createCatalog( String defaultDatabase, String username, String pwd, - String baseUrl) { + String baseUrl, + String extraUrlParam) { JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader); if (dialect instanceof PostgresDialect) { return new PostgresCatalog( - userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + userClassLoader, + catalogName, + defaultDatabase, + username, + pwd, + baseUrl, + extraUrlParam); } else if (dialect instanceof CrateDBDialect) { return new CrateDBCatalog( userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); } else if (dialect instanceof MySqlDialect) { return new MySqlCatalog( - userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + userClassLoader, + catalogName, + defaultDatabase, + username, + pwd, + baseUrl, + extraUrlParam); } else { throw new UnsupportedOperationException( String.format("Catalog for '%s' is not supported yet.", dialect)); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java index 9677744d2..4170dc555 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java @@ -32,6 +32,7 @@ import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL; import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.EXTRA_URL_PARAM; import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD; import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; @@ -60,6 +61,7 @@ public Set> requiredOptions() { public Set> optionalOptions() { final Set> options = new HashSet<>(); options.add(PROPERTY_VERSION); + options.add(EXTRA_URL_PARAM); return options; } @@ -75,6 +77,7 @@ public Catalog createCatalog(Context context) { helper.getOptions().get(DEFAULT_DATABASE), helper.getOptions().get(USERNAME), helper.getOptions().get(PASSWORD), - helper.getOptions().get(BASE_URL)); + helper.getOptions().get(BASE_URL), + helper.getOptions().get(EXTRA_URL_PARAM)); } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java index 0cdde6264..ef724d7a8 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java @@ -43,6 +43,8 @@ public class JdbcCatalogFactoryOptions { public static final ConfigOption BASE_URL = ConfigOptions.key("base-url").stringType().noDefaultValue(); + public static final ConfigOption EXTRA_URL_PARAM = + ConfigOptions.key("extra-url-param").stringType().defaultValue(""); private JdbcCatalogFactoryOptions() {} } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java index b54aa23d8..653a77e96 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java @@ -67,7 +67,18 @@ public MySqlCatalog( String username, String pwd, String baseUrl) { - super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + this(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, ""); + } + + public MySqlCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + String extraUrlParam) { + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam); String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null."); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java index 7ece9b645..89a88d93a 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java @@ -86,7 +86,27 @@ public PostgresCatalog( username, pwd, baseUrl, - new PostgresTypeMapper()); + new PostgresTypeMapper(), + ""); + } + + public PostgresCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + String extraUrlParam) { + this( + userClassLoader, + catalogName, + defaultDatabase, + username, + pwd, + baseUrl, + new PostgresTypeMapper(), + extraUrlParam); } protected PostgresCatalog( @@ -97,7 +117,27 @@ protected PostgresCatalog( String pwd, String baseUrl, JdbcDialectTypeMapper dialectTypeMapper) { - super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + this( + userClassLoader, + catalogName, + defaultDatabase, + username, + pwd, + baseUrl, + dialectTypeMapper, + ""); + } + + protected PostgresCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + JdbcDialectTypeMapper dialectTypeMapper, + String extraUrlParam) { + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam); this.dialectTypeMapper = dialectTypeMapper; } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalogTestBase.java index 2bf29b3ff..3c92946cf 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalogTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -532,4 +533,34 @@ void testGroupByInsert() throws Exception { assertThat(results) .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, 2L, -1L))); } + + @Test + void testExtraUrlParam() throws Exception { + String extraUrlParam = "?characterEncoding=utf8"; + String catalogName = "mysql_catalog_extra_param"; + String baseJdbcUrl = + getMetadata() + .getJdbcUrl() + .substring(0, getMetadata().getJdbcUrl().lastIndexOf("/")); + Catalog catalog2 = + new MySqlCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + TEST_DB, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseJdbcUrl, + extraUrlParam); + + tEnv.registerCatalog(catalogName, catalog2); + tEnv.useCatalog(catalogName); + + Catalog mysqlCatalog = tEnv.getCatalog(catalogName).get(); + + CatalogBaseTable table = + mysqlCatalog.getTable(new ObjectPath(TEST_DB, TABLE_ALL_TYPES.getTableName())); + + String url = table.getOptions().get("url"); + assertThat(url).isEqualTo(baseJdbcUrl + "/" + TEST_DB + extraUrlParam); + } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalogITCase.java index 29c4c884f..71f218d95 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalogITCase.java @@ -20,6 +20,9 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; @@ -193,4 +196,36 @@ void testSerialTypes() { + "9223372036854775807, " + "9223372036854775807]]"); } + + @Test + void testExtraUrlParam() throws Exception { + String extraUrlParam = "?socketTimeout=180"; + String catalogName = "postgre_catalog_extra_param"; + String baseJdbcUrl = + getMetadata() + .getJdbcUrl() + .substring(0, getMetadata().getJdbcUrl().lastIndexOf("/")); + Catalog catalog2 = + new PostgresCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + PostgresCatalog.DEFAULT_DATABASE, + TEST_USERNAME, + TEST_PWD, + baseUrl, + extraUrlParam); + + tEnv.registerCatalog(catalogName, catalog2); + tEnv.useCatalog(catalogName); + + Catalog postgresCatalog = tEnv.getCatalog(catalogName).get(); + + CatalogBaseTable table = + postgresCatalog.getTable( + new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + + String url = table.getOptions().get("url"); + assertThat(url) + .isEqualTo(baseJdbcUrl + "/" + PostgresCatalog.DEFAULT_DATABASE + extraUrlParam); + } }