Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
protected final String pwd;
protected final String baseUrl;
protected final String defaultUrl;
protected final String extraUrlParam;

public AbstractJdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
String baseUrl,
String extraUrlParam) {
super(catalogName, defaultDatabase);

checkNotNull(userClassLoader);
Expand All @@ -112,8 +114,13 @@ public AbstractJdbcCatalog(
this.userClassLoader = userClassLoader;
this.username = username;
this.pwd = pwd;
this.extraUrlParam = extraUrlParam;
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = this.baseUrl + defaultDatabase;
this.defaultUrl = initDefaultUrl(this.baseUrl, defaultDatabase, extraUrlParam);
}

protected String initDefaultUrl(String baseUrl, String defaultDatabase, String extraUrlParam) {
return baseUrl + defaultDatabase + extraUrlParam;
}

@Override
Expand Down Expand Up @@ -246,7 +253,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
}

String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;
String dbUrl = defaultUrl;

try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,28 @@ public JdbcCatalog(
String username,
String pwd,
String baseUrl) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
this(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, "");
}

public JdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
String extraUrlParam) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam);

internal =
JdbcCatalogUtils.createCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
extraUrlParam);
}

// ------ databases -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,31 @@ public static AbstractJdbcCatalog createCatalog(
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
String baseUrl,
String extraUrlParam) {
JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);

if (dialect instanceof PostgresDialect) {
return new PostgresCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
extraUrlParam);
} else if (dialect instanceof CrateDBDialect) {
return new CrateDBCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
extraUrlParam);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.EXTRA_URL_PARAM;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
Expand Down Expand Up @@ -60,6 +61,7 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(PROPERTY_VERSION);
options.add(EXTRA_URL_PARAM);
return options;
}

Expand All @@ -75,6 +77,7 @@ public Catalog createCatalog(Context context) {
helper.getOptions().get(DEFAULT_DATABASE),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD),
helper.getOptions().get(BASE_URL));
helper.getOptions().get(BASE_URL),
helper.getOptions().get(EXTRA_URL_PARAM));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class JdbcCatalogFactoryOptions {

public static final ConfigOption<String> BASE_URL =
ConfigOptions.key("base-url").stringType().noDefaultValue();
public static final ConfigOption<String> EXTRA_URL_PARAM =
ConfigOptions.key("extra-url-param").stringType().defaultValue("");

private JdbcCatalogFactoryOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,18 @@ public MySqlCatalog(
String username,
String pwd,
String baseUrl) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
this(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, "");
}

public MySqlCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
String extraUrlParam) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam);

String driverVersion =
Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,27 @@ public PostgresCatalog(
username,
pwd,
baseUrl,
new PostgresTypeMapper());
new PostgresTypeMapper(),
"");
}

public PostgresCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
String extraUrlParam) {
this(
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
new PostgresTypeMapper(),
extraUrlParam);
}

protected PostgresCatalog(
Expand All @@ -97,7 +117,27 @@ protected PostgresCatalog(
String pwd,
String baseUrl,
JdbcDialectTypeMapper dialectTypeMapper) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
this(
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
dialectTypeMapper,
"");
}

protected PostgresCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
JdbcDialectTypeMapper dialectTypeMapper,
String extraUrlParam) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl, extraUrlParam);
this.dialectTypeMapper = dialectTypeMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
Expand Down Expand Up @@ -532,4 +533,34 @@ void testGroupByInsert() throws Exception {
assertThat(results)
.isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, 2L, -1L)));
}

@Test
void testExtraUrlParam() throws Exception {
String extraUrlParam = "?characterEncoding=utf8";
String catalogName = "mysql_catalog_extra_param";
String baseJdbcUrl =
getMetadata()
.getJdbcUrl()
.substring(0, getMetadata().getJdbcUrl().lastIndexOf("/"));
Catalog catalog2 =
new MySqlCatalog(
Thread.currentThread().getContextClassLoader(),
TEST_CATALOG_NAME,
TEST_DB,
getMetadata().getUsername(),
getMetadata().getPassword(),
baseJdbcUrl,
extraUrlParam);

tEnv.registerCatalog(catalogName, catalog2);
tEnv.useCatalog(catalogName);

Catalog mysqlCatalog = tEnv.getCatalog(catalogName).get();

CatalogBaseTable table =
mysqlCatalog.getTable(new ObjectPath(TEST_DB, TABLE_ALL_TYPES.getTableName()));

String url = table.getOptions().get("url");
assertThat(url).isEqualTo(baseJdbcUrl + "/" + TEST_DB + extraUrlParam);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

Expand Down Expand Up @@ -193,4 +196,36 @@ void testSerialTypes() {
+ "9223372036854775807, "
+ "9223372036854775807]]");
}

@Test
void testExtraUrlParam() throws Exception {
String extraUrlParam = "?socketTimeout=180";
String catalogName = "postgre_catalog_extra_param";
String baseJdbcUrl =
getMetadata()
.getJdbcUrl()
.substring(0, getMetadata().getJdbcUrl().lastIndexOf("/"));
Catalog catalog2 =
new PostgresCatalog(
Thread.currentThread().getContextClassLoader(),
TEST_CATALOG_NAME,
PostgresCatalog.DEFAULT_DATABASE,
TEST_USERNAME,
TEST_PWD,
baseUrl,
extraUrlParam);

tEnv.registerCatalog(catalogName, catalog2);
tEnv.useCatalog(catalogName);

Catalog postgresCatalog = tEnv.getCatalog(catalogName).get();

CatalogBaseTable table =
postgresCatalog.getTable(
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE));

String url = table.getOptions().get("url");
assertThat(url)
.isEqualTo(baseJdbcUrl + "/" + PostgresCatalog.DEFAULT_DATABASE + extraUrlParam);
}
}