Skip to content

Commit 89b53e6

Browse files
Merge pull request #6205 from halibobo1205/480/adapt-TronError-for-thread-pools
feat(system.exit): adapt TronError for ExecutorService
2 parents 1168426 + ca28553 commit 89b53e6

File tree

7 files changed

+50
-11
lines changed

7 files changed

+50
-11
lines changed

chainbase/src/main/java/org/tron/core/service/RewardViCalService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void init() {
8282
// checkpoint is flushed to db, we can start rewardViCalService immediately
8383
lastBlockNumber = Long.MAX_VALUE;
8484
}
85-
es.scheduleWithFixedDelay(this::maybeRun, 0, 3, TimeUnit.SECONDS);
85+
ExecutorServiceManager.scheduleWithFixedDelay(es, this::maybeRun, 0, 3, TimeUnit.SECONDS);
8686
}
8787

8888
private boolean enableNewRewardAlgorithm() {

common/src/main/java/org/tron/common/es/ExecutorServiceManager.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import java.util.concurrent.BlockingQueue;
55
import java.util.concurrent.ExecutorService;
66
import java.util.concurrent.Executors;
7+
import java.util.concurrent.Future;
78
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledFuture;
810
import java.util.concurrent.ThreadPoolExecutor;
911
import java.util.concurrent.TimeUnit;
1012
import lombok.extern.slf4j.Slf4j;
13+
import org.tron.common.exit.ExitManager;
1114

1215
@Slf4j(topic = "common-executor")
1316
public class ExecutorServiceManager {
@@ -80,4 +83,30 @@ public static void shutdownAndAwaitTermination(ExecutorService pool, String name
8083
}
8184
logger.info("Pool {} shutdown done", name);
8285
}
86+
87+
public static Future<?> submit(ExecutorService es, Runnable task) {
88+
return es.submit(() -> {
89+
try {
90+
task.run();
91+
} catch (Throwable e) {
92+
ExitManager.findTronError(e).ifPresent(ExitManager::logAndExit);
93+
throw e;
94+
}
95+
});
96+
}
97+
98+
public static ScheduledFuture<?> scheduleWithFixedDelay(ScheduledExecutorService es,
99+
Runnable command,
100+
long initialDelay,
101+
long delay,
102+
TimeUnit unit) {
103+
return es.scheduleWithFixedDelay(() -> {
104+
try {
105+
command.run();
106+
} catch (Throwable e) {
107+
ExitManager.findTronError(e).ifPresent(ExitManager::logAndExit);
108+
throw e;
109+
}
110+
}, initialDelay, delay, unit);
111+
}
83112
}

common/src/main/java/org/tron/common/exit/ExitManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static Optional<TronError> findTronError(Throwable e) {
4444
return Optional.empty();
4545
}
4646

47-
private static void logAndExit(TronError exit) {
47+
public static void logAndExit(TronError exit) {
4848
final int code = exit.getErrCode().getCode();
4949
logger.error("Shutting down with code: {}.", exit.getErrCode(), exit);
5050
Thread exitThread = exitThreadFactory.newThread(() -> System.exit(code));

consensus/src/main/java/org/tron/consensus/dpos/DposTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.springframework.stereotype.Component;
1212
import org.springframework.util.ObjectUtils;
1313
import org.tron.common.es.ExecutorServiceManager;
14+
import org.tron.common.exit.ExitManager;
1415
import org.tron.common.parameter.CommonParameter;
1516
import org.tron.common.utils.ByteArray;
1617
import org.tron.common.utils.Sha256Hash;
@@ -68,10 +69,13 @@ public void init() {
6869
Thread.currentThread().interrupt();
6970
} catch (Throwable throwable) {
7071
logger.error("Produce block error.", throwable);
72+
ExitManager.findTronError(throwable).ifPresent(e -> {
73+
throw e;
74+
});
7175
}
7276
}
7377
};
74-
produceExecutor.submit(runnable);
78+
ExecutorServiceManager.submit(produceExecutor, runnable);
7579
logger.info("DPoS task started.");
7680
}
7781

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.tron.common.args.GenesisBlock;
5454
import org.tron.common.bloom.Bloom;
5555
import org.tron.common.es.ExecutorServiceManager;
56+
import org.tron.common.exit.ExitManager;
5657
import org.tron.common.logsfilter.EventPluginLoader;
5758
import org.tron.common.logsfilter.FilterQuery;
5859
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
@@ -293,6 +294,9 @@ public class Manager {
293294
Metrics.counterInc(MetricKeys.Counter.TXS, 1,
294295
MetricLabels.Counter.TXS_FAIL, MetricLabels.Counter.TXS_FAIL_ERROR);
295296
}
297+
ExitManager.findTronError(ex).ifPresent(e -> {
298+
throw e;
299+
});
296300
} finally {
297301
if (tx != null && getRePushTransactions().remove(tx)) {
298302
Metrics.gaugeInc(MetricKeys.Gauge.MANAGER_QUEUE, -1,
@@ -550,18 +554,18 @@ public void init() {
550554
validateSignService = ExecutorServiceManager
551555
.newFixedThreadPool(validateSignName, Args.getInstance().getValidateSignThreadNum());
552556
rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true);
553-
rePushEs.submit(rePushLoop);
557+
ExecutorServiceManager.submit(rePushEs, rePushLoop);
554558
// add contract event listener for subscribing
555559
if (Args.getInstance().isEventSubscribe()) {
556560
startEventSubscribing();
557561
triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true);
558-
triggerEs.submit(triggerCapsuleProcessLoop);
562+
ExecutorServiceManager.submit(triggerEs, triggerCapsuleProcessLoop);
559563
}
560564

561565
// start json rpc filter process
562566
if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) {
563567
filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName);
564-
filterEs.submit(filterProcessLoop);
568+
ExecutorServiceManager.submit(filterEs, filterProcessLoop);
565569
}
566570

567571
//initStoreFactory

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
8383
dropSmartContractCount++;
8484
}
8585
} else {
86-
trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx)));
86+
ExecutorServiceManager.submit(
87+
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
8788
}
8889
}
8990

@@ -109,11 +110,12 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep
109110
}
110111

111112
private void handleSmartContract() {
112-
smartContractExecutor.scheduleWithFixedDelay(() -> {
113+
ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> {
113114
try {
114115
while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) {
115116
TrxEvent event = smartContractQueue.take();
116-
trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg()));
117+
ExecutorServiceManager.submit(
118+
trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg()));
117119
}
118120
} catch (InterruptedException e) {
119121
logger.warn("Handle smart server interrupted");

framework/src/main/java/org/tron/core/net/service/sync/SyncService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class SyncService {
7070
private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum();
7171

7272
public void init() {
73-
fetchExecutor.scheduleWithFixedDelay(() -> {
73+
ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> {
7474
try {
7575
if (fetchFlag) {
7676
fetchFlag = false;
@@ -81,7 +81,7 @@ public void init() {
8181
}
8282
}, 10, 1, TimeUnit.SECONDS);
8383

84-
blockHandleExecutor.scheduleWithFixedDelay(() -> {
84+
ExecutorServiceManager.scheduleWithFixedDelay(blockHandleExecutor, () -> {
8585
try {
8686
if (handleFlag) {
8787
handleFlag = false;

0 commit comments

Comments
 (0)