1818
1919import com .iexec .common .utils .ContextualLockRunner ;
2020import com .iexec .core .configuration .WorkerConfiguration ;
21+ import lombok .Data ;
22+ import lombok .Getter ;
2123import lombok .extern .slf4j .Slf4j ;
2224import org .springframework .stereotype .Service ;
2325
24- import java .util .Collections ;
25- import java .util .Date ;
26- import java .util .List ;
27- import java .util .Optional ;
26+ import java .util .*;
27+ import java .util .concurrent .ConcurrentHashMap ;
28+ import java .util .stream .Collectors ;
2829
2930import static com .iexec .common .utils .DateTimeUtils .addMinutesToDate ;
3031
@@ -42,6 +43,19 @@ public class WorkerService {
4243 private final WorkerRepository workerRepository ;
4344 private final WorkerConfiguration workerConfiguration ;
4445 private final ContextualLockRunner <String > contextualLockRunner ;
46+ @ Getter
47+ private final ConcurrentHashMap <String , WorkerStats > workerStatsMap = new ConcurrentHashMap <>();
48+
49+ @ Data
50+ public static class WorkerStats {
51+ private String walletAddress ;
52+ private Date lastAliveDate ;
53+ private Date lastReplicateDemandDate ;
54+
55+ public WorkerStats (String walletAddress ) {
56+ this .walletAddress = walletAddress ;
57+ }
58+ }
4559
4660 public WorkerService (WorkerRepository workerRepository ,
4761 WorkerConfiguration workerConfiguration ) {
@@ -65,29 +79,19 @@ public boolean isAllowedToJoin(String workerAddress){
6579 }
6680
6781 public boolean isWorkerAllowedToAskReplicate (String walletAddress ) {
68- Optional < Date > oDate = getLastReplicateDemand (walletAddress );
69- if (oDate . isEmpty () ) {
82+ Date lastReplicateDemandDate = workerStatsMap . get (walletAddress ). getLastReplicateDemandDate ( );
83+ if (lastReplicateDemandDate == null ) {
7084 return true ;
7185 }
7286
7387 // the difference between now and the last time the worker asked for work should be less than the period allowed
7488 // in the configuration (500ms since (now - lastAsk) can still be slightly too small even if the worker behave nicely)
7589 long now = new Date ().getTime ();
76- long lastAsk = oDate . get () .getTime ();
90+ long lastAsk = lastReplicateDemandDate .getTime ();
7791
7892 return (now - lastAsk ) + 500 > workerConfiguration .getAskForReplicatePeriod ();
7993 }
8094
81- public Optional <Date > getLastReplicateDemand (String walletAddress ) {
82- Optional <Worker > optional = workerRepository .findByWalletAddress (walletAddress );
83- if (optional .isEmpty ()) {
84- return Optional .empty ();
85- }
86- Worker worker = optional .get ();
87-
88- return Optional .ofNullable (worker .getLastReplicateDemandDate ());
89- }
90-
9195 public List <String > getChainTaskIds (String walletAddress ) {
9296 Optional <Worker > optional = workerRepository .findByWalletAddress (walletAddress );
9397 if (optional .isPresent ()) {
@@ -110,13 +114,23 @@ public List<String> getComputingTaskIds(String walletAddress) {
110114 // worker is considered lost if it didn't ping for 1 minute
111115 public List <Worker > getLostWorkers () {
112116 Date oneMinuteAgo = addMinutesToDate (new Date (), -1 );
113- return workerRepository .findByLastAliveDateBefore (oneMinuteAgo );
117+ List <String > lostWorkers = workerStatsMap .entrySet ()
118+ .stream ()
119+ .filter (entry -> entry .getValue ().getLastAliveDate ().getTime () < oneMinuteAgo .getTime ())
120+ .map (Map .Entry ::getKey )
121+ .collect (Collectors .toList ());
122+ return workerRepository .findAllByWalletAddress (lostWorkers );
114123 }
115124
116- // worker is considered alive if it ping after 1 minute
125+ // worker is considered alive if it received a ping during the last minute
117126 public List <Worker > getAliveWorkers () {
118127 Date oneMinuteAgo = addMinutesToDate (new Date (), -1 );
119- return workerRepository .findByLastAliveDateAfter (oneMinuteAgo );
128+ List <String > aliveWorkers = workerStatsMap .entrySet ()
129+ .stream ()
130+ .filter (entry -> entry .getValue ().getLastAliveDate ().getTime () > oneMinuteAgo .getTime ())
131+ .map (Map .Entry ::getKey )
132+ .collect (Collectors .toList ());
133+ return workerRepository .findAllByWalletAddress (aliveWorkers );
120134 }
121135
122136 public boolean canAcceptMoreWorks (String walletAddress ) {
@@ -130,7 +144,7 @@ public boolean canAcceptMoreWorks(String walletAddress) {
130144 int runningReplicateNb = worker .getComputingChainTaskIds ().size ();
131145
132146 if (runningReplicateNb >= workerMaxNbTasks ) {
133- log .debug ("Worker asking for too many replicates [walletAddress: {}, runningReplicateNb:{}, workerMaxNbTasks:{}]" ,
147+ log .debug ("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]" ,
134148 walletAddress , runningReplicateNb , workerMaxNbTasks );
135149 return false ;
136150 }
@@ -191,6 +205,7 @@ public int getAliveAvailableGpu () {
191205
192206 // region Read-and-write methods
193207 public Worker addWorker (Worker worker ) {
208+ updateLastAlive (worker .getWalletAddress ());
194209 return contextualLockRunner .applyWithLock (
195210 worker .getWalletAddress (),
196211 address -> addWorkerWithoutThreadSafety (worker )
@@ -213,42 +228,14 @@ private Worker addWorkerWithoutThreadSafety(Worker worker) {
213228 return workerRepository .save (worker );
214229 }
215230
216- public Optional <Worker > updateLastAlive (String walletAddress ) {
217- return contextualLockRunner .applyWithLock (
218- walletAddress ,
219- this ::updateLastAliveWithoutThreadSafety
220- );
231+ public void updateLastAlive (String walletAddress ) {
232+ workerStatsMap .computeIfAbsent (walletAddress , WorkerStats ::new )
233+ .setLastAliveDate (new Date ());
221234 }
222235
223- private Optional <Worker > updateLastAliveWithoutThreadSafety (String walletAddress ) {
224- Optional <Worker > optional = workerRepository .findByWalletAddress (walletAddress );
225- if (optional .isPresent ()) {
226- Worker worker = optional .get ();
227- worker .setLastAliveDate (new Date ());
228- workerRepository .save (worker );
229- return Optional .of (worker );
230- }
231-
232- return Optional .empty ();
233- }
234-
235- public Optional <Worker > updateLastReplicateDemandDate (String walletAddress ) {
236- return contextualLockRunner .applyWithLock (
237- walletAddress ,
238- this ::updateLastReplicateDemandDateWithoutThreadSafety
239- );
240- }
241-
242- private Optional <Worker > updateLastReplicateDemandDateWithoutThreadSafety (String walletAddress ) {
243- Optional <Worker > optional = workerRepository .findByWalletAddress (walletAddress );
244- if (optional .isPresent ()) {
245- Worker worker = optional .get ();
246- worker .setLastReplicateDemandDate (new Date ());
247- workerRepository .save (worker );
248- return Optional .of (worker );
249- }
250-
251- return Optional .empty ();
236+ public void updateLastReplicateDemandDate (String walletAddress ) {
237+ workerStatsMap .computeIfAbsent (walletAddress , WorkerStats ::new )
238+ .setLastReplicateDemandDate (new Date ());
252239 }
253240
254241 public Optional <Worker > addChainTaskIdToWorker (String chainTaskId , String walletAddress ) {
0 commit comments