11/*
2- * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+ * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3232import com .iexec .worker .compute .post .PostComputeResponse ;
3333import com .iexec .worker .compute .pre .PreComputeResponse ;
3434import com .iexec .worker .dataset .DataService ;
35- import com .iexec .worker .docker .DockerService ;
3635import com .iexec .worker .pubsub .SubscriptionService ;
3736import com .iexec .worker .result .ResultService ;
37+ import com .iexec .worker .sms .SmsService ;
3838import com .iexec .worker .tee .TeeService ;
3939import com .iexec .worker .tee .TeeServicesManager ;
4040import com .iexec .worker .utils .LoggingUtils ;
4343import org .springframework .stereotype .Service ;
4444
4545import java .util .Optional ;
46- import java .util .function .Predicate ;
4746
4847import static com .iexec .common .replicate .ReplicateStatus .APP_DOWNLOAD_FAILED ;
4948import static com .iexec .common .replicate .ReplicateStatus .DATA_DOWNLOAD_FAILED ;
5049import static com .iexec .common .replicate .ReplicateStatusCause .*;
5150import static java .util .Objects .requireNonNull ;
5251
53-
5452@ Slf4j
5553@ Service
5654public class TaskManagerService {
@@ -65,7 +63,7 @@ public class TaskManagerService {
6563 private final TeeServicesManager teeServicesManager ;
6664 private final DataService dataService ;
6765 private final ResultService resultService ;
68- private final DockerService dockerService ;
66+ private final SmsService smsService ;
6967 private final SubscriptionService subscriptionService ;
7068 private final PurgeService purgeService ;
7169 private final String workerWalletAddress ;
@@ -78,7 +76,7 @@ public TaskManagerService(
7876 TeeServicesManager teeServicesManager ,
7977 DataService dataService ,
8078 ResultService resultService ,
81- DockerService dockerService ,
79+ SmsService smsService ,
8280 SubscriptionService subscriptionService ,
8381 PurgeService purgeService ,
8482 String workerWalletAddress ) {
@@ -89,7 +87,7 @@ public TaskManagerService(
8987 this .teeServicesManager = teeServicesManager ;
9088 this .dataService = dataService ;
9189 this .resultService = resultService ;
92- this .dockerService = dockerService ;
90+ this .smsService = smsService ;
9391 this .subscriptionService = subscriptionService ;
9492 this .purgeService = purgeService ;
9593 this .workerWalletAddress = workerWalletAddress ;
@@ -121,7 +119,12 @@ ReplicateActionResponse start(TaskDescription taskDescription) {
121119 log .error ("TEE prerequisites are not met [chainTaskId: {}, issue: {}]" , chainTaskId , teePrerequisitesIssue .get ());
122120 return getFailureResponseAndPrintError (teePrerequisitesIssue .get (), context , chainTaskId );
123121 }
122+
123+ final WorkerpoolAuthorization workerpoolAuthorization = contributionService .getWorkerpoolAuthorization (chainTaskId );
124+ final String token = resultService .getIexecUploadToken (workerpoolAuthorization );
125+ smsService .pushToken (workerpoolAuthorization , token );
124126 }
127+
125128 return ReplicateActionResponse .success ();
126129 }
127130
@@ -342,8 +345,9 @@ private ReplicateActionResponse contributeOrContributeAndFinalize(String chainTa
342345 context , chainTaskId );
343346 }
344347
348+ final WorkerpoolAuthorization workerpoolAuthorization = contributionService .getWorkerpoolAuthorization (chainTaskId );
345349 String callbackData = computedFile .getCallbackData ();
346- String resultLink = resultService .uploadResultAndGetLink (chainTaskId );
350+ String resultLink = resultService .uploadResultAndGetLink (workerpoolAuthorization );
347351 log .debug ("contributeAndFinalize [contribution:{}, resultLink:{}, callbackData:{}]" ,
348352 contribution , resultLink , callbackData );
349353 Optional <ChainReceipt > oChainReceipt = iexecHubService .contributeAndFinalize (contribution , resultLink , callbackData );
@@ -418,16 +422,16 @@ ReplicateActionResponse reveal(String chainTaskId,
418422 }
419423
420424 ReplicateActionResponse uploadResult (String chainTaskId ) {
421- String resultLink = resultService .uploadResultAndGetLink (chainTaskId );
425+ final WorkerpoolAuthorization workerpoolAuthorization = contributionService .getWorkerpoolAuthorization (chainTaskId );
426+ String resultLink = resultService .uploadResultAndGetLink (workerpoolAuthorization );
422427 String context = "upload result" ;
423428 if (resultLink .isEmpty ()) {
424429 return getFailureResponseAndPrintError (RESULT_LINK_MISSING ,
425430 context , chainTaskId
426431 );
427432 }
428433
429- ComputedFile computedFile =
430- resultService .getComputedFile (chainTaskId );
434+ ComputedFile computedFile = resultService .getComputedFile (chainTaskId );
431435 String callbackData = computedFile != null ?
432436 computedFile .getCallbackData () : "" ;
433437
@@ -456,18 +460,18 @@ ReplicateActionResponse complete(String chainTaskId) {
456460 * related to the task in question, unsubscribe from the task's notifications,
457461 * then remove result folders.
458462 *
459- * @param chainTaskId
460- * @return
463+ * @param chainTaskId Task ID
464+ * @return {@literal true} if all cleanup operations went well, {@literal false} otherwise
461465 */
462466 boolean abort (String chainTaskId ) {
463467 log .info ("Aborting task [chainTaskId:{}]" , chainTaskId );
464- Predicate <String > containsChainTaskId = name -> name .contains (chainTaskId );
465- dockerService .stopRunningContainersWithNamePredicate (containsChainTaskId );
466- log .info ("Stopped task containers [chainTaskId:{}]" , chainTaskId );
467468 subscriptionService .unsubscribeFromTopic (chainTaskId );
468- boolean isSuccess = purgeService .purgeAllServices (chainTaskId );
469+ boolean allContainersStopped = computeManagerService .abort (chainTaskId );
470+ boolean allServicesPurged = purgeService .purgeAllServices (chainTaskId );
471+ final boolean isSuccess = allContainersStopped && allServicesPurged ;
469472 if (!isSuccess ) {
470- log .error ("Failed to abort task [chainTaskId:{}]" , chainTaskId );
473+ log .error ("Failed to abort task [chainTaskId:{}, containers:{}, services:{}]" ,
474+ chainTaskId , allContainersStopped , allServicesPurged );
471475 }
472476 return isSuccess ;
473477 }
0 commit comments