Skip to content

Commit 0afb694

Browse files
committed
merge 1.15.0 branch to master
2 parents 864d436 + ff6a328 commit 0afb694

File tree

221 files changed

+6057
-1168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

221 files changed

+6057
-1168
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
.idea
22
.idea/*
3+
.metals
4+
.vscode
5+
.scalafmt.conf
6+
.bloop
37

48
.DS_Store
59

assembly/dss-package/pom.xml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<artifactId>dss</artifactId>
2323
<groupId>com.webank.wedatasphere.dss</groupId>
24-
<version>1.2.2</version>
24+
<version>1.15.0-SNAPSHOT</version>
2525
<relativePath>../../pom.xml</relativePath>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
@@ -56,10 +56,6 @@
5656
<artifactId>xstream</artifactId>
5757
<groupId>com.thoughtworks.xstream</groupId>
5858
</exclusion>
59-
<exclusion>
60-
<artifactId>servlet-api</artifactId>
61-
<groupId>javax.servlet</groupId>
62-
</exclusion>
6359
</exclusions>
6460
</dependency>
6561
<dependency>

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<artifactId>dss</artifactId>
2424
<groupId>com.webank.wedatasphere.dss</groupId>
25-
<version>1.2.2</version>
25+
<version>1.15.0-SNAPSHOT</version>
2626
<relativePath>../pom.xml</relativePath>
2727
</parent>
2828
<packaging>pom</packaging>

dss-appconn/appconns/dss-datachecker-appconn/pom.xml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<artifactId>dss</artifactId>
2323
<groupId>com.webank.wedatasphere.dss</groupId>
24-
<version>1.2.2</version>
24+
<version>1.15.0-SNAPSHOT</version>
2525
<relativePath>../../../pom.xml</relativePath>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
@@ -54,7 +54,7 @@
5454
<dependency>
5555
<groupId>com.webank.wedatasphere.dss</groupId>
5656
<artifactId>dss-origin-sso-integration-standard</artifactId>
57-
<version>1.2.2</version>
57+
<version>1.15.0-SNAPSHOT</version>
5858
<exclusions>
5959
<exclusion>
6060
<groupId>org.apache.linkis</groupId>
@@ -101,7 +101,6 @@
101101
<dependency>
102102
<groupId>com.alibaba</groupId>
103103
<artifactId>druid</artifactId>
104-
<version>1.0.28</version>
105104
</dependency>
106105
<dependency>
107106
<groupId>com.webank.wedatasphere.dss</groupId>

dss-appconn/appconns/dss-datachecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/datachecker/DataChecker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class DataChecker {
4545

4646
public final static String QUALITIS_CHECK = "qualitis.check";
4747
public final static String QUALITIS_SWITCH = "job.eventchecker.qualitis.switch";
48+
public final static String DENY_VIEW_SWITCH = "deny.view.switch";
4849
public final static String QUALITIS_CHECK_DEFAULT = "qualitis.check.default";
4950
public final static String EXPAND_SECOND_PARTITION = "hourly.secondary.partition";
5051

@@ -60,7 +61,7 @@ public class DataChecker {
6061
public DataChecker(Properties p, DataCheckerExecutionAction action) {
6162
this.p = p;
6263
dataCheckerAction = action;
63-
maxWaitTime = Long.valueOf(p.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000;
64+
maxWaitTime = (long) (Double.valueOf(p.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000);
6465
//test over time
6566
// maxWaitTime = Long.valueOf(p.getProperty(DataChecker.WAIT_TIME, "1")) * 120 * 1000;
6667
queryFrequency = Integer.valueOf(p.getProperty(DataChecker.QUERY_FREQUENCY, "60000"));

dss-appconn/appconns/dss-datachecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/datachecker/connector/DataCheckerDao.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
105105
// action.getExecutionRequestRefContext().appendLog("Database table partition info : " + dataCheckerInfo);
106106
}
107107
log.info("(DataChecker info) database table partition info : " + dataCheckerInfo);
108-
long waitTime = Long.valueOf(props.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000;
108+
long waitTime = (long) (Double.valueOf(props.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000);
109109
int queryFrequency = Integer.valueOf(props.getProperty(DataChecker.QUERY_FREQUENCY, "60000"));
110110
// String timeScape = props.getProperty(DataChecker.TIME_SCAPE, "NULL");
111111
log.info("(DataChecker info) wait time : " + waitTime);
@@ -197,7 +197,9 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
197197
}
198198
log.info("start to check hive meta");
199199
proObjectMap.put(DataChecker.SOURCE_TYPE, HIVE_SOURCE_TYPE);
200-
normalCheck= getJobTotalCount(dataObject, jobConn, log) > 0;
200+
boolean denyView = Boolean.parseBoolean(
201+
props.getProperty(DataChecker.DENY_VIEW_SWITCH, "false"));
202+
normalCheck= getJobTotalCount(dataObject, jobConn, log,denyView) > 0;
201203
if (null != action.getExecutionRequestRefContext()){
202204
action.getExecutionRequestRefContext().appendLog(dataObjectStr+" check hive meta end,check result:"+normalCheck);
203205
}
@@ -386,14 +388,20 @@ private CheckDataObject parseDataObject(String dataObjectStr) {
386388
/**
387389
* 查hive 元数据库
388390
*/
389-
private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logger log) {
391+
private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logger log,boolean denyView) {
390392
log.info("-------------------------------------- search hive/spark/mr data ");
391393
log.info("-------------------------------------- dataObject: " + dataObject);
392394
try (PreparedStatement pstmt = getJobStatement(conn, dataObject)) {
393395
ResultSet rs = pstmt.executeQuery();
394396
// long ret = rs.last() ? rs.getRow() : 0;
395397
long ret = 0L;
396398
while (rs.next()) {
399+
String tableType = rs.getString("TBL_TYPE");
400+
//如果禁用视图,则检查视图,有则报错
401+
if (denyView && "VIRTUAL_VIEW".equals(tableType)) {
402+
log.error("Virtual view table is not allowed to use DataChecker. check object:{}", dataObject);
403+
throw new DSSRuntimeException("Virtual view table is not allowed to use DataChecker. check object:{}" + dataObject);
404+
}
397405
ret ++;
398406
}
399407
log.info("-------------------------------------- hive/spark/mr data result:"+ret);
@@ -573,7 +581,7 @@ private Map<String, String> fetchMaskCode(CheckDataObject dataObject, Logger log
573581
Response response = HttpUtils.httpClientHandleBase(maskUrl, requestBody, dataMap);
574582
handleResponse(response, resultMap, log);
575583
} catch (IOException e) {
576-
log.error("fetch data from BDP MASK failed ");
584+
log.error("fetch data from BDP MASK failed ",e);
577585
resultMap.put("maskStatus", "noPrepare");
578586
} catch (MaskCheckNotExistException e) {
579587
String errorMessage = "fetch data from BDP MASK failed" +

dss-appconn/appconns/dss-dolphinscheduler-appconn/pom.xml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>dss</artifactId>
88
<groupId>com.webank.wedatasphere.dss</groupId>
9-
<version>1.2.2</version>
9+
<version>1.15.0-SNAPSHOT</version>
1010
<relativePath>../../../pom.xml</relativePath>
1111
</parent>
1212
<modelVersion>4.0.0</modelVersion>
@@ -72,11 +72,6 @@
7272
<version>${dss.version}</version>
7373
<scope>provided</scope>
7474
</dependency>
75-
<dependency>
76-
<groupId>org.apache.linkis</groupId>
77-
<artifactId>linkis-storage-script-dev-server</artifactId>
78-
<version>1.0.3</version>
79-
</dependency>
8075
<dependency>
8176
<groupId>com.google.guava</groupId>
8277
<artifactId>guava</artifactId>

dss-appconn/appconns/dss-eventchecker-appconn/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<artifactId>dss</artifactId>
2323
<groupId>com.webank.wedatasphere.dss</groupId>
24-
<version>1.2.2</version>
24+
<version>1.15.0-SNAPSHOT</version>
2525
<relativePath>../../../pom.xml</relativePath>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
@@ -86,7 +86,6 @@
8686
<dependency>
8787
<groupId>com.alibaba</groupId>
8888
<artifactId>druid</artifactId>
89-
<version>1.0.28</version>
9089
</dependency>
9190

9291
<dependency>

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/HttpEventCheckSender.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.gson.Gson;
2020
import com.google.gson.GsonBuilder;
2121
import com.google.gson.JsonSyntaxException;
22+
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.EventChecker;
2223
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.HttpMsgSendRequest;
2324
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.HttpMsgSendResponse;
2425
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.EventCheckerHttpUtils;
@@ -42,7 +43,10 @@ public HttpEventCheckSender(Properties props) {
4243
@Override
4344
public boolean sendMsg(int jobId, Properties props, Logger log) {
4445
boolean result = false;
45-
String url = props.getProperty(HTTP_EVENT_KGAS_SEND_URL);
46+
String url=getSendURL(props);
47+
if (url == null) {
48+
url = props.getProperty(HTTP_EVENT_KGAS_SEND_URL);
49+
}
4650
String key = props.getProperty(HTTP_EVENT_SIGN_KEY);
4751
String timestamp = String.valueOf(System.currentTimeMillis());
4852
String sign = EventCheckerHttpUtils.calculateSign(key, timestamp);
@@ -94,4 +98,25 @@ public boolean sendMsg(int jobId, Properties props, Logger log) {
9498

9599
return result;
96100
}
101+
102+
103+
/**
104+
* 根据CHANNEL_TYPE来从配置文件中匹配URL
105+
* @param propskey
106+
* @return
107+
*/
108+
private String getSendURL(Properties propskey) {
109+
String channelType = propskey.getProperty(EventChecker.CHANNEL_TYPE);
110+
111+
String filteredValue = null;
112+
for (String key : propskey.stringPropertyNames()) {
113+
String value = propskey.getProperty(key);
114+
if (channelType != null && key.toLowerCase().contains(channelType.toLowerCase()) && key.endsWith("send.url")) {
115+
filteredValue = value;
116+
break;
117+
}
118+
}
119+
120+
return filteredValue;
121+
}
97122
}

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/HttpEventcheckerReceiver.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.google.gson.Gson;
55
import com.google.gson.GsonBuilder;
6+
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.EventChecker;
67
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.HttpMsgReceiveRequest;
78
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.HttpMsgReceiveResponse;
89
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.EventCheckerHttpUtils;
@@ -30,7 +31,10 @@ public HttpEventcheckerReceiver(Properties props) {
3031

3132
@Override
3233
public String[] getMsg(int jobId,Properties props, Logger log, String... params) {
33-
String url=props.getProperty(HTTP_EVENT_KGAS_RECEIVE_URL);
34+
String url=getReceiveURL(props);
35+
if(url == null) {
36+
url = props.getProperty(HTTP_EVENT_KGAS_RECEIVE_URL);
37+
}
3438
String key=props.getProperty(HTTP_EVENT_SIGN_KEY);
3539
String timestamp=String.valueOf( System.currentTimeMillis());
3640
String sign = EventCheckerHttpUtils.calculateSign(key,timestamp);
@@ -47,6 +51,8 @@ public String[] getMsg(int jobId,Properties props, Logger log, String... params)
4751
String[] consumedMsgInfo = null;
4852
String responseBody = null;
4953
String messageJson = gson.toJson(message);
54+
String requestStr = EventCheckerHttpUtils.requestToString(url, "POST", header, null, messageJson);
55+
log.error("receive failed,request:{}", requestStr);
5056
try (Response response = EventCheckerHttpUtils.post(url, header, null, messageJson)) {
5157
HttpMsgReceiveResponse msgReceiveResponse;
5258
try {
@@ -88,8 +94,6 @@ public String[] getMsg(int jobId,Properties props, Logger log, String... params)
8894
);
8995
consumedMsgInfo = null;
9096
} else {
91-
String requestStr = EventCheckerHttpUtils.requestToString(url, "POST", header, null, messageJson);
92-
log.error("receive failed,request:{}", requestStr);
9397
log.error("receive failed,response:{}", responseBody);
9498
String errorMsg = "信号接收失败。详情:"+responseBody;
9599
throw new RuntimeException(errorMsg);
@@ -101,5 +105,27 @@ public String[] getMsg(int jobId,Properties props, Logger log, String... params)
101105
}
102106
}
103107

108+
/**
109+
* 根据CHANNEL_TYPE来从配置文件中匹配URL
110+
* @param propskey
111+
* @return
112+
*/
113+
private String getReceiveURL(Properties propskey) {
114+
String channelType = propskey.getProperty(EventChecker.CHANNEL_TYPE);
115+
116+
String filteredValue = null;
117+
for (String key : propskey.stringPropertyNames()) {
118+
String value = propskey.getProperty(key);
119+
120+
if (channelType != null && key.toLowerCase().contains(channelType.toLowerCase()) && key.endsWith("receive.url")) {
121+
filteredValue = value;
122+
break;
123+
}
124+
}
125+
126+
return filteredValue;
127+
}
128+
129+
104130

105131
}

0 commit comments

Comments
 (0)