|
19 | 19 | import com.iexec.common.lifecycle.purge.Purgeable; |
20 | 20 | import com.iexec.commons.poco.chain.*; |
21 | 21 | import com.iexec.commons.poco.contract.generated.IexecHubContract; |
22 | | -import com.iexec.commons.poco.utils.BytesUtils; |
| 22 | +import com.iexec.commons.poco.encoding.LogTopic; |
23 | 23 | import io.reactivex.Flowable; |
24 | 24 | import jakarta.annotation.PreDestroy; |
25 | 25 | import lombok.extern.slf4j.Slf4j; |
26 | 26 | import org.springframework.stereotype.Service; |
27 | | -import org.web3j.abi.EventEncoder; |
28 | | -import org.web3j.abi.datatypes.Event; |
29 | 27 | import org.web3j.protocol.core.DefaultBlockParameter; |
30 | 28 | import org.web3j.protocol.core.methods.request.EthFilter; |
| 29 | +import org.web3j.protocol.core.methods.response.EthLog; |
| 30 | +import org.web3j.protocol.core.methods.response.Log; |
31 | 31 |
|
32 | 32 | import java.math.BigInteger; |
| 33 | +import java.util.Arrays; |
33 | 34 | import java.util.Date; |
34 | 35 |
|
35 | 36 | import static com.iexec.commons.poco.chain.ChainContributionStatus.CONTRIBUTED; |
36 | 37 | import static com.iexec.commons.poco.chain.ChainContributionStatus.REVEALED; |
37 | | -import static com.iexec.commons.poco.contract.generated.IexecHubContract.*; |
38 | 38 |
|
39 | 39 | @Slf4j |
40 | 40 | @Service |
@@ -156,124 +156,95 @@ public boolean isRevealed(String... args) { |
156 | 156 | // endregion |
157 | 157 |
|
158 | 158 | // region get event blocks |
159 | | - public ChainReceipt getContributionBlock(String chainTaskId, |
160 | | - String workerWallet, |
161 | | - long fromBlock) { |
162 | | - long latestBlock = web3jService.getLatestBlockNumber(); |
| 159 | + public ChainReceipt getInitializeBlock(final String chainTaskId, |
| 160 | + final long fromBlock) { |
| 161 | + log.debug("getInitializeBlock [chainTaskId:{}]", chainTaskId); |
| 162 | + final long latestBlock = web3jService.getLatestBlockNumber(); |
163 | 163 | if (fromBlock > latestBlock) { |
164 | 164 | return ChainReceipt.builder().build(); |
165 | 165 | } |
166 | | - |
167 | | - EthFilter ethFilter = createContributeEthFilter(fromBlock, latestBlock); |
168 | | - |
169 | | - // filter only taskContribute events for the chainTaskId and the worker's wallet |
170 | | - // and retrieve the block number of the event |
171 | | - return iexecHubContract.taskContributeEventFlowable(ethFilter) |
172 | | - .filter(eventResponse -> |
173 | | - chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) && |
174 | | - workerWallet.equals(eventResponse.worker) |
175 | | - ) |
176 | | - .map(eventResponse -> ChainReceipt.builder() |
177 | | - .blockNumber(eventResponse.log.getBlockNumber().longValue()) |
178 | | - .txHash(eventResponse.log.getTransactionHash()) |
179 | | - .build()) |
| 166 | + final EthFilter ethFilter = createEthFilter( |
| 167 | + fromBlock, latestBlock, LogTopic.TASK_INITIALIZE_EVENT, chainTaskId); |
| 168 | + return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable() |
| 169 | + .map(this::createChainReceipt) |
180 | 170 | .blockingFirst(); |
181 | 171 | } |
182 | 172 |
|
183 | | - public ChainReceipt getConsensusBlock(String chainTaskId, long fromBlock) { |
| 173 | + public ChainReceipt getContributionBlock(final String chainTaskId, |
| 174 | + final String workerWallet, |
| 175 | + final long fromBlock) { |
| 176 | + log.debug("getContributionBlock [chainTaskId:{}]", chainTaskId); |
184 | 177 | long latestBlock = web3jService.getLatestBlockNumber(); |
185 | 178 | if (fromBlock > latestBlock) { |
186 | 179 | return ChainReceipt.builder().build(); |
187 | 180 | } |
188 | | - |
189 | | - EthFilter ethFilter = createConsensusEthFilter(fromBlock, latestBlock); |
190 | | - |
191 | | - // filter only taskConsensus events for the chainTaskId (there should be only one) |
192 | | - // and retrieve the block number of the event |
193 | | - return iexecHubContract.taskConsensusEventFlowable(ethFilter) |
194 | | - .filter(eventResponse -> chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid))) |
195 | | - .map(eventResponse -> ChainReceipt.builder() |
196 | | - .blockNumber(eventResponse.log.getBlockNumber().longValue()) |
197 | | - .txHash(eventResponse.log.getTransactionHash()) |
198 | | - .build()) |
| 181 | + final EthFilter ethFilter = createEthFilter( |
| 182 | + fromBlock, latestBlock, LogTopic.TASK_CONTRIBUTE_EVENT, chainTaskId, workerWallet); |
| 183 | + return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable() |
| 184 | + .map(this::createChainReceipt) |
199 | 185 | .blockingFirst(); |
200 | 186 | } |
201 | 187 |
|
202 | | - public ChainReceipt getRevealBlock(String chainTaskId, |
203 | | - String workerWallet, |
204 | | - long fromBlock) { |
| 188 | + public ChainReceipt getConsensusBlock(final String chainTaskId, final long fromBlock) { |
| 189 | + log.debug("getConsensusBlock [chainTaskId:{}]", chainTaskId); |
205 | 190 | long latestBlock = web3jService.getLatestBlockNumber(); |
206 | 191 | if (fromBlock > latestBlock) { |
207 | 192 | return ChainReceipt.builder().build(); |
208 | 193 | } |
209 | | - |
210 | | - EthFilter ethFilter = createRevealEthFilter(fromBlock, latestBlock); |
211 | | - |
212 | | - // filter only taskReveal events for the chainTaskId and the worker's wallet |
213 | | - // and retrieve the block number of the event |
214 | | - return iexecHubContract.taskRevealEventFlowable(ethFilter) |
215 | | - .filter(eventResponse -> |
216 | | - chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) && |
217 | | - workerWallet.equals(eventResponse.worker) |
218 | | - ) |
219 | | - .map(eventResponse -> ChainReceipt.builder() |
220 | | - .blockNumber(eventResponse.log.getBlockNumber().longValue()) |
221 | | - .txHash(eventResponse.log.getTransactionHash()) |
222 | | - .build()) |
| 194 | + final EthFilter ethFilter = createEthFilter( |
| 195 | + fromBlock, latestBlock, LogTopic.TASK_CONSENSUS_EVENT, chainTaskId); |
| 196 | + return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable() |
| 197 | + .map(this::createChainReceipt) |
223 | 198 | .blockingFirst(); |
224 | 199 | } |
225 | 200 |
|
226 | | - public ChainReceipt getFinalizeBlock(String chainTaskId, long fromBlock) { |
| 201 | + public ChainReceipt getRevealBlock(final String chainTaskId, |
| 202 | + final String workerWallet, |
| 203 | + final long fromBlock) { |
| 204 | + log.debug("getRevealBlock [chainTaskId:{}]", chainTaskId); |
227 | 205 | long latestBlock = web3jService.getLatestBlockNumber(); |
228 | 206 | if (fromBlock > latestBlock) { |
229 | 207 | return ChainReceipt.builder().build(); |
230 | 208 | } |
231 | | - |
232 | | - EthFilter ethFilter = createFinalizeEthFilter(fromBlock, latestBlock); |
233 | | - |
234 | | - // filter only taskFinalize events for the chainTaskId (there should be only one) |
235 | | - // and retrieve the block number of the event |
236 | | - return iexecHubContract.taskFinalizeEventFlowable(ethFilter) |
237 | | - .filter(eventResponse -> |
238 | | - chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) |
239 | | - ) |
240 | | - .map(eventResponse -> ChainReceipt.builder() |
241 | | - .blockNumber(eventResponse.log.getBlockNumber().longValue()) |
242 | | - .txHash(eventResponse.log.getTransactionHash()) |
243 | | - .build()) |
| 209 | + final EthFilter ethFilter = createEthFilter( |
| 210 | + fromBlock, latestBlock, LogTopic.TASK_REVEAL_EVENT, chainTaskId, workerWallet); |
| 211 | + return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable() |
| 212 | + .map(this::createChainReceipt) |
244 | 213 | .blockingFirst(); |
245 | 214 | } |
246 | 215 |
|
247 | | - private EthFilter createContributeEthFilter(long fromBlock, long toBlock) { |
248 | | - return createEthFilter(fromBlock, toBlock, TASKCONTRIBUTE_EVENT); |
249 | | - } |
250 | | - |
251 | | - private EthFilter createConsensusEthFilter(long fromBlock, long toBlock) { |
252 | | - return createEthFilter(fromBlock, toBlock, TASKCONSENSUS_EVENT); |
253 | | - } |
254 | | - |
255 | | - private EthFilter createRevealEthFilter(long fromBlock, long toBlock) { |
256 | | - return createEthFilter(fromBlock, toBlock, TASKREVEAL_EVENT); |
| 216 | + public ChainReceipt getFinalizeBlock(final String chainTaskId, |
| 217 | + final long fromBlock) { |
| 218 | + log.debug("getFinalizeBlock [chainTaskId:{}]", chainTaskId); |
| 219 | + long latestBlock = web3jService.getLatestBlockNumber(); |
| 220 | + if (fromBlock > latestBlock) { |
| 221 | + return ChainReceipt.builder().build(); |
| 222 | + } |
| 223 | + final EthFilter ethFilter = createEthFilter( |
| 224 | + fromBlock, latestBlock, LogTopic.TASK_FINALIZE_EVENT, chainTaskId); |
| 225 | + return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable() |
| 226 | + .map(this::createChainReceipt) |
| 227 | + .blockingFirst(); |
257 | 228 | } |
258 | 229 |
|
259 | | - private EthFilter createFinalizeEthFilter(long fromBlock, long toBlock) { |
260 | | - return createEthFilter(fromBlock, toBlock, TASKFINALIZE_EVENT); |
| 230 | + private ChainReceipt createChainReceipt(final EthLog ethLog) { |
| 231 | + final Log logEvent = (Log) ethLog.getLogs().get(0); |
| 232 | + return ChainReceipt.builder() |
| 233 | + .blockNumber(logEvent.getBlockNumber().longValue()) |
| 234 | + .txHash(logEvent.getTransactionHash()) |
| 235 | + .build(); |
261 | 236 | } |
262 | 237 |
|
263 | | - private EthFilter createEthFilter(long fromBlock, long toBlock, Event event) { |
264 | | - IexecHubContract iexecHub = getHubContract(); |
265 | | - DefaultBlockParameter startBlock = |
266 | | - DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock)); |
267 | | - DefaultBlockParameter endBlock = |
268 | | - DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock)); |
269 | | - |
270 | | - // define the filter |
271 | | - EthFilter ethFilter = new EthFilter( |
272 | | - startBlock, |
273 | | - endBlock, |
274 | | - iexecHub.getContractAddress() |
| 238 | + private EthFilter createEthFilter(final long fromBlock, |
| 239 | + final long toBlock, |
| 240 | + final String... topics) { |
| 241 | + log.debug("createEthFilter [from:{}, to:{}]", fromBlock, toBlock); |
| 242 | + final EthFilter ethFilter = new EthFilter( |
| 243 | + DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock)), |
| 244 | + DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock)), |
| 245 | + iexecHubAddress |
275 | 246 | ); |
276 | | - ethFilter.addSingleTopic(EventEncoder.encode(event)); |
| 247 | + Arrays.stream(topics).forEach(ethFilter::addSingleTopic); |
277 | 248 |
|
278 | 249 | return ethFilter; |
279 | 250 | } |
|
0 commit comments