Skip to content

Commit e4ebcbe

Browse files
committed
Merge branch 'master' into log
2 parents f107ba9 + e473af3 commit e4ebcbe

File tree

9 files changed

+128
-30
lines changed

9 files changed

+128
-30
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
iexecCommonVersion=3.0.0
22
nexusUser=fake
33
nexusPassword=fake
4-
version=3.0.0
4+
version=3.0.1-SNAPSHOT

src/main/java/com/iexec/worker/chain/ContributionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private boolean hasEnoughtStakeToContribute(ChainTask chainTask) {
7979
}
8080

8181
private boolean isTaskActiveToContribute(ChainTask chainTask) {
82-
return chainTask.getStatus().equals(ChainTaskStatus.ACTIVE);
82+
return iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTask.getChainTaskId());
8383
}
8484

8585
private boolean isBeforeContributionDeadlineToContribute(ChainTask chainTask) {

src/main/java/com/iexec/worker/chain/IexecHubService.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,16 @@
1010
import org.springframework.beans.factory.annotation.Autowired;
1111
import org.springframework.stereotype.Service;
1212
import org.web3j.protocol.Web3j;
13-
import org.web3j.protocol.core.DefaultBlockParameterName;
1413
import org.web3j.protocol.core.RemoteCall;
1514
import org.web3j.protocol.core.methods.response.TransactionReceipt;
1615

17-
import java.io.IOException;
1816
import java.util.List;
1917
import java.util.Optional;
2018
import java.util.concurrent.CompletableFuture;
2119
import java.util.concurrent.ExecutionException;
2220
import java.util.concurrent.Executors;
2321
import java.util.concurrent.ThreadPoolExecutor;
22+
import java.util.function.Function;
2423

2524
import static com.iexec.common.chain.ChainContributionStatus.CONTRIBUTED;
2625
import static com.iexec.common.chain.ChainContributionStatus.REVEALED;
@@ -173,6 +172,10 @@ public long getLatestBlockNumber() {
173172
return web3jService.getLatestBlockNumber();
174173
}
175174

175+
public long getMaxWaitingTimeWhenNotSync() {
176+
return web3jService.getMaxWaitingTimeWhenPendingReceipt();
177+
}
178+
176179
private Boolean isContributionStatusValidOnChain(String chainTaskId, ChainStatus chainContributionStatus) {
177180
if (chainContributionStatus instanceof ChainContributionStatus) {
178181
Optional<ChainContribution> chainContribution = getChainContribution(chainTaskId);
@@ -181,4 +184,78 @@ private Boolean isContributionStatusValidOnChain(String chainTaskId, ChainStatus
181184
return false;
182185
}
183186

187+
private boolean isBlockchainReadTrueWhenNodeNotSync(String chainTaskId, Function<String, Boolean> booleanBlockchainReadFunction) {
188+
long maxWaitingTime = web3jService.getMaxWaitingTimeWhenPendingReceipt();
189+
long startTime = System.currentTimeMillis();
190+
191+
for(long duration = 0L; duration < maxWaitingTime; duration = System.currentTimeMillis() - startTime) {
192+
try {
193+
if (booleanBlockchainReadFunction.apply(chainTaskId)) {
194+
return true;
195+
}
196+
197+
Thread.sleep(500L);
198+
} catch (InterruptedException e) {
199+
log.error("Error in checking the latest block number [chainTaskId:{}, maxWaitingTime:{}]",
200+
chainTaskId, maxWaitingTime);
201+
}
202+
}
203+
204+
return false;
205+
}
206+
207+
boolean isChainTaskActiveWhenNodeNotSync(String chainTaskId) {
208+
boolean isChainTaskStatusActive = isBlockchainReadTrueWhenNodeNotSync(chainTaskId, this::isChainTaskActive);
209+
if (!isChainTaskStatusActive){
210+
log.error("ChainTask status is still not in 'active' stage after maxWaitingTime [chainTaskId:{}]", chainTaskId);
211+
}
212+
return isChainTaskStatusActive;
213+
}
214+
215+
boolean isChainTaskRevealingWhenNodeNotSync(String chainTaskId) {
216+
boolean isChainTaskStatusRevealing = isBlockchainReadTrueWhenNodeNotSync(chainTaskId, this::isChainTaskRevealing);
217+
if (!isChainTaskStatusRevealing){
218+
log.error("ChainTask status is still not in 'revealing' stage after maxWaitingTime [chainTaskId:{}]", chainTaskId);
219+
}
220+
return isChainTaskStatusRevealing;
221+
}
222+
223+
private Boolean isChainTaskActive(String chainTaskId){
224+
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
225+
if (chainTask.isPresent()){
226+
switch (chainTask.get().getStatus()){
227+
case UNSET:
228+
break;//Could happen if node not synchronized. Should wait.
229+
case ACTIVE:
230+
return true;
231+
case REVEALING:
232+
return false;
233+
case COMPLETED:
234+
return false;
235+
case FAILLED:
236+
return false;
237+
}
238+
}
239+
return false;
240+
}
241+
242+
private Boolean isChainTaskRevealing(String chainTaskId){
243+
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
244+
if (chainTask.isPresent()){
245+
switch (chainTask.get().getStatus()){
246+
case UNSET:
247+
break;//Should not happen
248+
case ACTIVE:
249+
break;//Could happen if node not synchronized. Should wait.
250+
case REVEALING:
251+
return true;
252+
case COMPLETED:
253+
return false;
254+
case FAILLED:
255+
return false;
256+
}
257+
}
258+
return false;
259+
}
260+
184261
}

src/main/java/com/iexec/worker/chain/RevealService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public boolean canReveal(String chainTaskId) {
3434
}
3535
ChainTask chainTask = optionalChainTask.get();
3636

37-
boolean isChainTaskStatusRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING);
37+
boolean isChainTaskRevealing = iexecHubService.isChainTaskRevealingWhenNodeNotSync(chainTaskId);
3838
boolean isRevealDeadlineReached = chainTask.getRevealDeadline() < new Date().getTime();
3939

4040
Optional<ChainContribution> optionalContribution = iexecHubService.getChainContribution(chainTaskId);
@@ -57,18 +57,18 @@ public boolean canReveal(String chainTaskId) {
5757
);
5858
}
5959

60-
boolean ret = isChainTaskStatusRevealing && !isRevealDeadlineReached &&
60+
boolean ret = isChainTaskRevealing && !isRevealDeadlineReached &&
6161
isChainContributionStatusContributed && isContributionResultHashConsensusValue &&
6262
isContributionResultHashCorrect && isContributionResultSealCorrect;
6363

6464
if (ret) {
6565
log.info("All the conditions are valid for the reveal to happen [chainTaskId:{}]", chainTaskId);
6666
} else {
6767
log.warn("One or more conditions are not met for the reveal to happen [chainTaskId:{}, " +
68-
"isChainTaskStatusRevealing:{}, isRevealDeadlineReached:{}, " +
68+
"isChainTaskRevealingWhenNodeNotSync:{}, isRevealDeadlineReached:{}, " +
6969
"isChainContributionStatusContributed:{}, isContributionResultHashConsensusValue:{}, " +
7070
"isContributionResultHashCorrect:{}, isContributionResultSealCorrect:{}]", chainTaskId,
71-
isChainTaskStatusRevealing, isRevealDeadlineReached,
71+
isChainTaskRevealing, isRevealDeadlineReached,
7272
isChainContributionStatusContributed, isContributionResultHashConsensusValue,
7373
isContributionResultHashCorrect, isContributionResultSealCorrect);
7474
}

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ public void contribute(ContributionAuthorization contribAuth) {
229229
return;
230230
}
231231

232+
if (oChainReceipt.get().getBlockNumber() == 0) {
233+
log.warn("The blocknumber of the receipt is equal to 0, the CONTRIBUTED status will not be " +
234+
"sent to the core [chainTaskId:{}]", chainTaskId);
235+
return;
236+
}
237+
232238
customFeignClient.updateReplicateStatus(chainTaskId, CONTRIBUTED,
233239
ReplicateDetails.builder().chainReceipt(oChainReceipt.get()).build());
234240
}
@@ -239,6 +245,7 @@ public void reveal(String chainTaskId) {
239245
if (!revealService.canReveal(chainTaskId)) {
240246
log.warn("The worker will not be able to reveal [chainTaskId:{}]", chainTaskId);
241247
customFeignClient.updateReplicateStatus(chainTaskId, CANT_REVEAL);
248+
return;
242249
}
243250

244251
if (!revealService.hasEnoughGas()) {
@@ -256,6 +263,12 @@ public void reveal(String chainTaskId) {
256263
return;
257264
}
258265

266+
if (optionalChainReceipt.get().getBlockNumber() == 0) {
267+
log.warn("The blocknumber of the receipt is equal to 0, the REVEALED status will not be " +
268+
"sent to the core [chainTaskId:{}]", chainTaskId);
269+
return;
270+
}
271+
259272
customFeignClient.updateReplicateStatus(chainTaskId, REVEALED,
260273
ReplicateDetails.builder().chainReceipt(optionalChainReceipt.get()).build());
261274
}

src/main/java/com/iexec/worker/replicate/ReplicateDemandService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,18 @@ public ReplicateDemandService(TaskExecutorService taskExecutorService,
4545
@Scheduled(fixedRateString = "#{publicConfigurationService.askForReplicatePeriod}")
4646
public void askForReplicate() {
4747
// check if the worker can run a task or not
48-
long lastAvailableBlockNumber = iexecHubService.getLatestBlockNumber();
49-
if (!taskExecutorService.canAcceptMoreReplicates() && lastAvailableBlockNumber == 0) {
48+
if (!taskExecutorService.canAcceptMoreReplicates()) {
5049
log.info("The worker is already full, it can't accept more tasks");
5150
return;
5251
}
5352

53+
long lastAvailableBlockNumber = iexecHubService.getLatestBlockNumber();
54+
if (lastAvailableBlockNumber == 0) {
55+
log.error("Can't askForReplicate, your blockchain node seams unsync [lastAvailableBlockNumber:{}]",
56+
lastAvailableBlockNumber);
57+
return;
58+
}
59+
5460
Optional<ContributionAuthorization> oContributionAuth =
5561
customFeignClient.getAvailableReplicate(lastAvailableBlockNumber);
5662

src/main/java/com/iexec/worker/result/ResultRepoService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
import com.iexec.common.result.eip712.Eip712Challenge;
55
import com.iexec.worker.config.PublicConfigurationService;
66
import com.iexec.worker.feign.ResultRepoClient;
7-
87
import feign.FeignException;
98
import lombok.extern.slf4j.Slf4j;
10-
119
import org.springframework.http.ResponseEntity;
10+
import org.springframework.retry.annotation.Backoff;
1211
import org.springframework.retry.annotation.Recover;
1312
import org.springframework.retry.annotation.Retryable;
1413
import org.springframework.stereotype.Service;
@@ -31,7 +30,7 @@ public ResultRepoService(ResultRepoClient resultRepoClient,
3130

3231
@Retryable(value = FeignException.class)
3332
public Optional<Eip712Challenge> getChallenge() {
34-
return Optional.of(resultRepoClient.getChallenge(publicConfigurationService.getChainId()));
33+
return Optional.of(resultRepoClient.getChallenge(publicConfigurationService.getChainId()));
3534
}
3635

3736
@Recover
@@ -41,19 +40,21 @@ public Optional<Eip712Challenge> getResultRepoChallenge(FeignException e, Intege
4140
return Optional.empty();
4241
}
4342

44-
@Retryable(value = FeignException.class)
43+
@Retryable(value = FeignException.class,
44+
maxAttempts = 5,
45+
backoff = @Backoff(delay = 3000))
4546
public String uploadResult(String authorizationToken, ResultModel resultModel) {
4647
ResponseEntity<String> responseEntity =
4748
resultRepoClient.uploadResult(authorizationToken, resultModel);
48-
49+
4950
return responseEntity.getStatusCode().is2xxSuccessful()
5051
? responseEntity.getBody()
5152
: "";
5253
}
5354

5455
@Recover
5556
public String uploadResult(FeignException e, String authorizationToken, ResultModel resultModel) {
56-
log.error("Failed to upload result [attempts:3]");
57+
log.error("Failed to upload result [attempts:5]");
5758
e.printStackTrace();
5859
return "";
5960
}

src/test/java/com/iexec/worker/chain/ContributionServiceTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public void GetCanContributeStatusShouldReturnCanContribute() {
9696
ChainTask chainTask = ChainTask.builder()
9797
.dealid(chainDealId)
9898
.idx(0)
99-
.status(ACTIVE)
10099
.contributionDeadline(new Date().getTime() + 1000)
101100
.build();
102101

@@ -106,6 +105,7 @@ public void GetCanContributeStatusShouldReturnCanContribute() {
106105
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
107106
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
108107
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
108+
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
109109

110110
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
111111
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CAN_CONTRIBUTE)).isTrue();
@@ -118,7 +118,6 @@ public void GetCanContributeStatusShouldReturnStakeTooLoww() {
118118
ChainTask chainTask = ChainTask.builder()
119119
.dealid(chainDealId)
120120
.idx(0)
121-
.status(ACTIVE)
122121
.contributionDeadline(new Date().getTime() + 1000)
123122
.build();
124123

@@ -128,6 +127,7 @@ public void GetCanContributeStatusShouldReturnStakeTooLoww() {
128127
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(0).build()));
129128
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
130129
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
130+
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
131131

132132
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
133133
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_STAKE_TOO_LOW)).isTrue();
@@ -140,7 +140,6 @@ public void GetCanContributeStatusShouldReturnTaskNotActive() {
140140
ChainTask chainTask = ChainTask.builder()
141141
.dealid(chainDealId)
142142
.idx(0)
143-
.status(REVEALING)
144143
.contributionDeadline(new Date().getTime() + 1000)
145144
.build();
146145

@@ -150,6 +149,7 @@ public void GetCanContributeStatusShouldReturnTaskNotActive() {
150149
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
151150
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
152151
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
152+
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(false);
153153

154154
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
155155
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_TASK_NOT_ACTIVE)).isTrue();
@@ -162,7 +162,6 @@ public void GetCanContributeStatusShouldReturnAfterDeadline() {
162162
ChainTask chainTask = ChainTask.builder()
163163
.dealid(chainDealId)
164164
.idx(0)
165-
.status(ACTIVE)
166165
.contributionDeadline(new Date().getTime() - 1000)
167166
.build();
168167

@@ -172,6 +171,7 @@ public void GetCanContributeStatusShouldReturnAfterDeadline() {
172171
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
173172
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
174173
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.UNSET).build()));
174+
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
175175

176176
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
177177
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_AFTER_DEADLINE)).isTrue();
@@ -184,7 +184,6 @@ public void GetCanContributeStatusShouldReturnContributed() {
184184
ChainTask chainTask = ChainTask.builder()
185185
.dealid(chainDealId)
186186
.idx(0)
187-
.status(ACTIVE)
188187
.contributionDeadline(new Date().getTime() + 1000)
189188
.build();
190189

@@ -194,6 +193,7 @@ public void GetCanContributeStatusShouldReturnContributed() {
194193
when(iexecHubService.getChainAccount()).thenReturn(Optional.of(ChainAccount.builder().deposit(1000).build()));
195194
when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().workerStake(BigInteger.valueOf(5)).build()));
196195
when(iexecHubService.getChainContribution(chainTaskId)).thenReturn(Optional.of(ChainContribution.builder().status(ChainContributionStatus.CONTRIBUTED).build()));
196+
when(iexecHubService.isChainTaskActiveWhenNodeNotSync(chainTaskId)).thenReturn(true);
197197

198198
assertThat(contributionService.getCanContributeStatus(chainTaskId).isPresent()).isTrue();
199199
assertThat(contributionService.getCanContributeStatus(chainTaskId).get().equals(ReplicateStatus.CANT_CONTRIBUTE_SINCE_CONTRIBUTION_ALREADY_SET)).isTrue();

0 commit comments

Comments
 (0)