Skip to content

Commit 57bc15c

Browse files
author
Jérémy James Toussaint
committed
Removed InterruptedReplicateModel
1 parent 9dbe02e commit 57bc15c

File tree

8 files changed

+93
-122
lines changed

8 files changed

+93
-122
lines changed
Lines changed: 10 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.iexec.worker.amnesia;
22

33
import com.iexec.common.chain.ContributionAuthorization;
4-
import com.iexec.common.disconnection.InterruptedReplicateModel;
54
import com.iexec.common.notification.TaskNotification;
65
import com.iexec.common.notification.TaskNotificationType;
76
import com.iexec.common.replicate.AvailableReplicateModel;
@@ -51,21 +50,21 @@ public AmnesiaRecoveryService(CustomFeignClient customFeignClient,
5150

5251
public List<String> recoverInterruptedReplicates() {
5352
long lasAvailableBlockNumber = iexecHubService.getLatestBlockNumber();
54-
List<InterruptedReplicateModel> interruptedReplicates = customFeignClient.getInterruptedReplicates(
53+
List<TaskNotification> missedTaskNotifications = customFeignClient.getMissedTaskNotifications(
5554
lasAvailableBlockNumber);
5655
List<String> recoveredChainTaskIds = new ArrayList<>();
5756

58-
if (interruptedReplicates == null || interruptedReplicates.isEmpty()) {
57+
if (missedTaskNotifications == null || missedTaskNotifications.isEmpty()) {
5958
log.info("No interrupted tasks to recover");
6059
return Collections.emptyList();
6160
}
6261

63-
for (InterruptedReplicateModel interruptedReplicate : interruptedReplicates) {
62+
for (TaskNotification missedTaskNotification : missedTaskNotifications) {
6463

65-
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
66-
TaskNotificationType taskNotificationType = interruptedReplicate.getTaskNotificationType();
64+
ContributionAuthorization contributionAuth = missedTaskNotification.getTaskNotificationExtra().getContributionAuthorization();
65+
TaskNotificationType taskNotificationType = missedTaskNotification.getTaskNotificationType();
6766
String chainTaskId = contributionAuth.getChainTaskId();
68-
boolean isResultAvailable = isResultAvailable(chainTaskId);
67+
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
6968

7069
log.info("Recovering interrupted task [chainTaskId:{}, taskNotificationType:{}]",
7170
chainTaskId, taskNotificationType);
@@ -86,96 +85,20 @@ public List<String> recoverInterruptedReplicates() {
8685
}
8786

8887
AvailableReplicateModel replicateModel = oReplicateModel.get();
89-
recoverReplicate(interruptedReplicate, replicateModel);
88+
recoverReplicate(missedTaskNotification, replicateModel);
9089
recoveredChainTaskIds.add(chainTaskId);
9190
}
9291

9392
return recoveredChainTaskIds;
9493
}
9594

96-
public void recoverReplicate(InterruptedReplicateModel interruptedReplicate,
95+
public void recoverReplicate(TaskNotification taskNotification,
9796
AvailableReplicateModel replicateModel) {
98-
99-
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
100-
String chainTaskId = contributionAuth.getChainTaskId();
97+
String chainTaskId = taskNotification.getChainTaskId();
10198

10299
subscriptionService.subscribeToTopic(chainTaskId);
103100
resultService.saveResultInfo(chainTaskId, replicateModel);
104-
105-
TaskNotification taskNotification = null;
106-
107-
switch (interruptedReplicate.getTaskNotificationType()) {
108-
case PLEASE_CONTRIBUTE:
109-
recoverReplicateByContributing(contributionAuth, replicateModel);
110-
break;
111-
case PLEASE_ABORT_CONSENSUS_REACHED:
112-
taskNotification = TaskNotification.builder()
113-
.chainTaskId(chainTaskId)
114-
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED)
115-
.build();
116-
break;
117-
case PLEASE_ABORT_CONTRIBUTION_TIMEOUT:
118-
taskNotification = TaskNotification.builder()
119-
.chainTaskId(chainTaskId)
120-
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT)
121-
.build();
122-
break;
123-
124-
case PLEASE_REVEAL:
125-
taskNotification = TaskNotification.builder()
126-
.chainTaskId(chainTaskId)
127-
.taskNotificationType(TaskNotificationType.PLEASE_REVEAL)
128-
.blockNumber(iexecHubService.getLatestBlockNumber())
129-
.build();
130-
break;
131-
case PLEASE_UPLOAD:
132-
taskNotification = TaskNotification.builder()
133-
.chainTaskId(chainTaskId)
134-
.taskNotificationType(TaskNotificationType.PLEASE_UPLOAD)
135-
.build();
136-
break;
137-
case PLEASE_COMPLETE:
138-
taskNotification = TaskNotification.builder()
139-
.chainTaskId(chainTaskId)
140-
.taskNotificationType(TaskNotificationType.PLEASE_COMPLETE)
141-
.build();
142-
break;
143-
default:
144-
break;
145-
}
146-
147-
if (taskNotification != null) {
148-
subscriptionService.handleTaskNotification(taskNotification);
149-
}
150-
151-
152-
}
153-
154-
private boolean isResultAvailable(String chainTaskId) {
155-
boolean isResultZipFound = resultService.isResultZipFound(chainTaskId);
156-
boolean isResultFolderFound = resultService.isResultFolderFound(chainTaskId);
157-
158-
if (!isResultZipFound && !isResultFolderFound) return false;
159-
160-
if (!isResultZipFound) resultService.zipResultFolder(chainTaskId);
161-
162-
return true;
101+
subscriptionService.handleTaskNotification(taskNotification);
163102
}
164103

165-
public void recoverReplicateByContributing(ContributionAuthorization contributionAuth,
166-
AvailableReplicateModel replicateModel) {
167-
168-
String chainTaskId = contributionAuth.getChainTaskId();
169-
boolean isResultAvailable = isResultAvailable(chainTaskId);
170-
171-
if (!isResultAvailable) {
172-
log.info("Result not found, re-running computation to recover task " +
173-
"[chainTaskId:{}, recoveryAction:CONTRIBUTE]", chainTaskId);
174-
taskExecutorService.addReplicate(replicateModel);
175-
return;
176-
}
177-
178-
resultService.saveResultInfo(chainTaskId, replicateModel);
179-
taskExecutorService.contribute(contributionAuth);
180-
}
181104
}

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,22 @@ public CompletableFuture<Void> addReplicate(AvailableReplicateModel replicateMod
104104
});
105105
}
106106

107+
108+
public void tryToContribute(ContributionAuthorization contributionAuth,
109+
AvailableReplicateModel replicateModel) {
110+
111+
String chainTaskId = contributionAuth.getChainTaskId();
112+
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
113+
114+
if (!isResultAvailable) {
115+
log.info("Result not found, will restart task from RUNNING [chainTaskId:{}]", chainTaskId);
116+
addReplicate(replicateModel);
117+
} else {
118+
log.info("Result found, will restart task from CONTRIBUTING [chainTaskId:{}]", chainTaskId);
119+
contribute(contributionAuth);
120+
}
121+
}
122+
107123
@Async
108124
private String compute(AvailableReplicateModel replicateModel) {
109125
ContributionAuthorization contributionAuth = replicateModel.getContributionAuthorization();

src/main/java/com/iexec/worker/feign/CustomFeignClient.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@
33
import com.iexec.common.chain.ContributionAuthorization;
44
import com.iexec.common.config.PublicConfiguration;
55
import com.iexec.common.config.WorkerConfigurationModel;
6-
import com.iexec.common.disconnection.InterruptedReplicateModel;
6+
import com.iexec.common.notification.TaskNotification;
77
import com.iexec.common.replicate.ReplicateDetails;
88
import com.iexec.common.replicate.ReplicateStatus;
99
import com.iexec.common.security.Signature;
1010
import com.iexec.common.utils.SignatureUtils;
1111
import com.iexec.worker.chain.CredentialsService;
1212
import com.iexec.worker.config.CoreConfigurationService;
13-
1413
import feign.FeignException;
1514
import lombok.extern.slf4j.Slf4j;
1615
import org.springframework.http.HttpStatus;
@@ -107,15 +106,15 @@ public void registerWorker(WorkerConfigurationModel model) {
107106
}
108107
}
109108

110-
public List<InterruptedReplicateModel> getInterruptedReplicates(long lastAvailableBlockNumber) {
111-
List<InterruptedReplicateModel> interruptedReplicates = new ArrayList<>();
109+
public List<TaskNotification> getMissedTaskNotifications(long lastAvailableBlockNumber) {
110+
List<TaskNotification> interruptedReplicates = new ArrayList<>();
112111

113112
try {
114-
interruptedReplicates = replicateClient.getInterruptedReplicates(lastAvailableBlockNumber, getToken());
113+
interruptedReplicates = replicateClient.getMissedTaskNotifications(lastAvailableBlockNumber, getToken());
115114
} catch (FeignException e) {
116115
if (e.status() == HttpStatus.UNAUTHORIZED.value()) {
117116
generateNewToken();
118-
interruptedReplicates = replicateClient.getInterruptedReplicates(lastAvailableBlockNumber, getToken());
117+
interruptedReplicates = replicateClient.getMissedTaskNotifications(lastAvailableBlockNumber, getToken());
119118
} else {
120119
log.error("Failed to get interrupted replicates [instance:{}]", coreURL);
121120
e.printStackTrace();

src/main/java/com/iexec/worker/feign/ReplicateClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package com.iexec.worker.feign;
22

3-
import com.iexec.common.replicate.ReplicateDetails;
4-
import java.util.List;
5-
63
import com.iexec.common.chain.ContributionAuthorization;
7-
import com.iexec.common.disconnection.InterruptedReplicateModel;
4+
import com.iexec.common.notification.TaskNotification;
5+
import com.iexec.common.replicate.ReplicateDetails;
86
import com.iexec.common.replicate.ReplicateStatus;
97
import feign.FeignException;
108
import org.springframework.cloud.openfeign.FeignClient;
119
import org.springframework.web.bind.annotation.*;
1210

11+
import java.util.List;
12+
1313

1414
@FeignClient(name = "ReplicateClient", url = "http://${core.host}:${core.port}")
1515
public interface ReplicateClient {
@@ -21,7 +21,7 @@ ContributionAuthorization getAvailableReplicate(
2121
) throws FeignException;
2222

2323
@GetMapping("/replicates/interrupted")
24-
List<InterruptedReplicateModel> getInterruptedReplicates(
24+
List<TaskNotification> getMissedTaskNotifications(
2525
@RequestParam(name = "blockNumber") long blockNumber,
2626
@RequestHeader("Authorization") String bearerToken
2727
) throws FeignException;

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.iexec.worker.pubsub;
22

3+
import com.iexec.common.chain.ContributionAuthorization;
34
import com.iexec.common.notification.TaskNotification;
45
import com.iexec.common.notification.TaskNotificationType;
6+
import com.iexec.common.replicate.AvailableReplicateModel;
57
import com.iexec.worker.config.CoreConfigurationService;
68
import com.iexec.worker.config.WorkerConfigurationService;
79
import com.iexec.worker.executor.TaskExecutorService;
10+
import com.iexec.worker.replicate.ReplicateService;
811
import lombok.extern.slf4j.Slf4j;
912
import org.springframework.http.ResponseEntity;
1013
import org.springframework.lang.Nullable;
@@ -24,10 +27,7 @@
2427

2528
import javax.annotation.PostConstruct;
2629
import java.lang.reflect.Type;
27-
import java.util.ArrayList;
28-
import java.util.Arrays;
29-
import java.util.List;
30-
import java.util.Map;
30+
import java.util.*;
3131
import java.util.concurrent.ConcurrentHashMap;
3232

3333

@@ -39,6 +39,7 @@ public class SubscriptionService extends StompSessionHandlerAdapter {
3939
private final int corePort;
4040
private final String workerWalletAddress;
4141
private RestTemplate restTemplate;
42+
private ReplicateService replicateService;
4243
// external services
4344
private TaskExecutorService taskExecutorService;
4445

@@ -51,13 +52,15 @@ public class SubscriptionService extends StompSessionHandlerAdapter {
5152
public SubscriptionService(CoreConfigurationService coreConfigurationService,
5253
WorkerConfigurationService workerConfigurationService,
5354
TaskExecutorService taskExecutorService,
54-
RestTemplate restTemplate) {
55+
RestTemplate restTemplate,
56+
ReplicateService replicateService) {
5557
this.taskExecutorService = taskExecutorService;
5658

5759
this.coreHost = coreConfigurationService.getHost();
5860
this.corePort = coreConfigurationService.getPort();
5961
this.workerWalletAddress = workerConfigurationService.getWorkerWalletAddress();
6062
this.restTemplate = restTemplate;
63+
this.replicateService = replicateService;
6164

6265
chainTaskIdToSubscription = new ConcurrentHashMap<>();
6366
url = "http://" + coreHost + ":" + corePort + "/connect";
@@ -180,18 +183,27 @@ public void handleTaskNotification(TaskNotification notif) {
180183
String chainTaskId = notif.getChainTaskId();
181184

182185
switch (type) {
186+
case PLEASE_CONTRIBUTE:
187+
ContributionAuthorization contribAuth = notif.getTaskNotificationExtra().getContributionAuthorization();
188+
Optional<AvailableReplicateModel> replicateModel =
189+
replicateService.contributionAuthToReplicate(contribAuth);
190+
if (replicateModel.isPresent()){
191+
taskExecutorService.tryToContribute(contribAuth, replicateModel.get());
192+
} else {
193+
log.error("Empty AvailableReplicateModel for PLEASE_CONTRIBUTE[chainTaskId:{}]", chainTaskId);
194+
}
195+
break;
183196
case PLEASE_ABORT_CONTRIBUTION_TIMEOUT:
184197
unsubscribeFromTopic(chainTaskId);
185198
taskExecutorService.abortContributionTimeout(chainTaskId);
186199
break;
187-
188200
case PLEASE_ABORT_CONSENSUS_REACHED:
189201
unsubscribeFromTopic(chainTaskId);
190202
taskExecutorService.abortConsensusReached(chainTaskId);
191203
break;
192204

193205
case PLEASE_REVEAL:
194-
taskExecutorService.reveal(chainTaskId, notif.getBlockNumber());
206+
taskExecutorService.reveal(chainTaskId, notif.getTaskNotificationExtra().getBlockNumber());
195207
break;
196208

197209
case PLEASE_UPLOAD:
@@ -209,6 +221,9 @@ public void handleTaskNotification(TaskNotification notif) {
209221
}
210222
}
211223

224+
225+
226+
212227
private String getTaskTopicName(String chainTaskId) {
213228
return "/topic/task/" + chainTaskId;
214229
}

src/main/java/com/iexec/worker/replicate/ReplicateDemandService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.iexec.worker.replicate;
22

3+
import java.util.Collections;
34
import java.util.Optional;
45

56
import com.iexec.common.chain.ContributionAuthorization;
7+
import com.iexec.common.notification.TaskNotification;
8+
import com.iexec.common.notification.TaskNotificationExtra;
9+
import com.iexec.common.notification.TaskNotificationType;
610
import com.iexec.common.replicate.AvailableReplicateModel;
711
import com.iexec.worker.chain.ContributionService;
812
import com.iexec.worker.chain.IexecHubService;
@@ -73,13 +77,15 @@ public void askForReplicate() {
7377
return;
7478
}
7579

76-
Optional<AvailableReplicateModel> oReplicateModel =
77-
replicateService.contributionAuthToReplicate(contributionAuth);
80+
subscriptionService.subscribeToTopic(chainTaskId);
7881

79-
if (!oReplicateModel.isPresent()) return;
82+
TaskNotification taskNotification = TaskNotification.builder()
83+
.chainTaskId(chainTaskId)
84+
.workersAddress(Collections.emptyList())
85+
.taskNotificationType(TaskNotificationType.PLEASE_CONTRIBUTE)
86+
.taskNotificationExtra(TaskNotificationExtra.builder().contributionAuthorization(contributionAuth).build())
87+
.build();
8088

81-
subscriptionService.subscribeToTopic(chainTaskId);
82-
83-
taskExecutorService.addReplicate(oReplicateModel.get());
89+
subscriptionService.handleTaskNotification(taskNotification);
8490
}
8591
}

src/main/java/com/iexec/worker/result/ResultService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,16 @@ public String uploadResult(String chainTaskId) {
349349

350350
return resultRepoService.uploadResult(authorizationToken, getResultModelWithZip(chainTaskId));
351351
}
352+
353+
354+
public boolean isResultAvailable(String chainTaskId) {
355+
boolean isResultZipFound = isResultZipFound(chainTaskId);
356+
boolean isResultFolderFound = isResultFolderFound(chainTaskId);
357+
358+
if (!isResultZipFound && !isResultFolderFound) return false;
359+
360+
if (!isResultZipFound) zipResultFolder(chainTaskId);
361+
362+
return true;
363+
}
352364
}

0 commit comments

Comments
 (0)