Skip to content

Commit 35e6285

Browse files
authored
fix: refactor event lookup on-chain to use LogTopic (#763)
1 parent 4707de3 commit 35e6285

File tree

7 files changed

+133
-188
lines changed

7 files changed

+133
-188
lines changed

src/main/java/com/iexec/core/chain/IexecHubService.java

Lines changed: 64 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,22 @@
1919
import com.iexec.common.lifecycle.purge.Purgeable;
2020
import com.iexec.commons.poco.chain.*;
2121
import com.iexec.commons.poco.contract.generated.IexecHubContract;
22-
import com.iexec.commons.poco.utils.BytesUtils;
22+
import com.iexec.commons.poco.encoding.LogTopic;
2323
import io.reactivex.Flowable;
2424
import jakarta.annotation.PreDestroy;
2525
import lombok.extern.slf4j.Slf4j;
2626
import org.springframework.stereotype.Service;
27-
import org.web3j.abi.EventEncoder;
28-
import org.web3j.abi.datatypes.Event;
2927
import org.web3j.protocol.core.DefaultBlockParameter;
3028
import org.web3j.protocol.core.methods.request.EthFilter;
29+
import org.web3j.protocol.core.methods.response.EthLog;
30+
import org.web3j.protocol.core.methods.response.Log;
3131

3232
import java.math.BigInteger;
33+
import java.util.Arrays;
3334
import java.util.Date;
3435

3536
import static com.iexec.commons.poco.chain.ChainContributionStatus.CONTRIBUTED;
3637
import static com.iexec.commons.poco.chain.ChainContributionStatus.REVEALED;
37-
import static com.iexec.commons.poco.contract.generated.IexecHubContract.*;
3838

3939
@Slf4j
4040
@Service
@@ -43,9 +43,9 @@ public class IexecHubService extends IexecHubAbstractService implements Purgeabl
4343
private final SignerService signerService;
4444
private final Web3jService web3jService;
4545

46-
public IexecHubService(SignerService signerService,
47-
Web3jService web3jService,
48-
ChainConfig chainConfig) {
46+
public IexecHubService(final SignerService signerService,
47+
final Web3jService web3jService,
48+
final ChainConfig chainConfig) {
4949
super(
5050
signerService.getCredentials(),
5151
web3jService,
@@ -156,124 +156,95 @@ public boolean isRevealed(String... args) {
156156
// endregion
157157

158158
// region get event blocks
159-
public ChainReceipt getContributionBlock(String chainTaskId,
160-
String workerWallet,
161-
long fromBlock) {
162-
long latestBlock = web3jService.getLatestBlockNumber();
159+
public ChainReceipt getInitializeBlock(final String chainTaskId,
160+
final long fromBlock) {
161+
log.debug("getInitializeBlock [chainTaskId:{}]", chainTaskId);
162+
final long latestBlock = web3jService.getLatestBlockNumber();
163163
if (fromBlock > latestBlock) {
164164
return ChainReceipt.builder().build();
165165
}
166-
167-
EthFilter ethFilter = createContributeEthFilter(fromBlock, latestBlock);
168-
169-
// filter only taskContribute events for the chainTaskId and the worker's wallet
170-
// and retrieve the block number of the event
171-
return iexecHubContract.taskContributeEventFlowable(ethFilter)
172-
.filter(eventResponse ->
173-
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) &&
174-
workerWallet.equals(eventResponse.worker)
175-
)
176-
.map(eventResponse -> ChainReceipt.builder()
177-
.blockNumber(eventResponse.log.getBlockNumber().longValue())
178-
.txHash(eventResponse.log.getTransactionHash())
179-
.build())
166+
final EthFilter ethFilter = createEthFilter(
167+
fromBlock, latestBlock, LogTopic.TASK_INITIALIZE_EVENT, chainTaskId);
168+
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
169+
.map(this::createChainReceipt)
180170
.blockingFirst();
181171
}
182172

183-
public ChainReceipt getConsensusBlock(String chainTaskId, long fromBlock) {
173+
public ChainReceipt getContributionBlock(final String chainTaskId,
174+
final String workerWallet,
175+
final long fromBlock) {
176+
log.debug("getContributionBlock [chainTaskId:{}]", chainTaskId);
184177
long latestBlock = web3jService.getLatestBlockNumber();
185178
if (fromBlock > latestBlock) {
186179
return ChainReceipt.builder().build();
187180
}
188-
189-
EthFilter ethFilter = createConsensusEthFilter(fromBlock, latestBlock);
190-
191-
// filter only taskConsensus events for the chainTaskId (there should be only one)
192-
// and retrieve the block number of the event
193-
return iexecHubContract.taskConsensusEventFlowable(ethFilter)
194-
.filter(eventResponse -> chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)))
195-
.map(eventResponse -> ChainReceipt.builder()
196-
.blockNumber(eventResponse.log.getBlockNumber().longValue())
197-
.txHash(eventResponse.log.getTransactionHash())
198-
.build())
181+
final EthFilter ethFilter = createEthFilter(
182+
fromBlock, latestBlock, LogTopic.TASK_CONTRIBUTE_EVENT, chainTaskId, workerWallet);
183+
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
184+
.map(this::createChainReceipt)
199185
.blockingFirst();
200186
}
201187

202-
public ChainReceipt getRevealBlock(String chainTaskId,
203-
String workerWallet,
204-
long fromBlock) {
188+
public ChainReceipt getConsensusBlock(final String chainTaskId, final long fromBlock) {
189+
log.debug("getConsensusBlock [chainTaskId:{}]", chainTaskId);
205190
long latestBlock = web3jService.getLatestBlockNumber();
206191
if (fromBlock > latestBlock) {
207192
return ChainReceipt.builder().build();
208193
}
209-
210-
EthFilter ethFilter = createRevealEthFilter(fromBlock, latestBlock);
211-
212-
// filter only taskReveal events for the chainTaskId and the worker's wallet
213-
// and retrieve the block number of the event
214-
return iexecHubContract.taskRevealEventFlowable(ethFilter)
215-
.filter(eventResponse ->
216-
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) &&
217-
workerWallet.equals(eventResponse.worker)
218-
)
219-
.map(eventResponse -> ChainReceipt.builder()
220-
.blockNumber(eventResponse.log.getBlockNumber().longValue())
221-
.txHash(eventResponse.log.getTransactionHash())
222-
.build())
194+
final EthFilter ethFilter = createEthFilter(
195+
fromBlock, latestBlock, LogTopic.TASK_CONSENSUS_EVENT, chainTaskId);
196+
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
197+
.map(this::createChainReceipt)
223198
.blockingFirst();
224199
}
225200

226-
public ChainReceipt getFinalizeBlock(String chainTaskId, long fromBlock) {
201+
public ChainReceipt getRevealBlock(final String chainTaskId,
202+
final String workerWallet,
203+
final long fromBlock) {
204+
log.debug("getRevealBlock [chainTaskId:{}]", chainTaskId);
227205
long latestBlock = web3jService.getLatestBlockNumber();
228206
if (fromBlock > latestBlock) {
229207
return ChainReceipt.builder().build();
230208
}
231-
232-
EthFilter ethFilter = createFinalizeEthFilter(fromBlock, latestBlock);
233-
234-
// filter only taskFinalize events for the chainTaskId (there should be only one)
235-
// and retrieve the block number of the event
236-
return iexecHubContract.taskFinalizeEventFlowable(ethFilter)
237-
.filter(eventResponse ->
238-
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid))
239-
)
240-
.map(eventResponse -> ChainReceipt.builder()
241-
.blockNumber(eventResponse.log.getBlockNumber().longValue())
242-
.txHash(eventResponse.log.getTransactionHash())
243-
.build())
209+
final EthFilter ethFilter = createEthFilter(
210+
fromBlock, latestBlock, LogTopic.TASK_REVEAL_EVENT, chainTaskId, workerWallet);
211+
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
212+
.map(this::createChainReceipt)
244213
.blockingFirst();
245214
}
246215

247-
private EthFilter createContributeEthFilter(long fromBlock, long toBlock) {
248-
return createEthFilter(fromBlock, toBlock, TASKCONTRIBUTE_EVENT);
249-
}
250-
251-
private EthFilter createConsensusEthFilter(long fromBlock, long toBlock) {
252-
return createEthFilter(fromBlock, toBlock, TASKCONSENSUS_EVENT);
253-
}
254-
255-
private EthFilter createRevealEthFilter(long fromBlock, long toBlock) {
256-
return createEthFilter(fromBlock, toBlock, TASKREVEAL_EVENT);
216+
public ChainReceipt getFinalizeBlock(final String chainTaskId,
217+
final long fromBlock) {
218+
log.debug("getFinalizeBlock [chainTaskId:{}]", chainTaskId);
219+
long latestBlock = web3jService.getLatestBlockNumber();
220+
if (fromBlock > latestBlock) {
221+
return ChainReceipt.builder().build();
222+
}
223+
final EthFilter ethFilter = createEthFilter(
224+
fromBlock, latestBlock, LogTopic.TASK_FINALIZE_EVENT, chainTaskId);
225+
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
226+
.map(this::createChainReceipt)
227+
.blockingFirst();
257228
}
258229

259-
private EthFilter createFinalizeEthFilter(long fromBlock, long toBlock) {
260-
return createEthFilter(fromBlock, toBlock, TASKFINALIZE_EVENT);
230+
private ChainReceipt createChainReceipt(final EthLog ethLog) {
231+
final Log logEvent = (Log) ethLog.getLogs().get(0);
232+
return ChainReceipt.builder()
233+
.blockNumber(logEvent.getBlockNumber().longValue())
234+
.txHash(logEvent.getTransactionHash())
235+
.build();
261236
}
262237

263-
private EthFilter createEthFilter(long fromBlock, long toBlock, Event event) {
264-
IexecHubContract iexecHub = getHubContract();
265-
DefaultBlockParameter startBlock =
266-
DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock));
267-
DefaultBlockParameter endBlock =
268-
DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock));
269-
270-
// define the filter
271-
EthFilter ethFilter = new EthFilter(
272-
startBlock,
273-
endBlock,
274-
iexecHub.getContractAddress()
238+
private EthFilter createEthFilter(final long fromBlock,
239+
final long toBlock,
240+
final String... topics) {
241+
log.debug("createEthFilter [from:{}, to:{}]", fromBlock, toBlock);
242+
final EthFilter ethFilter = new EthFilter(
243+
DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock)),
244+
DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock)),
245+
iexecHubAddress
275246
);
276-
ethFilter.addSingleTopic(EventEncoder.encode(event));
247+
Arrays.stream(topics).forEach(ethFilter::addSingleTopic);
277248

278249
return ethFilter;
279250
}

src/main/java/com/iexec/core/detector/task/InitializedTaskDetector.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,8 +28,6 @@
2828
import org.springframework.scheduling.annotation.Scheduled;
2929
import org.springframework.stereotype.Service;
3030

31-
import java.util.Optional;
32-
3331
@Slf4j
3432
@Service
3533
public class InitializedTaskDetector implements Detector {
@@ -38,9 +36,9 @@ public class InitializedTaskDetector implements Detector {
3836
private final TaskUpdateRequestManager taskUpdateRequestManager;
3937
private final IexecHubService iexecHubService;
4038

41-
public InitializedTaskDetector(TaskService taskService,
42-
TaskUpdateRequestManager taskUpdateRequestManager,
43-
IexecHubService iexecHubService) {
39+
public InitializedTaskDetector(final TaskService taskService,
40+
final TaskUpdateRequestManager taskUpdateRequestManager,
41+
final IexecHubService iexecHubService) {
4442
this.taskService = taskService;
4543
this.taskUpdateRequestManager = taskUpdateRequestManager;
4644
this.iexecHubService = iexecHubService;
@@ -54,8 +52,8 @@ public InitializedTaskDetector(TaskService taskService,
5452
public void detect() {
5553
log.debug("Trying to detect initializable tasks");
5654
for (Task task : taskService.getInitializableTasks()) {
57-
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
58-
if (chainTask.isEmpty() || chainTask.get().getStatus().equals(ChainTaskStatus.UNSET)) {
55+
final ChainTask chainTask = iexecHubService.getChainTask(task.getChainTaskId()).orElse(null);
56+
if (chainTask == null || chainTask.getStatus() == ChainTaskStatus.UNSET) {
5957
continue;
6058
}
6159
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",

src/main/java/com/iexec/core/task/update/TaskUpdateManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,9 @@ void initializing2Initialized(final Task task) {
242242
log.info("Initialized on blockchain (tx mined) [chainTaskId:{}]", task.getChainTaskId());
243243
final Update update = new Update();
244244
// Without receipt, using deal block for initialization block
245-
task.setInitializationBlockNumber(task.getDealBlockNumber());
246-
update.set("initializationBlockNumber", task.getDealBlockNumber());
245+
final ChainReceipt chainReceipt = iexecHubService.getInitializeBlock(task.getChainTaskId(), task.getDealBlockNumber());
246+
task.setInitializationBlockNumber(chainReceipt.getBlockNumber());
247+
update.set("initializationBlockNumber", chainReceipt.getBlockNumber());
247248

248249
// Create enclave challenge after task has been initialized on-chain
249250
final Optional<String> enclaveChallenge = smsService.getEnclaveChallenge(

src/test/java/com/iexec/core/TestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class TestUtils {
3030
public static final String CHAIN_DEAL_ID = "0xd82223e5feff6720792ffed1665e980da95e5d32b177332013eaba8edc07f31c";
3131
public static final String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426";
3232
public static final int TASK_INDEX = 0;
33+
public static final long DEAL_BLOCK = 1_000L;
3334

3435
public static final String WORKER_ADDRESS = "0x87ae2b87b5db23830572988fb1f51242fbc471ce";
3536
public static final String WALLET_WORKER_1 = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248";
@@ -45,6 +46,7 @@ public class TestUtils {
4546

4647
public static Task getStubTask() {
4748
final Task task = new Task(CHAIN_DEAL_ID, 0, DAPP_NAME, COMMAND_LINE, 1, 60000, NO_TEE_TAG);
49+
task.setDealBlockNumber(DEAL_BLOCK);
4850
task.setContributionDeadline(Date.from(Instant.now().plus(1, ChronoUnit.MINUTES)));
4951
task.setFinalDeadline(Date.from(Instant.now().plus(1, ChronoUnit.MINUTES)));
5052
return task;

0 commit comments

Comments
 (0)