@@ -78,6 +78,7 @@ public class HeartbeatMgr extends MasterDaemon {
7878 private final ExecutorService executor ;
7979 private SystemInfoService nodeMgr ;
8080 private HeartbeatFlags heartbeatFlags ;
81+ private final ExecutorService abortTxnExecutor ;
8182
8283 private static volatile AtomicReference <TMasterInfo > masterInfo = new AtomicReference <>();
8384
@@ -86,6 +87,8 @@ public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
8687 this .nodeMgr = nodeMgr ;
8788 this .executor = ThreadPoolManager .newDaemonFixedThreadPool (Config .heartbeat_mgr_threads_num ,
8889 Config .heartbeat_mgr_blocking_queue_size , "heartbeat-mgr-pool" , needRegisterMetric );
90+ this .abortTxnExecutor = ThreadPoolManager .newDaemonFixedThreadPool (1 ,
91+ Config .heartbeat_mgr_blocking_queue_size , "abort-txn-executor" , needRegisterMetric );
8992 this .heartbeatFlags = new HeartbeatFlags ();
9093 }
9194
@@ -192,18 +195,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
192195 boolean isChanged = be .handleHbResponse (hbResponse , isReplay );
193196 if (hbResponse .getStatus () == HbStatus .OK ) {
194197 long newStartTime = be .getLastStartTime ();
198+ // oldStartTime > 0 means it is not the first heartbeat
195199 if (!isReplay && Config .enable_abort_txn_by_checking_coordinator_be
196- && oldStartTime != newStartTime ) {
197- Env .getCurrentGlobalTransactionMgr ().abortTxnWhenCoordinateBeRestart (
198- be .getId (), be .getHost (), newStartTime );
200+ && oldStartTime != newStartTime && oldStartTime > 0 ) {
201+ submitAbortTxnTaskByExecutor (() -> Env .getCurrentGlobalTransactionMgr ()
202+ .abortTxnWhenCoordinateBeRestart (be .getId (), be .getHost (), newStartTime ),
203+ "restart" );
199204 }
200205 } else {
201206 // invalid all connections cached in ClientPool
202207 ClientPool .backendPool .clearPool (new TNetworkAddress (be .getHost (), be .getBePort ()));
203208 if (!isReplay && System .currentTimeMillis () - be .getLastUpdateMs ()
204- >= Config .abort_txn_after_lost_heartbeat_time_second * 1000L ) {
205- Env .getCurrentGlobalTransactionMgr ().abortTxnWhenCoordinateBeDown (
206- be .getId (), be .getHost (), 100 );
209+ >= Config .abort_txn_after_lost_heartbeat_time_second * 1000L
210+ && be .getLastUpdateMs () > 0 ) {
211+ submitAbortTxnTaskByExecutor (() -> Env .getCurrentGlobalTransactionMgr ()
212+ .abortTxnWhenCoordinateBeDown (be .getId (), be .getHost (), 100 ), "down" );
207213 }
208214 }
209215 return isChanged ;
@@ -230,6 +236,26 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
230236 return false ;
231237 }
232238
239+ private void submitAbortTxnTaskByExecutor (Runnable task , String reason ) {
240+ long start = System .currentTimeMillis ();
241+ try {
242+ abortTxnExecutor .submit (() -> {
243+ LOG .info ("start abort txn task, reason={}, start_ts={}" , reason , start );
244+ try {
245+ task .run ();
246+ long duration = System .currentTimeMillis () - start ;
247+ LOG .info ("finish abort txn task, reason={}, start_ts={}, cost_ms={}" , reason , start , duration );
248+ } catch (Exception e ) {
249+ long duration = System .currentTimeMillis () - start ;
250+ LOG .warn ("abort txn task({}) failed, start_ts={}, cost_ms={}" , reason , start , duration , e );
251+ }
252+ });
253+ } catch (Exception e ) {
254+ long duration = System .currentTimeMillis () - start ;
255+ LOG .warn ("failed to submit abort txn task({}), start_ts={}, cost_ms={}" , reason , start , duration , e );
256+ }
257+ }
258+
233259 // backend heartbeat
234260 private class BackendHeartbeatHandler implements Callable <HeartbeatResponse > {
235261 private Backend backend ;
0 commit comments