1919import com .iexec .common .replicate .*;
2020import com .iexec .common .utils .ContextualLockRunner ;
2121import com .iexec .commons .poco .chain .ChainContribution ;
22+ import com .iexec .commons .poco .chain .ChainTask ;
2223import com .iexec .commons .poco .notification .TaskNotificationType ;
2324import com .iexec .commons .poco .task .TaskDescription ;
2425import com .iexec .core .chain .IexecHubService ;
3031import lombok .extern .slf4j .Slf4j ;
3132import org .springframework .context .ApplicationEventPublisher ;
3233import org .springframework .dao .OptimisticLockingFailureException ;
34+ import org .springframework .data .mongodb .core .MongoTemplate ;
35+ import org .springframework .data .mongodb .core .query .Criteria ;
36+ import org .springframework .data .mongodb .core .query .Query ;
37+ import org .springframework .data .mongodb .core .query .Update ;
3338import org .springframework .retry .annotation .Recover ;
3439import org .springframework .retry .annotation .Retryable ;
3540import org .springframework .stereotype .Service ;
3641import org .springframework .util .StringUtils ;
42+ import org .web3j .utils .Numeric ;
3743
3844import java .util .Collections ;
3945import java .util .List ;
4753@ Service
4854public class ReplicatesService {
4955
56+ private final MongoTemplate mongoTemplate ;
5057 private final ReplicatesRepository replicatesRepository ;
5158 private final IexecHubService iexecHubService ;
5259 private final ApplicationEventPublisher applicationEventPublisher ;
@@ -57,12 +64,14 @@ public class ReplicatesService {
5764 private final ContextualLockRunner <String > replicatesUpdateLockRunner =
5865 new ContextualLockRunner <>(10 , TimeUnit .MINUTES );
5966
60- public ReplicatesService (ReplicatesRepository replicatesRepository ,
67+ public ReplicatesService (MongoTemplate mongoTemplate ,
68+ ReplicatesRepository replicatesRepository ,
6169 IexecHubService iexecHubService ,
6270 ApplicationEventPublisher applicationEventPublisher ,
6371 Web3jService web3jService ,
6472 ResultService resultService ,
6573 TaskLogsService taskLogsService ) {
74+ this .mongoTemplate = mongoTemplate ;
6675 this .replicatesRepository = replicatesRepository ;
6776 this .iexecHubService = iexecHubService ;
6877 this .applicationEventPublisher = applicationEventPublisher ;
@@ -101,26 +110,16 @@ public Optional<ReplicatesList> getReplicatesList(String chainTaskId) {
101110 }
102111
103112 public List <Replicate > getReplicates (String chainTaskId ) {
104- Optional <ReplicatesList > optionalList = getReplicatesList (chainTaskId );
105- if (optionalList .isEmpty ()) {
106- return Collections .emptyList ();
107- }
108- return optionalList .get ().getReplicates ();
113+ return getReplicatesList (chainTaskId )
114+ .map (ReplicatesList ::getReplicates )
115+ .orElse (Collections .emptyList ());
109116 }
110117
111118 public Optional <Replicate > getReplicate (String chainTaskId , String walletAddress ) {
112- Optional <ReplicatesList > optional = getReplicatesList (chainTaskId );
113- if (optional .isEmpty ()) {
114- return Optional .empty ();
115- }
116-
117- for (Replicate replicate : optional .get ().getReplicates ()) {
118- if (replicate .getWalletAddress ().equals (walletAddress )) {
119- return Optional .of (replicate );
120- }
121- }
122-
123- return Optional .empty ();
119+ return getReplicates (chainTaskId )
120+ .stream ()
121+ .filter (replicate -> replicate .getWalletAddress ().equals (walletAddress ))
122+ .findFirst ();
124123 }
125124
126125 public int getNbReplicatesWithCurrentStatus (String chainTaskId , ReplicateStatus ... listStatus ) {
@@ -250,6 +249,15 @@ public UpdateReplicateStatusArgs computeUpdateReplicateStatusArgs(String chainTa
250249 }
251250 }
252251
252+ if (statusUpdate .getStatus () == CONTRIBUTE_AND_FINALIZE_DONE ) {
253+ // TODO read chainCallbackData if CONTRIBUTE_AND_FINALIZE becomes applicable some day in the future and if latest ABI is used
254+ resultLink = iexecHubService .getChainTask (chainTaskId )
255+ .map (ChainTask ::getResults )
256+ .map (Numeric ::hexStringToByteArray )
257+ .map (String ::new )
258+ .orElse (null );
259+ }
260+
253261 return UpdateReplicateStatusArgs .builder ()
254262 .workerWeight (workerWeight )
255263 .chainContribution (chainContribution )
@@ -402,31 +410,34 @@ Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatusWi
402410 log .info ("Replicate update request [status:{}, chainTaskId:{}, walletAddress:{}, details:{}]" ,
403411 statusUpdate .getStatus (), chainTaskId , walletAddress , statusUpdate .getDetailsWithoutLogs ());
404412
405- final Optional < ReplicatesList > oReplicatesList = getReplicatesList (chainTaskId );
406- final Optional < Replicate > oReplicate = oReplicatesList
407- .flatMap ( replicatesList -> replicatesList . getReplicateOfWorker ( walletAddress ) );
408- if (oReplicatesList . isEmpty () || oReplicate . isEmpty () ) {
413+ final Replicate replicate = getReplicatesList (chainTaskId )
414+ . flatMap ( replicatesList -> replicatesList . getReplicateOfWorker ( walletAddress ))
415+ .orElse ( null );
416+ if (replicate == null ) {
409417 log .error ("Cannot update replicate, could not get replicate [chainTaskId:{}, UpdateRequest:{}]" ,
410418 chainTaskId , statusUpdate );
411419 return Either .left (ReplicateStatusUpdateError .UNKNOWN_REPLICATE );
412420 }
413- final ReplicatesList replicatesList = oReplicatesList .get ();
414- final Replicate replicate = oReplicate .get ();
415421 final ReplicateStatus newStatus = statusUpdate .getStatus ();
416422
417423 final ReplicateStatusUpdateError error = canUpdateReplicateStatus (replicate , statusUpdate , updateReplicateStatusArgs );
418424 if (ReplicateStatusUpdateError .NO_ERROR != error ) {
419425 return Either .left (error );
420426 }
421427
428+ Update update = new Update ();
422429 if (newStatus == CONTRIBUTED || newStatus == CONTRIBUTE_AND_FINALIZE_DONE ) {
423430 replicate .setContributionHash (updateReplicateStatusArgs .getChainContribution ().getResultHash ());
424431 replicate .setWorkerWeight (updateReplicateStatusArgs .getWorkerWeight ());
432+ update .set ("replicates.$.contributionHash" , updateReplicateStatusArgs .getChainContribution ().getResultHash ());
433+ update .set ("replicates.$.workerWeight" , updateReplicateStatusArgs .getWorkerWeight ());
425434 }
426435
427436 if (newStatus == RESULT_UPLOADED || newStatus == CONTRIBUTE_AND_FINALIZE_DONE ) {
428437 replicate .setResultLink (updateReplicateStatusArgs .getResultLink ());
429438 replicate .setChainCallbackData (updateReplicateStatusArgs .getChainCallbackData ());
439+ update .set ("replicates.$.resultLink" , updateReplicateStatusArgs .getResultLink ());
440+ update .set ("replicates.$.chainCallbackData" , updateReplicateStatusArgs .getChainCallbackData ());
430441 }
431442
432443 if (statusUpdate .getDetails () != null &&
@@ -436,10 +447,13 @@ Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatusWi
436447 taskLogsService .addComputeLogs (chainTaskId , computeLogs );
437448 statusUpdate .getDetails ().setComputeLogs (null );//using null here to keep light replicate
438449 replicate .setAppComputeLogsPresent (true );
450+ update .set ("replicates.$.appComputeLogsPresent" , true );
439451 }
440452
453+ update .push ("replicates.$.statusUpdateList" , statusUpdate );
454+ Query query = Query .query (Criteria .where ("chainTaskId" ).is (chainTaskId ).and ("replicates.walletAddress" ).is (walletAddress ));
455+ mongoTemplate .updateFirst (query , update , ReplicatesList .class );
441456 replicate .updateStatus (statusUpdate );
442- replicatesRepository .save (replicatesList );
443457 applicationEventPublisher .publishEvent (new ReplicateUpdatedEvent (chainTaskId , walletAddress , statusUpdate ));
444458 ReplicateStatusCause newStatusCause = statusUpdate .getDetails () != null ?
445459 statusUpdate .getDetails ().getCause () : null ;
0 commit comments