diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 0dd9dbf02..bd3d24bbb 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,6 +19,6 @@
-
+
\ No newline at end of file
diff --git a/docs/data/jdbc.yml b/docs/data/jdbc.yml
index 8fb41cc3d..50beae5bc 100644
--- a/docs/data/jdbc.yml
+++ b/docs/data/jdbc.yml
@@ -16,7 +16,8 @@
# limitations under the License.
################################################################################
-version: 3.1.0-SNAPSHOT
+version: 3.2.0
+flink_compatibility: [1.18, 1.19]
variants:
- maven: flink-connector-jdbc
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/$full_version/flink-connector-jdbc-$full_version.jar
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 7ba0c06dd..ee3f98dbd 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -107,13 +107,19 @@ public AbstractJdbcCatalog(
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
- JdbcCatalogUtils.validateJdbcUrl(baseUrl);
-
+ if(!baseUrl.toLowerCase().contains("oracle")){
+ JdbcCatalogUtils.validateJdbcUrl(baseUrl);
+ this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+ this.defaultUrl = this.baseUrl + defaultDatabase;
+ } else {
+ this.baseUrl = baseUrl;
+ this.defaultUrl = this.baseUrl;
+ }
this.userClassLoader = userClassLoader;
this.username = username;
this.pwd = pwd;
- this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
- this.defaultUrl = this.baseUrl + defaultDatabase;
+
+
}
@Override
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index 09d4d924b..a1ef0d452 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -22,6 +22,8 @@
import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect;
import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
+import org.apache.flink.connector.jdbc.databases.oracle.catalog.OracleCatalog;
+import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
@@ -61,6 +63,9 @@ public static AbstractJdbcCatalog createCatalog(
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
+ } else if (dialect instanceof OracleDialect){
+ return new OracleCatalog(
+ userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect));
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java
new file mode 100644
index 000000000..da3e0ec00
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java
@@ -0,0 +1,217 @@
+package org.apache.flink.connector.jdbc.databases.oracle.catalog;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog;
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.*;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+/**
+ * OracleCatalog 用于查表和查数据库,便于重建
+ */
+public class OracleCatalog extends AbstractJdbcCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class);
+
+ public static final String DEFAULT_SCHEMA = "TEST_SCHEMA";
+
+
+ public static final String IDENTIFIER = "jdbc";
+ private static final String ORACLE_DRIVER = "oracle.driver.OracleDriver";
+ private OracleTypeMapper dialectTypeMapper;
+ private static final Set builtinDatabases = new HashSet() {
+ {
+ add("SCOTT");
+ add("ANONYMOUS");
+ add("XS$NULL");
+ add("DIP");
+ add("SPATIAL_WFS_ADMIN_USR");
+ add("SPATIAL_CSW_ADMIN_USR");
+ add("APEX_PUBLIC_USER");
+ add("ORACLE_OCM");
+ add("MDDATA");
+ }
+ };
+
+ public OracleCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+ super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
+ String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
+ String databaseVersion = Preconditions.checkNotNull(getDatabaseVersion(), "Database version must not be null.");
+ LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+ this.dialectTypeMapper = new OracleTypeMapper(databaseVersion, driverVersion);
+ }
+
+ private String getDatabaseVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+ return conn.getMetaData().getDatabaseProductVersion();
+ } catch (Exception e) {
+ throw new CatalogException( String.format("Failed in getting Oracle version by %s.", defaultUrl), e);
+ }
+ }
+
+ private String getDriverVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+ String driverVersion = conn.getMetaData().getDriverVersion();
+ Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+ Matcher matcher = regexp.matcher(driverVersion);
+ return matcher.find() ? matcher.group(0) : null;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in getting Oracle driver version by %s.", defaultUrl), e);
+
+ }
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return extractColumnValuesBySQL(this.defaultUrl,
+ "select username from sys.dba_users " +
+ "where DEFAULT_TABLESPACE <> 'SYSTEM' and DEFAULT_TABLESPACE <> 'SYSAUX' " +
+ " order by username",
+ 1,
+ dbName -> !builtinDatabases.contains(dbName));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(!StringUtils.isBlank(databaseName), "Database name must not be blank");
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+
+ @Override
+ public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+ if (!databaseExists(databaseName)){// 注意这个值是 oracle 实例名称
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ List listDatabases = listDatabases().stream().map(username -> "'" + username + "'")
+ .collect(Collectors.toList());
+ return extractColumnValuesBySQL(this.defaultUrl,
+ "SELECT OWNER||'.'||TABLE_NAME AS schemaTableName FROM sys.all_tables WHERE OWNER IN (" + String.join(",", listDatabases) + ")"+
+ "ORDER BY OWNER,TABLE_NAME",1, null, null);
+ }
+
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ String databaseName = tablePath.getDatabaseName();
+ String dbUrl = baseUrl;
+ try(Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional primaryKey = getPrimaryKey(metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath));
+ String statement = String.format("SELECT * FROM %s ", getSchemaTableName(tablePath)) ;
+ PreparedStatement ps = conn.prepareStatement(statement);
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+ String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+ DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+
+ for (int i = 1; i<=resultSetMetaData.getColumnCount(); i++) {
+ columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+ types[i - 1] = fromJDBCType(tablePath, resultSetMetaData,i);
+ if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+ types[i-1] = types[i-1].notNull();
+ }
+ }
+
+ Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+ primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+ Schema tableSchema = schemaBuilder.build();
+ Map props = new HashMap<>();
+ props.put(FactoryUtil.CONNECTOR.key(), IDENTIFIER);
+ props.put("username" , username);
+ props.put("password", pwd);
+ props.put("table_name", getSchemaTableName(tablePath));
+ props.put("driverName", ORACLE_DRIVER);
+ return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+
+ } catch (Exception ex) {
+ throw new CatalogException(String.format("Failed getting Table %s", tablePath.getFullName()), ex);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ String[] schemaTableNames = getSchemaTableName(tablePath).split("\\.");
+ return !extractColumnValuesBySQL(
+ defaultUrl, "SELECT table_name FROM sys.all_tables where OWNER = ? and table_name = ?",
+ 1, null, schemaTableNames[0], schemaTableNames[1])
+ .isEmpty();
+
+ }
+
+ protected List extractColumnValuesBySQL(String connUrl, String sql, int columnIndex, Predicate filterFunc, Object... params){
+ List columnValues = Lists.newArrayList();
+ String sql1 = "SELECT table_name,OWNER FROM sys.all_tables";
+ try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+ PreparedStatement ps = conn.prepareStatement(sql)){
+ if (Objects.nonNull(params) && params.length >0){
+ for (int i=0; i 0 && precision < DecimalType.MAX_PRECISION) {
+ return DataTypes.DECIMAL(precision, metaData.getScale(colIndex));
+ }
+ return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
+ case Types.DATE:
+ return DataTypes.DATE();
+ case Types.TIMESTAMP:
+ case Types.TIMESTAMP_WITH_TIMEZONE:
+ case OracleTypes.TIMESTAMPTZ:
+ case OracleTypes.TIMESTAMPLTZ:
+ return scale > 0 ? DataTypes.TIMESTAMP(scale) : DataTypes.TIMESTAMP();
+ case OracleTypes.INTERVALYM:
+ return DataTypes.INTERVAL(DataTypes.YEAR(), DataTypes.MONTH());
+ case OracleTypes.INTERVALDS:
+ return DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND());
+ case Types.BOOLEAN:
+ return DataTypes.BOOLEAN();
+ default:
+ final String jdbcColumnName = metaData.getColumnName(colIndex);
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support Oracle type '%s' on column '%s' in Oracle version %s, driver version %s yet.",
+ oracleType, jdbcColumnName, databaseVersion, driverVersion));
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java
index db773ede8..eccdd4b26 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java
@@ -22,11 +22,24 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleDatabase;
+import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleImages;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.OracleContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
/** Base class for Oracle testing. */
@ExtendWith(OracleDatabase.class)
-public interface OracleTestBase extends DatabaseTest {
+public interface OracleTestBase extends DatabaseTest, OracleImages {
+
+ @Container
+ OracleContainer CONTAINER =
+ new OracleContainer(ORACLE_21)
+ .withStartupTimeoutSeconds(240)
+ .withConnectTimeoutSeconds(120)
+ .usingSid();
@Override
default DatabaseMetadata getMetadata() {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java
new file mode 100644
index 000000000..cf99e6035
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java
@@ -0,0 +1,167 @@
+package org.apache.flink.connector.jdbc.databases.oracle.catalog;
+
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class OracleCatalogTest extends OracleCatalogTestBase {
+
+ @Test
+ void testGetDb_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessageContaining("Database nonexistent does not exist in Catalog");
+ }
+
+ @Test
+ void testListDatabases() {
+ List actual = catalog.listDatabases();
+ assertThat(actual).isEqualTo(Arrays.asList("postgres", "test"));
+ }
+
+ @Test
+ void testDbExists() {
+ assertThat(catalog.databaseExists("nonexistent")).isFalse();
+
+ assertThat(catalog.databaseExists(OracleCatalog.DEFAULT_SCHEMA)).isTrue();
+ }
+
+ // ------ tables ------
+
+ @Test
+ void testListTables() throws DatabaseNotExistException {
+ List actual = catalog.listTables(OracleCatalog.DEFAULT_SCHEMA);
+
+ assertThat(actual)
+ .isEqualTo(
+ Arrays.asList(
+ "TEST_SCHEMA.PRIMITIVE_TABLE",
+ "TEST_SCHEMA.PRIMITIVE_TABLE2",
+ "TEST_SCHEMA.T1",
+ "TEST_SCHEMA.T2",
+ "TEST_SCHEMA.T3",
+ "TEST_SCHEMA.T4",
+ "TEST_SCHEMA.T5"));
+
+// actual = catalog.listTables(TEST_DB);
+//
+// assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3"));
+ }
+
+ @Test
+ void testListTables_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.listTables(TEST_DB))
+ .isInstanceOf(DatabaseNotExistException.class);
+ }
+
+ @Test
+ void testTableExists() {
+ assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, "nonexist"))).isFalse();
+
+ assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE1)))
+ .isTrue();
+ assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE2))).isTrue();
+ assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, "TEST_SCHEMA.T3"))).isTrue();
+ }
+
+ @Test
+ void testGetTables_TableNotExistException() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ TEST_DB,
+ PostgresTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTables_TableNotExistException_NoSchema() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ TEST_DB,
+ PostgresTablePath.toFlinkTableName(
+ "nonexistschema", "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTables_TableNotExistException_NoDb() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ "nonexistdb",
+ PostgresTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+ // test postgres.public.user1
+ Schema schema = getSimpleTable().schema;
+
+ CatalogBaseTable table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE1));
+ System.out.println(table.getUnresolvedSchema().toString());
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+// table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE2));
+//
+// assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ // test testdb.public.user2
+ table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE2));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+// table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
+//
+// assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ // test testdb.testschema.user2
+ table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TEST_SCHEMA + ".T3"));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+ }
+
+ @Test
+ void testPrimitiveDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(
+ new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_PRIMITIVE_TYPE));
+ System.out.println(table.getUnresolvedSchema().toString());
+ assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema);
+ }
+
+ @Test
+ void testArrayDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(
+ new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_ARRAY_TYPE));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema);
+ }
+
+ @Test
+ public void testSerialDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(
+ new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_SERIAL_TYPE));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema);
+ }
+}
\ No newline at end of file
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java
new file mode 100644
index 000000000..2e600c9ae
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java
@@ -0,0 +1,408 @@
+package org.apache.flink.connector.jdbc.databases.oracle.catalog;
+
+import org.apache.flink.connector.jdbc.databases.oracle.OracleTestBase;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
+import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleDatabase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+
+public class OracleCatalogTestBase implements JdbcITCaseBase, OracleTestBase {
+
+ public static final Logger LOG = LoggerFactory.getLogger(OracleCatalogTestBase.class);
+
+ protected static final String TEST_CATALOG_NAME = "mypg";
+ protected static final String TEST_USERNAME = CONTAINER.getUsername();
+ protected static final String TEST_PWD = CONTAINER.getPassword();
+ protected static final String TEST_DB = "test";
+ protected static final String TEST_SCHEMA = "TEST_SCHEMA";
+ protected static final String TEST_PASSWORD = "test_password";
+ protected static final String TABLE1 = TEST_SCHEMA + ".T1";
+ protected static final String TABLE2 = TEST_SCHEMA + ".T2";
+ protected static final String TABLE3 = TEST_SCHEMA + ".T3";
+ protected static final String TABLE4 = TEST_SCHEMA + ".T4";
+ protected static final String TABLE5 = TEST_SCHEMA + ".T5";
+ protected static final String TABLE_PRIMITIVE_TYPE = TEST_SCHEMA + ".PRIMITIVE_TABLE";
+ protected static final String TABLE_PRIMITIVE_TYPE2 = TEST_SCHEMA + ".PRIMITIVE_TABLE2";
+ protected static final String TABLE_ARRAY_TYPE = TEST_SCHEMA + ".array_table";
+ protected static final String TABLE_SERIAL_TYPE = TEST_SCHEMA + ".serial_table";
+
+ protected static String baseUrl;
+ protected static OracleCatalog catalog;
+
+
+ @BeforeAll
+ static void init() throws SQLException {
+ // jdbc:oracle:thin:@//localhost:50807/helowin
+ String jdbcUrl = CONTAINER.getJdbcUrl();
+ // jdbc:oracle:thin:@//localhost:50807/
+ baseUrl = jdbcUrl; // jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+ catalog =
+ new OracleCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ CONTAINER.getSid(),
+ TEST_USERNAME,
+ TEST_PWD,
+ baseUrl);
+
+ // create test database and schema
+ createSchema(TEST_SCHEMA, TEST_PASSWORD);
+
+ // create test tables
+ // table: helowin.public.t1
+ // table: helowin.public.t4
+ // table: helowin.public.t5
+ createTable(OracleTablePath.fromFlinkTableName(TABLE1), getSimpleTable().oracleSchemaSql);
+ createTable(OracleTablePath.fromFlinkTableName(TABLE4), getSimpleTable().oracleSchemaSql);
+ createTable(OracleTablePath.fromFlinkTableName(TABLE5), getSimpleTable().oracleSchemaSql);
+
+ // table: test.public.t2
+ // table: test.test_schema.t3
+ // table: helowin.public.dt
+ // table: helowin.public.dt2
+ createTable(
+ TEST_DB,
+ OracleTablePath.fromFlinkTableName(TABLE2),
+ getSimpleTable().oracleSchemaSql);
+ createTable(
+ TEST_DB, OracleTablePath.fromFlinkTableName(TABLE3), getSimpleTable().oracleSchemaSql);
+ createTable(
+ OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE),
+ getPrimitiveTable().oracleSchemaSql);
+ createTable(
+ OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2),
+ getPrimitiveTable("test_pk2").oracleSchemaSql);
+// createTable(
+// OracleTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE),
+// getArrayTable().oracleSchemaSql);
+// createTable(
+// OracleTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
+// getSerialTable().oracleSchemaSql);
+
+ executeSQL(
+ OracleCatalog.DEFAULT_SCHEMA,
+ String.format(
+ "insert into %s values (%s)", TABLE1, getSimpleTable().values));
+ executeSQL(
+ OracleCatalog.DEFAULT_SCHEMA,
+ String.format(
+ "insert into %s values (%s)",
+ TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values));
+// executeSQL(
+// OracleCatalog.DEFAULT_SCHEMA,
+// String.format(
+// "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values));
+// executeSQL(
+// OracleCatalog.DEFAULT_SCHEMA,
+// String.format(
+// "insert into %s values (%s)", TABLE_SERIAL_TYPE, getSerialTable().values));
+ System.out.println("success");
+ }
+
+ public static void createTable(OracleTablePath tablePath, String tableSchemaSql)
+ throws SQLException {
+ executeTableSQL(String.format("CREATE TABLE %s(%s)", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ public static void createTable(String db, OracleTablePath tablePath, String tableSchemaSql)
+ throws SQLException {
+ executeSQL(
+ db, String.format("CREATE TABLE %s(%s)", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ public static void createSchema(String schema, String userPassword) throws SQLException {
+ executeSQL(String.format("CREATE USER %s IDENTIFIED BY %s DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp", schema, userPassword));
+ executeSQL(String.format("ALTER USER %s QUOTA UNLIMITED ON USERS",schema));
+ executeSQL(String.format("GRANT CONNECT, RESOURCE, CREATE TABLE,INSERT ANY TABLE TO %s",schema));
+
+ }
+
+ public static void createDatabase(String database) throws SQLException {
+ executeSQL(String.format("CREATE DATABASE %s;", database));
+ }
+
+ public static void executeSQL(String sql) throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+ public static void executeSQLQuery(String sql) throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ ResultSet rs = statement.executeQuery(sql);
+ while (rs.next()){
+ System.out.println(rs.toString());
+ }
+
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+ public static void executeTableSQL(String sql) throws SQLException {
+ // 连接数据库并执行查询
+ try (Connection conn = DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+// String s = "CREATE TABLE test_schema.primitive_table(int integer," +
+// " bytea bytea, short smallint, long bigint, real real, double_precision double precision, " +
+// "numeric numeric(10, 5), decimal decimal(10, 1), boolean boolean, text text, char char, " +
+// "character character(3), character_varying character varying(20), timestamp timestamp(5), " +
+// "date date,time time(0), default_numeric numeric, CONSTRAINT test_pk PRIMARY KEY (short, int))";
+//
+// String s1 = "CREATE TABLE test_schema.primitive_table(int integer, bytea blob, " +
+// "st number, lg number, rl real, docision double precision," +
+// "numeric numeric(10, 5),decal decimal(10, 1), tet clob, c1har char(3)," +
+// "character character(3), character_varying character varying(20), timestamp timestamp(5), " +
+// "date_col date,default_numeric numeric, CONSTRAINT test_pk PRIMARY KEY (int))" ;
+// statement.execute(s1);
+
+ } catch (SQLException e) {
+ // 处理数据库连接和查询中的异常
+ e.printStackTrace();
+ }
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_SCHEMA, TEST_PASSWORD);
+ Statement statement = conn.createStatement()) {
+
+ statement.executeUpdate(sql);
+ String s ="insert into test_schema.t1 values (1)";
+ statement.execute(s);
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+ public static void executeSQL(String db, String sql) throws SQLException {
+ try {
+ Connection conn = DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD);
+ System.out.println("Connected to the database!");
+ // Do something with the connection
+ conn.close();
+ }catch (SQLException e) {
+ System.out.println("Error connecting to the database: " + e.getMessage());
+ }
+
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+
+ /** Object holding schema and corresponding sql. */
+ public static class TestTable {
+ Schema schema;
+ String oracleSchemaSql;
+ String values;
+
+ public TestTable(Schema schema, String oracleSchemaSql, String values) {
+ this.schema = schema;
+ this.oracleSchemaSql = oracleSchemaSql;
+ this.values = values;
+ }
+ }
+
+ public static OracleCatalogTestBase.TestTable getSimpleTable() {
+ return new OracleCatalogTestBase.TestTable(
+ Schema.newBuilder().column("ID", DataTypes.DECIMAL(38,18)).build(), "id number", "1");
+ }
+
+ // oracle doesn't support to use the same primary key name across different tables,
+ // make the table parameterized to resolve this problem.
+ public static OracleCatalogTestBase.TestTable getPrimitiveTable() {
+ return getPrimitiveTable("TEST_PK");
+ }
+
+ // TODO: add back timestamptz and time types.
+ // Flink currently doesn't support converting time's precision, with the following error
+ // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class:
+ // java.sql.Time)
+ // to type information. Only data types that originated from type information fully support a
+ // reverse conversion.
+ public static OracleCatalogTestBase.TestTable getPrimitiveTable(String primaryKeyName) {
+ return new OracleCatalogTestBase.TestTable(
+ Schema.newBuilder()
+ .column("INT_COL", DataTypes.DECIMAL(38,18).notNull())
+ .column("BYTEA_COL", DataTypes.BYTES())
+ .column("SHORT_COL", DataTypes.DECIMAL(38,18).notNull())
+ .column("LONG_COL", DataTypes.DECIMAL(38,18))
+ .column("REAL_COL", DataTypes.DECIMAL(38,18))
+ .column("DOUBLE_PRECISION_COL", DataTypes.DECIMAL(38,18))
+ .column("NUMERIC_COL", DataTypes.DECIMAL(10, 5))
+ .column("DECIMAL_COL", DataTypes.DECIMAL(10, 1))
+ .column("TEXT_COL", DataTypes.STRING())
+ .column("CHAR_COL", DataTypes.STRING())
+ .column("CHARACTER_COL", DataTypes.STRING())
+ .column("CHARACTER_VARYING_COL", DataTypes.STRING())
+ .column("TIMESTAMP_COL", DataTypes.TIMESTAMP(5))
+ // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
+ .column("DATE_COL", DataTypes.TIMESTAMP(6))
+ .column("DEFAULT_NUMERIC", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
+ .primaryKeyNamed(primaryKeyName, "SHORT_COL", "INT_COL")
+ .build(),
+ "int_col NUMBER, "
+ + "bytea_col blob, "
+ + "short_col NUMBER, "
+ + "long_col NUMBER, "
+ + "real_col real, "
+ + "double_precision_col double precision, "
+ + "numeric_col numeric(10, 5), "
+ + "decimal_col decimal(10, 1), "
+ + "text_col clob, "
+ + "char_col char(3), "
+ + "character_col character(3), "
+ + "character_varying_col character varying(20), "
+ + "timestamp_col timestamp(5), "
+ +
+ // "timestamptz timestamptz(4), " +
+ "date_col date,"
+ + "default_numeric numeric, "
+ + "CONSTRAINT "
+ + primaryKeyName
+ + " PRIMARY KEY (short_col, int_col)",
+ "1,"
+ + "'2',"
+ + "3,"
+ + "4,"
+ + "5.5,"
+ + "6.6,"
+ + "7.7,"
+ + "8.8,"
+// + "true,"
+ + "'a',"
+ + "'b',"
+ + "'c',"
+ + "'d',"
+// + "'2016-16-22 19:10:25',"
+ + "SYSTIMESTAMP,"
+ +
+ // "'2006-06-22 19:10:25'," +
+// "'2015-12-12',"
+ "SYSDATE,"
+// + "'00:51:02.746572', "
+ + "500");
+ }
+
+ // TODO: add back timestamptz once planner supports timestamp with timezone
+ public static OracleCatalogTestBase.TestTable getArrayTable() {
+ return new OracleCatalogTestBase.TestTable(
+ Schema.newBuilder()
+ .column("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+ .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+ .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+ .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+ .column(
+ "numeric_arr_default",
+ DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
+ .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+ .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .column("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+ .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+ .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+ .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5)))
+ // .field("timestamptz_arr",
+ // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
+ .column("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
+ .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0)))
+ .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+ .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .build(),
+ "int_arr integer[], "
+ + "bytea_arr bytea[], "
+ + "short_arr smallint[], "
+ + "long_arr bigint[], "
+ + "real_arr real[], "
+ + "double_precision_arr double precision[], "
+ + "numeric_arr numeric(10, 5)[], "
+ + "numeric_arr_default numeric[], "
+ + "decimal_arr decimal(10,2)[], "
+ + "boolean_arr boolean[], "
+ + "text_arr text[], "
+ + "char_arr char[], "
+ + "character_arr character(3)[], "
+ + "character_varying_arr character varying(20)[], "
+ + "timestamp_arr timestamp(5)[], "
+ +
+ // "timestamptz_arr timestamptz(4)[], " +
+ "date_arr date[], "
+ + "time_arr time(0)[], "
+ + "null_bytea_arr bytea[], "
+ + "null_text_arr text[]",
+ String.format(
+ "'{1,2,3}',"
+ + "'{2,3,4}',"
+ + "'{3,4,5}',"
+ + "'{4,5,6}',"
+ + "'{5.5,6.6,7.7}',"
+ + "'{6.6,7.7,8.8}',"
+ + "'{7.7,8.8,9.9}',"
+ + "'{8.8,9.9,10.10}',"
+ + "'{9.9,10.10,11.11}',"
+ + "'{true,false,true}',"
+ + "'{a,b,c}',"
+ + "'{b,c,d}',"
+ + "'{b,c,d}',"
+ + "'{b,c,d}',"
+ + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}',"
+ +
+ // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," +
+ "'{\"2015-01-01\", \"2020-01-01\"}',"
+ + "'{\"00:51:02.746572\", \"00:59:02.746572\"}',"
+ + "NULL,"
+ + "NULL"));
+ }
+
+ public static OracleCatalogTestBase.TestTable getSerialTable() {
+ return new OracleCatalogTestBase.TestTable(
+ Schema.newBuilder()
+ // serial fields are returned as not null by ResultSetMetaData.columnNoNulls
+ .column("f0", DataTypes.SMALLINT().notNull())
+ .column("f1", DataTypes.INT().notNull())
+ .column("f2", DataTypes.SMALLINT().notNull())
+ .column("f3", DataTypes.INT().notNull())
+ .column("f4", DataTypes.BIGINT().notNull())
+ .column("f5", DataTypes.BIGINT().notNull())
+ .build(),
+ "f0 smallserial, "
+ + "f1 serial, "
+ + "f2 serial2, "
+ + "f3 serial4, "
+ + "f4 serial8, "
+ + "f5 bigserial",
+ "32767,"
+ + "2147483647,"
+ + "32767,"
+ + "2147483647,"
+ + "9223372036854775807,"
+ + "9223372036854775807");
+ }
+
+
+}
\ No newline at end of file
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java
index 8c31b5173..4a49cd1ee 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java
@@ -20,5 +20,7 @@
/** Oracle docker images. */
public interface OracleImages {
String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim";
+ String ORACLE_12 = "gvenzl/oracle-xe:12.4.0-slim";
+ String ORACLE_19 = "goodboy008/oracle-19.3.0-ee";
String ORACLE_21 = "gvenzl/oracle-xe:21.3.0-slim-faststart";
}
diff --git a/flink-connector-jdbc/src/test/resources/log4j2-test.properties b/flink-connector-jdbc/src/test/resources/log4j2-test.properties
index 835c2ec9a..c733b5cbd 100644
--- a/flink-connector-jdbc/src/test/resources/log4j2-test.properties
+++ b/flink-connector-jdbc/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = debug
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger