Skip to content

Commit d32a577

Browse files
authored
Merge pull request #442 from iExecBlockchainComputing/fix/optimistic-locking-failure-exception
Fix optimistic locking failure exception
2 parents 43bc204 + 866c2ca commit d32a577

File tree

6 files changed

+114
-9
lines changed

6 files changed

+114
-9
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=6.1.3
1+
version=6.1.4
22
iexecCommonVersion=5.5.1
33
nexusUser=fake
44
nexusPassword=fake

src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
@Service
3535
public class ContributionUnnotifiedDetector extends UnnotifiedAbstractDetector {
3636

37-
private static final int DETECTOR_MULTIPLIER = 10;
37+
private static final int LESS_OFTEN_DETECTOR_FREQUENCY = 10;
3838
private final List<TaskStatus> dectectWhenOffchainTaskStatuses;
3939
private final ReplicateStatus offchainCompleting;
4040
private final ReplicateStatus offchainCompleted;
4141
private final ChainContributionStatus onchainCompleted;
4242
private final int detectorRate;
4343

44+
private int detectorOccurrence;
45+
4446
public ContributionUnnotifiedDetector(TaskService taskService,
4547
ReplicatesService replicatesService,
4648
IexecHubService iexecHubService,
@@ -54,12 +56,26 @@ public ContributionUnnotifiedDetector(TaskService taskService,
5456
this.detectorRate = cronConfiguration.getContribute();
5557
}
5658

59+
/**
60+
* Detects onchain CONTRIBUTED only if replicates are offchain CONTRIBUTING and
61+
* onchain CONTRIBUTED if replicates are not CONTRIBUTED.
62+
* The second detection is not always ran, depending on the detector run occurrences.
63+
*/
64+
@Scheduled(fixedRateString = "#{@cronConfiguration.getContribute()}")
65+
public void detectOnChainChanges() {
66+
detectOnchainContributedWhenOffchainContributing();
67+
68+
detectorOccurrence++;
69+
if (detectorOccurrence % LESS_OFTEN_DETECTOR_FREQUENCY == 0) {
70+
detectOnchainContributed();
71+
}
72+
}
73+
5774
/*
5875
* Detecting onchain CONTRIBUTED only if replicates are offchain CONTRIBUTING
5976
* (worker didn't notify last offchain CONTRIBUTED)
6077
* We want to detect them very often since it's highly probable
6178
*/
62-
@Scheduled(fixedRateString = "#{@cronConfiguration.getContribute()}")
6379
public void detectOnchainContributedWhenOffchainContributing() {
6480
log.debug("Detect onchain Contributed (when offchain Contributing) [retryIn:{}]",
6581
this.detectorRate);
@@ -78,9 +94,8 @@ public void detectOnchainContributedWhenOffchainContributing() {
7894
* - Frequently but no so often since it's eth node resource consuming and less probable
7995
* - When we receive a CANT_CONTRIBUTE_SINCE_TASK_NOT_ACTIVE
8096
*/
81-
@Scheduled(fixedRateString = "#{@cronConfiguration.getContribute() * " + DETECTOR_MULTIPLIER + "}")
8297
public void detectOnchainContributed() {
83-
log.debug("Detect onchain Contributed [retryIn:{}]", this.detectorRate * DETECTOR_MULTIPLIER);
98+
log.debug("Detect onchain Contributed [retryIn:{}]", this.detectorRate * LESS_OFTEN_DETECTOR_FREQUENCY);
8499
dectectOnchainCompleted(
85100
dectectWhenOffchainTaskStatuses,
86101
offchainCompleting,

src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
@Service
3535
public class RevealUnnotifiedDetector extends UnnotifiedAbstractDetector {
3636

37-
private static final int DETECTOR_MULTIPLIER = 10;
37+
private static final int LESS_OFTEN_DETECTOR_FREQUENCY = 10;
3838
private final List<TaskStatus> dectectWhenTaskStatuses;
3939
private final ReplicateStatus offchainCompleting;
4040
private final ReplicateStatus offchainCompleted;
4141
private final ChainContributionStatus onchainCompleted;
4242
private final CronConfiguration cronConfiguration;
4343

44+
private int detectorOccurrence;
45+
4446
public RevealUnnotifiedDetector(TaskService taskService,
4547
ReplicatesService replicatesService,
4648
IexecHubService iexecHubService,
@@ -54,12 +56,21 @@ public RevealUnnotifiedDetector(TaskService taskService,
5456
onchainCompleted = ChainContributionStatus.REVEALED;
5557
}
5658

59+
@Scheduled(fixedRateString = "#{@cronConfiguration.getReveal()}")
60+
public void detectOnChainChanges() {
61+
detectOnchainRevealedWhenOffchainRevealed();
62+
63+
detectorOccurrence++;
64+
if (detectorOccurrence % LESS_OFTEN_DETECTOR_FREQUENCY == 0) {
65+
detectOnchainRevealed();
66+
}
67+
}
68+
5769
/*
5870
* Detecting onchain REVEALED only if replicates are offchain REVEALING
5971
* (worker didn't notify last offchain REVEALED)
6072
* We want to detect them very often since it's highly probable
6173
*/
62-
@Scheduled(fixedRateString = "#{@cronConfiguration.getReveal()}")
6374
public void detectOnchainRevealedWhenOffchainRevealed() {
6475
log.debug("Detect onchain Revealed (when offchain Revealing) [retryIn:{}]",
6576
cronConfiguration.getReveal());
@@ -78,10 +89,9 @@ public void detectOnchainRevealedWhenOffchainRevealed() {
7889
* - Frequently but no so often since it's eth node resource consuming and less probable
7990
* - When we receive a CANT_REVEAL
8091
*/
81-
@Scheduled(fixedRateString = "#{@cronConfiguration.getReveal() * " + DETECTOR_MULTIPLIER + "}")
8292
public void detectOnchainRevealed() {
8393
log.debug("Detect onchain Revealed [retryIn:{}]",
84-
cronConfiguration.getReveal() * DETECTOR_MULTIPLIER);
94+
cronConfiguration.getReveal() * LESS_OFTEN_DETECTOR_FREQUENCY);
8595
dectectOnchainCompleted(
8696
dectectWhenTaskStatuses,
8797
offchainCompleting,

src/main/java/com/iexec/core/replicate/ReplicatesService.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,31 @@ private boolean isStatusBeforeWorkerLostEqualsTo(Replicate replicate, ReplicateS
226226

227227
/*
228228
* This implicitly sets the modifier to POOL_MANAGER
229+
*
230+
* @Retryable is needed as it isn't triggered by a call from within the class itself.
229231
*/
232+
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
230233
public void updateReplicateStatus(String chainTaskId,
231234
String walletAddress,
232235
ReplicateStatus newStatus) {
233236
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(newStatus);
234237
updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
235238
}
236239

240+
@Recover
241+
public void updateReplicateStatus(OptimisticLockingFailureException exception,
242+
String chainTaskId,
243+
String walletAddress,
244+
ReplicateStatus newStatus) {
245+
logUpdateReplicateStatusRecover(exception);
246+
}
247+
237248
/*
238249
* This implicitly sets the modifier to POOL_MANAGER
250+
*
251+
* @Retryable is needed as it isn't triggered by a call from within the class itself.
239252
*/
253+
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
240254
public void updateReplicateStatus(String chainTaskId,
241255
String walletAddress,
242256
ReplicateStatus newStatus,
@@ -245,6 +259,15 @@ public void updateReplicateStatus(String chainTaskId,
245259
updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
246260
}
247261

262+
@Recover
263+
public void updateReplicateStatus(OptimisticLockingFailureException exception,
264+
String chainTaskId,
265+
String walletAddress,
266+
ReplicateStatus newStatus,
267+
ReplicateStatusDetails details) {
268+
logUpdateReplicateStatusRecover(exception);
269+
}
270+
248271
/*
249272
* We retry up to 100 times in case the task has been modified between
250273
* reading and writing it.
@@ -333,6 +356,10 @@ public void updateReplicateStatus(OptimisticLockingFailureException exception,
333356
String chainTaskId,
334357
String walletAddress,
335358
ReplicateStatusUpdate statusUpdate) {
359+
logUpdateReplicateStatusRecover(exception);
360+
}
361+
362+
private void logUpdateReplicateStatusRecover(OptimisticLockingFailureException exception) {
336363
log.error("Could not update replicate status, maximum number of retries reached");
337364
exception.printStackTrace();
338365
}

src/test/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetectorTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collections;
3636

3737
import static com.iexec.common.replicate.ReplicateStatus.*;
38+
import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER;
3839
import static org.mockito.ArgumentMatchers.*;
3940
import static org.mockito.Mockito.when;
4041

@@ -68,6 +69,32 @@ public void init() {
6869
MockitoAnnotations.initMocks(this);
6970
}
7071

72+
// Detector aggregator
73+
@Test
74+
public void shouldDetectBothChangesOnChain() {
75+
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
76+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
77+
78+
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
79+
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).modifier(WORKER).build();
80+
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
81+
82+
when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate));
83+
when(iexecHubService.isStatusTrueOnChain(any(), any(), any())).thenReturn(true);
84+
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
85+
when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder()
86+
.blockNumber(10L)
87+
.txHash("0xabcef")
88+
.build());
89+
90+
for (int i = 0; i < 10; i++) {
91+
contributionDetector.detectOnChainChanges();
92+
}
93+
94+
Mockito.verify(replicatesService, Mockito.times(11)) // 10 detectors #1 & 1 detector #2
95+
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
96+
}
97+
7198

7299
//Detector#1 after contributing
73100

src/test/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetectorTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,32 @@ public void init() {
6868
MockitoAnnotations.initMocks(this);
6969
}
7070

71+
// Detector aggregator
72+
@Test
73+
public void shouldDetectBothChangesOnChain() {
74+
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
75+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task));
76+
77+
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
78+
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(REVEALING).modifier(WORKER).build();
79+
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
80+
81+
when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate));
82+
when(iexecHubService.isStatusTrueOnChain(any(), any(), any())).thenReturn(true);
83+
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
84+
when(iexecHubService.getRevealBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder()
85+
.blockNumber(10L)
86+
.txHash("0xabcef")
87+
.build());
88+
89+
for (int i = 0; i < 10; i++) {
90+
revealDetector.detectOnChainChanges();
91+
}
92+
93+
Mockito.verify(replicatesService, Mockito.times(11)) // 10 detectors #1 & 1 detector #2
94+
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
95+
}
96+
7197

7298
//Detector#1 after contributing
7399

0 commit comments

Comments
 (0)