Skip to content

Commit cc426a6

Browse files
authored
Merge pull request #396 from iExecBlockchainComputing/feature/deal-block-as-landmark
Init for deal block as landmark for a task
2 parents 896b7c4 + 4ef7144 commit cc426a6

File tree

8 files changed

+78
-52
lines changed

8 files changed

+78
-52
lines changed

src/main/java/com/iexec/core/chain/DealWatcherService.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,21 @@ public class DealWatcherService {
4141
private final ConfigurationService configurationService;
4242
private final ApplicationEventPublisher applicationEventPublisher;
4343
private final TaskService taskService;
44+
private final Web3jService web3jService;
4445
// internal variables
4546
private Disposable dealEventSubscriptionReplay;
4647

4748
@Autowired
4849
public DealWatcherService(IexecHubService iexecHubService,
4950
ConfigurationService configurationService,
5051
ApplicationEventPublisher applicationEventPublisher,
51-
TaskService taskService) {
52+
TaskService taskService,
53+
Web3jService web3jService) {
5254
this.iexecHubService = iexecHubService;
5355
this.configurationService = configurationService;
5456
this.applicationEventPublisher = applicationEventPublisher;
5557
this.taskService = taskService;
58+
this.web3jService = web3jService;
5659
}
5760

5861
/**
@@ -85,20 +88,30 @@ Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) {
8588
* @param dealEvent
8689
*/
8790
private void onDealEvent(DealEvent dealEvent) {
88-
log.info("Received deal [dealId:{}, block:{}]", dealEvent.getChainDealId(), dealEvent.getBlockNumber());
89-
this.handleDeal(dealEvent.getChainDealId());
90-
if (configurationService.getLastSeenBlockWithDeal().compareTo(dealEvent.getBlockNumber()) < 0) {
91-
configurationService.setLastSeenBlockWithDeal(dealEvent.getBlockNumber());
91+
String dealId = dealEvent.getChainDealId();
92+
BigInteger dealBlock = dealEvent.getBlockNumber();
93+
log.info("Received deal [dealId:{}, block:{}]", dealId,
94+
dealBlock);
95+
if (dealBlock == null || dealBlock.equals(BigInteger.ZERO)){
96+
log.warn("Deal block number is empty, fetching later blockchain " +
97+
"events will be more expensive [chainDealId:{}, dealBlock:{}, " +
98+
"lastBlock:{}]", dealId, dealBlock, web3jService.getLatestBlockNumber());
99+
dealEvent.setBlockNumber(BigInteger.ZERO);
100+
}
101+
this.handleDeal(dealEvent);
102+
if (configurationService.getLastSeenBlockWithDeal().compareTo(dealBlock) < 0) {
103+
configurationService.setLastSeenBlockWithDeal(dealBlock);
92104
}
93105
}
94106

95107
/**
96108
* Handle new onchain deals and add its tasks
97109
* to db.
98-
*
99-
* @param chainDealId
110+
*
111+
* @param dealEvent
100112
*/
101-
private void handleDeal(String chainDealId) {
113+
private void handleDeal(DealEvent dealEvent) {
114+
String chainDealId = dealEvent.getChainDealId();
102115
Optional<ChainDeal> oChainDeal = iexecHubService.getChainDeal(chainDealId);
103116
if (oChainDeal.isEmpty()) {
104117
log.error("Could not get chain deal [chainDealId:{}]", chainDealId);
@@ -117,14 +130,14 @@ private void handleDeal(String chainDealId) {
117130
Optional<Task> optional = taskService.addTask(
118131
chainDealId,
119132
taskIndex,
133+
dealEvent.getBlockNumber().longValue(),
120134
BytesUtils.hexStringToAscii(chainDeal.getChainApp().getUri()),
121135
chainDeal.getParams().getIexecArgs(),
122136
chainDeal.getTrust().intValue(),
123137
chainDeal.getChainCategory().getMaxExecutionTime(),
124138
chainDeal.getTag(),
125139
iexecHubService.getChainDealContributionDeadline(chainDeal),
126-
iexecHubService.getChainDealFinalDeadline(chainDeal)
127-
);
140+
iexecHubService.getChainDealFinalDeadline(chainDeal));
128141
optional.ifPresent(task -> applicationEventPublisher
129142
.publishEvent(new TaskCreatedEvent(task.getChainTaskId())));
130143
}

src/main/java/com/iexec/core/chain/IexecHubService.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,11 @@ private Optional<Pair<String, ChainReceipt>> sendInitializeTransaction(String ch
203203

204204
if (isSuccessTx(computedChainTaskId, initializeEvent, ACTIVE)) {
205205
String chainTaskId = BytesUtils.bytesToString(initializeEvent.taskid);
206-
207-
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(initializeEvent.log, chainTaskId, web3jService.getLatestBlockNumber());
208-
209-
log.info("Initialized [chainTaskId:{}, chainDealId:{}, taskIndex:{}, gasUsed:{}]",
210-
computedChainTaskId, chainDealId, taskIndex, initializeReceipt.getGasUsed());
206+
ChainReceipt chainReceipt = buildChainReceipt(initializeReceipt);
207+
log.info("Initialized [chainTaskId:{}, chainDealId:{}, taskIndex:{}, " +
208+
"gasUsed:{}, block:{}]",
209+
computedChainTaskId, chainDealId, taskIndex,
210+
initializeReceipt.getGasUsed(), chainReceipt.getBlockNumber());
211211
return Optional.of(Pair.of(chainTaskId, chainReceipt));
212212
}
213213

@@ -281,10 +281,11 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
281281
}
282282

283283
if (isSuccessTx(chainTaskId, finalizeEvent, COMPLETED)) {
284-
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(finalizeEvents.get(0).log, chainTaskId, web3jService.getLatestBlockNumber());
285-
286-
log.info("Finalized [chainTaskId:{}, resultLink:{}, callbackData:{}, shouldSendCallback:{}, gasUsed:{}]", chainTaskId,
287-
resultLink, callbackData, shouldSendCallback, finalizeReceipt.getGasUsed());
284+
ChainReceipt chainReceipt = buildChainReceipt(finalizeReceipt);
285+
log.info("Finalized [chainTaskId:{}, resultLink:{}, callbackData:{}, " +
286+
"shouldSendCallback:{}, gasUsed:{}, block:{}]",
287+
chainTaskId, resultLink, callbackData, shouldSendCallback,
288+
finalizeReceipt.getGasUsed(), chainReceipt.getBlockNumber());
288289
return Optional.of(chainReceipt);
289290
}
290291

@@ -354,9 +355,9 @@ private Optional<ChainReceipt> sendReopenTransaction(String chainTaskId) {
354355
return Optional.empty();
355356
}
356357

357-
log.info("Reopened [chainTaskId:{}, gasUsed:{}]", chainTaskId, receipt.getGasUsed());
358-
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(eventsList.get(0).log, chainTaskId, web3jService.getLatestBlockNumber());
359-
358+
ChainReceipt chainReceipt = buildChainReceipt(receipt);
359+
log.info("Reopened [chainTaskId:{}, gasUsed:{}, block:{}]",
360+
chainTaskId, receipt.getGasUsed(), chainReceipt.getBlockNumber());
360361
return Optional.of(chainReceipt);
361362
}
362363

@@ -396,5 +397,12 @@ private Boolean isTaskStatusValidOnChain(String chainTaskId, ChainStatus chainTa
396397
return false;
397398
}
398399

400+
private ChainReceipt buildChainReceipt(TransactionReceipt receipt) {
401+
return ChainReceipt.builder()
402+
.txHash(receipt.getTransactionHash())
403+
.blockNumber(receipt.getBlockNumber() != null?
404+
receipt.getBlockNumber().longValue() : 0)
405+
.build();
406+
}
399407

400408
}

src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ void dectectOnchainCompleted(List<TaskStatus> detectWhenOffChainTaskStatuses,
109109
*/
110110
private void updateReplicateStatuses(Task task, Replicate replicate, ReplicateStatus offchainCompleted) {
111111
String chainTaskId = task.getChainTaskId();
112-
long initBlocknumber = task.getInitializationBlockNumber();
112+
long initBlockNumber = task.getInitializationBlockNumber();
113113
List<ReplicateStatus> statusesToUpdate;
114114
if (replicate.getCurrentStatus().equals(WORKER_LOST)) {
115115
statusesToUpdate = getMissingStatuses(replicate.getLastButOneStatus(), offchainCompleted);
@@ -125,15 +125,15 @@ private void updateReplicateStatuses(Task task, Replicate replicate, ReplicateSt
125125
case CONTRIBUTED:
126126
// retrieve the contribution block for that wallet
127127
ChainReceipt contributedBlock = iexecHubService.getContributionBlock(chainTaskId,
128-
wallet, initBlocknumber);
128+
wallet, initBlockNumber);
129129
long contributedBlockNumber = contributedBlock != null ? contributedBlock.getBlockNumber() : 0;
130130
replicatesService.updateReplicateStatus(chainTaskId, wallet,
131131
statusToUpdate, new ReplicateStatusDetails(contributedBlockNumber));
132132
break;
133133
case REVEALED:
134134
// retrieve the reveal block for that wallet
135135
ChainReceipt revealedBlock = iexecHubService.getRevealBlock(chainTaskId, wallet,
136-
initBlocknumber);
136+
initBlockNumber);
137137
long revealedBlockNumber = revealedBlock != null ? revealedBlock.getBlockNumber() : 0;
138138
replicatesService.updateReplicateStatus(chainTaskId, wallet,
139139
statusToUpdate, new ReplicateStatusDetails(revealedBlockNumber));

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.iexec.core.task.TaskStatus;
3434
import com.iexec.core.worker.Worker;
3535
import com.iexec.core.worker.WorkerService;
36+
import lombok.extern.slf4j.Slf4j;
3637
import org.springframework.dao.OptimisticLockingFailureException;
3738
import org.springframework.retry.annotation.Retryable;
3839
import org.springframework.stereotype.Service;
@@ -43,17 +44,18 @@
4344
import static com.iexec.common.replicate.ReplicateStatus.*;
4445

4546

47+
@Slf4j
4648
@Service
4749
public class ReplicateSupplyService {
4850

49-
private ReplicatesService replicatesService;
50-
private SignatureService signatureService;
51-
private TaskService taskService;
52-
private WorkerService workerService;
53-
private SmsService smsService;
54-
private Web3jService web3jService;
55-
private ContributionTimeoutTaskDetector contributionTimeoutTaskDetector;
56-
private ConsensusService consensusService;
51+
private final ReplicatesService replicatesService;
52+
private final SignatureService signatureService;
53+
private final TaskService taskService;
54+
private final WorkerService workerService;
55+
private final SmsService smsService;
56+
private final Web3jService web3jService;
57+
private final ContributionTimeoutTaskDetector contributionTimeoutTaskDetector;
58+
private final ConsensusService consensusService;
5759

5860
public ReplicateSupplyService(ReplicatesService replicatesService,
5961
SignatureService signatureService,
@@ -176,10 +178,10 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
176178
}
177179

178180
private boolean isFewBlocksAfterInitialization(Task task) {
179-
long coreLastBlock = web3jService.getLatestBlockNumber();
181+
long lastBlock = web3jService.getLatestBlockNumber();
180182
long initializationBlock = task.getInitializationBlockNumber();
181-
boolean isFewBlocksAfterInitialization = coreLastBlock >= initializationBlock + 2;
182-
return coreLastBlock > 0 && initializationBlock > 0 && isFewBlocksAfterInitialization;
183+
boolean isFewBlocksAfterInitialization = lastBlock >= initializationBlock + 2;
184+
return lastBlock > 0 && initializationBlock > 0 && isFewBlocksAfterInitialization;
183185
}
184186

185187
public List<TaskNotification> getMissedTaskNotifications(long blockNumber, String walletAddress) {

src/main/java/com/iexec/core/task/Task.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class Task {
5252

5353
private String chainDealId;
5454
private int taskIndex;
55+
private long dealBlockNumber;
5556
private long maxExecutionTime;
5657
private String tag;
5758
private DappType dappType;

src/main/java/com/iexec/core/task/TaskService.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public TaskService(
7575
*
7676
* @param chainDealId
7777
* @param taskIndex
78+
* @param dealBlockNumber
7879
* @param imageName
7980
* @param commandLine
8081
* @param trust
@@ -88,6 +89,7 @@ public TaskService(
8889
public Optional<Task> addTask(
8990
String chainDealId,
9091
int taskIndex,
92+
long dealBlockNumber,
9193
String imageName,
9294
String commandLine,
9395
int trust,
@@ -107,6 +109,7 @@ public Optional<Task> addTask(
107109
.orElseGet(() -> {
108110
Task newTask = new Task(chainDealId, taskIndex, imageName,
109111
commandLine, trust, maxExecutionTime, tag);
112+
newTask.setDealBlockNumber(dealBlockNumber);
110113
newTask.setFinalDeadline(finalDeadline);
111114
newTask.setContributionDeadline(contributionDeadline);
112115
newTask = taskRepository.save(newTask);
@@ -367,15 +370,13 @@ private void initializing2Initialized(Task task) {
367370

368371
private void initializing2Initialized(Task task, ChainReceipt chainReceipt) {
369372
String chainTaskId = task.getChainTaskId();
370-
long currentBlockNumber = web3jService.getLatestBlockNumber();
371-
long receiptBlockNumber = chainReceipt != null ? chainReceipt.getBlockNumber() : currentBlockNumber;
372-
if (receiptBlockNumber != 0) {
373-
task.setInitializationBlockNumber(receiptBlockNumber);
374-
}
375-
376-
if (chainReceipt == null) {
377-
chainReceipt = ChainReceipt.builder().blockNumber(currentBlockNumber).build();
373+
long initializationBlock = chainReceipt != null? chainReceipt.getBlockNumber() : 0;
374+
if (initializationBlock == 0){
375+
log.warn("Initialization block is empty, using deal block [chainTaskId:{}" +
376+
", dealBlock{}]", chainTaskId, task.getDealBlockNumber());
377+
initializationBlock = task.getDealBlockNumber();
378378
}
379+
task.setInitializationBlockNumber(initializationBlock);
379380
updateTaskStatusAndSave(task, INITIALIZED, chainReceipt);
380381
replicatesService.createEmptyReplicateList(chainTaskId);
381382
}

src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void shouldUpdateLastSeenBlockWhenOneDealAndCreateTask() {
118118

119119
when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent));
120120
when(iexecHubService.getChainDeal(dealEvent.get().getChainDealId())).thenReturn(Optional.of(chainDeal));
121-
when(taskService.addTask(any(), Mockito.anyInt(), any(), any(), Mockito.anyInt(), anyLong(), any(), any(), any()))
121+
when(taskService.addTask(any(), Mockito.anyInt(), anyLong(), any(), any(), Mockito.anyInt(), anyLong(), any(), any(), any()))
122122
.thenReturn(Optional.of(task));
123123
when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from);
124124
when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(true);
@@ -170,9 +170,9 @@ public void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceDealIsExpir
170170
verify(applicationEventPublisher, never())
171171
.publishEvent(any());
172172
verify(taskService, never())
173-
.addTask(anyString(), anyInt(), anyString(),
174-
anyString(), anyInt(), anyLong(), anyString(),
175-
any(), any());
173+
.addTask(anyString(), anyInt(), anyLong(),
174+
anyString(), anyString(), anyInt(), anyLong(),
175+
anyString(), any(), any());
176176
}
177177

178178
@Test

src/test/java/com/iexec/core/task/TaskServiceTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.mockito.MockitoAnnotations;
4040
import org.springframework.context.ApplicationEventPublisher;
4141

42+
import java.math.BigInteger;
4243
import java.time.Instant;
4344
import java.time.temporal.ChronoUnit;
4445
import java.util.*;
@@ -129,8 +130,8 @@ public void shouldAddTask() {
129130
task.changeStatus(TaskStatus.INITIALIZED);
130131

131132
when(taskRepository.save(any())).thenReturn(task);
132-
Optional<Task> saved = taskService.addTask(CHAIN_DEAL_ID, 0, DAPP_NAME, COMMAND_LINE, 2,
133-
maxExecutionTime, "0x0", contributionDeadline, finalDeadline);
133+
Optional<Task> saved = taskService.addTask(CHAIN_DEAL_ID, 0, 0, DAPP_NAME, COMMAND_LINE,
134+
2, maxExecutionTime, "0x0", contributionDeadline, finalDeadline);
134135
assertThat(saved).isPresent();
135136
assertThat(saved).isEqualTo(Optional.of(task));
136137
}
@@ -140,8 +141,8 @@ public void shouldNotAddTask() {
140141
Task task = getStubTask();
141142
task.changeStatus(TaskStatus.INITIALIZED);
142143
when(taskRepository.findByChainDealIdAndTaskIndex(CHAIN_DEAL_ID, 0)).thenReturn(Optional.of(task));
143-
Optional<Task> saved = taskService.addTask(CHAIN_DEAL_ID, 0, DAPP_NAME, COMMAND_LINE, 2,
144-
maxExecutionTime, "0x0", contributionDeadline, finalDeadline);
144+
Optional<Task> saved = taskService.addTask(CHAIN_DEAL_ID, 0, 0, DAPP_NAME, COMMAND_LINE,
145+
2, maxExecutionTime, "0x0", contributionDeadline, finalDeadline);
145146
assertThat(saved).isEqualTo(Optional.empty());
146147
}
147148

0 commit comments

Comments
 (0)