Skip to content

Commit 81c994f

Browse files
authored
Merge pull request #550 from iExecBlockchainComputing/feature/update-iexec-hub-service
Feature/update iexec hub service
2 parents a0e5adf + 6e0c333 commit 81c994f

File tree

3 files changed

+110
-72
lines changed

3 files changed

+110
-72
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ All notable changes to this project will be documented in this file.
99
### Bug fixes
1010
- Implement thread-safe login on scheduler. (#541)
1111
- Fix and harmonize `Dockerfile entrypoint` in all Spring Boot applications. (#548)
12+
- Remove potential `NullPointerException` and add `isStatusValidOnChainAfterPendingReceipt` in `IexecHubService`. (#550)
1213
### Quality
1314
- Remove `nexus.intra.iex.ec` repository. (#539)
1415
- Remove `Graylog` support. Fetch logs with a sidecar to push them to your log infrastructure. (#540)
1516
- Rename scontain registry to `registry.scontain.com`. (#542)
1617
- Upgrade to Gradle 8.2.1 with up-to-date plugins. (#545)
1718
- Fix log format in `LasService`. (#546)
1819
- Do not retry calls to fetch replicate from a scheduler, those calls are already scheduled. (#547)
20+
- Remove dead code in `IexecHubService`. (#550)
1921
### Dependency Upgrades
2022
- Upgrade to `iexec-common` 8.2.1-NEXT-SNAPSHOT. (#538)
2123
- Remove `logstash-gelf` dependency. (#540)

src/main/java/com/iexec/worker/chain/IexecHubService.java

Lines changed: 71 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.springframework.beans.factory.annotation.Autowired;
2828
import org.springframework.stereotype.Service;
2929
import org.web3j.protocol.core.RemoteCall;
30-
import org.web3j.protocol.core.methods.response.BaseEventResponse;
30+
import org.web3j.protocol.core.methods.response.Log;
3131
import org.web3j.protocol.core.methods.response.TransactionReceipt;
3232

3333
import java.nio.charset.StandardCharsets;
@@ -38,7 +38,6 @@
3838
import java.util.concurrent.ExecutionException;
3939
import java.util.concurrent.Executors;
4040
import java.util.concurrent.ThreadPoolExecutor;
41-
import java.util.function.Function;
4241
import java.util.stream.Collectors;
4342

4443
import static com.iexec.commons.poco.chain.ChainContributionStatus.CONTRIBUTED;
@@ -70,6 +69,7 @@ public IexecHubService(CredentialsService credentialsService,
7069
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
7170
}
7271

72+
// region contribute
7373
IexecHubContract.TaskContributeEventResponse contribute(Contribution contribution) {
7474
try {
7575
return CompletableFuture.supplyAsync(() -> {
@@ -113,33 +113,21 @@ private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(C
113113
.collect(Collectors.toList());
114114
log.debug("contributeEvents count {} [chainTaskId: {}]", contributeEvents.size(), chainTaskId);
115115

116-
IexecHubContract.TaskContributeEventResponse contributeEvent = null;
117116
if (!contributeEvents.isEmpty()) {
118-
contributeEvent = contributeEvents.get(0);
119-
}
120-
121-
if (isSuccessTx(chainTaskId, contributeEvent, CONTRIBUTED)) {
122-
log.info("Contributed [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
123-
chainTaskId, contribution, contributeReceipt.getGasUsed(), contributeEvent.log);
124-
return contributeEvent;
117+
IexecHubContract.TaskContributeEventResponse contributeEvent = contributeEvents.get(0);
118+
if (isSuccessTx(chainTaskId, contributeEvent.log, CONTRIBUTED)) {
119+
log.info("Contributed [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
120+
chainTaskId, contribution, contributeReceipt.getGasUsed(), contributeEvent.log);
121+
return contributeEvent;
122+
}
125123
}
126124

127125
log.error("Failed to contribute [chainTaskId:{}]", chainTaskId);
128126
return null;
129127
}
128+
// endregion
130129

131-
boolean isSuccessTx(String chainTaskId, BaseEventResponse txEvent, ChainContributionStatus pretendedStatus) {
132-
if (txEvent == null || txEvent.log == null) {
133-
return false;
134-
}
135-
136-
if (txEvent.log.getType() == null || txEvent.log.getType().equals(PENDING_RECEIPT_STATUS)) {
137-
return isStatusValidOnChainAfterPendingReceipt(chainTaskId, pretendedStatus, this::isContributionStatusValidOnChain);
138-
}
139-
140-
return true;
141-
}
142-
130+
// region reveal
143131
IexecHubContract.TaskRevealEventResponse reveal(String chainTaskId, String resultDigest) {
144132
try {
145133
return CompletableFuture.supplyAsync(() -> {
@@ -176,21 +164,21 @@ private IexecHubContract.TaskRevealEventResponse sendRevealTransaction(String ch
176164
.collect(Collectors.toList());
177165
log.debug("revealEvents count {} [chainTaskId:{}]", revealEvents.size(), chainTaskId);
178166

179-
IexecHubContract.TaskRevealEventResponse revealEvent = null;
180167
if (!revealEvents.isEmpty()) {
181-
revealEvent = revealEvents.get(0);
182-
}
183-
184-
if (isSuccessTx(chainTaskId, revealEvent, REVEALED)) {
185-
log.info("Revealed [chainTaskId:{}, resultDigest:{}, gasUsed:{}, log:{}]",
186-
chainTaskId, resultDigest, revealReceipt.getGasUsed(), revealEvent.log);
187-
return revealEvent;
168+
IexecHubContract.TaskRevealEventResponse revealEvent = revealEvents.get(0);
169+
if (isSuccessTx(chainTaskId, revealEvent.log, REVEALED)) {
170+
log.info("Revealed [chainTaskId:{}, resultDigest:{}, gasUsed:{}, log:{}]",
171+
chainTaskId, resultDigest, revealReceipt.getGasUsed(), revealEvent.log);
172+
return revealEvent;
173+
}
188174
}
189175

190176
log.error("Failed to reveal [chainTaskId:{}]", chainTaskId);
191177
return null;
192178
}
179+
// endregion reveal
193180

181+
// region contributeAndFinalize
194182
public Optional<ChainReceipt> contributeAndFinalize(Contribution contribution, String resultLink, String callbackData) {
195183
try {
196184
return CompletableFuture.supplyAsync(() -> {
@@ -238,72 +226,86 @@ private IexecHubContract.TaskFinalizeEventResponse sendContributeAndFinalizeTran
238226
.collect(Collectors.toList());
239227
log.debug("finalizeEvents count {} [chainTaskId:{}]", finalizeEvents.size(), chainTaskId);
240228

241-
IexecHubContract.TaskFinalizeEventResponse finalizeEvent = null;
242229
if (!finalizeEvents.isEmpty()) {
243-
finalizeEvent = finalizeEvents.get(0);
244-
}
245-
246-
if (isSuccessTx(chainTaskId, finalizeEvent, REVEALED)) {
247-
log.info("contributeAndFinalize done [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
248-
chainTaskId, contribution, receipt.getGasUsed(), finalizeEvent.log);
249-
return finalizeEvent;
230+
IexecHubContract.TaskFinalizeEventResponse finalizeEvent = finalizeEvents.get(0);
231+
if (isSuccessTx(chainTaskId, finalizeEvent.log, REVEALED)) {
232+
log.info("contributeAndFinalize done [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
233+
chainTaskId, contribution, receipt.getGasUsed(), finalizeEvent.log);
234+
return finalizeEvent;
235+
}
250236
}
251237

252238
log.error("contributeAndFinalize failed [chainTaskId:{}]", chainTaskId);
253239
return null;
254240
}
241+
// endregion
255242

243+
// region isSuccessTx
256244
private long getWaitingTransactionCount() {
257245
return executor.getTaskCount() - 1 - executor.getCompletedTaskCount();
258246
}
259247

260-
Optional<ChainContribution> getChainContribution(String chainTaskId) {
261-
return getChainContribution(chainTaskId, credentialsService.getCredentials().getAddress());
262-
}
263-
264-
Optional<ChainAccount> getChainAccount() {
265-
return getChainAccount(credentialsService.getCredentials().getAddress());
266-
}
267-
268-
public boolean hasEnoughGas() {
269-
return web3jService.hasEnoughGas(credentialsService.getCredentials().getAddress());
270-
}
248+
boolean isSuccessTx(String chainTaskId, Log eventLog, ChainContributionStatus pretendedStatus) {
249+
if (eventLog == null) {
250+
return false;
251+
}
271252

272-
public long getLatestBlockNumber() {
273-
return web3jService.getLatestBlockNumber();
274-
}
253+
log.info("event log type {}", eventLog.getType());
254+
if (PENDING_RECEIPT_STATUS.equals(eventLog.getType())) {
255+
return isStatusValidOnChainAfterPendingReceipt(chainTaskId, pretendedStatus);
256+
}
275257

276-
public long getMaxWaitingTimeWhenNotSync() {
277-
return web3jService.getMaxWaitingTimeWhenPendingReceipt();
258+
return true;
278259
}
279260

280-
private Boolean isContributionStatusValidOnChain(String chainTaskId, ChainStatus chainContributionStatus) {
281-
if (chainContributionStatus instanceof ChainContributionStatus) {
282-
Optional<ChainContribution> chainContribution = getChainContribution(chainTaskId);
283-
return chainContribution.isPresent() && chainContribution.get().getStatus().equals(chainContributionStatus);
284-
}
285-
return false;
261+
private boolean isContributionStatusValidOnChain(String chainTaskId, ChainContributionStatus chainContributionStatus) {
262+
Optional<ChainContribution> chainContribution = getChainContribution(chainTaskId);
263+
return chainContribution.isPresent() && chainContribution.get().getStatus() == chainContributionStatus;
286264
}
287265

288-
private boolean isBlockchainReadTrueWhenNodeNotSync(String chainTaskId, Function<String, Boolean> booleanBlockchainReadFunction) {
289-
long maxWaitingTime = web3jService.getMaxWaitingTimeWhenPendingReceipt();
290-
long startTime = System.currentTimeMillis();
266+
private boolean isStatusValidOnChainAfterPendingReceipt(String chainTaskId, ChainContributionStatus onchainStatus) {
267+
long maxWaitingTime = 10 * web3jService.getBlockTime().toMillis();
268+
log.info("Waiting for on-chain status after pending receipt " +
269+
"[chainTaskId:{}, status:{}, maxWaitingTime:{}]",
270+
chainTaskId, onchainStatus, maxWaitingTime);
291271

292-
for (long duration = 0L; duration < maxWaitingTime; duration = System.currentTimeMillis() - startTime) {
272+
final long startTime = System.currentTimeMillis();
273+
long duration = 0;
274+
while (duration < maxWaitingTime) {
293275
try {
294-
if (booleanBlockchainReadFunction.apply(chainTaskId)) {
276+
if (isContributionStatusValidOnChain(chainTaskId, onchainStatus)) {
295277
return true;
296278
}
297-
298-
Thread.sleep(500L);
279+
Thread.sleep(500);
299280
} catch (InterruptedException e) {
300-
log.error("Error in checking the latest block number [chainTaskId:{}, maxWaitingTime:{}]",
301-
chainTaskId, maxWaitingTime);
281+
log.error("Error in checking the latest block number", e);
282+
Thread.currentThread().interrupt();
302283
}
284+
duration = System.currentTimeMillis() - startTime;
303285
}
304286

287+
log.error("Timeout reached after waiting for on-chain status " +
288+
"[chainTaskId:{}, maxWaitingTime:{}]",
289+
chainTaskId, maxWaitingTime);
305290
return false;
306291
}
292+
// endregion
293+
294+
Optional<ChainContribution> getChainContribution(String chainTaskId) {
295+
return getChainContribution(chainTaskId, credentialsService.getCredentials().getAddress());
296+
}
297+
298+
Optional<ChainAccount> getChainAccount() {
299+
return getChainAccount(credentialsService.getCredentials().getAddress());
300+
}
301+
302+
public boolean hasEnoughGas() {
303+
return web3jService.hasEnoughGas(credentialsService.getCredentials().getAddress());
304+
}
305+
306+
public long getLatestBlockNumber() {
307+
return web3jService.getLatestBlockNumber();
308+
}
307309

308310
boolean isChainTaskActive(String chainTaskId) {
309311
Optional<ChainTask> chainTask = getChainTask(chainTaskId);

src/test/java/com/iexec/worker/chain/IexecHubServiceTests.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package com.iexec.worker.chain;
1818

1919
import com.iexec.common.contribution.Contribution;
20-
import com.iexec.commons.poco.chain.ChainReceipt;
21-
import com.iexec.commons.poco.chain.ChainTask;
22-
import com.iexec.commons.poco.chain.ChainTaskStatus;
20+
import com.iexec.commons.poco.chain.*;
2321
import com.iexec.commons.poco.contract.generated.IexecHubContract;
2422
import com.iexec.worker.config.BlockchainAdapterConfigurationService;
2523
import lombok.extern.slf4j.Slf4j;
@@ -269,6 +267,42 @@ void shouldNotContributeAndFinalizeWhenInterrupted() throws ExecutionException,
269267
}
270268
// endregion
271269

270+
// region isSuccessTx
271+
@ParameterizedTest
272+
@EnumSource(value = ChainContributionStatus.class)
273+
void shouldTxBeSuccess(ChainContributionStatus chainContributionStatus) {
274+
Log log = new Log();
275+
log.setType("");
276+
assertThat(iexecHubService.isSuccessTx(CHAIN_TASK_ID, log, chainContributionStatus)).isTrue();
277+
}
278+
279+
@ParameterizedTest
280+
@EnumSource(value = ChainContributionStatus.class)
281+
void shouldTxNotBeSuccessWhenLogIsNull(ChainContributionStatus chainContributionStatus) {
282+
assertThat(iexecHubService.isSuccessTx(CHAIN_TASK_ID, null, chainContributionStatus)).isFalse();
283+
}
284+
285+
@Test
286+
void shouldTxNotBeSuccessWhenTimeout() {
287+
Log log = new Log();
288+
log.setType("pending");
289+
when(web3jService.getBlockTime()).thenReturn(Duration.ofMillis(100L));
290+
doReturn(Optional.empty()).when(iexecHubService).getChainContribution(CHAIN_TASK_ID);
291+
assertThat(iexecHubService.isSuccessTx(CHAIN_TASK_ID, log, ChainContributionStatus.CONTRIBUTED)).isFalse();
292+
}
293+
294+
@ParameterizedTest
295+
@EnumSource(value = ChainContributionStatus.class)
296+
void test(ChainContributionStatus chainContributionStatus) {
297+
Log log = new Log();
298+
log.setType("pending");
299+
ChainContribution chainContribution = ChainContribution.builder().status(chainContributionStatus).build();
300+
when(web3jService.getBlockTime()).thenReturn(Duration.ofMillis(100L));
301+
doReturn(Optional.of(chainContribution)).when(iexecHubService).getChainContribution(CHAIN_TASK_ID);
302+
assertThat(iexecHubService.isSuccessTx(CHAIN_TASK_ID, log, chainContributionStatus)).isTrue();
303+
}
304+
// endregion
305+
272306
// region ChainTask status
273307
@ParameterizedTest
274308
@EnumSource(value = ChainTaskStatus.class, names = "ACTIVE")

0 commit comments

Comments
 (0)