Skip to content

Commit a465e73

Browse files
authored
Merge pull request #558 from longbridgeapp/dev-1.2.0
Sender/Receiver add log
2 parents 0fd37f6 + be7639a commit a465e73

File tree

7 files changed

+196
-16
lines changed

7 files changed

+196
-16
lines changed

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/entity/EventChecker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.webank.wedatasphere.dss.appconn.eventchecker.service.EventCheckerService;
2121
import com.webank.wedatasphere.dss.appconn.eventchecker.cs.CSEventReceiverHelper;
2222
import com.webank.wedatasphere.dss.appconn.eventchecker.execution.EventCheckerExecutionAction;
23+
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.Utils;
2324
import com.webank.wedatasphere.dss.standard.app.development.listener.common.RefExecutionState;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.log4j.Logger;
@@ -107,7 +108,7 @@ public void run() {
107108
if (p.containsKey(MSG) && StringUtils.isNotEmpty(p.getProperty(MSG)) && p.getProperty(MSG).length() > 250) {
108109
throw new RuntimeException("parameter " + MSG + " length less than 250 !");
109110
}
110-
success = wbDao.sendMsg(execId, p, logger);
111+
success = wbDao.sendMsg(execId, p, logger, backAction);
111112
if (success) {
112113
backAction.setState(RefExecutionState.Success);
113114

@@ -122,6 +123,7 @@ public void run() {
122123
throw new RuntimeException("Please input correct parameter of msg.type, Select RECEIVE Or SEND.");
123124
}
124125
}catch (Exception ex){
126+
Utils.log(backAction,ex);
125127
backAction.setState(RefExecutionState.Failed);
126128
throw ex;
127129
}
@@ -145,7 +147,7 @@ public boolean receiveMsg(){
145147
if (StringUtils.isNotEmpty(userTime)) {
146148
p.put(USER_TIME, userTime);
147149
}
148-
success = wbDao.reciveMsg(execId, p, logger);
150+
success = wbDao.reciveMsg(execId, p, logger, backAction);
149151
if (success) {
150152
backAction.saveKeyAndValue(getJobSaveKeyAndValue());
151153
backAction.setState(RefExecutionState.Success);

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/AbstractEventCheckReceiver.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.webank.wedatasphere.dss.appconn.eventchecker.entity.EventChecker;
2020

21+
import com.webank.wedatasphere.dss.appconn.eventchecker.execution.EventCheckerExecutionAction;
22+
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.Utils;
2123
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.commons.lang3.time.DateFormatUtils;
2325
import org.apache.log4j.Logger;
@@ -33,7 +35,7 @@ public class AbstractEventCheckReceiver extends AbstractEventCheck{
3335
/**
3436
* Fill the result into the source
3537
*/
36-
String setConsumedMsg(Properties props, Logger log, String[] consumedMsgInfo){
38+
String setConsumedMsg(Properties props, Logger log, String[] consumedMsgInfo,EventCheckerExecutionAction backAction){
3739
String vNewMsgID = "";
3840
try {
3941
if(consumedMsgInfo!=null && consumedMsgInfo.length == 4){
@@ -50,6 +52,7 @@ String setConsumedMsg(Properties props, Logger log, String[] consumedMsgInfo){
5052
+ ", messageBody: " + vMsg);
5153
}
5254
}catch (Exception e) {
55+
Utils.log(backAction,e);
5356
log.error("Error set consumed message failed {} setConsumedMsg failed" + e);
5457
return vNewMsgID;
5558
}
@@ -59,12 +62,12 @@ String setConsumedMsg(Properties props, Logger log, String[] consumedMsgInfo){
5962
/**
6063
* Update consumption status
6164
*/
62-
boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consumedMsgInfo,String lastMsgId){
65+
boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consumedMsgInfo, String lastMsgId, EventCheckerExecutionAction backAction){
6366
boolean result = false;
6467
String vNewMsgID = "-1";
6568
PreparedStatement updatePstmt = null;
6669
Connection msgConn = null;
67-
vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo);
70+
vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo,backAction);
6871
try {
6972
if(StringUtils.isNotEmpty(vNewMsgID) && StringUtils.isNotBlank(vNewMsgID) && !"-1".equals(vNewMsgID)){
7073
msgConn = getEventCheckerConnection(props,log);
@@ -93,6 +96,7 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
9396
result = false;
9497
}
9598
}catch (SQLException e){
99+
Utils.log(backAction,e);
96100
log.error("Error update Msg Offset" + e);
97101
return false;
98102
}finally {
@@ -105,7 +109,7 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
105109
/**
106110
* get consumption progress
107111
*/
108-
String getOffset(int jobId, Properties props, Logger log){
112+
String getOffset(int jobId, Properties props, Logger log, EventCheckerExecutionAction backAction){
109113
String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=?";
110114
PreparedStatement pstmtForGetID = null;
111115
Connection msgConn = null;
@@ -121,6 +125,7 @@ String getOffset(int jobId, Properties props, Logger log){
121125
rs = pstmtForGetID.executeQuery();
122126
lastMsgId = rs.last()==true ? rs.getString("msg_id"):"0";
123127
} catch (SQLException e) {
128+
Utils.log(backAction,e);
124129
throw new RuntimeException("get Offset failed " + e);
125130
}finally {
126131
closeQueryStmt(pstmtForGetID,log);
@@ -134,7 +139,7 @@ String getOffset(int jobId, Properties props, Logger log){
134139
/**
135140
* Consistent entrance to consumer message
136141
*/
137-
String[] getMsg(Properties props, Logger log,String ... params){
142+
String[] getMsg(Properties props, Logger log, EventCheckerExecutionAction backAction,String ... params){
138143
String sqlForReadTMsg = "SELECT * FROM event_queue WHERE topic=? AND msg_name=? AND send_time >=? AND send_time <=? AND msg_id >? ORDER BY msg_id ASC LIMIT 1";
139144
PreparedStatement pstmt = null;
140145
Connection msgConn = null;
@@ -160,6 +165,7 @@ String[] getMsg(Properties props, Logger log,String ... params){
160165
}
161166
}
162167
} catch (SQLException e) {
168+
Utils.log(backAction,e);
163169
throw new RuntimeException("EventChecker failed to receive message" + e);
164170
} finally {
165171
closeQueryStmt(pstmt, log);

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/DefaultEventcheckReceiver.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919

2020

21+
import com.webank.wedatasphere.dss.appconn.eventchecker.execution.EventCheckerExecutionAction;
22+
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.Utils;
2123
import org.apache.commons.lang3.time.DateFormatUtils;
2224
import org.apache.log4j.Logger;
2325

@@ -28,13 +30,16 @@
2830
import java.util.Properties;
2931

3032
public class DefaultEventcheckReceiver extends AbstractEventCheckReceiver {
33+
34+
EventCheckerExecutionAction backAction;
3135
String todayStartTime;
3236
String todayEndTime;
3337
String allStartTime;
3438
String allEndTime;
3539
String nowStartTime;
3640

37-
public DefaultEventcheckReceiver(Properties props) {
41+
public DefaultEventcheckReceiver(Properties props, EventCheckerExecutionAction backAction) {
42+
this.backAction = backAction;
3843
initECParams(props);
3944
initReceiverTimes();
4045
}
@@ -51,18 +56,19 @@ private void initReceiverTimes(){
5156
public boolean reciveMsg(int jobId, Properties props, Logger log) {
5257
boolean result = false;
5358
try{
54-
String lastMsgId = getOffset(jobId,props,log);
59+
String lastMsgId = getOffset(jobId,props,log,backAction);
5560
String[] executeType = createExecuteType(jobId,props,log,lastMsgId);
5661
if(executeType!=null && executeType.length ==3){
57-
String[] consumedMsgInfo = getMsg(props, log,executeType);
62+
String[] consumedMsgInfo = getMsg(props, log,backAction,executeType);
5863
if(consumedMsgInfo!=null && consumedMsgInfo.length == 4){
59-
result = updateMsgOffset(jobId,props,log,consumedMsgInfo,lastMsgId);
64+
result = updateMsgOffset(jobId,props,log,consumedMsgInfo,lastMsgId,backAction);
6065
}
6166
}else{
6267
log.error("executeType error {} " + executeType.toString());
6368
return result;
6469
}
6570
}catch (Exception e){
71+
Utils.log(backAction,e);
6672
log.error("EventChecker failed to receive the message {}" + e);
6773
return result;
6874
}
@@ -104,6 +110,7 @@ private String[] createExecuteType(int jobId, Properties props, Logger log,Strin
104110
}
105111
}
106112
}catch(Exception e){
113+
Utils.log(backAction,e);
107114
log.error("create executeType failed {}" + e);
108115
}
109116
return executeType;
@@ -117,6 +124,7 @@ private void waitForTime(Logger log,Long waitTime){
117124
try {
118125
targetWaitTime = fmt.parse(formatWaitForTime);
119126
} catch (ParseException e) {
127+
Utils.log(backAction,e);
120128
log.error("parse date failed {}" + e);
121129
}
122130

@@ -129,6 +137,7 @@ private void waitForTime(Logger log,Long waitTime){
129137
try {
130138
Thread.sleep(wt);
131139
} catch (InterruptedException e) {
140+
Utils.log(backAction,e);
132141
throw new RuntimeException("EventChecker throws an exception during the waiting time {}"+e);
133142
}
134143
}else{

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/EventCheckSender.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.webank.wedatasphere.dss.appconn.eventchecker.service;
1818

19+
import com.webank.wedatasphere.dss.appconn.eventchecker.execution.EventCheckerExecutionAction;
20+
import com.webank.wedatasphere.dss.appconn.eventchecker.utils.Utils;
1921
import org.apache.commons.lang3.time.DateFormatUtils;
2022
import org.apache.log4j.Logger;
2123

@@ -27,7 +29,10 @@
2729

2830
public class EventCheckSender extends AbstractEventCheck {
2931

30-
public EventCheckSender(Properties props) {
32+
EventCheckerExecutionAction backAction;
33+
34+
public EventCheckSender(Properties props,EventCheckerExecutionAction backAction) {
35+
this.backAction = backAction;
3136
initECParams(props);
3237
}
3338

@@ -57,6 +62,7 @@ public boolean sendMsg(int jobId, Properties props, Logger log) {
5762
log.error("Send msg failed for update database!");
5863
}
5964
} catch (SQLException e) {
65+
Utils.log(backAction,e);
6066
throw new RuntimeException("Send EventChecker msg failed!" + e);
6167
} finally {
6268
closeQueryStmt(pstmt, log);

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/EventCheckerService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.webank.wedatasphere.dss.appconn.eventchecker.service;
1818

19+
import com.webank.wedatasphere.dss.appconn.eventchecker.execution.EventCheckerExecutionAction;
1920
import org.apache.log4j.Logger;
2021

2122
import java.util.Properties;
@@ -34,9 +35,9 @@ public static EventCheckerService getInstance() {
3435
return instance;
3536
}
3637

37-
public boolean sendMsg(int jobId, Properties props, Logger log) {
38+
public boolean sendMsg(int jobId, Properties props, Logger log, EventCheckerExecutionAction backAction) {
3839
if(props!=null){
39-
return new EventCheckSender(props).sendMsg(jobId,props,log);
40+
return new EventCheckSender(props,backAction).sendMsg(jobId,props,log);
4041
}else{
4142
log.error("create EventCheckSender failed {}");
4243
return false;
@@ -50,9 +51,9 @@ public boolean sendMsg(int jobId, Properties props, Logger log) {
5051
* query manner, and the target message is repeatedly queried within a time period
5152
* when the set target is not exceeded.
5253
*/
53-
public boolean reciveMsg(int jobId, Properties props, Logger log) {
54+
public boolean reciveMsg(int jobId, Properties props, Logger log,EventCheckerExecutionAction backAction) {
5455
if(props!=null){
55-
return new DefaultEventcheckReceiver(props).reciveMsg(jobId,props,log);
56+
return new DefaultEventcheckReceiver(props,backAction).reciveMsg(jobId,props,log);
5657
}else{
5758
log.error("create EventCheckSender failed {}");
5859
return false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package com.webank.wedatasphere.dss.appconn.eventchecker.utils;
2+
3+
import java.io.ByteArrayOutputStream;
4+
import java.io.OutputStream;
5+
import java.io.PrintStream;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
10+
/**
11+
* Author: duhanmin
12+
* Description:
13+
* Date: 2022/4/2 11:24
14+
*/
15+
public class ExceptionUtils {
16+
private static final char TAB = ' ';
17+
private static final char CR = '\r';
18+
private static final char LF = '\n';
19+
private static final String SPACE = " ";
20+
private static final String EMPTY = "";
21+
22+
/**
23+
* 堆栈转为单行完整字符串
24+
*
25+
* @param throwable 异常对象
26+
* @param limit 限制最大长度
27+
* @return 堆栈转为的字符串
28+
*/
29+
public static String stacktraceToOneLineString(Throwable throwable, int limit) {
30+
Map<Character, String> replaceCharToStrMap = new HashMap<>();
31+
replaceCharToStrMap.put(CR, SPACE);
32+
replaceCharToStrMap.put(LF, SPACE);
33+
replaceCharToStrMap.put(TAB, SPACE);
34+
return stacktraceToString(throwable, limit, replaceCharToStrMap);
35+
}
36+
37+
38+
/**
39+
* 堆栈转为完整字符串
40+
*
41+
* @param throwable 异常对象
42+
* @param limit 限制最大长度
43+
* @param replaceCharToStrMap 替换字符为指定字符串
44+
* @return 堆栈转为的字符串
45+
*/
46+
private static String stacktraceToString(Throwable throwable, int limit, Map<Character, String> replaceCharToStrMap) {
47+
final OutputStream baos = new ByteArrayOutputStream();
48+
throwable.printStackTrace(new PrintStream(baos));
49+
String exceptionStr = baos.toString();
50+
int length = exceptionStr.length();
51+
if (limit > 0 && limit < length) {
52+
length = limit;
53+
}
54+
55+
if (!replaceCharToStrMap.isEmpty()) {
56+
final StringBuilder sb = new StringBuilder();
57+
char c;
58+
String value;
59+
for (int i = 0; i < length; i++) {
60+
c = exceptionStr.charAt(i);
61+
value = replaceCharToStrMap.get(c);
62+
if (null != value) {
63+
sb.append(value);
64+
} else {
65+
sb.append(c);
66+
}
67+
}
68+
return sb.toString();
69+
} else {
70+
return sub(exceptionStr,0, limit);
71+
}
72+
}
73+
74+
/**
75+
* 改进JDK subString<br>
76+
* index从0开始计算,最后一个字符为-1<br>
77+
* 如果from和to位置一样,返回 "" <br>
78+
* 如果from或to为负数,则按照length从后向前数位置,如果绝对值大于字符串长度,则from归到0,to归到length<br>
79+
* 如果经过修正的index中from大于to,则互换from和to example: <br>
80+
* abcdefgh 2 3 =》 c <br>
81+
* abcdefgh 2 -3 =》 cde <br>
82+
*
83+
* @param str String
84+
* @param fromIndex 开始的index(包括)
85+
* @param toIndex 结束的index(不包括)
86+
* @return 字串
87+
*/
88+
private static String sub(CharSequence str, int fromIndex, int toIndex) {
89+
if (isEmpty(str)) {
90+
return str(str);
91+
}
92+
int len = str.length();
93+
94+
if (fromIndex < 0) {
95+
fromIndex = len + fromIndex;
96+
if (fromIndex < 0) {
97+
fromIndex = 0;
98+
}
99+
} else if (fromIndex > len) {
100+
fromIndex = len;
101+
}
102+
103+
if (toIndex < 0) {
104+
toIndex = len + toIndex;
105+
if (toIndex < 0) {
106+
toIndex = len;
107+
}
108+
} else if (toIndex > len) {
109+
toIndex = len;
110+
}
111+
112+
if (toIndex < fromIndex) {
113+
int tmp = fromIndex;
114+
fromIndex = toIndex;
115+
toIndex = tmp;
116+
}
117+
118+
if (fromIndex == toIndex) {
119+
return EMPTY;
120+
}
121+
122+
return str.toString().substring(fromIndex, toIndex);
123+
}
124+
125+
/**
126+
* {@link CharSequence} 转为字符串,null安全
127+
*
128+
* @param cs {@link CharSequence}
129+
* @return 字符串
130+
*/
131+
private static String str(CharSequence cs) {
132+
return null == cs ? null : cs.toString();
133+
}
134+
135+
/**
136+
* 字符串是否为空,空的定义如下:<br>
137+
* 1、为null <br>
138+
* 2、为""<br>
139+
*
140+
* @param str 被检测的字符串
141+
* @return 是否为空
142+
*/
143+
private static boolean isEmpty(CharSequence str) {
144+
return str == null || str.length() == 0;
145+
}
146+
}

0 commit comments

Comments
 (0)