Skip to content

Commit d65cd4a

Browse files
author
Ugo Plouviez
committed
Add contribute check before computation
1 parent c7d55c3 commit d65cd4a

File tree

2 files changed

+102
-34
lines changed

2 files changed

+102
-34
lines changed

src/main/java/com/iexec/worker/chain/ContributionService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,13 @@ private boolean isSignatureValid(byte[] message, Signature sign, String signerAd
176176
public boolean hasEnoughGas() {
177177
return iexecHubService.hasEnoughGas();
178178
}
179+
180+
public boolean isContributionDeadlineReached(String chainTaskId) {
181+
Optional<ChainTask> oTask = iexecHubService.getChainTask(chainTaskId);
182+
if (!oTask.isPresent()) {
183+
return true;
184+
}
185+
ChainTask task = oTask.get();
186+
return isBeforeContributionDeadlineToContribute(task);
187+
}
179188
}

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 93 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ public void tryToContribute(ContributionAuthorization contributionAuth) {
126126
return;
127127
}
128128

129+
// TODO: not sure if this is needed !
130+
// if contribution deadline is passed then no need to contribute
131+
if (contributionService.isContributionDeadlineReached(chainTaskId)) {
132+
log.error("The contribution deadline has been already reached, no more contributions are allowed [chain]"
133+
+ " [chainTaskId:{}, contribAuth:{}]", chainTaskId, contributionAuth);
134+
return;
135+
}
136+
129137
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
130138

131139
if (!isResultAvailable) {
@@ -140,7 +148,7 @@ public void tryToContribute(ContributionAuthorization contributionAuth) {
140148
@Async
141149
private String compute(ContributionAuthorization contributionAuth) {
142150
String chainTaskId = contributionAuth.getChainTaskId();
143-
String stdout = "";
151+
String message = "";
144152

145153
if (!contributionService.isChainTaskInitialized(chainTaskId)) {
146154
log.error("Task not initialized onchain yet [ChainTaskId:{}]", chainTaskId);
@@ -150,9 +158,9 @@ private String compute(ContributionAuthorization contributionAuth) {
150158
Optional<TaskDescription> taskDescriptionFromChain = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
151159

152160
if (!taskDescriptionFromChain.isPresent()) {
153-
stdout = "AvailableReplicateModel not found";
154-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
155-
return stdout;
161+
message = "AvailableReplicateModel not found";
162+
log.error(message + " [chainTaskId:{}]", chainTaskId);
163+
return message;
156164
}
157165

158166
TaskDescription taskDescription = taskDescriptionFromChain.get();
@@ -166,33 +174,23 @@ private String compute(ContributionAuthorization contributionAuth) {
166174
// check app type
167175
customFeignClient.updateReplicateStatus(chainTaskId, RUNNING);
168176
if (!taskDescription.getAppType().equals(DappType.DOCKER)) {
169-
stdout = "Application is not of type Docker";
170-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
171-
return stdout;
177+
message = "Application is not of type Docker";
178+
log.error(message + " [chainTaskId:{}]", chainTaskId);
179+
return message;
172180
}
173181

174-
// pull app
175-
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
176-
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, taskDescription.getAppUri());
177-
if (!isAppDownloaded) {
178-
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
179-
stdout = "Failed to pull application image, URI:" + taskDescription.getAppUri();
180-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
181-
return stdout;
182+
// Try to downloadApp
183+
String errorDwnlApp = tryToDownloadApp(taskDescription);
184+
if (!errorDwnlApp.isEmpty()) {
185+
return errorDwnlApp;
182186
}
183-
184187
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADED);
185188

186-
// pull data
187-
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
188-
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, taskDescription.getDatasetUri());
189-
if (!isDatasetDownloaded) {
190-
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
191-
stdout = "Failed to pull dataset, URI:" + taskDescription.getDatasetUri();
192-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
193-
return stdout;
189+
// Try to downloadData
190+
String errorDwnlData = tryToDownloadData(taskDescription);
191+
if (!errorDwnlData.isEmpty()) {
192+
return errorDwnlData;
194193
}
195-
196194
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADED);
197195

198196
boolean isFetched = smsService.fetchTaskSecrets(contributionAuth);
@@ -212,24 +210,85 @@ private String compute(ContributionAuthorization contributionAuth) {
212210

213211
if (isDatasetDecryptionNeeded && !isDatasetDecrypted) {
214212
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
215-
stdout = "Failed to decrypt dataset, URI:" + taskDescription.getDatasetUri();
216-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
217-
return stdout;
213+
message = "Failed to decrypt dataset, URI:" + taskDescription.getDatasetUri();
214+
log.error(message + " [chainTaskId:{}]", chainTaskId);
215+
return message;
216+
}
217+
218+
String error = checkContributionAbility(chainTaskId);
219+
if (!error.isEmpty()) {
220+
return error;
218221
}
219222

220223
// compute
221224
String datasetFilename = datasetService.getDatasetFilename(taskDescription.getDatasetUri());
222-
stdout = dockerComputationService.dockerRunAndGetLogs(taskDescription, datasetFilename);
225+
message = dockerComputationService.dockerRunAndGetLogs(taskDescription, datasetFilename);
223226

224-
if (stdout.isEmpty()) {
227+
if (message.isEmpty()) {
225228
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
226-
stdout = "Failed to start computation";
227-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
228-
return stdout;
229+
message = "Failed to start computation";
230+
log.error(message + " [chainTaskId:{}]", chainTaskId);
231+
return message;
229232
}
230233

231234
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTED);
232-
return stdout;
235+
return message;
236+
}
237+
238+
private String tryToDownloadApp(TaskDescription taskDescription) {
239+
String chainTaskId = taskDescription.getChainTaskId();
240+
241+
String error = checkContributionAbility(chainTaskId);
242+
if (!error.isEmpty()) {
243+
return error;
244+
}
245+
246+
// pull app
247+
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
248+
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, taskDescription.getAppUri());
249+
if (!isAppDownloaded) {
250+
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
251+
String errorMessage = "Failed to pull application image, URI:" + taskDescription.getAppUri();
252+
log.error(errorMessage + " [chainTaskId:{}]", chainTaskId);
253+
return errorMessage;
254+
}
255+
256+
return "";
257+
}
258+
259+
private String tryToDownloadData(TaskDescription taskDescription) {
260+
String chainTaskId = taskDescription.getChainTaskId();
261+
262+
String error = checkContributionAbility(chainTaskId);
263+
if (!error.isEmpty()) {
264+
return error;
265+
}
266+
267+
// pull data
268+
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
269+
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, taskDescription.getDatasetUri());
270+
if (!isDatasetDownloaded) {
271+
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
272+
String errorMessage = "Failed to pull dataset, URI:" + taskDescription.getDatasetUri();
273+
log.error(errorMessage + " [chainTaskId:{}]", chainTaskId);
274+
return errorMessage;
275+
}
276+
277+
return "";
278+
}
279+
280+
private String checkContributionAbility(String chainTaskId) {
281+
String errorMessage = "";
282+
283+
Optional<ReplicateStatus> statusBeforeDownloadApp = contributionService.getCanContributeStatus(chainTaskId);
284+
if(statusBeforeDownloadApp.isPresent() && !statusBeforeDownloadApp.get().equals(CAN_CONTRIBUTE)) {
285+
errorMessage = "The worker cannot contribute";
286+
log.error(errorMessage + " [chainTaskId:{}, replicateStatus:{}]", chainTaskId, statusBeforeDownloadApp.get());
287+
customFeignClient.updateReplicateStatus(chainTaskId, statusBeforeDownloadApp.get());
288+
return errorMessage;
289+
}
290+
291+
return errorMessage;
233292
}
234293

235294
@Async

0 commit comments

Comments
 (0)