Skip to content

Commit d7570c7

Browse files
yanghuaiGitdujie
andauthored
[Feature-#1944][OceanBase]support MysqlMode and Oracle Mode (#1945)
Co-authored-by: dujie <[email protected]>
1 parent d39ce22 commit d7570c7

File tree

14 files changed

+545
-13
lines changed

14 files changed

+545
-13
lines changed

chunjun-connectors/chunjun-connector-oceanbase/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@
4848
<artifactId>chunjun-connector-jdbc-base</artifactId>
4949
<version>${project.version}</version>
5050
</dependency>
51+
52+
<dependency>
53+
<groupId>com.dtstack.chunjun</groupId>
54+
<artifactId>chunjun-connector-oracle</artifactId>
55+
<version>${project.version}</version>
56+
</dependency>
57+
<dependency>
58+
<groupId>com.dtstack.chunjun</groupId>
59+
<artifactId>chunjun-connector-mysql</artifactId>
60+
<version>${project.version}</version>
61+
</dependency>
62+
5163
</dependencies>
5264
<build>
5365
<plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.dtstack.chunjun.connector.oceanbase.config;
2+
3+
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
4+
5+
public class OceanBaseConf extends JdbcConfig {
6+
7+
private String oceanBaseMode = OceanBaseMode.MYSQL.name();
8+
9+
public String getOceanBaseMode() {
10+
return oceanBaseMode;
11+
}
12+
13+
public void setOceanBaseMode(String oceanBaseMode) {
14+
this.oceanBaseMode = oceanBaseMode;
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.dtstack.chunjun.connector.oceanbase.config;
2+
3+
public enum OceanBaseMode {
4+
MYSQL,
5+
ORACLE
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.dtstack.chunjun.connector.oceanbase.converter;
2+
3+
import com.dtstack.chunjun.config.CommonConfig;
4+
import com.dtstack.chunjun.connector.oracle.converter.BlobType;
5+
import com.dtstack.chunjun.connector.oracle.converter.ClobType;
6+
import com.dtstack.chunjun.connector.oracle.converter.ConvertUtil;
7+
import com.dtstack.chunjun.connector.oracle.converter.OracleSyncConverter;
8+
import com.dtstack.chunjun.converter.IDeserializationConverter;
9+
import com.dtstack.chunjun.element.column.BytesColumn;
10+
import com.dtstack.chunjun.element.column.StringColumn;
11+
import com.dtstack.chunjun.element.column.TimestampColumn;
12+
import com.dtstack.chunjun.element.column.ZonedTimestampColumn;
13+
14+
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
15+
import org.apache.flink.table.types.logical.LogicalType;
16+
import org.apache.flink.table.types.logical.RowType;
17+
import org.apache.flink.table.types.logical.TimestampType;
18+
import org.apache.flink.table.types.logical.ZonedTimestampType;
19+
20+
import com.oceanbase.jdbc.Blob;
21+
import com.oceanbase.jdbc.Clob;
22+
import com.oceanbase.jdbc.extend.datatype.DataTypeUtilities;
23+
import com.oceanbase.jdbc.extend.datatype.TIMESTAMPLTZ;
24+
import com.oceanbase.jdbc.extend.datatype.TIMESTAMPTZ;
25+
26+
import java.sql.Timestamp;
27+
import java.util.TimeZone;
28+
29+
public class OceanbaseOracleSyncConverter extends OracleSyncConverter {
30+
31+
public OceanbaseOracleSyncConverter(RowType rowType, CommonConfig commonConfig) {
32+
super(rowType, commonConfig);
33+
}
34+
35+
@Override
36+
protected IDeserializationConverter createInternalConverter(LogicalType type) {
37+
switch (type.getTypeRoot()) {
38+
case VARCHAR:
39+
if (type instanceof ClobType) {
40+
return val -> {
41+
Clob clob = (Clob) val;
42+
return new StringColumn(ConvertUtil.convertClob(clob));
43+
};
44+
}
45+
return val -> new StringColumn(val.toString());
46+
case VARBINARY:
47+
return val -> {
48+
if (type instanceof BlobType) {
49+
Blob blob = (Blob) val;
50+
byte[] bytes = blob.getBytes(1, (int) blob.length());
51+
return new BytesColumn(bytes);
52+
} else {
53+
return new BytesColumn((byte[]) val);
54+
}
55+
};
56+
case TIMESTAMP_WITHOUT_TIME_ZONE:
57+
final int precision = ((TimestampType) type).getPrecision();
58+
if (precision == 6) {
59+
return val -> new TimestampColumn((Timestamp) val, 0); // java.sql.Timestamp
60+
}
61+
case TIMESTAMP_WITH_TIME_ZONE:
62+
if (type instanceof ZonedTimestampType) {
63+
final int zonedPrecision = ((ZonedTimestampType) type).getPrecision();
64+
return val -> {
65+
TIMESTAMPTZ timestamptz = (TIMESTAMPTZ) val;
66+
Timestamp timestamp = timestamptz.timestampValue();
67+
return new ZonedTimestampColumn(timestamp, zonedPrecision);
68+
};
69+
}
70+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
71+
if (type instanceof LocalZonedTimestampType) {
72+
final int localPrecision = ((LocalZonedTimestampType) type).getPrecision();
73+
return val -> {
74+
TIMESTAMPLTZ timestamptz = (TIMESTAMPLTZ) val;
75+
// 重写处理12个字节情况,TIMESTAMPLTZ#toTimestamp
76+
byte[] bytes = timestamptz.toBytes(); // 获取字节码
77+
TimeZone timeZone;
78+
if (bytes.length >= 14) {
79+
// 字节数组长度足够,尝试提取时区信息
80+
String tzStr =
81+
DataTypeUtilities.toTimezoneStr(
82+
bytes[12], bytes[13], "GMT", true);
83+
timeZone = TimeZone.getTimeZone(tzStr);
84+
} else {
85+
// 字节数组长度不足,使用默认时区
86+
timeZone = TimeZone.getDefault();
87+
}
88+
Timestamp timestamp =
89+
new Timestamp(DataTypeUtilities.getOriginTime(bytes, timeZone));
90+
timestamp.setNanos(DataTypeUtilities.getNanos(bytes, 7));
91+
return new ZonedTimestampColumn(timestamp, timeZone, localPrecision);
92+
};
93+
}
94+
}
95+
return super.createInternalConverter(type);
96+
}
97+
}

chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public static DataType apply(TypeConfig type) {
5050
case "DECIMAL":
5151
case "DECIMAL UNSIGNED":
5252
case "NUMERIC":
53+
case "NUMBER":
5354
return DataTypes.DECIMAL(38, 18);
5455
case "DOUBLE":
5556
case "DOUBLE UNSIGNED":
@@ -81,6 +82,7 @@ public static DataType apply(TypeConfig type) {
8182
case "LONGTEXT":
8283
case "ENUM":
8384
case "SET":
85+
case "VARCHAR2":
8486
return DataTypes.STRING();
8587
default:
8688
throw new UnsupportedTypeException(type);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.dtstack.chunjun.connector.oceanbase.dialect;
2+
3+
import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter;
4+
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
5+
import com.dtstack.chunjun.converter.RawTypeMapper;
6+
7+
import java.util.Optional;
8+
9+
public class OceanbaseMysqlModeDialect extends MysqlDialect {
10+
OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();
11+
12+
@Override
13+
public String dialectName() {
14+
return oceanbaseDialect.dialectName();
15+
}
16+
17+
@Override
18+
public boolean canHandle(String url) {
19+
return oceanbaseDialect.canHandle(url);
20+
}
21+
22+
@Override
23+
public RawTypeMapper getRawTypeConverter() {
24+
return MysqlRawTypeConverter::apply;
25+
}
26+
27+
@Override
28+
public Optional<String> defaultDriverName() {
29+
return oceanbaseDialect.defaultDriverName();
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.dtstack.chunjun.connector.oceanbase.dialect;
2+
3+
import com.dtstack.chunjun.config.CommonConfig;
4+
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
5+
import com.dtstack.chunjun.connector.oceanbase.converter.OceanbaseOracleSyncConverter;
6+
import com.dtstack.chunjun.connector.oracle.converter.OracleRawTypeConverter;
7+
import com.dtstack.chunjun.connector.oracle.dialect.OracleDialect;
8+
import com.dtstack.chunjun.converter.AbstractRowConverter;
9+
import com.dtstack.chunjun.converter.RawTypeMapper;
10+
11+
import org.apache.flink.table.types.logical.LogicalType;
12+
import org.apache.flink.table.types.logical.RowType;
13+
14+
import io.vertx.core.json.JsonArray;
15+
16+
import java.sql.ResultSet;
17+
import java.util.Optional;
18+
19+
public class OceanbaseOracleModeDialect extends OracleDialect {
20+
OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();
21+
22+
@Override
23+
public String dialectName() {
24+
return oceanbaseDialect.dialectName();
25+
}
26+
27+
@Override
28+
public boolean canHandle(String url) {
29+
return oceanbaseDialect.canHandle(url);
30+
}
31+
32+
@Override
33+
public RawTypeMapper getRawTypeConverter() {
34+
return OracleRawTypeConverter::apply;
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return oceanbaseDialect.defaultDriverName();
40+
}
41+
42+
@Override
43+
public AbstractRowConverter<ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType>
44+
getColumnConverter(RowType rowType, CommonConfig commonConfig) {
45+
return new OceanbaseOracleSyncConverter(rowType, commonConfig);
46+
}
47+
}

chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,31 @@
1818
package com.dtstack.chunjun.connector.oceanbase.sink;
1919

2020
import com.dtstack.chunjun.config.SyncConfig;
21+
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
2122
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
22-
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
23+
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
24+
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
25+
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
26+
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;
2327

2428
public class OceanbaseSinkFactory extends JdbcSinkFactory {
29+
30+
private OceanBaseConf oceanBaseConf;
31+
2532
public OceanbaseSinkFactory(SyncConfig syncConfig) {
26-
super(syncConfig, new OceanbaseDialect());
33+
super(syncConfig, new OceanbaseMysqlModeDialect());
34+
this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
35+
if (oceanBaseConf != null) {
36+
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
37+
// 若是for oracle模式
38+
if (mode == OceanBaseMode.ORACLE) {
39+
this.jdbcDialect = new OceanbaseOracleModeDialect();
40+
}
41+
}
42+
}
43+
44+
@Override
45+
protected Class<? extends JdbcConfig> getConfClass() {
46+
return OceanBaseConf.class;
2747
}
2848
}

chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,76 @@
1818
package com.dtstack.chunjun.connector.oceanbase.source;
1919

2020
import com.dtstack.chunjun.config.SyncConfig;
21+
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
2122
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
22-
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
23+
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
24+
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
25+
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
26+
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;
2327

2428
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2529

2630
import org.apache.commons.lang3.StringUtils;
2731

32+
import java.util.Properties;
33+
2834
public class OceanbaseSourceFactory extends JdbcSourceFactory {
35+
// 默认是Mysql流式拉取
36+
private static final int DEFAULT_FETCH_SIZE = Integer.MIN_VALUE;
37+
private static final String ORACLE_JDBC_READ_TIMEOUT = "oracle.jdbc.ReadTimeout";
38+
private static final String ORACLE_NET_CONNECT_TIMEOUT = "oracle.net.CONNECT_TIMEOUT";
39+
40+
private OceanBaseConf oceanBaseConf;
41+
2942
public OceanbaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) {
30-
super(syncConfig, env, new OceanbaseDialect());
31-
if (jdbcConfig.isPolling()
32-
&& StringUtils.isEmpty(jdbcConfig.getStartLocation())
33-
&& jdbcConfig.getFetchSize() == 0) {
34-
jdbcConfig.setFetchSize(1000);
43+
super(syncConfig, env, new OceanbaseMysqlModeDialect()); // 默认为mysql方言
44+
this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
45+
if (oceanBaseConf != null) {
46+
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
47+
// 若是for oracle模式
48+
if (mode == OceanBaseMode.ORACLE) {
49+
// 设置for oracle方言
50+
this.jdbcDialect = new OceanbaseOracleModeDialect();
51+
Properties properties = jdbcConfig.getProperties();
52+
if (properties == null) {
53+
properties = new Properties();
54+
}
55+
if (jdbcConfig.getConnectTimeOut() != 0) {
56+
// queryTimeOut单位是秒 需要转换成毫秒
57+
properties.putIfAbsent(
58+
ORACLE_JDBC_READ_TIMEOUT,
59+
String.valueOf(jdbcConfig.getQueryTimeOut() * 1000));
60+
properties.putIfAbsent(
61+
ORACLE_NET_CONNECT_TIMEOUT,
62+
String.valueOf(jdbcConfig.getQueryTimeOut() * 3 * 1000));
63+
jdbcConfig.setProperties(properties);
64+
}
65+
} else {
66+
// 其他情况:for mysql模式 初始化
67+
// 避免result.next阻塞
68+
if (jdbcConfig.isPolling()
69+
&& StringUtils.isEmpty(jdbcConfig.getStartLocation())
70+
&& jdbcConfig.getFetchSize() == 0) {
71+
jdbcConfig.setFetchSize(1000);
72+
}
73+
}
74+
}
75+
}
76+
77+
@Override
78+
protected Class<? extends JdbcConfig> getConfClass() {
79+
return OceanBaseConf.class;
80+
}
81+
82+
@Override
83+
protected int getDefaultFetchSize() {
84+
if (oceanBaseConf != null) {
85+
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
86+
// 处理for oracle情况
87+
if (mode == OceanBaseMode.ORACLE) {
88+
return super.getDefaultFetchSize();
89+
}
3590
}
91+
return DEFAULT_FETCH_SIZE;
3692
}
3793
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.dtstack.chunjun.connector.oceanbase.table;
2+
3+
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
4+
import com.dtstack.chunjun.connector.mysql.table.MysqlDynamicTableFactory;
5+
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
6+
7+
public class MysqlDynamicTableFactoryProxy extends MysqlDynamicTableFactory {
8+
@Override
9+
protected JdbcDialect getDialect() {
10+
return new OceanbaseMysqlModeDialect();
11+
}
12+
}

0 commit comments

Comments
 (0)