Skip to content

Commit 39cf75e

Browse files
committed
Expose current task statuses count on /metrics endpoint
1 parent 1cdeed2 commit 39cf75e

File tree

7 files changed

+143
-37
lines changed

7 files changed

+143
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ All notable changes to this project will be documented in this file.
1212
- Add prometheus endpoint with custom metrics. (#632)
1313
- Expose version through prometheus endpoint. (#637, #639)
1414
- Stop fetching completed tasks count from DB. (#638)
15-
- Expose current task statuses count to Prometheus. (#640)
15+
- Expose current task statuses count to Prometheus and `/metrics` endpoint. (#640, #654)
1616
- Add `tasks` endpoints to `iexec-core-library`. (#645)
1717

1818
### Quality

iexec-core-library/src/main/java/com/iexec/core/metric/PlatformMetric.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
2020
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
21+
import com.iexec.core.task.TaskStatus;
2122
import lombok.Builder;
2223
import lombok.Value;
2324

2425
import java.math.BigInteger;
26+
import java.util.LinkedHashMap;
27+
import java.util.concurrent.atomic.AtomicLong;
2528

2629
@Value
2730
@Builder
@@ -32,7 +35,7 @@ public class PlatformMetric {
3235
int aliveAvailableCpu;
3336
int aliveTotalGpu;
3437
int aliveAvailableGpu;
35-
long completedTasks;
38+
LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusCounts;
3639
long dealEventsCount;
3740
long dealsCount;
3841
long replayDealsCount;

src/main/java/com/iexec/core/metric/MetricService.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717
package com.iexec.core.metric;
1818

1919
import com.iexec.core.chain.DealWatcherService;
20-
import com.iexec.core.task.TaskService;
20+
import com.iexec.core.task.TaskStatus;
21+
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
2122
import com.iexec.core.worker.WorkerService;
23+
import org.springframework.context.event.EventListener;
2224
import org.springframework.stereotype.Service;
2325

26+
import java.util.LinkedHashMap;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
2429
@Service
2530
public class MetricService {
2631
private final DealWatcherService dealWatcherService;
2732
private final WorkerService workerService;
28-
private final TaskService taskService;
33+
private LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;
2934

3035
public MetricService(DealWatcherService dealWatcherService,
31-
WorkerService workerService,
32-
TaskService taskService) {
36+
WorkerService workerService) {
3337
this.dealWatcherService = dealWatcherService;
3438
this.workerService = workerService;
35-
this.taskService = taskService;
39+
40+
this.currentTaskStatusesCount = new LinkedHashMap<>();
3641
}
3742

3843
public PlatformMetric getPlatformMetrics() {
@@ -42,11 +47,16 @@ public PlatformMetric getPlatformMetrics() {
4247
.aliveAvailableCpu(workerService.getAliveAvailableCpu())
4348
.aliveTotalGpu(workerService.getAliveTotalGpu())
4449
.aliveAvailableGpu(workerService.getAliveAvailableGpu())
45-
.completedTasks(taskService.getCompletedTasksCount())
50+
.currentTaskStatusCounts(currentTaskStatusesCount)
4651
.dealEventsCount(dealWatcherService.getDealEventsCount())
4752
.dealsCount(dealWatcherService.getDealsCount())
4853
.replayDealsCount(dealWatcherService.getReplayDealsCount())
4954
.latestBlockNumberWithDeal(dealWatcherService.getLatestBlockNumberWithDeal())
5055
.build();
5156
}
57+
58+
@EventListener
59+
void onTaskStatusesCountUpdateEvent(TaskStatusesCountUpdatedEvent event) {
60+
this.currentTaskStatusesCount = event.getCurrentTaskStatusesCount();
61+
}
5262
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2024-2024 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.task.event;
18+
19+
import com.iexec.core.task.TaskStatus;
20+
import lombok.Value;
21+
22+
import java.util.LinkedHashMap;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
@Value
26+
public class TaskStatusesCountUpdatedEvent {
27+
LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;
28+
}

src/main/java/com/iexec/core/task/update/TaskUpdateManager.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class TaskUpdateManager {
6565
private final BlockchainAdapterService blockchainAdapterService;
6666
private final SmsService smsService;
6767

68-
private final Map<TaskStatus, AtomicLong> currentTaskStatusesCount;
68+
private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;
6969
private final ExecutorService taskStatusesCountExecutor;
7070

7171
public TaskUpdateManager(TaskService taskService,
@@ -104,18 +104,25 @@ public TaskUpdateManager(TaskService taskService,
104104
@PostConstruct
105105
Future<Void> init() {
106106
return taskStatusesCountExecutor.submit(
107-
// The following could take a bit of time, depending on how many tasks are in DB.
108-
// It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
109-
// As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
110-
// even though new deals are detected during the count.
111-
() -> currentTaskStatusesCount
112-
.entrySet()
113-
.parallelStream()
114-
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey()))),
107+
this::initializeCurrentTaskStatusesCount,
115108
null // Trick to get a `Future<Void>` instead of a `Future<?>`
116109
);
117110
}
118111

112+
/**
113+
* The following could take a bit of time, depending on how many tasks are in DB.
114+
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
115+
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
116+
* even though new deals are detected during the count.
117+
*/
118+
private void initializeCurrentTaskStatusesCount() {
119+
currentTaskStatusesCount
120+
.entrySet()
121+
.parallelStream()
122+
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey())));
123+
publishTaskStatusesCountUpdate();
124+
}
125+
119126
void updateTask(String chainTaskId) {
120127
Optional<Task> optional = taskService.getTaskByChainTaskId(chainTaskId);
121128
if (optional.isEmpty()) {
@@ -710,10 +717,17 @@ void toFailed(Task task, TaskStatus reason) {
710717
void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
711718
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
712719
currentTaskStatusesCount.get(newStatus).incrementAndGet();
720+
publishTaskStatusesCountUpdate();
713721
}
714722

715723
@EventListener(TaskCreatedEvent.class)
716724
void onTaskCreatedEvent() {
717725
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
726+
publishTaskStatusesCountUpdate();
727+
}
728+
729+
private void publishTaskStatusesCountUpdate() {
730+
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCount);
731+
applicationEventPublisher.publishEvent(event);
718732
}
719733
}

src/test/java/com/iexec/core/metric/MetricServiceTests.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717
package com.iexec.core.metric;
1818

1919
import com.iexec.core.chain.DealWatcherService;
20-
import com.iexec.core.task.TaskService;
20+
import com.iexec.core.task.TaskStatus;
21+
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
2122
import com.iexec.core.worker.Worker;
2223
import com.iexec.core.worker.WorkerService;
24+
import org.junit.jupiter.api.Assertions;
2325
import org.junit.jupiter.api.BeforeEach;
2426
import org.junit.jupiter.api.Test;
2527
import org.mockito.InjectMocks;
2628
import org.mockito.Mock;
2729
import org.mockito.MockitoAnnotations;
2830

2931
import java.math.BigInteger;
32+
import java.util.LinkedHashMap;
3033
import java.util.List;
34+
import java.util.concurrent.atomic.AtomicLong;
3135

3236
import static org.assertj.core.api.Assertions.assertThat;
3337
import static org.mockito.Mockito.when;
@@ -38,8 +42,6 @@ class MetricServiceTests {
3842
private DealWatcherService dealWatcherService;
3943
@Mock
4044
private WorkerService workerService;
41-
@Mock
42-
private TaskService taskService;
4345

4446
@InjectMocks
4547
private MetricService metricService;
@@ -51,30 +53,60 @@ void init() {
5153

5254
@Test
5355
void shouldGetPlatformMetrics() {
56+
final LinkedHashMap<TaskStatus, AtomicLong> expectedCurrentTaskStatusesCount = createExpectedCurrentTaskStatusesCount();
57+
5458
List<Worker> aliveWorkers = List.of(new Worker());
5559
when(workerService.getAliveWorkers()).thenReturn(aliveWorkers);
5660
when(workerService.getAliveTotalCpu()).thenReturn(1);
5761
when(workerService.getAliveAvailableCpu()).thenReturn(1);
5862
when(workerService.getAliveTotalGpu()).thenReturn(1);
5963
when(workerService.getAliveAvailableGpu()).thenReturn(1);
60-
when(taskService.getCompletedTasksCount())
61-
.thenReturn(10L);
6264
when(dealWatcherService.getDealEventsCount()).thenReturn(10L);
6365
when(dealWatcherService.getDealsCount()).thenReturn(8L);
6466
when(dealWatcherService.getReplayDealsCount()).thenReturn(2L);
6567
when(dealWatcherService.getLatestBlockNumberWithDeal()).thenReturn(BigInteger.valueOf(255L));
6668

6769
PlatformMetric metric = metricService.getPlatformMetrics();
68-
assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size());
69-
assertThat(metric.getAliveTotalCpu()).isEqualTo(1);
70-
assertThat(metric.getAliveAvailableCpu()).isEqualTo(1);
71-
assertThat(metric.getAliveTotalGpu()).isEqualTo(1);
72-
assertThat(metric.getAliveAvailableGpu()).isEqualTo(1);
73-
assertThat(metric.getCompletedTasks()).isEqualTo(10L);
74-
assertThat(metric.getDealEventsCount()).isEqualTo(10);
75-
assertThat(metric.getDealsCount()).isEqualTo(8);
76-
assertThat(metric.getReplayDealsCount()).isEqualTo(2);
77-
assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255);
70+
Assertions.assertAll(
71+
() -> assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size()),
72+
() -> assertThat(metric.getAliveTotalCpu()).isEqualTo(1),
73+
() -> assertThat(metric.getAliveAvailableCpu()).isEqualTo(1),
74+
() -> assertThat(metric.getAliveTotalGpu()).isEqualTo(1),
75+
() -> assertThat(metric.getAliveAvailableGpu()).isEqualTo(1),
76+
() -> assertThat(metric.getCurrentTaskStatusCounts()).isEqualTo(expectedCurrentTaskStatusesCount),
77+
() -> assertThat(metric.getDealEventsCount()).isEqualTo(10),
78+
() -> assertThat(metric.getDealsCount()).isEqualTo(8),
79+
() -> assertThat(metric.getReplayDealsCount()).isEqualTo(2),
80+
() -> assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255)
81+
);
82+
}
83+
84+
private LinkedHashMap<TaskStatus, AtomicLong> createExpectedCurrentTaskStatusesCount() {
85+
final LinkedHashMap<TaskStatus, AtomicLong> expectedCurrentTaskStatusesCount = new LinkedHashMap<>(TaskStatus.values().length);
86+
expectedCurrentTaskStatusesCount.put(TaskStatus.RECEIVED, new AtomicLong(1));
87+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZING, new AtomicLong(2));
88+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZED, new AtomicLong(3));
89+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZE_FAILED, new AtomicLong(4));
90+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING, new AtomicLong(5));
91+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING_FAILED, new AtomicLong(6));
92+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONTRIBUTION_TIMEOUT, new AtomicLong(7));
93+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONSENSUS_REACHED, new AtomicLong(8));
94+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENING, new AtomicLong(9));
95+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENED, new AtomicLong(10));
96+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPEN_FAILED, new AtomicLong(11));
97+
expectedCurrentTaskStatusesCount.put(TaskStatus.AT_LEAST_ONE_REVEALED, new AtomicLong(12));
98+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADING, new AtomicLong(13));
99+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADED, new AtomicLong(14));
100+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOAD_TIMEOUT, new AtomicLong(15));
101+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZING, new AtomicLong(16));
102+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZED, new AtomicLong(17));
103+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZE_FAILED, new AtomicLong(18));
104+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINAL_DEADLINE_REACHED, new AtomicLong(19));
105+
expectedCurrentTaskStatusesCount.put(TaskStatus.COMPLETED, new AtomicLong(20));
106+
expectedCurrentTaskStatusesCount.put(TaskStatus.FAILED, new AtomicLong(21));
107+
metricService.onTaskStatusesCountUpdateEvent(new TaskStatusesCountUpdatedEvent(expectedCurrentTaskStatusesCount));
108+
109+
return expectedCurrentTaskStatusesCount;
78110
}
79111

80112
}

src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.iexec.core.task.TaskService;
4040
import com.iexec.core.task.TaskStatus;
4141
import com.iexec.core.task.event.PleaseUploadEvent;
42+
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
4243
import com.iexec.core.worker.Worker;
4344
import com.iexec.core.worker.WorkerService;
4445
import io.micrometer.core.instrument.Gauge;
@@ -53,15 +54,14 @@
5354
import org.mockito.Mockito;
5455
import org.mockito.MockitoAnnotations;
5556
import org.springframework.context.ApplicationEventPublisher;
57+
import org.springframework.test.util.ReflectionTestUtils;
5658

5759
import java.math.BigInteger;
5860
import java.time.Instant;
5961
import java.time.temporal.ChronoUnit;
60-
import java.util.ArrayList;
61-
import java.util.Date;
62-
import java.util.List;
63-
import java.util.Optional;
62+
import java.util.*;
6463
import java.util.concurrent.ExecutionException;
64+
import java.util.concurrent.atomic.AtomicLong;
6565
import java.util.stream.Collectors;
6666

6767
import static com.iexec.core.task.TaskStatus.*;
@@ -128,7 +128,7 @@ void afterEach() {
128128

129129
// region init
130130
@Test
131-
void shouldBuildGauges() throws ExecutionException, InterruptedException {
131+
void shouldBuildGaugesAndFireEvent() throws ExecutionException, InterruptedException {
132132
for (final TaskStatus status : TaskStatus.values()) {
133133
// Give a unique initial count for each status
134134
when(taskService.countByCurrentStatus(status)).thenReturn((long) status.ordinal());
@@ -143,6 +143,8 @@ void shouldBuildGauges() throws ExecutionException, InterruptedException {
143143
.extracting(Gauge::value)
144144
.isEqualTo(((double) status.ordinal()));
145145
}
146+
147+
verify(applicationEventPublisher, times(1)).publishEvent(any(TaskStatusesCountUpdatedEvent.class));
146148
}
147149
// endregion
148150

@@ -2034,6 +2036,23 @@ void shouldUpdateMetricsAfterStatusUpdate() throws ExecutionException, Interrupt
20342036

20352037
assertThat(currentReceivedTasks.value()).isZero();
20362038
assertThat(currentInitializingTasks.value()).isOne();
2039+
// Called a first time during init, then a second time during update
2040+
verify(applicationEventPublisher, times(2)).publishEvent(any(TaskStatusesCountUpdatedEvent.class));
2041+
}
2042+
// endregion
2043+
2044+
// region onTaskCreatedEvent
2045+
@Test
2046+
void shouldUpdateCurrentReceivedCountAndFireEvent() {
2047+
final AtomicLong receivedCount =
2048+
(AtomicLong) ((LinkedHashMap<?, ?>) ReflectionTestUtils.getField(taskUpdateManager, "currentTaskStatusesCount"))
2049+
.get(RECEIVED);
2050+
final long initialCount = receivedCount.get();
2051+
2052+
taskUpdateManager.onTaskCreatedEvent();
2053+
2054+
assertThat(receivedCount.get() - initialCount).isOne();
2055+
verify(applicationEventPublisher, times(1)).publishEvent(any(TaskStatusesCountUpdatedEvent.class));
20372056
}
20382057
// endregion
20392058

0 commit comments

Comments
 (0)