Skip to content

Commit 621135c

Browse files
authored
Merge pull request #456 from iExecBlockchainComputing/release/6.3.0
Release version 6.3.0
2 parents 69b2ed0 + e271448 commit 621135c

File tree

11 files changed

+109
-65
lines changed

11 files changed

+109
-65
lines changed

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=6.2.0
2-
iexecCommonVersion=5.6.0
1+
version=6.3.0
2+
iexecCommonVersion=5.7.0
33
nexusUser=fake
44
nexusPassword=fake

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.iexec.common.replicate.ReplicateStatus;
2424
import com.iexec.common.replicate.ReplicateStatusDetails;
2525
import com.iexec.common.replicate.ReplicateStatusUpdate;
26+
import com.iexec.common.task.TaskAbortCause;
2627
import com.iexec.core.chain.SignatureService;
2728
import com.iexec.core.chain.Web3jService;
2829
import com.iexec.core.detector.task.ContributionTimeoutTaskDetector;
@@ -178,36 +179,37 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
178179
return Optional.empty();
179180
}
180181

181-
private boolean isFewBlocksAfterInitialization(Task task) {
182-
long lastBlock = web3jService.getLatestBlockNumber();
183-
long initializationBlock = task.getInitializationBlockNumber();
184-
boolean isFewBlocksAfterInitialization = lastBlock >= initializationBlock + 2;
185-
return lastBlock > 0 && initializationBlock > 0 && isFewBlocksAfterInitialization;
186-
}
187-
182+
/**
183+
* Get notifications missed by the worker during the time it was absent.
184+
*
185+
* @param blockNumber last seen blocknumber by the worker
186+
* @param walletAddress of the worker
187+
* @return list of missed notifications. Can be empty if no notification is found
188+
*/
188189
public List<TaskNotification> getMissedTaskNotifications(long blockNumber, String walletAddress) {
189-
190190
List<String> chainTaskIdList = workerService.getChainTaskIds(walletAddress);
191191
List<Task> tasksWithWorkerParticipation = taskService.getTasksByChainTaskIds(chainTaskIdList);
192192
List<TaskNotification> taskNotifications = new ArrayList<>();
193-
194193
for (Task task : tasksWithWorkerParticipation) {
195194
String chainTaskId = task.getChainTaskId();
196195

197196
Optional<Replicate> oReplicate = replicatesService.getReplicate(chainTaskId, walletAddress);
198-
if (!oReplicate.isPresent()) continue;
199-
197+
if (!oReplicate.isPresent()) {
198+
continue;
199+
}
200200
Replicate replicate = oReplicate.get();
201-
202201
boolean isRecoverable = replicate.isRecoverable();
203-
if (!isRecoverable) continue;
204-
202+
if (!isRecoverable) {
203+
continue;
204+
}
205205
String enclaveChallenge = smsService.getEnclaveChallenge(chainTaskId, task.isTeeTask());
206-
if (task.isTeeTask() && enclaveChallenge.isEmpty()) continue;
207-
206+
if (task.isTeeTask() && enclaveChallenge.isEmpty()) {
207+
continue;
208+
}
208209
Optional<TaskNotificationType> taskNotificationType = getTaskNotificationType(task, replicate, blockNumber);
209-
if (!taskNotificationType.isPresent()) continue;
210-
210+
if (!taskNotificationType.isPresent()) {
211+
continue;
212+
}
211213
TaskNotificationExtra taskNotificationExtra =
212214
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
213215

@@ -240,6 +242,8 @@ private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificati
240242
case PLEASE_REVEAL:
241243
taskNotificationExtra.setBlockNumber(task.getConsensusReachedBlockNumber());
242244
break;
245+
case PLEASE_ABORT:
246+
taskNotificationExtra.setTaskAbortCause(getTaskAbortCause(task));
243247
default:
244248
break;
245249
}
@@ -251,13 +255,11 @@ public Optional<TaskNotificationType> getTaskNotificationType(Task task, Replica
251255
if (task.inContributionPhase()) {
252256
return recoverReplicateInContributionPhase(task, replicate, blockNumber);
253257
}
254-
255-
if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT)) {
256-
return Optional.of(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT);
257-
}
258-
259-
if (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED) && !replicate.containsContributedStatus()) {
260-
return Optional.of(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED);
258+
// CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution
259+
if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT)
260+
|| (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED)
261+
&& !replicate.containsContributedStatus())) {
262+
return Optional.of(TaskNotificationType.PLEASE_ABORT);
261263
}
262264

263265
Optional<TaskNotificationType> oRecoveryAction = Optional.empty();
@@ -449,4 +451,14 @@ private Optional<TaskNotificationType> recoverReplicateIfRevealed(Replicate repl
449451
return Optional.empty();
450452
}
451453

454+
private TaskAbortCause getTaskAbortCause(Task task) {
455+
switch (task.getCurrentStatus()) {
456+
case CONSENSUS_REACHED:
457+
return TaskAbortCause.CONSENSUS_REACHED;
458+
case CONTRIBUTION_TIMEOUT:
459+
return TaskAbortCause.CONTRIBUTION_TIMEOUT;
460+
default:
461+
return TaskAbortCause.UNKNOWN;
462+
}
463+
}
452464
}

src/main/java/com/iexec/core/task/TaskService.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616

1717
package com.iexec.core.task;
1818

19-
import com.iexec.common.chain.ChainTask;
20-
import com.iexec.common.chain.ChainTaskStatus;
21-
import com.iexec.core.chain.IexecHubService;
22-
import com.iexec.core.replicate.ReplicatesService;
2319
import lombok.extern.slf4j.Slf4j;
2420
import org.springframework.stereotype.Service;
2521

@@ -40,16 +36,9 @@ public class TaskService {
4036
taskAccessForNewReplicateLock = new ConcurrentHashMap<>();
4137

4238
private final TaskRepository taskRepository;
43-
private final IexecHubService iexecHubService;
44-
private final ReplicatesService replicatesService;
4539

46-
public TaskService(
47-
TaskRepository taskRepository,
48-
IexecHubService iexecHubService,
49-
ReplicatesService replicatesService) {
40+
public TaskService(TaskRepository taskRepository) {
5041
this.taskRepository = taskRepository;
51-
this.iexecHubService = iexecHubService;
52-
this.replicatesService = replicatesService;
5342
}
5443

5544
/**

src/main/java/com/iexec/core/task/TaskUpdateManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void updateTask(String chainTaskId) {
127127
toFailed(task);
128128
break;
129129
case CONSENSUS_REACHED:
130-
consensusReached2AtLeastOneReveal2UploadRequested(task);
130+
consensusReached2AtLeastOneReveal2ResultUploading(task);
131131
consensusReached2Reopening(task);
132132
break;
133133
case CONTRIBUTION_TIMEOUT:
@@ -367,7 +367,7 @@ void running2RunningFailed(Task task) {
367367
.build());
368368
}
369369

370-
void consensusReached2AtLeastOneReveal2UploadRequested(Task task) {
370+
void consensusReached2AtLeastOneReveal2ResultUploading(Task task) {
371371
boolean condition1 = task.getCurrentStatus().equals(CONSENSUS_REACHED);
372372
boolean condition2 = replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.REVEALED) > 0;
373373

@@ -496,7 +496,8 @@ void resultUploading2UploadTimeout(Task task) {
496496
void requestUpload(Task task) {
497497
boolean isThereAWorkerUploading = replicatesService
498498
.getNbReplicatesWithCurrentStatus(task.getChainTaskId(),
499-
ReplicateStatus.RESULT_UPLOADING) > 0;
499+
ReplicateStatus.RESULT_UPLOADING,
500+
ReplicateStatus.RESULT_UPLOAD_REQUESTED) > 0;
500501

501502
if (isThereAWorkerUploading) {
502503
log.info("Upload is requested but an upload is already in process. [chainTaskId: {}]", task.getChainTaskId());

src/main/java/com/iexec/core/task/listener/ReplicateListeners.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.iexec.core.detector.replicate.ContributionUnnotifiedDetector;
2323
import com.iexec.core.replicate.ReplicateUpdatedEvent;
2424
import com.iexec.core.replicate.ReplicatesService;
25-
import com.iexec.core.task.TaskService;
2625
import com.iexec.core.task.TaskUpdateManager;
2726
import com.iexec.core.worker.WorkerService;
2827
import lombok.extern.slf4j.Slf4j;

src/main/java/com/iexec/core/task/listener/TaskListeners.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import com.iexec.common.notification.TaskNotification;
2020
import com.iexec.common.notification.TaskNotificationExtra;
2121
import com.iexec.common.notification.TaskNotificationType;
22+
import com.iexec.common.task.TaskAbortCause;
2223
import com.iexec.core.pubsub.NotificationService;
2324
import com.iexec.core.replicate.Replicate;
2425
import com.iexec.core.replicate.ReplicatesService;
2526
import com.iexec.core.task.Task;
26-
import com.iexec.core.task.TaskService;
2727
import com.iexec.core.task.TaskUpdateManager;
2828
import com.iexec.core.task.event.*;
2929
import com.iexec.core.worker.WorkerService;
@@ -75,7 +75,10 @@ public void onTaskContributionTimeout(ContributionTimeoutEvent event) {
7575
notificationService.sendTaskNotification(TaskNotification.builder()
7676
.chainTaskId(chainTaskId)
7777
.workersAddress(workerAddresses)
78-
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT)
78+
.taskNotificationType(TaskNotificationType.PLEASE_ABORT)
79+
.taskNotificationExtra(TaskNotificationExtra.builder()
80+
.taskAbortCause(TaskAbortCause.CONTRIBUTION_TIMEOUT)
81+
.build())
7982
.build());
8083
log.info("NotifyAbortContributionTimeout completed[workerAddresses:{}]", workerAddresses);
8184
}
@@ -114,9 +117,13 @@ public void onTaskConsensusReached(ConsensusReachedEvent event) {
114117
// losers: please abort
115118
if (!losers.isEmpty()) {
116119
notificationService.sendTaskNotification(TaskNotification.builder()
117-
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED)
118120
.chainTaskId(chainTaskId)
119-
.workersAddress(losers).build()
121+
.workersAddress(losers)
122+
.taskNotificationType(TaskNotificationType.PLEASE_ABORT)
123+
.taskNotificationExtra(TaskNotificationExtra.builder()
124+
.taskAbortCause(TaskAbortCause.CONSENSUS_REACHED)
125+
.build())
126+
.build()
120127
);
121128
}
122129
}

src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ cron:
3131

3232
workers:
3333
askForReplicatePeriod: ${IEXEC_ASK_REPLICATE_PERIOD:5000}
34-
requiredWorkerVersion: ${IEXEC_CORE_REQUIRED_WORKER_VERSION:6.1.0} #leave empty will allow any worker version
34+
requiredWorkerVersion: ${IEXEC_CORE_REQUIRED_WORKER_VERSION:} #leave empty will allow any worker version
3535
# the whitelist format should be as follow (comma separated on one or multiple lines:
3636
# whitelist: ${IEXEC_WHITELIST:address1,
3737
# address2,

src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import com.iexec.common.chain.WorkerpoolAuthorization;
2020
import com.iexec.common.notification.TaskNotification;
21+
import com.iexec.common.notification.TaskNotificationExtra;
2122
import com.iexec.common.notification.TaskNotificationType;
2223
import com.iexec.common.replicate.ReplicateStatus;
2324
import com.iexec.common.replicate.ReplicateStatusDetails;
2425
import com.iexec.common.replicate.ReplicateStatusModifier;
2526
import com.iexec.common.replicate.ReplicateStatusUpdate;
27+
import com.iexec.common.task.TaskAbortCause;
2628
import com.iexec.common.utils.BytesUtils;
2729
import com.iexec.core.chain.SignatureService;
2830
import com.iexec.core.chain.Web3jService;
@@ -728,7 +730,9 @@ public void shouldTellReplicateToAbortSinceContributionTimeout() {
728730

729731
assertThat(missedTaskNotifications).isNotEmpty();
730732
TaskNotificationType taskNotificationType = missedTaskNotifications.get(0).getTaskNotificationType();
731-
assertThat(taskNotificationType).isEqualTo(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT);
733+
assertThat(taskNotificationType).isEqualTo(TaskNotificationType.PLEASE_ABORT);
734+
TaskNotificationExtra notificationExtra = missedTaskNotifications.get(0).getTaskNotificationExtra();
735+
assertThat(notificationExtra.getTaskAbortCause()).isEqualTo(TaskAbortCause.CONTRIBUTION_TIMEOUT);
732736

733737
Mockito.verify(replicatesService, Mockito.times(1))
734738
.updateReplicateStatus(anyString(), anyString(), any(ReplicateStatusUpdate.class)); // RECOVERING
@@ -754,7 +758,9 @@ public void shouldTellReplicateToWaitSinceConsensusReachedAndItDidNotContribute(
754758

755759
assertThat(missedTaskNotifications).isNotEmpty();
756760
TaskNotificationType taskNotificationType = missedTaskNotifications.get(0).getTaskNotificationType();
757-
assertThat(taskNotificationType).isEqualTo(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED);
761+
assertThat(taskNotificationType).isEqualTo(TaskNotificationType.PLEASE_ABORT);
762+
TaskNotificationExtra notificationExtra = missedTaskNotifications.get(0).getTaskNotificationExtra();
763+
assertThat(notificationExtra.getTaskAbortCause()).isEqualTo(TaskAbortCause.CONSENSUS_REACHED);
758764

759765
Mockito.verify(replicatesService, Mockito.times(1))
760766
.updateReplicateStatus(anyString(), anyString(), any(ReplicateStatusUpdate.class)); // RECOVERING

src/test/java/com/iexec/core/task/TaskUpdateManagerTest.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,11 @@ public void shouldRequestUpload() {
16021602

16031603
when(taskService.getTaskByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task));
16041604
when(taskRepository.save(task)).thenReturn(task);
1605-
when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID,ReplicateStatus.RESULT_UPLOADING)).thenReturn(0);
1605+
when(replicatesService.getNbReplicatesWithCurrentStatus(
1606+
CHAIN_TASK_ID,
1607+
ReplicateStatus.RESULT_UPLOADING,
1608+
ReplicateStatus.RESULT_UPLOAD_REQUESTED)
1609+
).thenReturn(0);
16061610
when(replicatesService.getRandomReplicateWithRevealStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate));
16071611
doNothing().when(replicatesService).updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, ReplicateStatus.RESULT_UPLOAD_REQUESTED);
16081612
doNothing().when(applicationEventPublisher).publishEvent(any());
@@ -1620,7 +1624,11 @@ public void shouldNotRequestUpload() {
16201624
task.changeStatus(AT_LEAST_ONE_REVEALED);
16211625
task.setChainTaskId(CHAIN_TASK_ID);
16221626

1623-
when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID,ReplicateStatus.RESULT_UPLOADING)).thenReturn(0);
1627+
when(replicatesService.getNbReplicatesWithCurrentStatus(
1628+
CHAIN_TASK_ID,
1629+
ReplicateStatus.RESULT_UPLOADING,
1630+
ReplicateStatus.RESULT_UPLOAD_REQUESTED)
1631+
).thenReturn(0);
16241632
// For example, this could happen if replicate is lost after having revealed.
16251633
when(replicatesService.getRandomReplicateWithRevealStatus(CHAIN_TASK_ID)).thenReturn(Optional.empty());
16261634

@@ -1634,7 +1642,7 @@ public void shouldNotRequestUpload() {
16341642
}
16351643

16361644
@Test
1637-
public void shouldRequestUploadSinceUploadInProgress() {
1645+
public void shouldNotRequestUploadSinceUploadInProgress() {
16381646
Task task = getStubTask(maxExecutionTime);
16391647
task.setChainTaskId(CHAIN_TASK_ID);
16401648
task.changeStatus(AT_LEAST_ONE_REVEALED);
@@ -1644,11 +1652,17 @@ public void shouldRequestUploadSinceUploadInProgress() {
16441652

16451653
when(taskService.getTaskByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task));
16461654
when(taskRepository.save(task)).thenReturn(task);
1647-
when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADING)).thenReturn(1);
1655+
when(replicatesService.getNbReplicatesWithCurrentStatus(
1656+
CHAIN_TASK_ID,
1657+
ReplicateStatus.RESULT_UPLOADING,
1658+
ReplicateStatus.RESULT_UPLOAD_REQUESTED)
1659+
).thenReturn(1);
16481660

16491661
taskUpdateManager.requestUpload(task);
16501662

16511663
assertThat(task.getCurrentStatus()).isEqualTo(AT_LEAST_ONE_REVEALED);
1664+
verify(replicatesService, Mockito.times(0))
1665+
.getRandomReplicateWithRevealStatus(CHAIN_TASK_ID);
16521666
verify(replicatesService, Mockito.times(0))
16531667
.updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, ReplicateStatus.RESULT_UPLOAD_REQUESTED);
16541668
verify(applicationEventPublisher, Mockito.times(0))

0 commit comments

Comments
 (0)