Skip to content

Commit 7e2f9c3

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents 3739c1c + 10d5a4c commit 7e2f9c3

File tree

5 files changed

+140
-30
lines changed

5 files changed

+140
-30
lines changed

flinkx-phoenix/flinkx-phoenix-core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,11 @@
2323
</exclusion>
2424
</exclusions>
2525
</dependency>
26+
27+
<dependency>
28+
<groupId>org.codehaus.janino</groupId>
29+
<artifactId>janino</artifactId>
30+
<version>3.1.1</version>
31+
</dependency>
2632
</dependencies>
2733
</project>

flinkx-phoenix/flinkx-phoenix-core/src/main/java/com/dtstack/flinkx/phoenix/util/PhoenixUtil.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,23 @@
1717
*/
1818
package com.dtstack.flinkx.phoenix.util;
1919

20+
import com.dtstack.flinkx.reader.MetaColumn;
2021
import com.dtstack.flinkx.util.ClassUtil;
2122
import com.dtstack.flinkx.util.TelnetUtil;
23+
import org.codehaus.commons.compiler.CompileException;
24+
import org.codehaus.janino.ClassBodyEvaluator;
2225

26+
import java.io.IOException;
27+
import java.io.StringReader;
2328
import java.sql.Connection;
2429
import java.sql.DriverManager;
30+
import java.sql.ResultSet;
31+
import java.sql.ResultSetMetaData;
2532
import java.sql.SQLException;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
2637

2738
/**
2839
* Date: 2020/02/28
@@ -31,21 +42,58 @@
3142
* @author tudou
3243
*/
3344
public class PhoenixUtil {
34-
public static Connection getConnectionInternal(String url, String username, String password) throws SQLException {
45+
46+
public interface IPhoenixConn {
47+
default Connection getConn(String url, String userName, String password) throws SQLException {
48+
throw new RuntimeException("this method must be override");
49+
}
50+
51+
default Connection getConn(String url) throws SQLException {
52+
throw new RuntimeException("this method must be override");
53+
}
54+
}
55+
56+
public static Connection getConnectionInternal(String url, String username, String password, ClassLoader parentClassLoader) throws SQLException, IOException, CompileException, IllegalAccessException, InstantiationException {
3557
Connection dbConn;
36-
synchronized (ClassUtil.LOCK_STR){
58+
synchronized (ClassUtil.LOCK_STR) {
3759
DriverManager.setLoginTimeout(10);
3860

3961
// telnet
4062
TelnetUtil.telnet(url);
41-
63+
ClassBodyEvaluator cbe = new ClassBodyEvaluator();
64+
cbe.setParentClassLoader(parentClassLoader);
65+
cbe.setDefaultImports("java.sql.Connection", "java.sql.DriverManager", "java.sql.SQLException");
66+
cbe.setImplementedInterfaces(new Class[]{IPhoenixConn.class});
4267
if (username == null) {
43-
dbConn = DriverManager.getConnection(url);
68+
StringReader sr = new StringReader("public Connection getConn(String url) throws SQLException { return DriverManager.getConnection(url); }");
69+
IPhoenixConn iPhoenixConn = (IPhoenixConn) cbe.createInstance(sr);
70+
dbConn = iPhoenixConn.getConn(url);
4471
} else {
45-
dbConn = DriverManager.getConnection(url, username, password);
72+
StringReader sr = new StringReader("public Connection getConn(String url, String userName, String password) throws SQLException { return DriverManager.getConnection(url, userName, password); }");
73+
IPhoenixConn iPhoenixConn = (IPhoenixConn) cbe.createInstance(sr);
74+
dbConn = iPhoenixConn.getConn(url, username, password);
4675
}
4776
}
4877

4978
return dbConn;
5079
}
80+
81+
public static List<String> analyzeTable(ResultSet rs, List<MetaColumn> metaColumns) throws SQLException {
82+
List<String> ret = new ArrayList<>(metaColumns.size());
83+
ResultSetMetaData rd = rs.getMetaData();
84+
85+
Map<String,String> nameTypeMap = new HashMap<>((rd.getColumnCount() << 2) / 3);
86+
for(int i = 0; i < rd.getColumnCount(); ++i) {
87+
nameTypeMap.put(rd.getColumnName(i+1),rd.getColumnTypeName(i+1));
88+
}
89+
90+
for (MetaColumn metaColumn : metaColumns) {
91+
if(metaColumn.getValue() != null){
92+
ret.add("string");
93+
} else {
94+
ret.add(nameTypeMap.get(metaColumn.getName()));
95+
}
96+
}
97+
return ret;
98+
}
5199
}

flinkx-phoenix/flinkx-phoenix-reader/src/main/java/com/dtstack/flinkx/phoenix/format/PhoenixInputFormat.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,25 @@
1919

2020
import com.dtstack.flinkx.phoenix.util.PhoenixUtil;
2121
import com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat;
22-
import com.dtstack.flinkx.rdb.util.DbUtil;
2322
import com.dtstack.flinkx.reader.MetaColumn;
2423
import com.dtstack.flinkx.util.ClassUtil;
2524
import com.dtstack.flinkx.util.DateUtil;
25+
import com.dtstack.flinkx.util.ReflectionUtils;
26+
import com.google.common.collect.Lists;
2627
import org.apache.commons.collections.CollectionUtils;
28+
import org.apache.commons.io.FilenameUtils;
2729
import org.apache.commons.lang3.StringUtils;
2830
import org.apache.flink.core.io.InputSplit;
31+
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
2932
import org.apache.flink.types.Row;
33+
import sun.misc.URLClassPath;
3034

3135
import java.io.IOException;
32-
import java.sql.SQLException;
36+
import java.lang.reflect.Field;
37+
import java.net.URL;
38+
import java.net.URLClassLoader;
3339
import java.util.ArrayList;
40+
import java.util.List;
3441

3542
import static com.dtstack.flinkx.rdb.util.DbUtil.clobToString;
3643

@@ -41,27 +48,48 @@
4148
*/
4249
public class PhoenixInputFormat extends JdbcInputFormat {
4350

51+
private static final String PHOENIX_READER_PREFIX = "flinkx-phoenix-reader";
52+
4453
@Override
4554
public void openInternal(InputSplit inputSplit) throws IOException {
4655
try {
4756
LOG.info(inputSplit.toString());
4857

49-
ClassUtil.forName(driverName, getClass().getClassLoader());
58+
Field declaredField = ReflectionUtils.getDeclaredField(getClass().getClassLoader(), "ucp");
59+
declaredField.setAccessible(true);
60+
URLClassPath urlClassPath = (URLClassPath) declaredField.get(getClass().getClassLoader());
61+
declaredField.setAccessible(false);
62+
63+
List<URL> needJar = Lists.newArrayList();
64+
for (URL url : urlClassPath.getURLs()) {
65+
String urlFileName = FilenameUtils.getName(url.getPath());
66+
if (urlFileName.startsWith(PHOENIX_READER_PREFIX)) {
67+
needJar.add(url);
68+
}
69+
}
5070

51-
if (incrementConfig.isIncrement() && incrementConfig.isUseMaxFunc()){
71+
ClassLoader parentClassLoader = getClass().getClassLoader();
72+
String[] alwaysParentFirstPatterns = new String[2];
73+
alwaysParentFirstPatterns[0] = "org.apache.flink";
74+
alwaysParentFirstPatterns[1] = "com.dtstack.flinkx";
75+
URLClassLoader childFirstClassLoader = FlinkUserCodeClassLoaders.childFirst(needJar.toArray(new URL[0]), parentClassLoader, alwaysParentFirstPatterns);
76+
77+
ClassUtil.forName(driverName, childFirstClassLoader);
78+
79+
if (incrementConfig.isIncrement() && incrementConfig.isUseMaxFunc()) {
5280
getMaxValue(inputSplit);
5381
}
5482

5583
initMetric(inputSplit);
5684

57-
if(!canReadData(inputSplit)){
85+
if (!canReadData(inputSplit)) {
5886
LOG.warn("Not read data when the start location are equal to end location");
5987

6088
hasNext = false;
6189
return;
6290
}
6391

64-
dbConn = PhoenixUtil.getConnectionInternal(dbUrl, username, password);
92+
dbConn = PhoenixUtil.getConnectionInternal(dbUrl, username, password, childFirstClassLoader);
6593

6694
// 部分驱动需要关闭事务自动提交,fetchSize参数才会起作用
6795
dbConn.setAutoCommit(false);
@@ -76,22 +104,22 @@ public void openInternal(InputSplit inputSplit) throws IOException {
76104
columnCount = resultSet.getMetaData().getColumnCount();
77105

78106
boolean splitWithRowCol = numPartitions > 1 && StringUtils.isNotEmpty(splitKey) && splitKey.contains("(");
79-
if(splitWithRowCol){
80-
columnCount = columnCount-1;
107+
if (splitWithRowCol) {
108+
columnCount = columnCount - 1;
81109
}
82110

83111
hasNext = resultSet.next();
84112

85-
if (StringUtils.isEmpty(customSql)){
86-
descColumnTypeList = DbUtil.analyzeTable(dbUrl, username, password,databaseInterface,table,metaColumns);
113+
if (StringUtils.isEmpty(customSql)) {
114+
descColumnTypeList = PhoenixUtil.analyzeTable(resultSet, metaColumns);
87115
} else {
88116
descColumnTypeList = new ArrayList<>();
89117
for (MetaColumn metaColumn : metaColumns) {
90118
descColumnTypeList.add(metaColumn.getName());
91119
}
92120
}
93121

94-
} catch (SQLException se) {
122+
} catch (Exception se) {
95123
throw new IllegalArgumentException("open() failed. " + se.getMessage(), se);
96124
}
97125

@@ -108,15 +136,15 @@ public Row nextRecordInternal(Row row) throws IOException {
108136
try {
109137
for (int pos = 0; pos < row.getArity(); pos++) {
110138
Object obj = resultSet.getObject(pos + 1);
111-
if(obj != null) {
112-
if(CollectionUtils.isNotEmpty(descColumnTypeList)) {
139+
if (obj != null) {
140+
if (CollectionUtils.isNotEmpty(descColumnTypeList)) {
113141
String columnType = descColumnTypeList.get(pos);
114-
if("year".equalsIgnoreCase(columnType)) {
142+
if ("year".equalsIgnoreCase(columnType)) {
115143
java.util.Date date = (java.util.Date) obj;
116144
obj = DateUtil.dateToYearString(date);
117-
} else if("tinyint".equalsIgnoreCase(columnType)
118-
|| "bit".equalsIgnoreCase(columnType)) {
119-
if(obj instanceof Boolean) {
145+
} else if ("tinyint".equalsIgnoreCase(columnType)
146+
|| "bit".equalsIgnoreCase(columnType)) {
147+
if (obj instanceof Boolean) {
120148
obj = ((Boolean) obj ? 1 : 0);
121149
}
122150
}
@@ -127,9 +155,8 @@ public Row nextRecordInternal(Row row) throws IOException {
127155
row.setField(pos, obj);
128156
}
129157
return super.nextRecordInternal(row);
130-
}catch (Exception e) {
158+
} catch (Exception e) {
131159
throw new IOException("Couldn't read data - " + e.getMessage(), e);
132160
}
133161
}
134-
135162
}

flinkx-phoenix/flinkx-phoenix-writer/src/main/java/com/dtstack/flinkx/phoenix/format/PhoenixOutputFormat.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,55 @@
2121
import com.dtstack.flinkx.phoenix.util.PhoenixUtil;
2222
import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat;
2323
import com.dtstack.flinkx.util.ClassUtil;
24+
import com.dtstack.flinkx.util.ReflectionUtils;
25+
import com.google.common.collect.Lists;
2426
import org.apache.commons.collections.CollectionUtils;
27+
import org.apache.commons.io.FilenameUtils;
28+
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
31+
import sun.misc.URLClassPath;
2732

28-
import java.sql.SQLException;
33+
import java.lang.reflect.Field;
34+
import java.net.URL;
35+
import java.net.URLClassLoader;
36+
import java.util.List;
2937

3038
/**
3139
* Company: www.dtstack.com
3240
*
3341
* @author wuhui
3442
*/
3543
public class PhoenixOutputFormat extends JdbcOutputFormat {
44+
3645
private static final Logger LOG = LoggerFactory.getLogger(PhoenixOutputFormat.class);
3746

47+
private static final String PHOENIX_WRITER_PREFIX = "flinkx-phoenix-writer";
48+
3849
@Override
3950
protected void openInternal(int taskNumber, int numTasks){
4051
try {
41-
ClassUtil.forName(driverName, getClass().getClassLoader());
42-
dbConn = PhoenixUtil.getConnectionInternal(dbUrl, username, password);
52+
Field declaredField = ReflectionUtils.getDeclaredField(getClass().getClassLoader(), "ucp");
53+
declaredField.setAccessible(true);
54+
URLClassPath urlClassPath = (URLClassPath) declaredField.get(getClass().getClassLoader());
55+
declaredField.setAccessible(false);
56+
57+
List<URL> needJar = Lists.newArrayList();
58+
for(URL url : urlClassPath.getURLs()){
59+
String urlFileName = FilenameUtils.getName(url.getPath());
60+
if(urlFileName.startsWith(PHOENIX_WRITER_PREFIX)){
61+
needJar.add(url);
62+
}
63+
}
64+
65+
ClassLoader parentClassLoader = getClass().getClassLoader();
66+
String[] alwaysParentFirstPatterns = new String[2];
67+
alwaysParentFirstPatterns[0] = "org.apache.flink";
68+
alwaysParentFirstPatterns[1] = "com.dtstack.flinkx";
69+
URLClassLoader childFirstClassLoader = FlinkUserCodeClassLoaders.childFirst(needJar.toArray(new URL[0]), parentClassLoader, alwaysParentFirstPatterns);
70+
71+
ClassUtil.forName(driverName, childFirstClassLoader);
72+
dbConn = PhoenixUtil.getConnectionInternal(dbUrl, username, password, childFirstClassLoader);
4373

4474
if (restoreConfig.isRestore()){
4575
dbConn.setAutoCommit(false);
@@ -66,9 +96,8 @@ protected void openInternal(int taskNumber, int numTasks){
6696
readyCheckpoint = false;
6797

6898
LOG.info("subTask[{}}] wait finished", taskNumber);
69-
} catch (SQLException sqe) {
99+
} catch (Exception sqe) {
70100
throw new IllegalArgumentException("open() failed.", sqe);
71101
}
72102
}
73-
74103
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<module>flinkx-ftp</module>
3838
<module>flinkx-odps</module>
3939
<module>flinkx-hbase</module>
40-
<!-- <module>flinkx-phoenix</module>-->
40+
<module>flinkx-phoenix</module>
4141
<module>flinkx-phoenix5</module>
4242
<module>flinkx-carbondata</module>
4343
<module>flinkx-kudu</module>

0 commit comments

Comments
 (0)