Skip to content

Commit 6272ada

Browse files
authored
Add WebSocketBlockchainListener to fetch latest block without polling the blockchain network (#639)
1 parent 9742f91 commit 6272ada

File tree

9 files changed

+323
-2
lines changed

9 files changed

+323
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
88

99
- Use TEE framework version of dApp to retrieve pre/post-compute properties via SMS endpoint. (#630)
1010
- Validate authorization proof for pre/post-compute requests. (#635)
11+
- Add `WebSocketBlockchainListener` to fetch latest block without polling the blockchain network. (#639)
1112

1213
### Quality
1314

docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
services:
2+
3+
chain:
4+
image: docker-regis.iex.ec/poco-chain:1.0.0-poco-v5.5.0-voucher-v1.0.0-nethermind
5+
expose:
6+
- "8545"
7+
8+
core:
9+
image: wiremock/wiremock:3.3.1
10+
expose:
11+
- "8080"
12+
volumes:
13+
- "./src/test/resources/wiremock/mappings:/home/wiremock/mappings"

src/main/java/com/iexec/worker/Application.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.boot.SpringApplication;
2929
import org.springframework.boot.autoconfigure.SpringBootApplication;
3030
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
31+
import org.springframework.context.annotation.Profile;
3132
import org.springframework.retry.annotation.EnableRetry;
3233
import org.springframework.scheduling.annotation.EnableAsync;
3334

@@ -37,6 +38,7 @@
3738
@EnableRetry
3839
@EnableAsync
3940
@SpringBootApplication
41+
@Profile("!test")
4042
@ConfigurationPropertiesScan
4143
public class Application implements CommandLineRunner {
4244

src/main/java/com/iexec/worker/chain/Web3jService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,13 +17,17 @@
1717
package com.iexec.worker.chain;
1818

1919
import com.iexec.commons.poco.chain.Web3jAbstractService;
20+
import com.iexec.worker.chain.event.LatestBlockEvent;
2021
import com.iexec.worker.config.ConfigServerConfigurationService;
2122
import com.iexec.worker.config.WorkerConfigurationService;
23+
import org.springframework.context.event.EventListener;
2224
import org.springframework.stereotype.Service;
2325

2426
@Service
2527
public class Web3jService extends Web3jAbstractService {
2628

29+
private long latestBlockNumber;
30+
2731
public Web3jService(ConfigServerConfigurationService configServerConfigurationService,
2832
WorkerConfigurationService workerConfService) {
2933
super(
@@ -38,4 +42,14 @@ public Web3jService(ConfigServerConfigurationService configServerConfigurationSe
3842
);
3943
}
4044

45+
@EventListener
46+
private void onLatestBlockEvent(final LatestBlockEvent event) {
47+
this.latestBlockNumber = event.getBlockNumber();
48+
}
49+
50+
@Override
51+
public long getLatestBlockNumber() {
52+
return latestBlockNumber;
53+
}
54+
4155
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.worker.chain;
18+
19+
import com.iexec.commons.poco.chain.SignerService;
20+
import com.iexec.worker.chain.event.LatestBlockEvent;
21+
import com.iexec.worker.config.ConfigServerConfigurationService;
22+
import com.iexec.worker.config.WorkerConfigurationService;
23+
import io.micrometer.core.instrument.Metrics;
24+
import io.micrometer.core.instrument.Tag;
25+
import io.reactivex.Flowable;
26+
import io.reactivex.disposables.Disposable;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.springframework.context.ApplicationEventPublisher;
29+
import org.springframework.scheduling.annotation.Scheduled;
30+
import org.springframework.stereotype.Service;
31+
import org.web3j.protocol.Web3j;
32+
import org.web3j.protocol.core.DefaultBlockParameterName;
33+
import org.web3j.protocol.core.Request;
34+
import org.web3j.protocol.core.methods.response.EthSubscribe;
35+
import org.web3j.protocol.http.HttpService;
36+
import org.web3j.protocol.websocket.WebSocketService;
37+
import org.web3j.protocol.websocket.events.NewHeadsNotification;
38+
import org.web3j.utils.Async;
39+
import org.web3j.utils.Numeric;
40+
41+
import java.io.IOException;
42+
import java.math.BigInteger;
43+
import java.net.ConnectException;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.concurrent.atomic.AtomicLong;
47+
48+
@Slf4j
49+
@Service
50+
public class WebSocketBlockchainListener {
51+
52+
static final String LATEST_BLOCK_METRIC_NAME = "iexec.chain.block.latest";
53+
static final String TX_COUNT_METRIC_NAME = "iexec.chain.wallet.tx-count";
54+
55+
private static final String SUBSCRIBE_METHOD = "eth_subscribe";
56+
private static final String UNSUBSCRIBE_METHOD = "eth_unsubscribe";
57+
58+
private final ApplicationEventPublisher applicationEventPublisher;
59+
private final String walletAddress;
60+
private final Web3j web3Client;
61+
private final WebSocketService webSocketService;
62+
private final AtomicLong lastSeenBlock;
63+
private final AtomicLong latestTxGauge;
64+
private final AtomicLong pendingTxGauge;
65+
private Disposable newHeads;
66+
67+
public WebSocketBlockchainListener(final ApplicationEventPublisher applicationEventPublisher,
68+
final ConfigServerConfigurationService configServerConfigurationService,
69+
final SignerService signerService,
70+
final WorkerConfigurationService workerConfigurationService) {
71+
this.applicationEventPublisher = applicationEventPublisher;
72+
this.walletAddress = signerService.getAddress();
73+
final String nodeUrl = !workerConfigurationService.getOverrideBlockchainNodeAddress().isEmpty() ?
74+
workerConfigurationService.getOverrideBlockchainNodeAddress() :
75+
configServerConfigurationService.getChainNodeUrl();
76+
final String wsUrl = nodeUrl.replace("http", "ws");
77+
this.webSocketService = new WebSocketService(wsUrl, false);
78+
this.web3Client = Web3j.build(new HttpService(nodeUrl),
79+
configServerConfigurationService.getBlockTime().toMillis(), Async.defaultExecutorService());
80+
lastSeenBlock = Metrics.gauge(LATEST_BLOCK_METRIC_NAME, new AtomicLong(0));
81+
latestTxGauge = Metrics.gauge(TX_COUNT_METRIC_NAME, List.of(Tag.of("block", "latest")), new AtomicLong(0));
82+
pendingTxGauge = Metrics.gauge(TX_COUNT_METRIC_NAME, List.of(Tag.of("block", "pending")), new AtomicLong(0));
83+
}
84+
85+
@Scheduled(fixedRate = 5000)
86+
public void run() throws ConnectException {
87+
if (newHeads != null && !newHeads.isDisposed()) {
88+
return;
89+
}
90+
91+
log.warn("web socket disconnection detected");
92+
webSocketService.connect();
93+
94+
final Request<?, EthSubscribe> newHeadsRequest = new Request<>(
95+
SUBSCRIBE_METHOD,
96+
List.of("newHeads", Map.of()),
97+
webSocketService,
98+
EthSubscribe.class
99+
);
100+
101+
final Flowable<NewHeadsNotification> newHeadsEvents = webSocketService.subscribe(
102+
newHeadsRequest, UNSUBSCRIBE_METHOD, NewHeadsNotification.class);
103+
newHeads = newHeadsEvents.subscribe(this::processHead, this::handleError);
104+
}
105+
106+
private void processHead(final NewHeadsNotification event) throws IOException {
107+
final long blockNumber = Numeric.toBigInt(event.getParams().getResult().getNumber()).longValue();
108+
final String blockHash = event.getParams().getResult().getHash();
109+
final long blockTimestamp = Numeric.toBigInt(event.getParams().getResult().getTimestamp()).longValue();
110+
lastSeenBlock.set(blockNumber);
111+
final BigInteger pendingTxCount = web3Client.ethGetTransactionCount(walletAddress,
112+
DefaultBlockParameterName.PENDING).send().getTransactionCount();
113+
final BigInteger latestTxCount = web3Client.ethGetTransactionCount(walletAddress,
114+
DefaultBlockParameterName.LATEST).send().getTransactionCount();
115+
log.info("Transaction count [block:{}, pending:{}, latest:{}]",
116+
lastSeenBlock, pendingTxCount, latestTxCount);
117+
pendingTxGauge.set(pendingTxCount.longValue());
118+
latestTxGauge.set(latestTxCount.longValue());
119+
applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp));
120+
}
121+
122+
private void handleError(final Throwable t) {
123+
log.error("An error happened during subscription", t);
124+
}
125+
126+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.worker.chain.event;
18+
19+
import lombok.EqualsAndHashCode;
20+
import lombok.Value;
21+
import org.springframework.context.ApplicationEvent;
22+
23+
@Value
24+
@EqualsAndHashCode(callSuper = true)
25+
public class LatestBlockEvent extends ApplicationEvent {
26+
long blockNumber;
27+
String blockHash;
28+
long blockTimestamp;
29+
30+
public LatestBlockEvent(final Object source, final long blockNumber, final String blockHash, final long blockTimestamp) {
31+
super(source);
32+
this.blockNumber = blockNumber;
33+
this.blockHash = blockHash;
34+
this.blockTimestamp = blockTimestamp;
35+
}
36+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.worker;
18+
19+
import org.springframework.boot.SpringApplication;
20+
import org.springframework.boot.autoconfigure.SpringBootApplication;
21+
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
22+
23+
/**
24+
* Custom test application to avoid run method and exits in Application class
25+
*/
26+
@SpringBootApplication
27+
@ConfigurationPropertiesScan
28+
public class TestApplication {
29+
30+
public static void main(String[] args) {
31+
SpringApplication.run(TestApplication.class, args);
32+
}
33+
34+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.worker.chain;
18+
19+
import com.iexec.worker.TestApplication;
20+
import io.micrometer.core.instrument.Gauge;
21+
import io.micrometer.core.instrument.MeterRegistry;
22+
import org.junit.jupiter.api.Test;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.boot.test.context.SpringBootTest;
25+
import org.springframework.test.context.ActiveProfiles;
26+
import org.springframework.test.context.DynamicPropertyRegistry;
27+
import org.springframework.test.context.DynamicPropertySource;
28+
import org.testcontainers.containers.ComposeContainer;
29+
import org.testcontainers.containers.wait.strategy.Wait;
30+
import org.testcontainers.junit.jupiter.Container;
31+
import org.testcontainers.junit.jupiter.Testcontainers;
32+
33+
import java.io.File;
34+
import java.util.Objects;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static com.iexec.worker.chain.WebSocketBlockchainListener.LATEST_BLOCK_METRIC_NAME;
38+
import static com.iexec.worker.chain.WebSocketBlockchainListener.TX_COUNT_METRIC_NAME;
39+
import static org.assertj.core.api.Assertions.assertThat;
40+
import static org.awaitility.Awaitility.await;
41+
42+
@Testcontainers
43+
@SpringBootTest(classes = TestApplication.class)
44+
@ActiveProfiles("test")
45+
class WebSocketBlockchainListenerTests {
46+
47+
private static final String CHAIN_SVC_NAME = "chain";
48+
private static final int CHAIN_SVC_PORT = 8545;
49+
private static final String CORE_SVC_NAME = "core";
50+
private static final int CORE_SVC_PORT = 8080;
51+
52+
@Container
53+
static ComposeContainer environment = new ComposeContainer(new File("docker-compose.yml"))
54+
.withExposedService(CHAIN_SVC_NAME, CHAIN_SVC_PORT, Wait.forListeningPort())
55+
.withExposedService(CORE_SVC_NAME, CORE_SVC_PORT, Wait.forListeningPort())
56+
.withPull(true);
57+
58+
@DynamicPropertySource
59+
static void registerProperties(DynamicPropertyRegistry registry) {
60+
registry.add("core.protocol", () -> "http");
61+
registry.add("core.host", () -> environment.getServiceHost(CORE_SVC_NAME, CORE_SVC_PORT));
62+
registry.add("core.port", () -> environment.getServicePort(CORE_SVC_NAME, CORE_SVC_PORT));
63+
registry.add("core.pool-address", () -> "0x1");
64+
registry.add("worker.override-blockchain-node-address", () -> getServiceUrl(
65+
environment.getServiceHost(CHAIN_SVC_NAME, CHAIN_SVC_PORT),
66+
environment.getServicePort(CHAIN_SVC_NAME, CHAIN_SVC_PORT)));
67+
}
68+
69+
@Autowired
70+
private MeterRegistry meterRegistry;
71+
72+
@Autowired
73+
private Web3jService web3jService;
74+
75+
private static String getServiceUrl(String serviceHost, int servicePort) {
76+
return "http://" + serviceHost + ":" + servicePort;
77+
}
78+
79+
@Test
80+
void shouldConnect() {
81+
await().atMost(10L, TimeUnit.SECONDS)
82+
.until(() -> Objects.requireNonNull(meterRegistry.find(LATEST_BLOCK_METRIC_NAME).gauge()).value() != 0.0);
83+
assertThat(meterRegistry.find(TX_COUNT_METRIC_NAME).tag("block", "latest").gauge())
84+
.isNotNull()
85+
.extracting(Gauge::value)
86+
.isEqualTo(0.0);
87+
assertThat(meterRegistry.find(TX_COUNT_METRIC_NAME).tag("block", "pending").gauge())
88+
.isNotNull()
89+
.extracting(Gauge::value)
90+
.isEqualTo(0.0);
91+
final Long latestBlockNumber = (long) Objects.requireNonNull(meterRegistry.find(LATEST_BLOCK_METRIC_NAME).gauge()).value();
92+
assertThat(latestBlockNumber).isEqualTo(web3jService.getLatestBlockNumber());
93+
}
94+
95+
}

src/test/resources/wiremock/mappings/worker-config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
},
66
"response": {
77
"status": 200,
8-
"body": "{\"workerPoolAddress\":\"0x1\",\"schedulerPublicAddress\":\"0x2\",\"configServerUrl\":\"http://localhost:{{request.port}}\",\"resultRepositoryURL\":\"http://localhost:{{request.port}}\",\"askForReplicatePeriod\":\"5\",\"requiredWorkerVersion\":\"x.y.z\"}",
8+
"body": "{\"workerPoolAddress\":\"0x1\",\"schedulerPublicAddress\":\"0x2\",\"configServerUrl\":\"http://localhost:{{request.port}}\",\"resultRepositoryURL\":\"http://localhost:{{request.port}}\",\"askForReplicatePeriod\":\"5000\",\"requiredWorkerVersion\":\"x.y.z\"}",
99
"headers": {
1010
"Content-Type": "application/json"
1111
},

0 commit comments

Comments
 (0)