@@ -80,39 +80,62 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
8080 boolean result = false ;
8181 String vNewMsgID = "-1" ;
8282 PreparedStatement updatePstmt = null ;
83+ PreparedStatement pstmtForGetID = null ;
8384 Connection msgConn = null ;
8485 vNewMsgID = setConsumedMsg (props ,log ,consumedMsgInfo );
8586 try {
8687 if (StringUtils .isNotEmpty (vNewMsgID ) && StringUtils .isNotBlank (vNewMsgID ) && !"-1" .equals (vNewMsgID )){
8788 msgConn = getEventCheckerConnection (props ,log );
8889 if (msgConn == null ) return false ;
89- int vProcessID = jobId ;
90- String vReceiveTime = DateFormatUtils .format (new Date (), "yyyy-MM-dd HH:mm:ss" );;
91- String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END" ;
92- log .info ("last message offset {} is:" + lastMsgId );
93- updatePstmt = msgConn .prepareCall (sqlForUpdateMsg );
94- updatePstmt .setString (1 , receiver );
95- updatePstmt .setString (2 , topic );
96- updatePstmt .setString (3 , msgName );
97- updatePstmt .setString (4 , vReceiveTime );
98- updatePstmt .setString (5 , vNewMsgID );
99- int updaters = updatePstmt .executeUpdate ();
100- log .info ("updateMsgOffset successful {} update result is:" + updaters );
101- if (updaters != 0 ){
102- log .info ("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID );
103- //return true after update success
104- result = true ;
90+ msgConn .setAutoCommit (false );
91+ String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=? for update" ;
92+ pstmtForGetID = msgConn .prepareCall (sqlForReadMsgID );
93+ pstmtForGetID .setString (1 , receiver );
94+ pstmtForGetID .setString (2 , topic );
95+ pstmtForGetID .setString (3 , msgName );
96+ ResultSet rs = pstmtForGetID .executeQuery ();
97+ String nowLastMsgId = rs .last ()==true ? rs .getString ("msg_id" ):"0" ;
98+ log .info ("receive message successfully , Now check to see if the latest offset has changed ,nowLastMsgId is {} " + nowLastMsgId );
99+ if ("0" .equals (nowLastMsgId ) || nowLastMsgId .equals (lastMsgId )){
100+
101+ int vProcessID = jobId ;
102+ String vReceiveTime = DateFormatUtils .format (new Date (), "yyyy-MM-dd HH:mm:ss" );;
103+ String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END" ;
104+ log .info ("last message offset {} is:" + lastMsgId );
105+ updatePstmt = msgConn .prepareCall (sqlForUpdateMsg );
106+ updatePstmt .setString (1 , receiver );
107+ updatePstmt .setString (2 , topic );
108+ updatePstmt .setString (3 , msgName );
109+ updatePstmt .setString (4 , vReceiveTime );
110+ updatePstmt .setString (5 , vNewMsgID );
111+ int updaters = updatePstmt .executeUpdate ();
112+ log .info ("updateMsgOffset successful {} update result is:" + updaters );
113+ if (updaters != 0 ){
114+ log .info ("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID );
115+ //return true after update success
116+ result = true ;
117+ }else {
118+ log .info ("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID );
119+ result = false ;
120+ }
105121 }else {
106- log .info ("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID );
122+ log .info ("the latest offset has changed , Keep waiting for the signal" );
107123 result = false ;
108124 }
125+ msgConn .commit ();
109126 }else {
110127 result = false ;
111128 }
112129 }catch (SQLException e ){
113130 log .error ("Error update Msg Offset" + e );
131+ try {
132+ msgConn .rollback ();
133+ } catch (SQLException ex ) {
134+ log .error ("transaction rollback failed " + e );
135+ }
114136 return false ;
115137 }finally {
138+ closeQueryStmt (pstmtForGetID , log );
116139 closeQueryStmt (updatePstmt , log );
117140 closeConnection (msgConn , log );
118141 }
0 commit comments