2525import com .iexec .core .replicate .Replicate ;
2626import com .iexec .core .replicate .ReplicatesService ;
2727import com .iexec .core .task .event .*;
28- import com .iexec .core .task .executor .TaskExecutorEngine ;
28+ import com .iexec .core .task .update .TaskUpdateRequestConsumer ;
29+ import com .iexec .core .task .update .TaskUpdateRequestManager ;
2930import lombok .extern .slf4j .Slf4j ;
3031import org .apache .commons .lang3 .tuple .Pair ;
3132import org .springframework .context .ApplicationEventPublisher ;
3233import org .springframework .stereotype .Service ;
3334
34- import java .util .Arrays ;
35- import java .util .Date ;
36- import java .util .List ;
37- import java .util .Optional ;
38- import java .util .concurrent .CompletableFuture ;
39- import java .util .concurrent .ConcurrentHashMap ;
35+ import java .util .*;
36+ import java .util .concurrent .*;
4037import java .util .stream .Collectors ;
4138
4239import static com .iexec .core .task .TaskStatus .*;
4340
4441@ Slf4j
4542@ Service
46- public class TaskService {
43+ public class TaskService implements TaskUpdateRequestConsumer {
4744
4845 private final ConcurrentHashMap <String , Boolean >
4946 taskAccessForNewReplicateLock = new ConcurrentHashMap <>();
5047
51- private TaskRepository taskRepository ;
52- private TaskExecutorEngine taskExecutorEngine ;
53- private IexecHubService iexecHubService ;
54- private ReplicatesService replicatesService ;
55- private ApplicationEventPublisher applicationEventPublisher ;
56- private Web3jService web3jService ;
48+ private final TaskRepository taskRepository ;
49+ private final TaskUpdateRequestManager taskUpdateRequestManager ;
50+ private final IexecHubService iexecHubService ;
51+ private final ReplicatesService replicatesService ;
52+ private final ApplicationEventPublisher applicationEventPublisher ;
53+ private final Web3jService web3jService ;
5754
5855 public TaskService (
5956 TaskRepository taskRepository ,
60- TaskExecutorEngine taskExecutorEngine ,
57+ TaskUpdateRequestManager taskUpdateRequestManager ,
6158 IexecHubService iexecHubService ,
6259 ReplicatesService replicatesService ,
6360 ApplicationEventPublisher applicationEventPublisher ,
6461 Web3jService web3jService
6562 ) {
6663 this .taskRepository = taskRepository ;
67- this .taskExecutorEngine = taskExecutorEngine ;
64+ this .taskUpdateRequestManager = taskUpdateRequestManager ;
6865 this .iexecHubService = iexecHubService ;
6966 this .replicatesService = replicatesService ;
7067 this .applicationEventPublisher = applicationEventPublisher ;
7168 this .web3jService = web3jService ;
69+ this .taskUpdateRequestManager .setRequestConsumer (this );
7270 }
7371
7472 /**
@@ -143,6 +141,10 @@ public List<Task> getTasksInNonFinalStatuses() {
143141 return taskRepository .findByCurrentStatusNotIn (TaskStatus .getFinalStatuses ());
144142 }
145143
144+ public List <Task > getTasksWhereFinalDeadlineIsPossible () {
145+ return taskRepository .findByCurrentStatusNotIn (TaskStatus .getStatusesWhereFinalDeadlineIsImpossible ());
146+ }
147+
146148 public List <String > getChainTaskIdsOfTasksExpiredBefore (Date expirationDate ) {
147149 return taskRepository .findChainTaskIdsByFinalDeadlineBefore (expirationDate )
148150 .stream ()
@@ -191,47 +193,22 @@ public Date getTaskFinalDeadline(String chainTaskId) {
191193 }
192194
193195 /**
194- * Update task asynchronously.
195- *
196+ * Async method for requesting task update
196197 * @param chainTaskId
197198 * @return
198199 */
199- // TODO change this mechanism of update
200- public CompletableFuture <Void > updateTask (String chainTaskId ) {
201- Optional <Task > oTask = getTaskByChainTaskId (chainTaskId );
202- if (oTask .isEmpty ()) {
203- return CompletableFuture .completedFuture (null );
204- }
205- Task task = oTask .get ();
206- Date finalDeadline = task .getFinalDeadline ();
207- if (finalDeadline == null ) {
208- log .error ("Cannot update task without final deadline [chainTaskId:{}]" ,
209- chainTaskId );
210- return CompletableFuture .completedFuture (null );
211- }
212- long expiration = finalDeadline .getTime () - new Date ().getTime ();
213- return taskExecutorEngine .run (
214- chainTaskId ,
215- expiration ,
216- () -> updateTaskRunnable (chainTaskId )
217- );
200+ public CompletableFuture <Boolean > updateTask (String chainTaskId ) {
201+ return taskUpdateRequestManager .publishRequest (chainTaskId );
218202 }
219203
220204 /**
221- * Remove task's executor if task is
222- * in final status.
223- *
224- * @param task
205+ * Async called when a task update request is received
206+ * @param chainTaskId
225207 */
226- public void removeTaskExecutor (Task task ) {
227- if (!TaskStatus .isFinalStatus (task .getCurrentStatus ())) {
228- log .error ("Cannot remove executor for unfinished " +
229- "task [chainTaskId:{}]" , task .getChainTaskId ());
230- return ;
231- }
232- taskExecutorEngine .removeExecutor (task .getChainTaskId ());
233- log .info ("Removed task executor [chainTaskId:{}]" ,
234- task .getChainTaskId ());
208+ @ Override
209+ public void onTaskUpdateRequest (String chainTaskId ) {
210+ log .info ("Received task update request [chainTaskId:{}]" , chainTaskId );
211+ this .updateTaskRunnable (chainTaskId );
235212 }
236213
237214 void updateTaskRunnable (String chainTaskId ) {
@@ -240,8 +217,20 @@ void updateTaskRunnable(String chainTaskId) {
240217 return ;
241218 }
242219 Task task = optional .get ();
220+ TaskStatus currentStatus = task .getCurrentStatus ();
221+
222+ boolean isFinalDeadlinePossible =
223+ !TaskStatus .getStatusesWhereFinalDeadlineIsImpossible ().contains (currentStatus );
224+ if (isFinalDeadlinePossible && new Date ().after (task .getFinalDeadline ())){
225+ updateTaskStatusAndSave (task , FINAL_DEADLINE_REACHED );
226+ // Eventually should fire a "final deadline reached" notification to worker,
227+ // but here let's just trigger an updateTask() leading to a failed status
228+ // which will itself fire a generic "abort" notification
229+ updateTask (chainTaskId );
230+ return ;
231+ }
243232
244- switch (task . getCurrentStatus () ) {
233+ switch (currentStatus ) {
245234 case RECEIVED :
246235 received2Initialized (task );
247236 break ;
@@ -303,9 +292,11 @@ void updateTaskRunnable(String chainTaskId) {
303292 case FINALIZE_FAILED :
304293 toFailed (task );
305294 break ;
295+ case FINAL_DEADLINE_REACHED :
296+ toFailed (task );
297+ break ;
306298 case COMPLETED :
307299 case FAILED :
308- removeTaskExecutor (task );
309300 break ;
310301 }
311302 }
@@ -692,6 +683,7 @@ private void finalizedToCompleted(Task task) {
692683
693684 private void toFailed (Task task ) {
694685 updateTaskStatusAndSave (task , FAILED );
686+ applicationEventPublisher .publishEvent (new TaskFailedEvent (task .getChainTaskId ()));
695687 }
696688 public void initializeTaskAccessForNewReplicateLock (String chainTaskId ) {
697689 taskAccessForNewReplicateLock .putIfAbsent (chainTaskId , false );
@@ -713,4 +705,5 @@ private void setTaskAccessForNewReplicateLock(String chainTaskId, boolean isTask
713705 taskAccessForNewReplicateLock .replace (chainTaskId , isTaskBeingAccessedForNewReplicate );
714706 }
715707
708+
716709}
0 commit comments