Skip to content

Commit 34148bc

Browse files
authored
Add ConsensusReachedTaskDetector to detect missed TaskConsensus on-chain events (#683)
1 parent c0f3ac1 commit 34148bc

File tree

5 files changed

+174
-1
lines changed

5 files changed

+174
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44

55
## [[NEXT]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/vNEXT) 2024
66

7+
### New Features
8+
9+
- Add `ConsensusReachedTaskDetector` to detect missed `TaskConsensus` on-chain events. (#683)
10+
711
### Bug Fixes
812

913
- Keep a single `updateReplicateStatus` method in `ReplicatesService`. (#670)

src/main/java/com/iexec/core/configuration/CronConfiguration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public class CronConfiguration {
2323
@Value("${cron.detector.chain.contribute}")
2424
private int contribute;
2525

26+
@Value("${cron.detector.chain.consensus-reached}")
27+
private int consensusReached;
28+
2629
@Value("${cron.detector.chain.reveal}")
2730
private int reveal;
2831

@@ -42,5 +45,5 @@ public class CronConfiguration {
4245
private int revealTimeout;
4346

4447
@Value("${cron.detector.timeout.result-upload}")
45-
private int resultUploadTimeout;
48+
private int resultUploadTimeout;
4649
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2024 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.detector.task;
18+
19+
import com.iexec.core.chain.IexecHubService;
20+
import com.iexec.core.detector.Detector;
21+
import com.iexec.core.task.Task;
22+
import com.iexec.core.task.TaskService;
23+
import com.iexec.core.task.TaskStatus;
24+
import com.iexec.core.task.update.TaskUpdateRequestManager;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.springframework.scheduling.annotation.Scheduled;
27+
28+
import static com.iexec.commons.poco.chain.ChainTaskStatus.REVEALING;
29+
import static com.iexec.core.task.TaskStatus.RUNNING;
30+
31+
@Slf4j
32+
public class ConsensusReachedTaskDetector implements Detector {
33+
34+
private final IexecHubService iexecHubService;
35+
private final TaskService taskService;
36+
private final TaskUpdateRequestManager taskUpdateRequestManager;
37+
38+
public ConsensusReachedTaskDetector(IexecHubService iexecHubService,
39+
TaskService taskService,
40+
TaskUpdateRequestManager taskUpdateRequestManager) {
41+
this.iexecHubService = iexecHubService;
42+
this.taskService = taskService;
43+
this.taskUpdateRequestManager = taskUpdateRequestManager;
44+
}
45+
46+
@Scheduled(fixedRateString = "#{@cronConfiguration.getConsensusReached()}")
47+
@Override
48+
public void detect() {
49+
taskService.findByCurrentStatus(RUNNING).stream()
50+
.filter(this::isConsensusReached)
51+
.forEach(this::publishTaskUpdateRequest);
52+
}
53+
54+
private boolean isConsensusReached(Task task) {
55+
return iexecHubService.getChainTask(task.getChainTaskId()).stream()
56+
.allMatch(chainTask -> chainTask.getStatus() == REVEALING);
57+
}
58+
59+
private void publishTaskUpdateRequest(Task task) {
60+
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
61+
task.getCurrentStatus(), TaskStatus.CONSENSUS_REACHED, task.getChainTaskId());
62+
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
63+
}
64+
}

src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ cron:
2727
unstarted-tx: 300000 # 5m
2828
initialize: 30000 # 30s
2929
contribute: 30000 # 30s
30+
consensus-reached: 30000 # 30s
3031
reveal: 30000 # 30s
3132
contribute-and-finalize: 30000 # 30s
3233
finalize: 30000 # 30s
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2024 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.detector.task;
18+
19+
import com.iexec.commons.poco.chain.ChainTask;
20+
import com.iexec.commons.poco.chain.ChainTaskStatus;
21+
import com.iexec.core.chain.IexecHubService;
22+
import com.iexec.core.task.TaskService;
23+
import com.iexec.core.task.TaskStatus;
24+
import com.iexec.core.task.update.TaskUpdateRequestManager;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.EnumSource;
30+
import org.mockito.InjectMocks;
31+
import org.mockito.Mock;
32+
import org.mockito.MockitoAnnotations;
33+
import org.springframework.boot.test.system.CapturedOutput;
34+
import org.springframework.boot.test.system.OutputCaptureExtension;
35+
import org.springframework.test.context.DynamicPropertyRegistry;
36+
import org.springframework.test.context.DynamicPropertySource;
37+
import org.testcontainers.containers.MongoDBContainer;
38+
import org.testcontainers.junit.jupiter.Container;
39+
import org.testcontainers.utility.DockerImageName;
40+
41+
import java.util.List;
42+
import java.util.Optional;
43+
44+
import static com.iexec.core.task.TaskTestsUtils.CHAIN_TASK_ID;
45+
import static com.iexec.core.task.TaskTestsUtils.getStubTask;
46+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
47+
import static org.mockito.Mockito.*;
48+
49+
@ExtendWith(OutputCaptureExtension.class)
50+
class ConsensusReachedTaskDetectorTests {
51+
52+
@Container
53+
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse(System.getProperty("mongo.image")));
54+
55+
@DynamicPropertySource
56+
static void registerProperties(DynamicPropertyRegistry registry) {
57+
registry.add("spring.data.mongodb.host", mongoDBContainer::getHost);
58+
registry.add("spring.data.mongodb.port", () -> mongoDBContainer.getMappedPort(27017));
59+
}
60+
61+
@Mock
62+
private IexecHubService iexecHubService;
63+
@Mock
64+
private TaskService taskService;
65+
@Mock
66+
private TaskUpdateRequestManager taskUpdateRequestManager;
67+
68+
@InjectMocks
69+
private ConsensusReachedTaskDetector detector;
70+
71+
@BeforeEach
72+
void init() {
73+
MockitoAnnotations.openMocks(this);
74+
when(taskService.findByCurrentStatus(TaskStatus.RUNNING)).thenReturn(List.of(getStubTask()));
75+
}
76+
77+
@Test
78+
void shouldDetectWhenOnChainTaskRevealing(CapturedOutput output) {
79+
final ChainTask chainTask = ChainTask.builder()
80+
.chainTaskId(CHAIN_TASK_ID)
81+
.status(ChainTaskStatus.REVEALING)
82+
.build();
83+
when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(chainTask));
84+
detector.detect();
85+
assertThat(output.getOut()).contains("Detected confirmed missing update (task)");
86+
verify(taskUpdateRequestManager).publishRequest(CHAIN_TASK_ID);
87+
}
88+
89+
@ParameterizedTest
90+
@EnumSource(value = ChainTaskStatus.class, names = "REVEALING", mode = EnumSource.Mode.EXCLUDE)
91+
void shouldNotDetectWhenOnChainTaskNotRevealing(ChainTaskStatus chainTaskStatus, CapturedOutput output) {
92+
final ChainTask chainTask = ChainTask.builder()
93+
.chainTaskId(CHAIN_TASK_ID)
94+
.status(chainTaskStatus)
95+
.build();
96+
when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(chainTask));
97+
detector.detect();
98+
assertThat(output.getOut()).doesNotContain("Detected confirmed missing update (task)");
99+
verifyNoInteractions(taskUpdateRequestManager);
100+
}
101+
}

0 commit comments

Comments
 (0)