Skip to content

Commit 68fa900

Browse files
Paddy0523FlechazoW
authored andcommitted
[feat-4826][inceptor] Fix the lookup table driver type error
1 parent ae25ae4 commit 68fa900

File tree

1 file changed

+70
-1
lines changed
  • flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/util

1 file changed

+70
-1
lines changed

flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/util/InceptorDbUtil.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919
package com.dtstack.flinkx.connector.inceptor.util;
2020

2121
import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf;
22+
import com.dtstack.flinkx.connector.inceptor.dialect.InceptorDialect;
23+
import com.dtstack.flinkx.connector.inceptor.dialect.InceptorHdfsDialect;
24+
import com.dtstack.flinkx.connector.inceptor.dialect.InceptorHyperbaseDialect;
25+
import com.dtstack.flinkx.connector.inceptor.dialect.InceptorSearchDialect;
26+
import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf;
2227
import com.dtstack.flinkx.security.KerberosUtil;
28+
import com.dtstack.flinkx.throwable.FlinkxRuntimeException;
2329
import com.dtstack.flinkx.util.ExceptionUtil;
2430
import com.dtstack.flinkx.util.FileSystemUtil;
2531
import com.dtstack.flinkx.util.RetryUtil;
@@ -35,7 +41,10 @@
3541
import java.security.PrivilegedAction;
3642
import java.sql.Connection;
3743
import java.sql.DriverManager;
44+
import java.sql.ResultSet;
3845
import java.sql.SQLException;
46+
import java.util.Calendar;
47+
import java.util.TimeZone;
3948
import java.util.concurrent.locks.ReentrantLock;
4049

4150
/**
@@ -62,6 +71,19 @@ public final class InceptorDbUtil {
6271
public static final String INCEPTOR_TRANSACTION_COMMIT = "COMMIT";
6372
public static final String INCEPTOR_TRANSACTION_ROLLBACK = "ROLLBACK";
6473

74+
public static final String INCEPTOR_HYPERBASE_STORAGE_HANDLER = "HyperdriveStorageHandler";
75+
public static final String INCEPTOR_HBASE_STORAGE_HANDLER = "HBaseStorageHandler";
76+
public static final String INCEPTOR_SEARCH_STORAGE_HANDLER = "ElasticSearchStorageHandler";
77+
78+
public static final String INCEPROE_SHOW_DESCRIBE_FORMAT = "describe formatted %s";
79+
80+
public static final ThreadLocal<TimeZone> LOCAL_TIMEZONE =
81+
new ThreadLocal<TimeZone>() {
82+
protected TimeZone initialValue() {
83+
return Calendar.getInstance().getTimeZone();
84+
}
85+
};
86+
6587
private InceptorDbUtil() {}
6688

6789
public static Connection getConnection(
@@ -92,7 +114,7 @@ private static Connection getConnectionWithKerberos(
92114

93115
String principal =
94116
KerberosUtil.getPrincipal(connectionInfo.getHadoopConfig(), keytabFileName);
95-
KerberosUtil.loadKrb5Conf(connectionInfo.getHadoopConfig(), distributedCache, jobId);
117+
KerberosUtil.loadKrb5Conf(connectionInfo.getHadoopConfig(), distributedCache);
96118

97119
Configuration conf =
98120
FileSystemUtil.getConfiguration(connectionInfo.getHadoopConfig(), null);
@@ -162,4 +184,51 @@ public static Connection connect(InceptorConf connectionInfo) {
162184
lock.unlock();
163185
}
164186
}
187+
188+
public static InceptorDialect getDialectWithDriverType(JdbcConf jdbcConf) {
189+
String storageHandler = getTableStorageHandler(jdbcConf);
190+
storageHandler = storageHandler.substring(storageHandler.lastIndexOf(".") + 1);
191+
switch (storageHandler) {
192+
case INCEPTOR_HYPERBASE_STORAGE_HANDLER:
193+
case INCEPTOR_HBASE_STORAGE_HANDLER:
194+
return new InceptorHyperbaseDialect();
195+
case INCEPTOR_SEARCH_STORAGE_HANDLER:
196+
return new InceptorSearchDialect();
197+
default:
198+
return new InceptorHdfsDialect();
199+
}
200+
}
201+
202+
public static String getTableStorageHandler(JdbcConf jdbcConf) {
203+
Connection connection = getConnection((InceptorConf) jdbcConf, null, null);
204+
String schema = jdbcConf.getSchema();
205+
String table = jdbcConf.getTable();
206+
207+
String storageHandler = "";
208+
try {
209+
connection.createStatement().execute(String.format("use %s", schema));
210+
String sql = String.format(INCEPROE_SHOW_DESCRIBE_FORMAT, table);
211+
ResultSet resultSet = connection.createStatement().executeQuery(sql);
212+
while (resultSet.next()) {
213+
if (resultSet.getString(1).trim().equalsIgnoreCase("storage_handler")) {
214+
storageHandler = resultSet.getString(2);
215+
break;
216+
}
217+
}
218+
return storageHandler;
219+
} catch (SQLException e) {
220+
throw new FlinkxRuntimeException(
221+
String.format("failed to get table[%s] storage handler", table), e);
222+
} finally {
223+
try {
224+
connection.close();
225+
} catch (SQLException e) {
226+
e.printStackTrace();
227+
} finally {
228+
if (connection != null) {
229+
connection = null;
230+
}
231+
}
232+
}
233+
}
165234
}

0 commit comments

Comments
 (0)