Skip to content

Commit e1c3be7

Browse files
author
Jérémy James Toussaint
committed
Only detect worker lost when possible
1 parent a2891e8 commit e1c3be7

File tree

4 files changed

+84
-28
lines changed

4 files changed

+84
-28
lines changed

src/main/java/com/iexec/core/detector/WorkerLostDetector.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.iexec.core.detector;
1818

19+
import com.google.common.collect.ImmutableSet;
20+
import com.iexec.common.replicate.ReplicateStatus;
1921
import com.iexec.core.replicate.ReplicatesService;
2022
import com.iexec.core.task.TaskService;
2123
import com.iexec.core.worker.Worker;
@@ -30,14 +32,20 @@
3032
@Slf4j
3133
public class WorkerLostDetector implements Detector {
3234

35+
public static final ImmutableSet<ReplicateStatus> SHOULD_NOT_UPDATE_STATUSES =
36+
new ImmutableSet.Builder<ReplicateStatus>()
37+
.add(WORKER_LOST)
38+
.addAll(ReplicateStatus.getFinalStatuses())
39+
.build();
40+
3341
private final ReplicatesService replicatesService;
3442
private final WorkerService workerService;
3543
private final TaskService taskService;
3644

3745
public WorkerLostDetector(
38-
ReplicatesService replicatesService,
39-
WorkerService workerService,
40-
TaskService taskService
46+
ReplicatesService replicatesService,
47+
WorkerService workerService,
48+
TaskService taskService
4149
) {
4250
this.replicatesService = replicatesService;
4351
this.workerService = workerService;
@@ -57,13 +65,13 @@ public void detect() {
5765
replicatesService
5866
.getReplicate(chainTaskId, workerWallet)
5967
.ifPresent(replicate -> {
60-
if (!replicate.getCurrentStatus().equals(WORKER_LOST)) {
61-
replicatesService.updateReplicateStatus(
68+
if (!SHOULD_NOT_UPDATE_STATUSES.contains(replicate.getCurrentStatus())) {
69+
replicatesService.updateReplicateStatus(
6270
chainTaskId,
6371
workerWallet,
6472
WORKER_LOST
65-
);
66-
}
73+
);
74+
}
6775
});
6876
}
6977
}

src/main/java/com/iexec/core/task/listener/ReplicateListeners.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@
3434
@Component
3535
public class ReplicateListeners {
3636

37-
private TaskService taskService;
38-
private WorkerService workerService;
39-
private ContributionUnnotifiedDetector contributionUnnotifiedDetector;
40-
private ReplicatesService replicatesService;
41-
37+
private final TaskService taskService;
38+
private final WorkerService workerService;
39+
private final ContributionUnnotifiedDetector contributionUnnotifiedDetector;
40+
private final ReplicatesService replicatesService;
4241

4342
public ReplicateListeners(WorkerService workerService,
4443
TaskService taskService,
@@ -55,10 +54,13 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
5554
log.debug("Received ReplicateUpdatedEvent [chainTaskId:{}] ", event.getChainTaskId());
5655
ReplicateStatusUpdate statusUpdate = event.getReplicateStatusUpdate();
5756
ReplicateStatus newStatus = statusUpdate.getStatus();
58-
ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause(): null;
57+
ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause() : null;
5958

6059
taskService.updateTask(event.getChainTaskId());
6160

61+
/*
62+
* Should release 1 CPU of given worker for this replicate if status is COMPUTED
63+
* */
6264
if (newStatus.equals(ReplicateStatus.COMPUTED)) {
6365
workerService.removeComputedChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
6466
}
@@ -83,9 +85,9 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
8385
}
8486

8587
/*
86-
* Should release one CPU for this replicate if status is FAILED
88+
* Should release given worker for this replicate if status is COMPLETED or FAILED
8789
* */
88-
if (newStatus.equals(ReplicateStatus.FAILED)) {
90+
if (ReplicateStatus.getFinalStatuses().contains(newStatus)) {
8991
workerService.removeChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
9092
}
9193
}

src/test/java/com/iexec/core/detector/WorkerLostDetectorTests.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737

3838
import static com.iexec.common.utils.DateTimeUtils.addMinutesToDate;
3939
import static org.mockito.ArgumentMatchers.anyString;
40-
import static org.mockito.Mockito.any;
41-
import static org.mockito.Mockito.when;
40+
import static org.mockito.Mockito.*;
4241

4342
public class WorkerLostDetectorTests {
4443

@@ -97,12 +96,9 @@ public void shouldUpdateOneReplicateToWorkerLost(){
9796

9897
// similar test with previous except that the Replicate is already is WORKER_LOST status.
9998
@Test
100-
public void shouldNotUpdateOneReplicateToWorkerLostSinceAlreadyUpdated(){
101-
Date twoMinutesAgo = addMinutesToDate(new Date(), -2);
102-
99+
public void shouldNotUpdateToWorkerLostSinceAlreadyUpdated(){
103100
Worker worker = Worker.builder()
104101
.walletAddress(WALLET_WORKER)
105-
.lastAliveDate(twoMinutesAgo)
106102
.participatingChainTaskIds(Collections.singletonList(CHAIN_TASK_ID))
107103
.build();
108104

@@ -115,13 +111,48 @@ public void shouldNotUpdateOneReplicateToWorkerLostSinceAlreadyUpdated(){
115111
when(taskService.isExpired(anyString())).thenReturn(false);
116112

117113
workerLostDetector.detect();
118-
// verify that the call on the update is correct
119-
Mockito.verify(replicatesService, Mockito.times(0))
114+
Mockito.verify(replicatesService, never())
120115
.updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER, ReplicateStatus.WORKER_LOST);
116+
}
121117

122-
// verify that the worker should remove the taskId from its current tasks
123-
Mockito.verify(workerService, Mockito.times(0))
124-
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WALLET_WORKER);
118+
@Test
119+
public void shouldNotUpdateToWorkerLostSinceFailed(){
120+
Worker worker = Worker.builder()
121+
.walletAddress(WALLET_WORKER)
122+
.participatingChainTaskIds(Collections.singletonList(CHAIN_TASK_ID))
123+
.build();
124+
125+
Replicate replicate = new Replicate(WALLET_WORKER, CHAIN_TASK_ID);
126+
replicate.updateStatus(ReplicateStatus.STARTING, ReplicateStatusModifier.WORKER);
127+
replicate.updateStatus(ReplicateStatus.FAILED, ReplicateStatusModifier.POOL_MANAGER);
128+
129+
when(workerService.getLostWorkers()).thenReturn(Collections.singletonList(worker));
130+
when(replicatesService.getReplicate(CHAIN_TASK_ID, WALLET_WORKER)).thenReturn(Optional.of(replicate));
131+
when(taskService.isExpired(anyString())).thenReturn(false);
132+
133+
workerLostDetector.detect();
134+
Mockito.verify(replicatesService, never())
135+
.updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER, ReplicateStatus.WORKER_LOST);
136+
}
137+
138+
@Test
139+
public void shouldNotUpdateToWorkerLostSinceCompleted(){
140+
Worker worker = Worker.builder()
141+
.walletAddress(WALLET_WORKER)
142+
.participatingChainTaskIds(Collections.singletonList(CHAIN_TASK_ID))
143+
.build();
144+
145+
Replicate replicate = new Replicate(WALLET_WORKER, CHAIN_TASK_ID);
146+
replicate.updateStatus(ReplicateStatus.STARTING, ReplicateStatusModifier.WORKER);
147+
replicate.updateStatus(ReplicateStatus.COMPLETED, ReplicateStatusModifier.POOL_MANAGER);
148+
149+
when(workerService.getLostWorkers()).thenReturn(Collections.singletonList(worker));
150+
when(replicatesService.getReplicate(CHAIN_TASK_ID, WALLET_WORKER)).thenReturn(Optional.of(replicate));
151+
when(taskService.isExpired(anyString())).thenReturn(false);
152+
153+
workerLostDetector.detect();
154+
Mockito.verify(replicatesService, never())
155+
.updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER, ReplicateStatus.WORKER_LOST);
125156
}
126157

127158
@Test

src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,21 @@ public void shouldNotAddFailedStatusSinceCompletableReplicateStatus() {
145145
}
146146

147147
@Test
148-
public void shouldRemoveComputedChainTaskIdFromWorkerSinceFailed() {
148+
public void shouldRemoveChainTaskIdFromWorkerSinceCompleted() {
149+
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
150+
.chainTaskId(CHAIN_TASK_ID)
151+
.walletAddress(WORKER_WALLET)
152+
.replicateStatusUpdate(new ReplicateStatusUpdate(COMPLETED))
153+
.build();
154+
155+
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
156+
157+
Mockito.verify(workerService, Mockito.times(1))
158+
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
159+
}
160+
161+
@Test
162+
public void shouldRemoveChainTaskIdFromWorkerSinceFailed() {
149163
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
150164
.chainTaskId(CHAIN_TASK_ID)
151165
.walletAddress(WORKER_WALLET)
@@ -159,8 +173,9 @@ public void shouldRemoveComputedChainTaskIdFromWorkerSinceFailed() {
159173
}
160174

161175
@Test
162-
public void shouldNotRemoveComputedChainTaskIdFromWorker() {
176+
public void shouldNotRemoveChainTaskIdFromWorker() {
163177
List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
178+
someStatuses.remove(COMPLETED);
164179
someStatuses.remove(FAILED);
165180

166181
for (ReplicateStatus randomStatus: someStatuses){

0 commit comments

Comments
 (0)