2020import com .iexec .core .configuration .WorkerConfiguration ;
2121import com .mongodb .client .result .UpdateResult ;
2222import io .micrometer .core .instrument .Metrics ;
23- import jakarta .annotation .PostConstruct ;
2423import lombok .Data ;
2524import lombok .Getter ;
2625import lombok .extern .slf4j .Slf4j ;
@@ -50,15 +49,19 @@ public class WorkerService {
5049
5150 private static final String WALLET_ADDRESS_FIELD = "walletAddress" ;
5251 public static final String METRIC_WORKERS_GAUGE = "iexec.core.workers" ;
53- public static final String METRIC_CPU_TOTAL_GAUGE = "iexec.core.cpu.total" ;
54- public static final String METRIC_CPU_AVAILABLE_GAUGE = "iexec.core.cpu.available" ;
52+ public static final String METRIC_CPU_COMPUTING_GAUGE = "iexec.core.cpu.computing" ;
53+ public static final String METRIC_CPU_REGISTERED_GAUGE = "iexec.core.cpu.registered" ;
54+ public static final String METRIC_GPU_COMPUTING_GAUGE = "iexec.core.gpu.computing" ;
55+ public static final String METRIC_GPU_REGISTERED_GAUGE = "iexec.core.gpu.registered" ;
5556 private final MongoTemplate mongoTemplate ;
5657 private final WorkerRepository workerRepository ;
5758 private final WorkerConfiguration workerConfiguration ;
5859 private final ContextualLockRunner <String > contextualLockRunner ;
59- private AtomicInteger aliveWorkersGauge ;
60- private AtomicInteger aliveTotalCpuGauge ;
61- private AtomicInteger aliveAvailableCpuGauge ;
60+ private final AtomicInteger aliveWorkersGauge ;
61+ private final AtomicInteger aliveComputingCpuGauge ;
62+ private final AtomicInteger aliveRegisteredCpuGauge ;
63+ private final AtomicInteger aliveComputingGpuGauge ;
64+ private final AtomicInteger aliveRegisteredGpuGauge ;
6265 @ Getter
6366 private final ConcurrentHashMap <String , WorkerStats > workerStatsMap = new ConcurrentHashMap <>();
6467
@@ -80,38 +83,52 @@ public WorkerService(MongoTemplate mongoTemplate,
8083 this .workerRepository = workerRepository ;
8184 this .workerConfiguration = workerConfiguration ;
8285 this .contextualLockRunner = new ContextualLockRunner <>();
83- }
8486
85- @ PostConstruct
86- void init () {
87- aliveWorkersGauge = Metrics .gauge (METRIC_WORKERS_GAUGE , new AtomicInteger (getAliveWorkers (). size () ));
88- aliveTotalCpuGauge = Metrics .gauge (METRIC_CPU_TOTAL_GAUGE , new AtomicInteger (getAliveTotalCpu () ));
89- aliveAvailableCpuGauge = Metrics .gauge (METRIC_CPU_AVAILABLE_GAUGE , new AtomicInteger (getAliveAvailableCpu () ));
87+ this . aliveWorkersGauge = Metrics . gauge ( METRIC_WORKERS_GAUGE , new AtomicInteger ( 0 ));
88+ this . aliveComputingCpuGauge = Metrics . gauge ( METRIC_CPU_COMPUTING_GAUGE , new AtomicInteger ( 0 ));
89+ this . aliveRegisteredCpuGauge = Metrics .gauge (METRIC_CPU_REGISTERED_GAUGE , new AtomicInteger (0 ));
90+ this . aliveComputingGpuGauge = Metrics .gauge (METRIC_GPU_COMPUTING_GAUGE , new AtomicInteger (0 ));
91+ this . aliveRegisteredGpuGauge = Metrics .gauge (METRIC_GPU_REGISTERED_GAUGE , new AtomicInteger (0 ));
9092 }
9193
9294 /**
9395 * updateMetrics is used to update all workers metrics
9496 */
95- @ Scheduled (fixedDelayString = "${cron.metrics.refresh.period}" , initialDelayString = "${cron.metrics.refresh.period}" )
97+ @ Scheduled (fixedDelayString = "${cron.metrics.refresh.period}" )
9698 void updateMetrics () {
9799 // Fusion of methods getAliveTotalCpu and getAliveAvailableCpu to prevent making 3 calls to getAliveWorkers
98- int availableCpus = 0 ;
99- int totalCpus = 0 ;
100- List <Worker > workers = getAliveWorkers ();
101- for (Worker worker : workers ) {
100+ int computingCpus = 0 ;
101+ int registeredCpus = 0 ;
102+ int computingGpus = 0 ;
103+ int registeredGpus = 0 ;
104+ final List <Worker > workers = getAliveWorkers ();
105+ for (final Worker worker : workers ) {
102106 if (worker .isGpuEnabled ()) {
103- continue ;
107+ registeredGpus ++;
108+ if (!worker .getComputingChainTaskIds ().isEmpty ()) {
109+ computingGpus ++;
110+ }
111+ } else {
112+ registeredCpus += worker .getCpuNb ();
113+ computingCpus += worker .getComputingChainTaskIds ().size ();
104114 }
105- int workerCpuNb = worker .getCpuNb ();
106- int computingReplicateNb = worker .getComputingChainTaskIds ().size ();
107- int availableCpu = workerCpuNb - computingReplicateNb ;
108- totalCpus += workerCpuNb ;
109- availableCpus += availableCpu ;
110115 }
111116
112117 aliveWorkersGauge .set (workers .size ());
113- aliveTotalCpuGauge .set (totalCpus );
114- aliveAvailableCpuGauge .set (availableCpus );
118+ aliveComputingCpuGauge .set (computingCpus );
119+ aliveRegisteredCpuGauge .set (registeredCpus );
120+ aliveComputingGpuGauge .set (computingGpus );
121+ aliveRegisteredGpuGauge .set (registeredGpus );
122+ }
123+
124+ public AliveWorkerMetrics getAliveWorkerMetrics () {
125+ return AliveWorkerMetrics .builder ()
126+ .aliveWorkers (aliveWorkersGauge .get ())
127+ .aliveComputingCpu (aliveComputingCpuGauge .get ())
128+ .aliveRegisteredCpu (aliveRegisteredCpuGauge .get ())
129+ .aliveComputingGpu (aliveComputingGpuGauge .get ())
130+ .aliveRegisteredGpu (aliveRegisteredGpuGauge .get ())
131+ .build ();
115132 }
116133
117134 // region Read methods
@@ -120,12 +137,9 @@ public Optional<Worker> getWorker(String walletAddress) {
120137 }
121138
122139 public boolean isAllowedToJoin (String workerAddress ) {
123- List <String > whitelist = workerConfiguration .getWhitelist ();
140+ final List <String > whitelist = workerConfiguration .getWhitelist ();
124141 // if the whitelist is empty, there is no restriction on the workers
125- if (whitelist .isEmpty ()) {
126- return true ;
127- }
128- return whitelist .contains (workerAddress );
142+ return whitelist .isEmpty () || whitelist .contains (workerAddress );
129143 }
130144
131145 public boolean isWorkerAllowedToAskReplicate (String walletAddress ) {
@@ -190,56 +204,6 @@ public boolean canAcceptMoreWorks(Worker worker) {
190204
191205 return true ;
192206 }
193-
194- public int getAliveAvailableCpu () {
195- int availableCpus = 0 ;
196- for (Worker worker : getAliveWorkers ()) {
197- if (worker .isGpuEnabled ()) {
198- continue ;
199- }
200-
201- int workerCpuNb = worker .getCpuNb ();
202- int computingReplicateNb = worker .getComputingChainTaskIds ().size ();
203- int availableCpu = workerCpuNb - computingReplicateNb ;
204- availableCpus += availableCpu ;
205- }
206- return availableCpus ;
207- }
208-
209- public int getAliveTotalCpu () {
210- int totalCpus = 0 ;
211- for (Worker worker : getAliveWorkers ()) {
212- if (worker .isGpuEnabled ()) {
213- continue ;
214- }
215- totalCpus += worker .getCpuNb ();
216- }
217- return totalCpus ;
218- }
219-
220- // We suppose for now that 1 Gpu enabled worker has only one GPU
221- public int getAliveTotalGpu () {
222- int totalGpus = 0 ;
223- for (Worker worker : getAliveWorkers ()) {
224- if (worker .isGpuEnabled ()) {
225- totalGpus ++;
226- }
227- }
228- return totalGpus ;
229- }
230-
231- public int getAliveAvailableGpu () {
232- int availableGpus = getAliveTotalGpu ();
233- for (Worker worker : getAliveWorkers ()) {
234- if (worker .isGpuEnabled ()) {
235- boolean isWorking = !worker .getComputingChainTaskIds ().isEmpty ();
236- if (isWorking ) {
237- availableGpus = availableGpus - 1 ;
238- }
239- }
240- }
241- return availableGpus ;
242- }
243207 // endregion
244208
245209 // region Read-and-write methods
0 commit comments