Skip to content

Commit 2c40fb4

Browse files
authored
Add WebSocketBlockchainListener to fetch latest block without polling the blockchain network (#747)
1 parent 0bf846b commit 2c40fb4

File tree

12 files changed

+305
-25
lines changed

12 files changed

+305
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44

55
## [[NEXT]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/vNEXT) 2025
66

7+
### New Features
8+
9+
- Add WebSocketBlockchainListener to fetch latest block without polling the blockchain network. (#747)
10+
711
### Bug Fixes
812

913
- Remove TaskResultUploadTimeout detector, covered by FinalDeadlineTaskDetector. (#732)

docker-compose.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
services:
2+
3+
chain:
4+
image: docker-regis.iex.ec/poco-chain:1.0.0-poco-v5.5.0-voucher-v1.0.0-nethermind
5+
expose:
6+
- "8545"
7+
8+
config-server:
9+
image: wiremock/wiremock:3.3.1
10+
expose:
11+
- "8080"
12+
volumes:
13+
- "./src/test/resources/wiremock/mappings:/home/wiremock/mappings"
14+
15+
mongo:
16+
image: library/mongo:7.0.15-jammy
17+
expose:
18+
- "27017"

iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class PlatformMetric {
3535
int aliveComputingGpu;
3636
int aliveRegisteredGpu;
3737
LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
38+
LatestBlockMetric latestBlockMetric;
3839
long dealEventsCount;
3940
long dealsCount;
4041
long replayDealsCount;
@@ -66,4 +67,7 @@ public class PlatformMetric {
6667
@JsonPOJOBuilder(withPrefix = "")
6768
public static class PlatformMetricBuilder {
6869
}
70+
71+
public record LatestBlockMetric(long blockNumber, String blockHash, long blockTimestamp) {
72+
}
6973
}

src/main/java/com/iexec/core/chain/BlockchainConnectionHealthIndicator.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2023-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import com.iexec.core.chain.event.ChainConnectedEvent;
2020
import com.iexec.core.chain.event.ChainDisconnectedEvent;
21+
import com.iexec.core.chain.event.LatestBlockEvent;
2122
import lombok.Getter;
2223
import lombok.extern.slf4j.Slf4j;
2324
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,6 +32,7 @@
3132

3233
import java.time.Clock;
3334
import java.time.Duration;
35+
import java.time.Instant;
3436
import java.time.LocalDateTime;
3537
import java.util.concurrent.Executors;
3638
import java.util.concurrent.ScheduledExecutorService;
@@ -41,7 +43,6 @@
4143
public class BlockchainConnectionHealthIndicator implements HealthIndicator {
4244

4345
private final ApplicationEventPublisher applicationEventPublisher;
44-
private final Web3jService web3jService;
4546
/**
4647
* Interval between 2 requests onto the chain.
4748
*/
@@ -57,6 +58,8 @@ public class BlockchainConnectionHealthIndicator implements HealthIndicator {
5758
private int consecutiveFailures = 1;
5859
@Getter
5960
private LocalDateTime firstFailure = null;
61+
private long latestBlockTimestamp;
62+
private long latestBlockNumber;
6063

6164
/**
6265
* Required for test purposes.
@@ -66,11 +69,9 @@ public class BlockchainConnectionHealthIndicator implements HealthIndicator {
6669
@Autowired
6770
public BlockchainConnectionHealthIndicator(
6871
ApplicationEventPublisher applicationEventPublisher,
69-
Web3jService web3jService,
7072
ChainConfig chainConfig) {
7173
this(
7274
applicationEventPublisher,
73-
web3jService,
7475
chainConfig,
7576
Executors.newSingleThreadScheduledExecutor(),
7677
Clock.systemDefaultZone()
@@ -79,12 +80,10 @@ public BlockchainConnectionHealthIndicator(
7980

8081
BlockchainConnectionHealthIndicator(
8182
ApplicationEventPublisher applicationEventPublisher,
82-
Web3jService web3jService,
8383
ChainConfig chainConfig,
8484
ScheduledExecutorService monitoringExecutor,
8585
Clock clock) {
8686
this.applicationEventPublisher = applicationEventPublisher;
87-
this.web3jService = web3jService;
8887
this.pollingInterval = chainConfig.getBlockTime();
8988
this.monitoringExecutor = monitoringExecutor;
9089
this.clock = clock;
@@ -107,12 +106,13 @@ void scheduleMonitoring() {
107106
* blockchain node is restored.
108107
*/
109108
void checkConnection() {
110-
final long latestBlockNumber = web3jService.getLatestBlockNumber();
111109
log.debug("Latest on-chain block number [block:{}]", latestBlockNumber);
112-
if (latestBlockNumber == 0) {
110+
final Instant now = Instant.now();
111+
final Instant threshold = Instant.ofEpochSecond(latestBlockTimestamp).plusSeconds(pollingInterval.toSeconds());
112+
if (now.isAfter(threshold)) {
113113
connectionFailed();
114114
} else {
115-
connectionSucceeded(latestBlockNumber);
115+
connectionSucceeded();
116116
}
117117
}
118118

@@ -145,7 +145,7 @@ private void connectionFailed() {
145145
* <li>If {@link Status#OUT_OF_SERVICE}, publish a {@link ChainConnectedEvent} event
146146
* </ul>
147147
*/
148-
private void connectionSucceeded(long latestBlockNumber) {
148+
private void connectionSucceeded() {
149149
if (isOutOfService()) {
150150
log.info("Blockchain connection is now restored after a period of unavailability." +
151151
" [block:{}, unavailabilityPeriod:{}]",
@@ -190,4 +190,10 @@ public boolean isOutOfService() {
190190
public boolean isUp() {
191191
return health().getStatus() == Status.UP;
192192
}
193+
194+
@EventListener
195+
private void onLatestBlockEvent(final LatestBlockEvent event) {
196+
latestBlockNumber = event.getBlockNumber();
197+
latestBlockTimestamp = event.getBlockTimestamp();
198+
}
193199
}

src/main/java/com/iexec/core/chain/Web3jService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package com.iexec.core.chain;
1818

1919
import com.iexec.commons.poco.chain.Web3jAbstractService;
20+
import com.iexec.core.chain.event.LatestBlockEvent;
21+
import org.springframework.context.event.EventListener;
2022
import org.springframework.stereotype.Service;
2123

2224
@Service
2325
public class Web3jService extends Web3jAbstractService {
2426

27+
private long latestBlockNumber;
28+
2529
public Web3jService(ChainConfig chainConfig) {
2630
super(
2731
chainConfig.getChainId(),
@@ -33,4 +37,14 @@ public Web3jService(ChainConfig chainConfig) {
3337
);
3438
}
3539

40+
@EventListener
41+
private void onLatestBlockEvent(final LatestBlockEvent event) {
42+
this.latestBlockNumber = event.getBlockNumber();
43+
}
44+
45+
@Override
46+
public long getLatestBlockNumber() {
47+
return latestBlockNumber;
48+
}
49+
3650
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.chain;
18+
19+
import com.iexec.core.chain.event.LatestBlockEvent;
20+
import io.micrometer.core.instrument.Metrics;
21+
import io.reactivex.Flowable;
22+
import io.reactivex.disposables.Disposable;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.springframework.context.ApplicationEventPublisher;
25+
import org.springframework.scheduling.annotation.Scheduled;
26+
import org.springframework.stereotype.Service;
27+
import org.web3j.protocol.core.Request;
28+
import org.web3j.protocol.core.methods.response.EthSubscribe;
29+
import org.web3j.protocol.websocket.WebSocketService;
30+
import org.web3j.protocol.websocket.events.NewHeadsNotification;
31+
import org.web3j.utils.Numeric;
32+
33+
import java.net.ConnectException;
34+
import java.time.Instant;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.atomic.AtomicLong;
38+
39+
@Slf4j
40+
@Service
41+
public class WebSocketBlockchainListener {
42+
static final String LATEST_BLOCK_METRIC_NAME = "iexec.chain.block.latest";
43+
44+
private static final String SUBSCRIBE_METHOD = "eth_subscribe";
45+
private static final String UNSUBSCRIBE_METHOD = "eth_unsubscribe";
46+
47+
private final ApplicationEventPublisher applicationEventPublisher;
48+
private final WebSocketService webSocketService;
49+
50+
private final AtomicLong lastSeenBlock;
51+
private Disposable newHeads;
52+
53+
public WebSocketBlockchainListener(final ApplicationEventPublisher applicationEventPublisher,
54+
final ChainConfig chainConfig) {
55+
this.applicationEventPublisher = applicationEventPublisher;
56+
final String wsUrl = chainConfig.getNodeAddress().replace("http", "ws");
57+
this.webSocketService = new WebSocketService(wsUrl, false);
58+
lastSeenBlock = Metrics.gauge(LATEST_BLOCK_METRIC_NAME, new AtomicLong(0));
59+
}
60+
61+
@Scheduled(fixedRate = 5000)
62+
private void run() throws ConnectException {
63+
if (newHeads != null && !newHeads.isDisposed()) {
64+
return;
65+
}
66+
67+
log.warn("web socket disconnection detected");
68+
webSocketService.connect();
69+
70+
final Request<?, EthSubscribe> newHeadsRequest = new Request<>(
71+
SUBSCRIBE_METHOD,
72+
List.of("newHeads", Map.of()),
73+
webSocketService,
74+
EthSubscribe.class
75+
);
76+
77+
final Flowable<NewHeadsNotification> newHeadsEvents = webSocketService.subscribe(
78+
newHeadsRequest, UNSUBSCRIBE_METHOD, NewHeadsNotification.class);
79+
newHeads = newHeadsEvents.subscribe(this::processHead, this::handleError);
80+
}
81+
82+
private void processHead(final NewHeadsNotification event) {
83+
final long blockNumber = Numeric.toBigInt(event.getParams().getResult().getNumber()).longValue();
84+
final String blockHash = event.getParams().getResult().getHash();
85+
final long blockTimestamp = Numeric.toBigInt(event.getParams().getResult().getTimestamp()).longValue();
86+
final Instant blockTimestampInstant = Instant.ofEpochSecond(blockTimestamp);
87+
log.info("Last seen block [number:{}, hash:{}, timestamp:{}, instant:{}]",
88+
blockNumber, blockHash, blockTimestamp, blockTimestampInstant);
89+
lastSeenBlock.set(blockNumber);
90+
applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp));
91+
}
92+
93+
private void handleError(final Throwable t) {
94+
log.error("An error happened during subscription", t);
95+
}
96+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.chain.event;
18+
19+
import lombok.EqualsAndHashCode;
20+
import lombok.Value;
21+
import org.springframework.context.ApplicationEvent;
22+
23+
@Value
24+
@EqualsAndHashCode(callSuper = true)
25+
public class LatestBlockEvent extends ApplicationEvent {
26+
long blockNumber;
27+
String blockHash;
28+
long blockTimestamp;
29+
30+
public LatestBlockEvent(final Object source, final long blockNumber, final String blockHash, final long blockTimestamp) {
31+
super(source);
32+
this.blockNumber = blockNumber;
33+
this.blockHash = blockHash;
34+
this.blockTimestamp = blockTimestamp;
35+
}
36+
}

src/main/java/com/iexec/core/metric/MetricService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.iexec.core.metric;
1818

1919
import com.iexec.core.chain.DealWatcherService;
20+
import com.iexec.core.chain.event.LatestBlockEvent;
2021
import com.iexec.core.task.TaskStatus;
2122
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
2223
import com.iexec.core.worker.AliveWorkerMetrics;
@@ -31,6 +32,7 @@ public class MetricService {
3132
private final DealWatcherService dealWatcherService;
3233
private final WorkerService workerService;
3334
private LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
35+
private PlatformMetric.LatestBlockMetric latestBlock;
3436

3537
public MetricService(final DealWatcherService dealWatcherService,
3638
final WorkerService workerService) {
@@ -49,6 +51,7 @@ public PlatformMetric getPlatformMetrics() {
4951
.aliveComputingGpu(aliveWorkerMetrics.aliveComputingGpu())
5052
.aliveRegisteredGpu(aliveWorkerMetrics.aliveRegisteredGpu())
5153
.currentTaskStatusesCount(currentTaskStatusesCount)
54+
.latestBlockMetric(latestBlock)
5255
.dealEventsCount(dealWatcherService.getDealEventsCount())
5356
.dealsCount(dealWatcherService.getDealsCount())
5457
.replayDealsCount(dealWatcherService.getReplayDealsCount())
@@ -60,6 +63,11 @@ public PlatformMetric getPlatformMetrics() {
6063
.build();
6164
}
6265

66+
@EventListener
67+
private void onLatestBlockEvent(final LatestBlockEvent event) {
68+
latestBlock = new PlatformMetric.LatestBlockMetric(event.getBlockNumber(), event.getBlockHash(), event.getBlockTimestamp());
69+
}
70+
6371
@EventListener
6472
void onTaskStatusesCountUpdateEvent(final TaskStatusesCountUpdatedEvent event) {
6573
this.currentTaskStatusesCount = event.getCurrentTaskStatusesCount();

0 commit comments

Comments
 (0)