Skip to content

Commit 5105645

Browse files
authored
Use less MongoDB calls to update a task to its final status (#649)
1 parent 2a089a3 commit 5105645

File tree

3 files changed

+114
-72
lines changed

3 files changed

+114
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ All notable changes to this project will be documented in this file.
2828
- Move `TaskModel` and `ReplicateModel` instances creation methods to `Task` and `Replicate` classes. (#625)
2929
- Expose `TaskLogsModel` on `TaskController` instead of `TaskLogs`. (#631)
3030
- Remove duplicated MongoDB read on `ReplicatesList` during replicate status update. (#647)
31+
- Use less MongoDB calls when updating a task to a final status. (#649)
3132

3233
### Dependency Upgrades
3334

src/main/java/com/iexec/core/task/update/TaskUpdateManager.java

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ Future<Void> init() {
116116
);
117117
}
118118

119-
@SuppressWarnings("DuplicateBranchesInSwitch")
120119
void updateTask(String chainTaskId) {
121120
Optional<Task> optional = taskService.getTaskByChainTaskId(chainTaskId);
122121
if (optional.isEmpty()) {
@@ -128,11 +127,10 @@ void updateTask(String chainTaskId) {
128127
boolean isFinalDeadlinePossible =
129128
!TaskStatus.getStatusesWhereFinalDeadlineIsImpossible().contains(currentStatus);
130129
if (isFinalDeadlinePossible && new Date().after(task.getFinalDeadline())) {
131-
updateTaskStatusAndSave(task, FINAL_DEADLINE_REACHED);
132130
// Eventually should fire a "final deadline reached" notification to worker,
133131
// but here let's just trigger an toFailed(task) leading to a failed status
134132
// which will itself fire a generic "abort" notification
135-
toFailed(task);
133+
toFailed(task, FINAL_DEADLINE_REACHED);
136134
return;
137135
}
138136

@@ -147,25 +145,16 @@ void updateTask(String chainTaskId) {
147145
initialized2Running(task);
148146
initializedOrRunning2ContributionTimeout(task);
149147
break;
150-
case INITIALIZE_FAILED:
151-
toFailed(task);
152-
break;
153148
case RUNNING:
154149
running2Finalized2Completed(task); // running2Finalized2Completed must be the first call to prevent other transition execution
155150
running2ConsensusReached(task);
156151
running2RunningFailed(task);
157152
initializedOrRunning2ContributionTimeout(task);
158153
break;
159-
case RUNNING_FAILED:
160-
toFailed(task);
161-
break;
162154
case CONSENSUS_REACHED:
163155
consensusReached2AtLeastOneReveal2ResultUploading(task);
164156
consensusReached2Reopening(task);
165157
break;
166-
case CONTRIBUTION_TIMEOUT:
167-
toFailed(task);
168-
break;
169158
case AT_LEAST_ONE_REVEALED:
170159
requestUpload(task);
171160
break;
@@ -175,28 +164,25 @@ void updateTask(String chainTaskId) {
175164
case REOPENED:
176165
updateTaskStatusAndSave(task, INITIALIZED);
177166
break;
178-
case REOPEN_FAILED:
179-
toFailed(task);
180-
break;
181167
case RESULT_UPLOADING:
182168
resultUploading2Uploaded(task);
183169
resultUploading2UploadTimeout(task);
184170
break;
185171
case RESULT_UPLOADED:
186172
resultUploaded2Finalizing(task);
187173
break;
188-
case RESULT_UPLOAD_TIMEOUT:
189-
toFailed(task);
190-
break;
191174
case FINALIZING:
192175
finalizing2Finalized2Completed(task);
193176
break;
194177
case FINALIZED:
195178
finalizedToCompleted(task);
196179
break;
180+
case INITIALIZE_FAILED:
181+
case RUNNING_FAILED:
182+
case CONTRIBUTION_TIMEOUT:
183+
case REOPEN_FAILED:
184+
case RESULT_UPLOAD_TIMEOUT:
197185
case FINALIZE_FAILED:
198-
toFailed(task);
199-
break;
200186
case FINAL_DEADLINE_REACHED:
201187
toFailed(task);
202188
break;
@@ -206,25 +192,49 @@ void updateTask(String chainTaskId) {
206192
}
207193
}
208194

209-
Task updateTaskStatusAndSave(Task task, TaskStatus newStatus) {
210-
return updateTaskStatusAndSave(task, newStatus, null);
195+
/**
196+
* Creates one or several task status changes for the task before committing all of them to the database.
197+
*
198+
* @param task The task
199+
* @param statuses List of statuses to append to the task {@code dateStatusList}
200+
*/
201+
void updateTaskStatusesAndSave(Task task, TaskStatus... statuses) {
202+
TaskStatus initialStatus = task.getCurrentStatus();
203+
TaskStatus lastStatus = statuses[statuses.length - 1];
204+
for (TaskStatus newStatus : statuses) {
205+
log.info("Create TaskStatusChange succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]",
206+
task.getChainTaskId(), task.getCurrentStatus(), newStatus);
207+
task.changeStatus(newStatus, null);
208+
}
209+
saveTask(task, initialStatus, lastStatus);
211210
}
212211

213-
Task updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chainReceipt) {
214-
TaskStatus currentStatus = task.getCurrentStatus();
212+
void updateTaskStatusAndSave(Task task, TaskStatus newStatus) {
213+
updateTaskStatusAndSave(task, newStatus, null);
214+
}
215+
216+
void updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chainReceipt) {
217+
TaskStatus initialStatus = task.getCurrentStatus();
215218
task.changeStatus(newStatus, chainReceipt);
216-
Optional<Task> savedTask = taskService.updateTask(task);
219+
saveTask(task, initialStatus, newStatus);
220+
}
217221

222+
/**
223+
* Saves the task to the database.
224+
*
225+
* @param task The task
226+
* @param currentStatus The current status in database
227+
* @param newStatus The new status in database after save
228+
*/
229+
void saveTask(Task task, TaskStatus currentStatus, TaskStatus newStatus) {
230+
Optional<Task> savedTask = taskService.updateTask(task);
218231
// `savedTask.isPresent()` should always be true if the task exists in the repository.
219232
if (savedTask.isPresent()) {
220233
updateMetricsAfterStatusUpdate(currentStatus, newStatus);
221234
log.info("UpdateTaskStatus succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]", task.getChainTaskId(), currentStatus, newStatus);
222-
return savedTask.get();
223235
} else {
224-
log.warn("UpdateTaskStatus failed. Chain Task is probably unknown." +
225-
" [chainTaskId:{}, currentStatus:{}, wishedStatus:{}]",
236+
log.warn("UpdateTaskStatus failed. Chain Task is probably unknown [chainTaskId:{}, currentStatus:{}, wishedStatus:{}]",
226237
task.getChainTaskId(), currentStatus, newStatus);
227-
return null;
228238
}
229239
}
230240

@@ -253,8 +263,7 @@ void received2Initializing(Task task) {
253263
Optional<String> smsUrl = smsService.getVerifiedSmsUrl(task.getChainTaskId(), task.getTag());
254264
if (smsUrl.isEmpty()) {
255265
log.error("Couldn't get verified SMS url [chainTaskId: {}]", task.getChainTaskId());
256-
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
257-
updateTaskStatusAndSave(task, FAILED);
266+
toFailed(task, INITIALIZE_FAILED);
258267
return;
259268
}
260269
task.setSmsUrl(smsUrl.get()); //SMS URL source of truth for the task
@@ -271,8 +280,7 @@ void received2Initializing(Task task) {
271280
if (enclaveChallenge.isEmpty()) {
272281
log.error("Can't initialize task, enclave challenge is empty [chainTaskId:{}]",
273282
chainTaskId);
274-
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
275-
updateTask(chainTaskId);
283+
toFailed(task, INITIALIZE_FAILED);
276284
return;
277285
}
278286
task.setEnclaveChallenge(enclaveChallenge.get());
@@ -282,8 +290,7 @@ void received2Initializing(Task task) {
282290
}, () -> {
283291
log.error("Failed to request initialize on blockchain [chainTaskId:{}]",
284292
task.getChainTaskId());
285-
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
286-
updateTaskStatusAndSave(task, FAILED);
293+
toFailed(task, INITIALIZE_FAILED);
287294
});
288295
}
289296

@@ -306,8 +313,7 @@ void initializing2Initialized(Task task) {
306313
}
307314
log.error("Initialization failed on blockchain (tx reverted) [chainTaskId:{}]",
308315
task.getChainTaskId());
309-
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
310-
updateTaskStatusAndSave(task, FAILED);
316+
toFailed(task, INITIALIZE_FAILED);
311317
}, () -> log.error("Unable to check initialization on blockchain " +
312318
"(likely too long), should use a detector [chainTaskId:{}]",
313319
task.getChainTaskId()));
@@ -407,12 +413,11 @@ void running2Finalized2Completed(Task task) {
407413
} else if (nbReplicatesWithContributeAndFinalizeStatus > 1) {
408414
log.error("Too many replicates in ContributeAndFinalize status"
409415
+ " [chainTaskId:{}, nbReplicates:{}]", chainTaskId, nbReplicatesWithContributeAndFinalizeStatus);
410-
toFailed(task);
416+
toFailed(task, RUNNING_FAILED);
411417
return;
412418
}
413419

414-
updateTaskStatusAndSave(task, FINALIZED);
415-
finalizedToCompleted(task);
420+
toFinalizedToCompleted(task);
416421
}
417422

418423
void initializedOrRunning2ContributionTimeout(Task task) {
@@ -421,8 +426,7 @@ void initializedOrRunning2ContributionTimeout(Task task) {
421426
boolean isNowAfterContributionDeadline = task.getContributionDeadline() != null && new Date().after(task.getContributionDeadline());
422427

423428
if (isInitializedOrRunningTask && isNowAfterContributionDeadline) {
424-
updateTaskStatusAndSave(task, CONTRIBUTION_TIMEOUT);
425-
updateTaskStatusAndSave(task, FAILED);
429+
updateTaskStatusesAndSave(task, CONTRIBUTION_TIMEOUT, FAILED);
426430
applicationEventPublisher.publishEvent(new ContributionTimeoutEvent(task.getChainTaskId()));
427431
}
428432
}
@@ -473,8 +477,7 @@ void running2RunningFailed(Task task) {
473477
// If all alive workers have failed on this task, its computation should be stopped.
474478
// It could denote that the task is wrong
475479
// - e.g. failing script, dataset can't be retrieved, app can't be downloaded, ...
476-
updateTaskStatusAndSave(task, RUNNING_FAILED);
477-
updateTaskStatusAndSave(task, FAILED);
480+
updateTaskStatusesAndSave(task, RUNNING_FAILED, FAILED);
478481
applicationEventPublisher.publishEvent(new TaskRunningFailedEvent(task.getChainTaskId()));
479482
}
480483

@@ -595,9 +598,8 @@ void resultUploading2UploadTimeout(Task task) {
595598
&& new Date().after(task.getFinalDeadline());
596599

597600
if (isTaskInResultUploading && isNowAfterFinalDeadline) {
598-
updateTaskStatusAndSave(task, RESULT_UPLOAD_TIMEOUT);
599601
applicationEventPublisher.publishEvent(new ResultUploadTimeoutEvent(task.getChainTaskId()));
600-
updateTaskStatusAndSave(task, FAILED);
602+
toFailed(task, RESULT_UPLOAD_TIMEOUT);
601603
}
602604
}
603605

@@ -660,8 +662,7 @@ void resultUploaded2Finalizing(Task task) {
660662
}, () -> {
661663
log.error("Failed to request finalize on blockchain [chainTaskId:{}]",
662664
task.getChainTaskId());
663-
updateTaskStatusAndSave(task, FINALIZE_FAILED);
664-
updateTaskStatusAndSave(task, FAILED);
665+
toFailed(task, FINALIZE_FAILED);
665666
});
666667
}
667668

@@ -672,14 +673,12 @@ void finalizing2Finalized2Completed(Task task) {
672673
if (Boolean.TRUE.equals(isSuccess)) {
673674
log.info("Finalized on blockchain (tx mined) [chainTaskId:{}]",
674675
task.getChainTaskId());
675-
updateTaskStatusAndSave(task, FINALIZED, null);
676-
finalizedToCompleted(task);
676+
toFinalizedToCompleted(task);
677677
return;
678678
}
679679
log.error("Finalization failed on blockchain (tx reverted) [chainTaskId:{}]",
680680
task.getChainTaskId());
681-
updateTaskStatusAndSave(task, FINALIZE_FAILED);
682-
updateTaskStatusAndSave(task, FAILED);
681+
toFailed(task, FINALIZE_FAILED);
683682
}, () -> log.error("Unable to check finalization on blockchain " +
684683
"(likely too long), should use a detector [chainTaskId:{}]",
685684
task.getChainTaskId()));
@@ -693,11 +692,21 @@ void finalizedToCompleted(Task task) {
693692
applicationEventPublisher.publishEvent(new TaskCompletedEvent(task));
694693
}
695694

695+
void toFinalizedToCompleted(Task task) {
696+
updateTaskStatusesAndSave(task, FINALIZED, COMPLETED);
697+
applicationEventPublisher.publishEvent(new TaskCompletedEvent(task));
698+
}
699+
696700
void toFailed(Task task) {
697701
updateTaskStatusAndSave(task, FAILED);
698702
applicationEventPublisher.publishEvent(new TaskFailedEvent(task.getChainTaskId()));
699703
}
700704

705+
void toFailed(Task task, TaskStatus reason) {
706+
updateTaskStatusesAndSave(task, reason, FAILED);
707+
applicationEventPublisher.publishEvent(new TaskFailedEvent(task.getChainTaskId()));
708+
}
709+
701710
void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
702711
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
703712
currentTaskStatusesCount.get(newStatus).incrementAndGet();

0 commit comments

Comments
 (0)