Skip to content

Commit 97fcca6

Browse files
authored
fix: use less RPC calls to listen on the blockchain (#647)
1 parent 979b032 commit 97fcca6

File tree

3 files changed

+98
-129
lines changed

3 files changed

+98
-129
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.worker.chain;
18+
19+
import com.iexec.commons.poco.chain.SignerService;
20+
import com.iexec.worker.chain.event.LatestBlockEvent;
21+
import com.iexec.worker.config.ConfigServerConfigurationService;
22+
import com.iexec.worker.config.WorkerConfigurationService;
23+
import io.micrometer.core.instrument.Metrics;
24+
import io.micrometer.core.instrument.Tag;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.springframework.context.ApplicationEventPublisher;
27+
import org.springframework.scheduling.annotation.Scheduled;
28+
import org.springframework.stereotype.Service;
29+
import org.web3j.protocol.Web3j;
30+
import org.web3j.protocol.core.DefaultBlockParameterName;
31+
import org.web3j.protocol.core.methods.response.EthBlock;
32+
import org.web3j.protocol.http.HttpService;
33+
import org.web3j.utils.Async;
34+
import org.web3j.utils.Numeric;
35+
36+
import java.io.IOException;
37+
import java.math.BigInteger;
38+
import java.util.List;
39+
import java.util.concurrent.atomic.AtomicLong;
40+
41+
@Slf4j
42+
@Service
43+
public class BlockchainListener {
44+
45+
static final String LATEST_BLOCK_METRIC_NAME = "iexec.chain.block.latest";
46+
static final String TX_COUNT_METRIC_NAME = "iexec.chain.wallet.tx-count";
47+
48+
private final ApplicationEventPublisher applicationEventPublisher;
49+
private final String walletAddress;
50+
private final Web3j web3Client;
51+
private final AtomicLong lastSeenBlock;
52+
private final AtomicLong latestTxGauge;
53+
private final AtomicLong pendingTxGauge;
54+
55+
public BlockchainListener(final ApplicationEventPublisher applicationEventPublisher,
56+
final ConfigServerConfigurationService configServerConfigurationService,
57+
final SignerService signerService,
58+
final WorkerConfigurationService workerConfigurationService) {
59+
this.applicationEventPublisher = applicationEventPublisher;
60+
this.walletAddress = signerService.getAddress();
61+
final String nodeUrl = !workerConfigurationService.getOverrideBlockchainNodeAddress().isEmpty() ?
62+
workerConfigurationService.getOverrideBlockchainNodeAddress() :
63+
configServerConfigurationService.getChainNodeUrl();
64+
this.web3Client = Web3j.build(new HttpService(nodeUrl),
65+
configServerConfigurationService.getBlockTime().toMillis(), Async.defaultExecutorService());
66+
lastSeenBlock = Metrics.gauge(LATEST_BLOCK_METRIC_NAME, new AtomicLong(0));
67+
latestTxGauge = Metrics.gauge(TX_COUNT_METRIC_NAME, List.of(Tag.of("block", "latest")), new AtomicLong(0));
68+
pendingTxGauge = Metrics.gauge(TX_COUNT_METRIC_NAME, List.of(Tag.of("block", "pending")), new AtomicLong(0));
69+
}
70+
71+
@Scheduled(fixedRate = 5000)
72+
public void run() throws IOException {
73+
try {
74+
final EthBlock.Block latestBlock = web3Client.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send().getBlock();
75+
final long blockNumber = Numeric.toBigInt(latestBlock.getNumberRaw()).longValue();
76+
final String blockHash = latestBlock.getHash();
77+
final long blockTimestamp = Numeric.toBigInt(latestBlock.getTimestampRaw()).longValue();
78+
lastSeenBlock.set(blockNumber);
79+
final BigInteger pendingTxCount = web3Client.ethGetTransactionCount(walletAddress,
80+
DefaultBlockParameterName.PENDING).send().getTransactionCount();
81+
if (pendingTxCount.longValue() > pendingTxGauge.get() || pendingTxGauge.get() != latestTxGauge.get()) {
82+
final BigInteger latestTxCount = web3Client.ethGetTransactionCount(walletAddress,
83+
DefaultBlockParameterName.LATEST).send().getTransactionCount();
84+
pendingTxGauge.set(pendingTxCount.longValue());
85+
latestTxGauge.set(latestTxCount.longValue());
86+
}
87+
log.info("Transaction count [block:{}, pending:{}, latest:{}]",
88+
lastSeenBlock, pendingTxGauge.get(), latestTxGauge.get());
89+
applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp));
90+
} catch (Exception e) {
91+
log.error("An error happened while fetching data on-chain", e);
92+
}
93+
}
94+
95+
}

src/main/java/com/iexec/worker/chain/WebSocketBlockchainListener.java

Lines changed: 0 additions & 126 deletions
This file was deleted.

src/test/java/com/iexec/worker/chain/WebSocketBlockchainListenerTests.java renamed to src/test/java/com/iexec/worker/chain/BlockchainListenerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@
3434
import java.util.Objects;
3535
import java.util.concurrent.TimeUnit;
3636

37-
import static com.iexec.worker.chain.WebSocketBlockchainListener.LATEST_BLOCK_METRIC_NAME;
38-
import static com.iexec.worker.chain.WebSocketBlockchainListener.TX_COUNT_METRIC_NAME;
37+
import static com.iexec.worker.chain.BlockchainListener.LATEST_BLOCK_METRIC_NAME;
38+
import static com.iexec.worker.chain.BlockchainListener.TX_COUNT_METRIC_NAME;
3939
import static org.assertj.core.api.Assertions.assertThat;
4040
import static org.awaitility.Awaitility.await;
4141

4242
@Testcontainers
4343
@SpringBootTest(classes = TestApplication.class)
4444
@ActiveProfiles("test")
45-
class WebSocketBlockchainListenerTests {
45+
class BlockchainListenerTests {
4646

4747
private static final String CHAIN_SVC_NAME = "chain";
4848
private static final int CHAIN_SVC_PORT = 8545;

0 commit comments

Comments
 (0)