Skip to content

Commit f2eb609

Browse files
committed
[feat-858][doris] Doris Connector add sql mode.
1 parent 6071537 commit f2eb609

24 files changed

+937
-273
lines changed

chunjun-connectors/chunjun-connector-doris/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242
<artifactId>jackson-databind</artifactId>
4343
<version>2.9.10.1</version>
4444
</dependency>
45+
46+
<dependency>
47+
<groupId>com.dtstack.chunjun</groupId>
48+
<artifactId>chunjun-connector-mysql</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
4551
</dependencies>
4652

4753
<build>
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.doris;
20+
21+
import org.apache.commons.lang3.StringUtils;
22+
23+
import java.util.Collections;
24+
import java.util.List;
25+
26+
public class DorisUtil {
27+
28+
private static final String JDBC_QUERY_PORT = "9030";
29+
30+
private static final String JDBC_TEMPLATE = "jdbc:mysql://%s:%s?useSSL=false";
31+
32+
/**
33+
* split fe and get fe ip. like: http://172.16.21.193:8030
34+
*
35+
* @param fe fe
36+
* @return ip of fe.
37+
*/
38+
private static String splitFe(String fe) {
39+
String[] split = fe.split("://");
40+
for (String s : split) {
41+
String[] items = s.split(":");
42+
if (items.length == 2) {
43+
return items[0];
44+
}
45+
}
46+
throw new RuntimeException("Get fe ip from fe uri: " + fe + " failed.");
47+
}
48+
49+
public static String getJdbcUrlFromFe(List<String> feNodes, String url) {
50+
if (StringUtils.isEmpty(url)) {
51+
Collections.shuffle(feNodes);
52+
String fe = feNodes.get(0);
53+
String feIp = splitFe(fe);
54+
return String.format(JDBC_TEMPLATE, feIp, JDBC_QUERY_PORT);
55+
}
56+
return url;
57+
}
58+
59+
public interface Accept<T> {
60+
void accept(T t) throws Exception;
61+
}
62+
63+
public interface Action {
64+
void action() throws Exception;
65+
}
66+
67+
public static <T> void doRetry(
68+
Accept<T> accept, Action action, T t, int retryTimes, long sleepTimeOut)
69+
throws Exception {
70+
for (int i = 0; i < retryTimes; i++) {
71+
try {
72+
accept.accept(t);
73+
return;
74+
} catch (Exception exception) {
75+
boolean retry = needRetry(exception);
76+
if (i + 1 == retryTimes || !retry) {
77+
throw exception;
78+
}
79+
80+
try {
81+
Thread.sleep(sleepTimeOut);
82+
} catch (InterruptedException interruptedException) {
83+
// ignore
84+
}
85+
action.action();
86+
}
87+
}
88+
}
89+
90+
private static boolean needRetry(Exception exception) {
91+
if (exception != null) {
92+
String errorMessage = exception.getMessage();
93+
return StringUtils.isNotEmpty(errorMessage) && errorMessage.contains("err=-235");
94+
}
95+
return false;
96+
}
97+
}

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowConvert.java renamed to chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpRowConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
* @author xuchao
3636
* @date 2021-11-21
3737
*/
38-
public class DorisRowConvert
38+
public class DorisHttpRowConverter
3939
extends AbstractRowConverter<RowData, RowData, StringJoiner, LogicalType> {
4040

4141
private static final long serialVersionUID = 2L;
4242

4343
private static final String NULL_VALUE = "\\N";
4444

45-
public DorisRowConvert(RowType rowType) {
45+
public DorisHttpRowConverter(RowType rowType) {
4646
super(rowType);
4747
for (int i = 0; i < rowType.getFieldCount(); i++) {
4848
toInternalConverters.add(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.doris.converter;
20+
21+
import com.dtstack.chunjun.connector.jdbc.converter.JdbcRowConverter;
22+
23+
import org.apache.flink.table.types.logical.RowType;
24+
25+
public class DorisJdbcRowConverter extends JdbcRowConverter {
26+
public DorisJdbcRowConverter(RowType rowType) {
27+
super(rowType);
28+
}
29+
}

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowConverter.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowTypeConvert.java

Lines changed: 0 additions & 47 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.doris.converter;
20+
21+
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
22+
23+
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.types.DataType;
25+
26+
import java.util.Locale;
27+
28+
/**
29+
* @author xuchao
30+
* @date 2021-11-21
31+
*/
32+
public class DorisRowTypeConverter {
33+
34+
public static DataType apply(String type) {
35+
switch (type.toUpperCase(Locale.ENGLISH)) {
36+
case "BOOLEAN":
37+
case "BIT":
38+
return DataTypes.BOOLEAN();
39+
case "TINYINT":
40+
return DataTypes.TINYINT();
41+
case "SMALLINT":
42+
case "MEDIUMINT":
43+
case "INT":
44+
case "INTEGER":
45+
case "INT24":
46+
return DataTypes.INT();
47+
case "BIGINT":
48+
return DataTypes.BIGINT();
49+
case "REAL":
50+
case "FLOAT":
51+
return DataTypes.FLOAT();
52+
case "DECIMAL":
53+
case "NUMERIC":
54+
case "DECIMALV2":
55+
return DataTypes.DECIMAL(38, 18);
56+
case "DOUBLE":
57+
return DataTypes.DOUBLE();
58+
case "CHAR":
59+
case "VARCHAR":
60+
case "STRING":
61+
case "JSON":
62+
case "TINYTEXT":
63+
case "TEXT":
64+
case "MEDIUMTEXT":
65+
case "LONGTEXT":
66+
case "ENUM":
67+
case "SET":
68+
return DataTypes.STRING();
69+
case "DATE":
70+
return DataTypes.DATE();
71+
case "TIME":
72+
return DataTypes.TIME();
73+
case "YEAR":
74+
return DataTypes.INTERVAL(DataTypes.YEAR());
75+
case "TIMESTAMP":
76+
case "DATETIME":
77+
return DataTypes.TIMESTAMP(0);
78+
case "TINYBLOB":
79+
case "BLOB":
80+
case "MEDIUMBLOB":
81+
case "LONGBLOB":
82+
case "BINARY":
83+
case "VARBINARY":
84+
// BYTES 底层调用的是VARBINARY最大长度
85+
case "GEOMETRY":
86+
return DataTypes.BYTES();
87+
case "NULL_TYPE":
88+
case "NULL":
89+
return DataTypes.NULL();
90+
case "HLL":
91+
default:
92+
throw new UnsupportedTypeException(type);
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)