Skip to content

Commit 68ec17d

Browse files
authored
Add @PreDetroy annotation in services implementing Purgeable (#621)
1 parent 8adb3f2 commit 68ec17d

File tree

8 files changed

+142
-96
lines changed

8 files changed

+142
-96
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file.
2121
- Resolve deprecations caused by `TaskDescription` in `AppComputeService`, `TaskManagerService`, and `ResultService`. (#616)
2222
- Replace `SignatureUtils#hashAndSign` deprecated calls in `LoginServiceTests`. (#618)
2323
- Rename `executor` package to `task` package. (#619)
24+
- Add missing `@PreDestroy` annotation in services implementing `Purgeable`. (#621)
2425

2526
### Dependency Upgrades
2627

src/main/java/com/iexec/worker/chain/IexecHubService.java

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.web3j.protocol.core.methods.response.Log;
3131
import org.web3j.protocol.core.methods.response.TransactionReceipt;
3232

33+
import javax.annotation.PreDestroy;
3334
import java.nio.charset.StandardCharsets;
3435
import java.util.List;
3536
import java.util.Objects;
@@ -67,15 +68,15 @@ public IexecHubService(SignerService signerService,
6768
}
6869

6970
// region contribute
70-
IexecHubContract.TaskContributeEventResponse contribute(Contribution contribution) {
71+
IexecHubContract.TaskContributeEventResponse contribute(final Contribution contribution) {
7172
log.info("contribute request [chainTaskId:{}, waitingTxCount:{}]", contribution.getChainTaskId(), getWaitingTransactionCount());
7273
return sendContributeTransaction(contribution);
7374
}
7475

75-
private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(Contribution contribution) {
76-
String chainTaskId = contribution.getChainTaskId();
76+
private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(final Contribution contribution) {
77+
final String chainTaskId = contribution.getChainTaskId();
7778

78-
RemoteCall<TransactionReceipt> contributeCall = iexecHubContract.contribute(
79+
final RemoteCall<TransactionReceipt> contributeCall = iexecHubContract.contribute(
7980
stringToBytes(chainTaskId),
8081
stringToBytes(contribution.getResultHash()),
8182
stringToBytes(contribution.getResultSeal()),
@@ -84,17 +85,17 @@ private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(C
8485
stringToBytes(contribution.getWorkerPoolSignature()));
8586
log.info("Sent contribute [chainTaskId:{}, contribution:{}]", chainTaskId, contribution);
8687

87-
TransactionReceipt contributeReceipt = submit(chainTaskId, "contribute", contributeCall);
88+
final TransactionReceipt contributeReceipt = submit(chainTaskId, "contribute", contributeCall);
8889

89-
List<IexecHubContract.TaskContributeEventResponse> contributeEvents =
90+
final List<IexecHubContract.TaskContributeEventResponse> contributeEvents =
9091
IexecHubContract.getTaskContributeEvents(contributeReceipt).stream()
9192
.filter(event -> Objects.equals(bytesToString(event.taskid), chainTaskId)
9293
&& Objects.equals(event.worker, signerService.getAddress()))
9394
.collect(Collectors.toList());
9495
log.debug("contributeEvents count {} [chainTaskId: {}]", contributeEvents.size(), chainTaskId);
9596

9697
if (!contributeEvents.isEmpty()) {
97-
IexecHubContract.TaskContributeEventResponse contributeEvent = contributeEvents.get(0);
98+
final IexecHubContract.TaskContributeEventResponse contributeEvent = contributeEvents.get(0);
9899
if (isSuccessTx(chainTaskId, contributeEvent.log, CONTRIBUTED)) {
99100
log.info("Contributed [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
100101
chainTaskId, contribution, contributeReceipt.getGasUsed(), contributeEvent.log);
@@ -108,28 +109,28 @@ private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(C
108109
// endregion
109110

110111
// region reveal
111-
IexecHubContract.TaskRevealEventResponse reveal(String chainTaskId, String resultDigest) {
112+
IexecHubContract.TaskRevealEventResponse reveal(final String chainTaskId, final String resultDigest) {
112113
log.info("reveal request [chainTaskId:{}, waitingTxCount:{}]", chainTaskId, getWaitingTransactionCount());
113114
return sendRevealTransaction(chainTaskId, resultDigest);
114115
}
115116

116-
private IexecHubContract.TaskRevealEventResponse sendRevealTransaction(String chainTaskId, String resultDigest) {
117-
RemoteCall<TransactionReceipt> revealCall = iexecHubContract.reveal(
117+
private IexecHubContract.TaskRevealEventResponse sendRevealTransaction(final String chainTaskId, final String resultDigest) {
118+
final RemoteCall<TransactionReceipt> revealCall = iexecHubContract.reveal(
118119
stringToBytes(chainTaskId),
119120
stringToBytes(resultDigest));
120121
log.info("Sent reveal [chainTaskId:{}, resultDigest:{}]", chainTaskId, resultDigest);
121122

122-
TransactionReceipt revealReceipt = submit(chainTaskId, "reveal", revealCall);
123+
final TransactionReceipt revealReceipt = submit(chainTaskId, "reveal", revealCall);
123124

124-
List<IexecHubContract.TaskRevealEventResponse> revealEvents =
125+
final List<IexecHubContract.TaskRevealEventResponse> revealEvents =
125126
IexecHubContract.getTaskRevealEvents(revealReceipt).stream()
126127
.filter(event -> Objects.equals(bytesToString(event.taskid), chainTaskId)
127128
&& Objects.equals(event.worker, signerService.getAddress()))
128129
.collect(Collectors.toList());
129130
log.debug("revealEvents count {} [chainTaskId:{}]", revealEvents.size(), chainTaskId);
130131

131132
if (!revealEvents.isEmpty()) {
132-
IexecHubContract.TaskRevealEventResponse revealEvent = revealEvents.get(0);
133+
final IexecHubContract.TaskRevealEventResponse revealEvent = revealEvents.get(0);
133134
if (isSuccessTx(chainTaskId, revealEvent.log, REVEALED)) {
134135
log.info("Revealed [chainTaskId:{}, resultDigest:{}, gasUsed:{}, log:{}]",
135136
chainTaskId, resultDigest, revealReceipt.getGasUsed(), revealEvent.log);
@@ -143,18 +144,21 @@ private IexecHubContract.TaskRevealEventResponse sendRevealTransaction(String ch
143144
// endregion reveal
144145

145146
// region contributeAndFinalize
146-
public Optional<ChainReceipt> contributeAndFinalize(Contribution contribution, String resultLink, String callbackData) {
147+
public Optional<ChainReceipt> contributeAndFinalize(final Contribution contribution, final String resultLink,
148+
final String callbackData) {
147149
log.info("contributeAndFinalize request [chainTaskId:{}, waitingTxCount:{}]",
148150
contribution.getChainTaskId(), getWaitingTransactionCount());
149-
IexecHubContract.TaskFinalizeEventResponse finalizeEvent = sendContributeAndFinalizeTransaction(contribution, resultLink, callbackData);
151+
final IexecHubContract.TaskFinalizeEventResponse finalizeEvent = sendContributeAndFinalizeTransaction(contribution, resultLink, callbackData);
150152
return Optional.ofNullable(finalizeEvent)
151153
.map(event -> ChainUtils.buildChainReceipt(event.log, contribution.getChainTaskId(), getLatestBlockNumber()));
152154
}
153155

154-
private IexecHubContract.TaskFinalizeEventResponse sendContributeAndFinalizeTransaction(Contribution contribution, String resultLink, String callbackData) {
155-
String chainTaskId = contribution.getChainTaskId();
156+
private IexecHubContract.TaskFinalizeEventResponse sendContributeAndFinalizeTransaction(final Contribution contribution,
157+
final String resultLink,
158+
final String callbackData) {
159+
final String chainTaskId = contribution.getChainTaskId();
156160

157-
RemoteCall<TransactionReceipt> contributeAndFinalizeCall = iexecHubContract.contributeAndFinalize(
161+
final RemoteCall<TransactionReceipt> contributeAndFinalizeCall = iexecHubContract.contributeAndFinalize(
158162
stringToBytes(chainTaskId),
159163
stringToBytes(contribution.getResultDigest()),
160164
StringUtils.isNotEmpty(resultLink) ? resultLink.getBytes(StandardCharsets.UTF_8) : new byte[0],
@@ -165,16 +169,16 @@ private IexecHubContract.TaskFinalizeEventResponse sendContributeAndFinalizeTran
165169
log.info("Sent contributeAndFinalize [chainTaskId:{}, contribution:{}, resultLink:{}, callbackData:{}]",
166170
chainTaskId, contribution, resultLink, callbackData);
167171

168-
TransactionReceipt receipt = submit(chainTaskId, "contributeAndFinalize", contributeAndFinalizeCall);
172+
final TransactionReceipt receipt = submit(chainTaskId, "contributeAndFinalize", contributeAndFinalizeCall);
169173

170-
List<IexecHubContract.TaskFinalizeEventResponse> finalizeEvents =
174+
final List<IexecHubContract.TaskFinalizeEventResponse> finalizeEvents =
171175
IexecHubContract.getTaskFinalizeEvents(receipt).stream()
172176
.filter(event -> Objects.equals(bytesToString(event.taskid), chainTaskId))
173177
.collect(Collectors.toList());
174178
log.debug("finalizeEvents count {} [chainTaskId:{}]", finalizeEvents.size(), chainTaskId);
175179

176180
if (!finalizeEvents.isEmpty()) {
177-
IexecHubContract.TaskFinalizeEventResponse finalizeEvent = finalizeEvents.get(0);
181+
final IexecHubContract.TaskFinalizeEventResponse finalizeEvent = finalizeEvents.get(0);
178182
if (isSuccessTx(chainTaskId, finalizeEvent.log, REVEALED)) {
179183
log.info("contributeAndFinalize done [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]",
180184
chainTaskId, contribution, receipt.getGasUsed(), finalizeEvent.log);
@@ -192,7 +196,7 @@ private long getWaitingTransactionCount() {
192196
return executor.getTaskCount() - 1 - executor.getCompletedTaskCount();
193197
}
194198

195-
boolean isSuccessTx(String chainTaskId, Log eventLog, ChainContributionStatus pretendedStatus) {
199+
boolean isSuccessTx(final String chainTaskId, final Log eventLog, final ChainContributionStatus pretendedStatus) {
196200
if (eventLog == null) {
197201
return false;
198202
}
@@ -205,12 +209,12 @@ boolean isSuccessTx(String chainTaskId, Log eventLog, ChainContributionStatus pr
205209
return true;
206210
}
207211

208-
private boolean isContributionStatusValidOnChain(String chainTaskId, ChainContributionStatus chainContributionStatus) {
209-
Optional<ChainContribution> chainContribution = getChainContribution(chainTaskId);
212+
private boolean isContributionStatusValidOnChain(final String chainTaskId, final ChainContributionStatus chainContributionStatus) {
213+
final Optional<ChainContribution> chainContribution = getChainContribution(chainTaskId);
210214
return chainContribution.isPresent() && chainContribution.get().getStatus() == chainContributionStatus;
211215
}
212216

213-
private boolean isStatusValidOnChainAfterPendingReceipt(String chainTaskId, ChainContributionStatus onchainStatus) {
217+
private boolean isStatusValidOnChainAfterPendingReceipt(final String chainTaskId, final ChainContributionStatus onchainStatus) {
214218
long maxWaitingTime = 10 * web3jService.getBlockTime().toMillis();
215219
log.info("Waiting for on-chain status after pending receipt " +
216220
"[chainTaskId:{}, status:{}, maxWaitingTime:{}]",
@@ -238,7 +242,7 @@ private boolean isStatusValidOnChainAfterPendingReceipt(String chainTaskId, Chai
238242
}
239243
// endregion
240244

241-
Optional<ChainContribution> getChainContribution(String chainTaskId) {
245+
Optional<ChainContribution> getChainContribution(final String chainTaskId) {
242246
return getChainContribution(chainTaskId, signerService.getAddress());
243247
}
244248

@@ -254,27 +258,30 @@ public long getLatestBlockNumber() {
254258
return web3jService.getLatestBlockNumber();
255259
}
256260

257-
boolean isChainTaskActive(String chainTaskId) {
258-
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
261+
boolean isChainTaskActive(final String chainTaskId) {
262+
final Optional<ChainTask> chainTask = getChainTask(chainTaskId);
259263
return chainTask.filter(task -> task.getStatus() == ChainTaskStatus.ACTIVE).isPresent();
260264
}
261265

262-
boolean isChainTaskRevealing(String chainTaskId) {
263-
Optional<ChainTask> chainTask = getChainTask(chainTaskId);
266+
boolean isChainTaskRevealing(final String chainTaskId) {
267+
final Optional<ChainTask> chainTask = getChainTask(chainTaskId);
264268
return chainTask.filter(task -> task.getStatus() == ChainTaskStatus.REVEALING).isPresent();
265269
}
266270

267271
@Override
268-
public boolean purgeTask(String chainTaskId) {
272+
public boolean purgeTask(final String chainTaskId) {
273+
log.debug("purgeTask [chainTaskId:{}]", chainTaskId);
269274
return super.purgeTask(chainTaskId);
270275
}
271276

272277
@Override
278+
@PreDestroy
273279
public void purgeAllTasksData() {
280+
log.info("Method purgeAllTasksData() called to perform task data cleanup.");
274281
super.purgeAllTasksData();
275282
}
276283

277-
TransactionReceipt submit(String chainTaskId, String transactionType, RemoteCall<TransactionReceipt> remoteCall) {
284+
TransactionReceipt submit(final String chainTaskId, final String transactionType, final RemoteCall<TransactionReceipt> remoteCall) {
278285
try {
279286
final RemoteCallTask remoteCallSend = new RemoteCallTask(chainTaskId, transactionType, remoteCall);
280287
return submit(remoteCallSend);
@@ -285,13 +292,13 @@ TransactionReceipt submit(String chainTaskId, String transactionType, RemoteCall
285292
Thread.currentThread().interrupt();
286293
}
287294
// return non-null receipt with empty logs on failure
288-
TransactionReceipt receipt = new TransactionReceipt();
295+
final TransactionReceipt receipt = new TransactionReceipt();
289296
receipt.setLogs(List.of());
290297
return receipt;
291298
}
292299

293-
TransactionReceipt submit(RemoteCallTask remoteCallTask) throws ExecutionException, InterruptedException {
294-
Future<TransactionReceipt> future = executor.submit(remoteCallTask);
300+
TransactionReceipt submit(final RemoteCallTask remoteCallTask) throws ExecutionException, InterruptedException {
301+
final Future<TransactionReceipt> future = executor.submit(remoteCallTask);
295302
return future.get();
296303
}
297304

@@ -311,7 +318,7 @@ public RemoteCallTask(String chainTaskId, String transactionType, RemoteCall<Tra
311318

312319
@Override
313320
public TransactionReceipt call() throws Exception {
314-
TransactionReceipt receipt = remoteCall.send();
321+
final TransactionReceipt receipt = remoteCall.send();
315322
log.debug("{} transaction hash {} at block {} [chainTaskId:{}]",
316323
transactionType, receipt.getTransactionHash(), receipt.getBlockNumber(), chainTaskId);
317324
log.info("{} receipt [chainTaskId:{}]", transactionType, chainTaskId);

src/main/java/com/iexec/worker/chain/WorkerpoolAuthorizationService.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.springframework.stereotype.Service;
2929

30+
import javax.annotation.PreDestroy;
3031
import java.util.Map;
3132

3233

@@ -44,15 +45,15 @@ public WorkerpoolAuthorizationService(SchedulerConfiguration schedulerConfigurat
4445
workerpoolAuthorizations = ExpiringTaskMapFactory.getExpiringTaskMap();
4546
}
4647

47-
public boolean isWorkerpoolAuthorizationValid(WorkerpoolAuthorization auth, String signerAddress) {
48+
public boolean isWorkerpoolAuthorizationValid(final WorkerpoolAuthorization auth, final String signerAddress) {
4849
// create the hash that was used in the signature in the core
49-
byte[] message = BytesUtils.stringToBytes(
50+
final byte[] message = BytesUtils.stringToBytes(
5051
HashUtils.concatenateAndHash(auth.getWorkerWallet(), auth.getChainTaskId(), auth.getEnclaveChallenge()));
5152

5253
return SignatureUtils.isSignatureValid(message, auth.getSignature(), signerAddress);
5354
}
5455

55-
public boolean putWorkerpoolAuthorization(WorkerpoolAuthorization workerpoolAuthorization) {
56+
public boolean putWorkerpoolAuthorization(final WorkerpoolAuthorization workerpoolAuthorization) {
5657
if (workerpoolAuthorization == null || workerpoolAuthorization.getChainTaskId() == null) {
5758
log.error("Cant putWorkerpoolAuthorization (null) [workerpoolAuthorization:{}]", workerpoolAuthorization);
5859
return false;
@@ -72,7 +73,7 @@ public boolean putWorkerpoolAuthorization(WorkerpoolAuthorization workerpoolAuth
7273
return true;
7374
}
7475

75-
WorkerpoolAuthorization getWorkerpoolAuthorization(String chainTaskId) {
76+
WorkerpoolAuthorization getWorkerpoolAuthorization(final String chainTaskId) {
7677
return workerpoolAuthorizations.get(chainTaskId);
7778
}
7879

@@ -84,13 +85,16 @@ WorkerpoolAuthorization getWorkerpoolAuthorization(String chainTaskId) {
8485
* {@literal false} otherwise.
8586
*/
8687
@Override
87-
public boolean purgeTask(String chainTaskId) {
88+
public boolean purgeTask(final String chainTaskId) {
89+
log.debug("purgeTask [chainTaskId:{}]", chainTaskId);
8890
workerpoolAuthorizations.remove(chainTaskId);
8991
return !workerpoolAuthorizations.containsKey(chainTaskId);
9092
}
9193

9294
@Override
95+
@PreDestroy
9396
public void purgeAllTasksData() {
97+
log.info("Method purgeAllTasksData() called to perform task data cleanup.");
9498
workerpoolAuthorizations.clear();
9599
}
96100
}

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public boolean purgeTask(final String chainTaskId) {
104104
@Override
105105
@PreDestroy
106106
public void purgeAllTasksData() {
107+
log.info("Method purgeAllTasksData() called to perform task data cleanup.");
107108
this.chainTaskIdToSubscription.keySet().forEach(this::purgeTask);
108109
this.chainTaskIdToSubscription.clear();
109110
}

0 commit comments

Comments
 (0)