Skip to content

Commit 9ed0227

Browse files
authored
Merge pull request #7 from WeBankFinTech/dev-0.8.0
Dev 0.8.0
2 parents 78806f2 + 4127a9a commit 9ed0227

File tree

9 files changed

+53
-30
lines changed

9 files changed

+53
-30
lines changed

bin/checkServices.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ MICRO_SERVICE_PORT=$3
3131

3232
local_host="`hostname --fqdn`"
3333

34-
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
34+
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
3535

3636
function isLocal(){
3737
if [ "$1" == "127.0.0.1" ];then

bin/install.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ source ${DISTRIBUTION}
104104
isSuccess "load config"
105105

106106
local_host="`hostname --fqdn`"
107-
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
107+
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
108108

109109
function isLocal(){
110110
if [ "$1" == "127.0.0.1" ];then

bin/start-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ fi
4343
}
4444
local_host="`hostname --fqdn`"
4545

46-
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
46+
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
4747

4848
function isLocal(){
4949
if [ "$1" == "127.0.0.1" ];then

bin/stop-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export DISTRIBUTION=${DISTRIBUTION:-"${CONF_DIR}/config.sh"}
3434
source ${DISTRIBUTION}
3535

3636
local_host="`hostname --fqdn`"
37-
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
37+
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
3838

3939
function isSuccess(){
4040
if [ $? -ne 0 ]; then

docs/en_US/ch2/DSS Quick Installation Guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ dss_port="8088"
103103
linkis_url="http://127.0.0.1:9001"
104104
105105
# dss ip address
106-
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
106+
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
107107
```
108108

109109
The environment is ready, click me to enter ****[4. Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)**
@@ -219,7 +219,7 @@ dss_port="8088"
219219
linkis_url="http://127.0.0.1:9001"
220220
221221
# dss ip address
222-
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
222+
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
223223
```
224224

225225
The environment is ready, click me to enter **[Four Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)**

docs/zh_CN/ch2/DSS快速安装使用文档.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ dss_port="8088"
132132
linkis_url="http://127.0.0.1:9001"
133133
134134
# dss ip address
135-
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
135+
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
136136
```
137137

138138
环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)
@@ -243,7 +243,7 @@ dss_port="8088"
243243
linkis_url="http://127.0.0.1:9001"
244244
245245
# dss ip address
246-
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
246+
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
247247
```
248248

249249
环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)
@@ -365,7 +365,7 @@ dss_port="8088"
365365
linkis_url="http://127.0.0.1:9001"
366366
367367
# dss ip address
368-
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
368+
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
369369
```
370370

371371
环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)

dss-azkaban-scheduler-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/scheduler/azkaban/service/AzkabanProjectService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ public Project createProject(Project project, Session session) throws AppJointEr
8080
params.add(new BasicNameValuePair("name", project.getName()));
8181
params.add(new BasicNameValuePair("description", project.getDescription()));
8282
HttpPost httpPost = new HttpPost(projectUrl);
83-
httpPost.addHeader(HTTP.CONTENT_ENCODING, "UTF-8");
83+
httpPost.addHeader(HTTP.CONTENT_ENCODING, HTTP.IDENTITY_CODING);
8484
CookieStore cookieStore = new BasicCookieStore();
8585
cookieStore.addCookie(session.getCookies()[0]);
86-
HttpEntity entity = EntityBuilder.create().setContentEncoding("UTF-8").
87-
setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8))
86+
HttpEntity entity = EntityBuilder.create()
87+
.setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8))
8888
.setParameters(params).build();
8989
httpPost.setEntity(entity);
9090
CloseableHttpClient httpClient = null;

eventchecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/service/AbstractEventCheckReceiver.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,39 +80,62 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
8080
boolean result = false;
8181
String vNewMsgID = "-1";
8282
PreparedStatement updatePstmt = null;
83+
PreparedStatement pstmtForGetID = null;
8384
Connection msgConn = null;
8485
vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo);
8586
try {
8687
if(StringUtils.isNotEmpty(vNewMsgID) && StringUtils.isNotBlank(vNewMsgID) && !"-1".equals(vNewMsgID)){
8788
msgConn = getEventCheckerConnection(props,log);
8889
if(msgConn == null) return false;
89-
int vProcessID = jobId;
90-
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
91-
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
92-
log.info("last message offset {} is:" + lastMsgId);
93-
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
94-
updatePstmt.setString(1, receiver);
95-
updatePstmt.setString(2, topic);
96-
updatePstmt.setString(3, msgName);
97-
updatePstmt.setString(4, vReceiveTime);
98-
updatePstmt.setString(5, vNewMsgID);
99-
int updaters = updatePstmt.executeUpdate();
100-
log.info("updateMsgOffset successful {} update result is:" + updaters);
101-
if(updaters != 0){
102-
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
103-
//return true after update success
104-
result = true;
90+
msgConn.setAutoCommit(false);
91+
String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=? for update";
92+
pstmtForGetID = msgConn.prepareCall(sqlForReadMsgID);
93+
pstmtForGetID.setString(1, receiver);
94+
pstmtForGetID.setString(2, topic);
95+
pstmtForGetID.setString(3, msgName);
96+
ResultSet rs = pstmtForGetID.executeQuery();
97+
String nowLastMsgId = rs.last()==true ? rs.getString("msg_id"):"0";
98+
log.info("receive message successfully , Now check to see if the latest offset has changed ,nowLastMsgId is {} " + nowLastMsgId);
99+
if("0".equals(nowLastMsgId) || nowLastMsgId.equals(lastMsgId)){
100+
101+
int vProcessID = jobId;
102+
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
103+
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
104+
log.info("last message offset {} is:" + lastMsgId);
105+
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
106+
updatePstmt.setString(1, receiver);
107+
updatePstmt.setString(2, topic);
108+
updatePstmt.setString(3, msgName);
109+
updatePstmt.setString(4, vReceiveTime);
110+
updatePstmt.setString(5, vNewMsgID);
111+
int updaters = updatePstmt.executeUpdate();
112+
log.info("updateMsgOffset successful {} update result is:" + updaters);
113+
if(updaters != 0){
114+
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
115+
//return true after update success
116+
result = true;
117+
}else{
118+
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
119+
result = false;
120+
}
105121
}else{
106-
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
122+
log.info("the latest offset has changed , Keep waiting for the signal");
107123
result = false;
108124
}
125+
msgConn.commit();
109126
}else{
110127
result = false;
111128
}
112129
}catch (SQLException e){
113130
log.error("Error update Msg Offset" + e);
131+
try {
132+
msgConn.rollback();
133+
} catch (SQLException ex) {
134+
log.error("transaction rollback failed " + e);
135+
}
114136
return false;
115137
}finally {
138+
closeQueryStmt(pstmtForGetID, log);
116139
closeQueryStmt(updatePstmt, log);
117140
closeConnection(msgConn, log);
118141
}

web/config.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ dss_web_port="8088"
55
linkis_gateway_url="http://localhost:9001"
66

77
#dss nginx ip
8-
dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
8+
dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

0 commit comments

Comments
 (0)