Skip to content

Commit 0640dd6

Browse files
committed
feat(event): optimize the event service
1 parent 1f0aa38 commit 0640dd6

26 files changed

+1818
-16
lines changed

common/src/main/java/org/tron/common/logsfilter/EventPluginConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ public class EventPluginConfig {
1515
public static final String SOLIDITY_EVENT_NAME = "solidityevent";
1616
public static final String SOLIDITY_LOG_NAME = "soliditylog";
1717

18+
@Getter
19+
@Setter
20+
private int version;
21+
22+
@Getter
23+
@Setter
24+
private long startSyncBlockNum;
25+
1826
@Getter
1927
@Setter
2028
private String pluginPath;

common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public BlockLogTrigger() {
3434
@Override
3535
public String toString() {
3636
return new StringBuilder().append("triggerName: ").append(getTriggerName())
37-
.append("timestamp: ")
37+
.append(", timestamp: ")
3838
.append(timeStamp)
3939
.append(", blockNumber: ")
4040
.append(blockNumber)

common/src/main/java/org/tron/common/logsfilter/trigger/SolidityTrigger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public SolidityTrigger() {
1616
@Override
1717
public String toString() {
1818
return new StringBuilder().append("triggerName: ").append(getTriggerName())
19-
.append("timestamp: ")
19+
.append(", timestamp: ")
2020
.append(timeStamp)
2121
.append(", latestSolidifiedBlockNumber: ")
2222
.append(latestSolidifiedBlockNumber).toString();

common/src/main/java/org/tron/core/Constant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ public class Constant {
280280

281281
public static final String NATIVE_QUEUE_SEND_LENGTH = "event.subscribe.native.sendqueuelength";
282282

283+
public static final String EVENT_SUBSCRIBE_VERSION = "event.subscribe.version";
284+
public static final String EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM = "event.subscribe.startSyncBlockNum";
283285
public static final String EVENT_SUBSCRIBE_PATH = "event.subscribe.path";
284286
public static final String EVENT_SUBSCRIBE_SERVER = "event.subscribe.server";
285287
public static final String EVENT_SUBSCRIBE_DB_CONFIG = "event.subscribe.dbconfig";

framework/src/main/java/org/tron/common/application/ApplicationImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@
1010
import org.tron.core.db.Manager;
1111
import org.tron.core.metrics.MetricsUtil;
1212
import org.tron.core.net.TronNetService;
13+
import org.tron.core.services.event.EventService;
1314

1415
@Slf4j(topic = "app")
1516
@Component
1617
public class ApplicationImpl implements Application {
1718

1819
private ServiceContainer services;
1920

21+
@Autowired
22+
private EventService eventService;
23+
2024
@Autowired
2125
private TronNetService tronNetService;
2226

@@ -56,6 +60,7 @@ public void initServices(CommonParameter parameter) {
5660
public void startup() {
5761
this.initServices(Args.getInstance());
5862
this.startServices();
63+
eventService.init();
5964
if ((!Args.getInstance().isSolidityNode()) && (!Args.getInstance().isP2pDisable())) {
6065
tronNetService.start();
6166
}
@@ -66,6 +71,7 @@ public void startup() {
6671
@Override
6772
public void shutdown() {
6873
this.shutdownServices();
74+
eventService.close();
6975
consensusService.stop();
7076
if (!Args.getInstance().isSolidityNode() && (!Args.getInstance().p2pDisable)) {
7177
tronNetService.close();

framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.tron.common.logsfilter.trigger.SolidityTrigger;
2525
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
2626
import org.tron.common.logsfilter.trigger.Trigger;
27+
import org.tron.common.utils.JsonUtil;
2728

2829
@Slf4j
2930
public class EventPluginLoader {
@@ -42,6 +43,10 @@ public class EventPluginLoader {
4243

4344
private List<TriggerConfig> triggerConfigList;
4445

46+
private int version = 0;
47+
48+
private long startSyncBlockNum = 0;
49+
4550
private boolean blockLogTriggerEnable = false;
4651

4752
private boolean blockLogTriggerSolidified = false;
@@ -219,6 +224,10 @@ public boolean start(EventPluginConfig config) {
219224
return false;
220225
}
221226

227+
this.version = config.getVersion();
228+
229+
this.startSyncBlockNum = config.getStartSyncBlockNum();
230+
222231
this.triggerConfigList = config.getTriggerConfigList();
223232

224233
useNativeQueue = config.isUseNativeQueue();
@@ -358,6 +367,14 @@ public void postSolidityTrigger(SolidityTrigger trigger) {
358367
}
359368
}
360369

370+
public synchronized int getVersion() {
371+
return version;
372+
}
373+
374+
public synchronized long getStartSyncBlockNum() {
375+
return startSyncBlockNum;
376+
}
377+
361378
public synchronized boolean isBlockLogTriggerEnable() {
362379
return blockLogTriggerEnable;
363380
}

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,15 @@ private static EventPluginConfig getEventPluginConfig(
13231323
final com.typesafe.config.Config config) {
13241324
EventPluginConfig eventPluginConfig = new EventPluginConfig();
13251325

1326+
if (config.hasPath(Constant.EVENT_SUBSCRIBE_VERSION)) {
1327+
eventPluginConfig.setVersion(config.getInt(Constant.EVENT_SUBSCRIBE_VERSION));
1328+
}
1329+
1330+
if (config.hasPath(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM)) {
1331+
eventPluginConfig.setStartSyncBlockNum(config
1332+
.getLong(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM));
1333+
}
1334+
13261335
boolean useNativeQueue = false;
13271336
int bindPort = 0;
13281337
int sendQueueLength = 0;

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ public class Manager {
237237
Collections.synchronizedList(Lists.newArrayList());
238238
// the capacity is equal to Integer.MAX_VALUE default
239239
private BlockingQueue<TransactionCapsule> rePushTransactions;
240+
@Getter
240241
private BlockingQueue<TriggerCapsule> triggerCapsuleQueue;
241242
// log filter
242243
private boolean isRunFilterProcessThread = true;
@@ -1100,7 +1101,9 @@ private void switchFork(BlockCapsule newHead)
11001101
while (!getDynamicPropertiesStore()
11011102
.getLatestBlockHeaderHash()
11021103
.equals(binaryTree.getValue().peekLast().getParentHash())) {
1103-
reOrgContractTrigger();
1104+
if (EventPluginLoader.getInstance().getVersion() == 0) {
1105+
reOrgContractTrigger();
1106+
}
11041107
reOrgLogsFilter();
11051108
eraseBlock();
11061109
}
@@ -1362,11 +1365,26 @@ public void pushBlock(final BlockCapsule block)
13621365
}
13631366

13641367
void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
1368+
// post block and logs for jsonrpc
13651369
try {
1370+
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
1371+
postBlockFilter(block, false);
1372+
postLogsFilter(block, false, false);
1373+
}
1374+
1375+
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
1376+
postSolidityFilter(oldSolid, newSolid);
1377+
}
1378+
1379+
if (EventPluginLoader.getInstance().getVersion() != 0) {
1380+
lastUsedSolidityNum = newSolid;
1381+
return;
1382+
}
1383+
13661384
// if event subscribe is enabled, post block trigger to queue
13671385
postBlockTrigger(block);
13681386
// if event subscribe is enabled, post solidity trigger to queue
1369-
postSolidityTrigger(oldSolid, newSolid);
1387+
postSolidityTrigger(newSolid);
13701388
} catch (Exception e) {
13711389
logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}",
13721390
block.getNum(), oldSolid, newSolid, e);
@@ -1506,7 +1524,8 @@ public TransactionInfo processTransaction(final TransactionCapsule trxCap, Block
15061524

15071525
// if event subscribe is enabled, post contract triggers to queue
15081526
// only trigger when process block
1509-
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()) {
1527+
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()
1528+
&& EventPluginLoader.getInstance().getVersion() == 0) {
15101529
String blockHash = blockCap.getBlockId().toString();
15111530
postContractTrigger(trace, false, blockHash);
15121531
}
@@ -2083,7 +2102,7 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif
20832102
}
20842103
}
20852104

2086-
private void postSolidityTrigger(final long oldSolidNum, final long latestSolidifiedBlockNumber) {
2105+
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
20872106
if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) {
20882107
for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) {
20892108
postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber);
@@ -2109,10 +2128,6 @@ private void postSolidityTrigger(final long oldSolidNum, final long latestSolidi
21092128
}
21102129
}
21112130
}
2112-
2113-
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
2114-
postSolidityFilter(oldSolidNum, latestSolidifiedBlockNumber);
2115-
}
21162131
lastUsedSolidityNum = latestSolidifiedBlockNumber;
21172132
}
21182133

@@ -2224,12 +2239,6 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified,
22242239
}
22252240

22262241
void postBlockTrigger(final BlockCapsule blockCapsule) {
2227-
// post block and logs for jsonrpc
2228-
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
2229-
postBlockFilter(blockCapsule, false);
2230-
postLogsFilter(blockCapsule, false, false);
2231-
}
2232-
22332242
// process block trigger
22342243
long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
22352244
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package org.tron.core.services.event;
2+
3+
import com.google.common.collect.Lists;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import lombok.Getter;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.tron.core.capsule.BlockCapsule;
11+
import org.tron.core.services.event.bo.BlockEvent;
12+
import org.tron.core.services.event.exception.EventException;
13+
14+
@Slf4j(topic = "event")
15+
public class BlockEventCache {
16+
@Getter
17+
private static volatile long solidNum;
18+
19+
@Getter
20+
private static volatile BlockEvent head;
21+
22+
@Getter
23+
private static volatile BlockCapsule.BlockId solidId;
24+
25+
private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>();
26+
27+
private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();
28+
29+
public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) {
30+
return blockEventMap.get(blockId);
31+
}
32+
33+
public static void init(BlockCapsule.BlockId blockId) {
34+
blockEventMap.clear();
35+
numMap.clear();
36+
solidNum = blockId.getNum();
37+
head = new BlockEvent(blockId);
38+
solidId = blockId;
39+
List<BlockEvent> list = new ArrayList<>();
40+
list.add(head);
41+
numMap.put(blockId.getNum(), list);
42+
blockEventMap.put(blockId, head);
43+
}
44+
45+
public static void add(BlockEvent blockEvent) throws EventException {
46+
logger.info("Add block event, {}", blockEvent.getBlockId().getString(),
47+
blockEvent.getParentId().getString());
48+
if (blockEventMap.get(blockEvent.getParentId()) == null) {
49+
throw new EventException("unlink BlockEvent, "
50+
+ blockEvent.getBlockId().getString() + ", "
51+
+ blockEvent.getParentId().getString());
52+
}
53+
54+
long num = blockEvent.getBlockId().getNum();
55+
List<BlockEvent> list = numMap.get(num);
56+
if (list == null) {
57+
list = new ArrayList<>();
58+
numMap.put(num, list);
59+
}
60+
list.add(blockEvent);
61+
62+
blockEventMap.put(blockEvent.getBlockId(), blockEvent);
63+
64+
if (num > head.getBlockId().getNum()) {
65+
head = blockEvent;
66+
}
67+
68+
if (blockEvent.getSolidId().getNum() > solidId.getNum()) {
69+
solidId = blockEvent.getSolidId();
70+
}
71+
}
72+
73+
public static void remove(BlockCapsule.BlockId solidId) {
74+
logger.info("Remove solidId {}, solidNum {}, {}, {}",
75+
solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
76+
numMap.forEach((k, v) -> {
77+
if (k < solidId.getNum()) {
78+
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
79+
numMap.remove(k);
80+
}
81+
});
82+
solidNum = solidId.getNum();
83+
}
84+
85+
public static List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) {
86+
List<BlockEvent> blockEvents = new ArrayList<>();
87+
BlockCapsule.BlockId tmp = solidId;
88+
while (tmp.getNum() > solidNum) {
89+
BlockEvent blockEvent = blockEventMap.get(tmp);
90+
blockEvents.add(blockEvent);
91+
tmp = blockEvent.getParentId();
92+
}
93+
94+
return Lists.reverse(blockEvents);
95+
}
96+
}

0 commit comments

Comments
 (0)