Skip to content

Commit c5cf832

Browse files
author
Ugo Plouviez
authored
Merge pull request #193 from iExecBlockchainComputing/block
Check that the consensus reached block number is available to the wor…
2 parents de4b827 + c7732f3 commit c5cf832

File tree

9 files changed

+37
-46
lines changed

9 files changed

+37
-46
lines changed

src/main/java/com/iexec/worker/amnesia/AmnesiaRecoveryService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void recoverReplicate(InterruptedReplicateModel interruptedReplicate,
122122
case REVEAL:
123123
subscriptionService.subscribeToTopic(chainTaskId);
124124
resultService.saveResultInfo(chainTaskId, replicateModel);
125-
taskExecutorService.reveal(chainTaskId);
125+
taskExecutorService.reveal(chainTaskId, iexecHubService.getLatestBlockNumber());
126126
break;
127127

128128
case UPLOAD_RESULT:

src/main/java/com/iexec/worker/chain/ContributionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private boolean hasEnoughtStakeToContribute(ChainTask chainTask) {
7979
}
8080

8181
private boolean isTaskActiveToContribute(ChainTask chainTask) {
82-
return iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTask.getChainTaskId());
82+
return iexecHubService.isChainTaskActive(chainTask.getChainTaskId());
8383
}
8484

8585
private boolean isBeforeContributionDeadlineToContribute(ChainTask chainTask) {

src/main/java/com/iexec/worker/chain/IexecHubService.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -204,23 +204,7 @@ private boolean isBlockchainReadTrueWhenNodeNotSync(String chainTaskId, Function
204204
return false;
205205
}
206206

207-
boolean isChainTaskActiveWhenNodeNotSync(String chainTaskId) {
208-
boolean isChainTaskStatusActive = isBlockchainReadTrueWhenNodeNotSync(chainTaskId, this::isChainTaskActive);
209-
if (!isChainTaskStatusActive){
210-
log.error("ChainTask status is still not in 'active' stage after maxWaitingTime [chainTaskId:{}]", chainTaskId);
211-
}
212-
return isChainTaskStatusActive;
213-
}
214-
215-
boolean isChainTaskRevealingWhenNodeNotSync(String chainTaskId) {
216-
boolean isChainTaskStatusRevealing = isBlockchainReadTrueWhenNodeNotSync(chainTaskId, this::isChainTaskRevealing);
217-
if (!isChainTaskStatusRevealing){
218-
log.error("ChainTask status is still not in 'revealing' stage after maxWaitingTime [chainTaskId:{}]", chainTaskId);
219-
}
220-
return isChainTaskStatusRevealing;
221-
}
222-
223-
private Boolean isChainTaskActive(String chainTaskId){
207+
Boolean isChainTaskActive(String chainTaskId){
224208
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
225209
if (chainTask.isPresent()){
226210
switch (chainTask.get().getStatus()){
@@ -239,7 +223,7 @@ private Boolean isChainTaskActive(String chainTaskId){
239223
return false;
240224
}
241225

242-
private Boolean isChainTaskRevealing(String chainTaskId){
226+
public Boolean isChainTaskRevealing(String chainTaskId){
243227
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
244228
if (chainTask.isPresent()){
245229
switch (chainTask.get().getStatus()){

src/main/java/com/iexec/worker/chain/RevealService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,17 @@ public boolean canReveal(String chainTaskId) {
3030

3131
Optional<ChainTask> optionalChainTask = iexecHubService.getChainTask(chainTaskId);
3232
if (!optionalChainTask.isPresent()) {
33+
log.error("Task couldn't be retrieved [chainTaskId:{}]", chainTaskId);
3334
return false;
3435
}
3536
ChainTask chainTask = optionalChainTask.get();
3637

37-
boolean isChainTaskRevealing = iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId);
38+
boolean isChainTaskRevealing = iexecHubService.isChainTaskRevealing(chainTaskId);
3839
boolean isRevealDeadlineReached = chainTask.getRevealDeadline() < new Date().getTime();
3940

4041
Optional<ChainContribution> optionalContribution = iexecHubService.getChainContribution(chainTaskId);
4142
if (!optionalContribution.isPresent()) {
43+
log.error("Contribution couldn't be retrieved [chainTaskId:{}]", chainTaskId);
4244
return false;
4345
}
4446
ChainContribution chainContribution = optionalContribution.get();
@@ -65,7 +67,7 @@ public boolean canReveal(String chainTaskId) {
6567
log.info("All the conditions are valid for the reveal to happen [chainTaskId:{}]", chainTaskId);
6668
} else {
6769
log.warn("One or more conditions are not met for the reveal to happen [chainTaskId:{}, " +
68-
"isChainTaskRevealingWhenNodeNotSync:{}, isRevealDeadlineReached:{}, " +
70+
"isChainTaskRevealing:{}, isRevealDeadlineReached:{}, " +
6971
"isChainContributionStatusContributed:{}, isContributionResultHashConsensusValue:{}, " +
7072
"isContributionResultHashCorrect:{}, isContributionResultSealCorrect:{}]", chainTaskId,
7173
isChainTaskRevealing, isRevealDeadlineReached,

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.iexec.worker.chain.ContributionService;
1313
import com.iexec.worker.chain.IexecHubService;
1414
import com.iexec.worker.chain.RevealService;
15+
import com.iexec.worker.chain.Web3jService;
1516
import com.iexec.worker.config.WorkerConfigurationService;
1617
import com.iexec.worker.dataset.DatasetService;
1718
import com.iexec.worker.docker.DockerComputationService;
@@ -51,6 +52,7 @@ public class TaskExecutorService {
5152
private WorkerConfigurationService workerConfigurationService;
5253
private IexecHubService iexecHubService;
5354
private SmsService smsService;
55+
private Web3jService web3jService;
5456

5557
// internal variables
5658
private int maxNbExecutions;
@@ -64,17 +66,18 @@ public TaskExecutorService(DatasetService datasetService,
6466
RevealService revealService,
6567
WorkerConfigurationService workerConfigurationService,
6668
IexecHubService iexecHubService,
67-
SmsService smsService) {
69+
SmsService smsService,
70+
Web3jService web3jService) {
6871
this.datasetService = datasetService;
6972
this.dockerComputationService = dockerComputationService;
7073
this.resultService = resultService;
7174
this.contributionService = contributionService;
7275
this.customFeignClient = customFeignClient;
7376
this.revealService = revealService;
74-
this.customFeignClient = customFeignClient;
7577
this.workerConfigurationService = workerConfigurationService;
7678
this.iexecHubService = iexecHubService;
7779
this.smsService = smsService;
80+
this.web3jService = web3jService;
7881

7982
maxNbExecutions = Runtime.getRuntime().availableProcessors() - 1;
8083
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxNbExecutions);
@@ -244,8 +247,14 @@ public void contribute(ContributionAuthorization contribAuth) {
244247
}
245248

246249
@Async
247-
public void reveal(String chainTaskId) {
250+
public void reveal(String chainTaskId, long consensusBlock) {
248251
log.info("Trying to reveal [chainTaskId:{}]", chainTaskId);
252+
if (!web3jService.isBlockAvailable(consensusBlock)) {
253+
log.warn("Sync issues before canReveal (latestBlock before consensusBlock) [chainTaskId:{}, latestBlock:{}, " +
254+
"consensusBlock:{}]", chainTaskId, web3jService.getLatestBlockNumber(), consensusBlock);
255+
return;
256+
}
257+
249258
if (!revealService.canReveal(chainTaskId)) {
250259
log.warn("The worker will not be able to reveal [chainTaskId:{}]", chainTaskId);
251260
customFeignClient.updateReplicateStatus(chainTaskId, CANT_REVEAL);

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void handleTaskNotification(TaskNotification notif) {
191191
break;
192192

193193
case PLEASE_REVEAL:
194-
taskExecutorService.reveal(chainTaskId);
194+
taskExecutorService.reveal(chainTaskId, notif.getBlockNumber());
195195
break;
196196

197197
case PLEASE_UPLOAD:

src/test/java/com/iexec/worker/amnesia/AmnesiaRecoveryServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void shouldNotRecoverByRevealingWhenResultNotFound() {
161161
assertThat(recovered).isEmpty();
162162

163163
Mockito.verify(taskExecutorService, Mockito.times(0))
164-
.reveal(CHAIN_TASK_ID);
164+
.reveal(CHAIN_TASK_ID, blockNumber);
165165
}
166166

167167
@Test
@@ -179,7 +179,7 @@ public void shouldRecoverByRevealingWhenResultFound() {
179179
assertThat(recovered.get(0)).isEqualTo(CHAIN_TASK_ID);
180180

181181
Mockito.verify(taskExecutorService, Mockito.times(1))
182-
.reveal(CHAIN_TASK_ID);
182+
.reveal(CHAIN_TASK_ID, blockNumber);
183183
}
184184

185185
@Test

src/test/java/com/iexec/worker/chain/ContributionServiceTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@
99
import org.mockito.InjectMocks;
1010
import org.mockito.Mock;
1111
import org.mockito.MockitoAnnotations;
12-
import org.web3j.crypto.Sign;
1312

1413
import java.math.BigInteger;
1514
import java.util.Date;
1615
import java.util.Optional;
1716

18-
import static com.iexec.common.chain.ChainTaskStatus.ACTIVE;
19-
import static com.iexec.common.chain.ChainTaskStatus.REVEALING;
2017
import static com.iexec.worker.chain.ContributionService.computeResultHash;
2118
import static com.iexec.worker.chain.ContributionService.computeResultSeal;
2219
import static org.assertj.core.api.Java6Assertions.assertThat;
@@ -105,7 +102,7 @@ public void GetCanContributeStatusShouldReturnCanContribute() {
105102
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
106103
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
107104
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
108-
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
105+
when(iexecHubService.isChainTaskActive(chainTaskId)).thenReturn(true);
109106

110107
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
111108
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CAN_CONTRIBUTE)).isTrue();
@@ -127,7 +124,7 @@ public void GetCanContributeStatusShouldReturnStakeTooLoww() {
127124
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(0).build()));
128125
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
129126
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
130-
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
127+
when(iexecHubService.isChainTaskActive(chainTaskId)).thenReturn(true);
131128

132129
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
133130
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_STAKE_TOO_LOW)).isTrue();
@@ -149,7 +146,7 @@ public void GetCanContributeStatusShouldReturnTaskNotActive() {
149146
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
150147
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
151148
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
152-
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(false);
149+
when(iexecHubService.isChainTaskActive(chainTaskId)).thenReturn(false);
153150

154151
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
155152
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_TASK_NOT_ACTIVE)).isTrue();
@@ -171,7 +168,7 @@ public void GetCanContributeStatusShouldReturnAfterDeadline() {
171168
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
172169
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
173170
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
174-
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
171+
when(iexecHubService.isChainTaskActive(chainTaskId)).thenReturn(true);
175172

176173
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
177174
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_AFTER_DEADLINE)).isTrue();
@@ -193,7 +190,7 @@ public void GetCanContributeStatusShouldReturnContributed() {
193190
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
194191
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
195192
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.CONTRIBUTED).build()));
196-
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
193+
when(iexecHubService.isChainTaskActive(chainTaskId)).thenReturn(true);
197194

198195
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
199196
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_CONTRIBUTION_ALREADY_SET)).isTrue();

src/test/java/com/iexec/worker/chain/RevealServiceTests.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.iexec.common.chain.ChainContribution;
44
import com.iexec.common.chain.ChainContributionStatus;
55
import com.iexec.common.chain.ChainTask;
6-
import com.iexec.common.chain.ChainTaskStatus;
76
import com.iexec.common.contract.generated.IexecHubABILegacy;
87
import com.iexec.common.utils.HashUtils;
98
import com.iexec.worker.result.ResultService;
@@ -71,7 +70,7 @@ public void canRevealAllValid() {
7170
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
7271
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
7372
when(credentialsService.getCredentials()).thenReturn(credentials);
74-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
73+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
7574

7675
assertThat(revealService.canReveal(chainTaskId)).isTrue();
7776
}
@@ -102,7 +101,7 @@ public void cannotRevealSinceChainTaskStatusWrong() {
102101
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
103102
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
104103
when(credentialsService.getCredentials()).thenReturn(credentials);
105-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(false);
104+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(false);
106105

107106
assertThat(revealService.canReveal(chainTaskId)).isFalse();
108107
}
@@ -133,7 +132,7 @@ public void cannotRevealSinceRevealDeadlineReached() {
133132
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
134133
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
135134
when(credentialsService.getCredentials()).thenReturn(credentials);
136-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
135+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
137136

138137
assertThat(revealService.canReveal(chainTaskId)).isFalse();
139138
}
@@ -164,7 +163,7 @@ public void cannotRevealSinceChainContributionStatusWrong() {
164163
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
165164
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
166165
when(credentialsService.getCredentials()).thenReturn(credentials);
167-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
166+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
168167

169168
assertThat(revealService.canReveal(chainTaskId)).isFalse();
170169
}
@@ -195,7 +194,7 @@ public void cannotRevealSinceHashDoesntMatchConsensus() {
195194
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
196195
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
197196
when(credentialsService.getCredentials()).thenReturn(credentials);
198-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
197+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
199198

200199
assertThat(revealService.canReveal(chainTaskId)).isFalse();
201200
}
@@ -226,7 +225,7 @@ public void cannotRevealSinceContributionResultHashWrong() {
226225
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
227226
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
228227
when(credentialsService.getCredentials()).thenReturn(credentials);
229-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
228+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
230229

231230
assertThat(revealService.canReveal(chainTaskId)).isFalse();
232231
}
@@ -255,7 +254,7 @@ public void cannotRevealSinceContributionResultSealWrong() {
255254
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
256255
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn(deterministHash);
257256
when(credentialsService.getCredentials()).thenReturn(credentials);
258-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
257+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
259258

260259
assertThat(revealService.canReveal(chainTaskId)).isFalse();
261260
}
@@ -281,7 +280,7 @@ public void cannotRevealSinceCannotFindChainContribution() {
281280
.build());
282281
when(iexecHubService.getChainTask(chainTaskId)).thenReturn(optionalChainTask);
283282
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.empty());
284-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
283+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
285284

286285

287286
assertThat(revealService.canReveal(chainTaskId)).isFalse();
@@ -313,7 +312,7 @@ public void cannotRevealSinceDeterministHashIsEmpty() {
313312
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(optionalChainContribution);
314313
when(resultService.getDeterministHashForTask(chainTaskId)).thenReturn("");
315314
when(credentialsService.getCredentials()).thenReturn(credentials);
316-
when(iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId)).thenReturn(true);
315+
when(iexecHubService.isChainTaskRevealing(chainTaskId)).thenReturn(true);
317316

318317
assertThat(revealService.canReveal(chainTaskId)).isFalse();
319318
}

0 commit comments

Comments
 (0)