1818
1919import com .iexec .common .lifecycle .purge .Purgeable ;
2020import com .iexec .commons .poco .chain .*;
21- import com .iexec .commons .poco .contract .generated .IexecHubContract ;
21+ import com .iexec .commons .poco .encoding .LogTopic ;
22+ import com .iexec .commons .poco .encoding .PoCoDataEncoder ;
2223import com .iexec .worker .config .ConfigServerConfigurationService ;
24+ import io .micrometer .core .instrument .Counter ;
25+ import io .micrometer .core .instrument .Metrics ;
2326import jakarta .annotation .PreDestroy ;
2427import lombok .extern .slf4j .Slf4j ;
2528import org .apache .commons .lang3 .StringUtils ;
2629import org .springframework .beans .factory .annotation .Autowired ;
2730import org .springframework .stereotype .Service ;
28- import org .web3j .protocol .core .RemoteCall ;
2931import org .web3j .protocol .core .methods .response .Log ;
3032import org .web3j .protocol .core .methods .response .TransactionReceipt ;
33+ import org .web3j .protocol .exceptions .TransactionException ;
3134
35+ import java .io .IOException ;
36+ import java .math .BigInteger ;
3237import java .nio .charset .StandardCharsets ;
3338import java .util .List ;
34- import java .util .Objects ;
3539import java .util .Optional ;
36- import java .util .concurrent .*;
40+ import java .util .concurrent .Executors ;
41+ import java .util .concurrent .ThreadPoolExecutor ;
3742
3843import static com .iexec .commons .poco .chain .ChainContributionStatus .CONTRIBUTED ;
3944import static com .iexec .commons .poco .chain .ChainContributionStatus .REVEALED ;
40- import static com .iexec .commons .poco .utils .BytesUtils .bytesToString ;
4145import static com .iexec .commons .poco .utils .BytesUtils .stringToBytes ;
4246
4347@ Slf4j
@@ -48,53 +52,54 @@ public class IexecHubService extends IexecHubAbstractService implements Purgeabl
4852 private final SignerService signerService ;
4953 private final ThreadPoolExecutor executor ;
5054 private final Web3jService web3jService ;
55+ private final String hubContractAddress ;
56+ private final Counter failureCounter = Metrics .counter ("iexec.poco.transaction" , "status" , "failure" );
57+ private final Counter successCounter = Metrics .counter ("iexec.poco.transaction" , "status" , "success" );
5158
5259 @ Autowired
53- public IexecHubService (SignerService signerService ,
54- Web3jService web3jService ,
55- ConfigServerConfigurationService configServerConfigurationService ) {
60+ public IexecHubService (final SignerService signerService ,
61+ final Web3jService web3jService ,
62+ final ConfigServerConfigurationService configServerConfigurationService ) {
5663 super (signerService .getCredentials (),
5764 web3jService ,
5865 configServerConfigurationService .getIexecHubContractAddress (),
5966 1 ,
6067 5 );
68+ this .hubContractAddress = configServerConfigurationService .getIexecHubContractAddress ();
6169 this .signerService = signerService ;
6270 this .web3jService = web3jService ;
6371 this .executor = (ThreadPoolExecutor ) Executors .newFixedThreadPool (1 );
6472 }
6573
6674 // region contribute
67- IexecHubContract . TaskContributeEventResponse contribute (final Contribution contribution ) {
75+ Log contribute (final Contribution contribution ) {
6876 log .info ("contribute request [chainTaskId:{}, waitingTxCount:{}]" , contribution .chainTaskId (), getWaitingTransactionCount ());
69- return sendContributeTransaction (contribution );
70- }
7177
72- private IexecHubContract .TaskContributeEventResponse sendContributeTransaction (final Contribution contribution ) {
7378 final String chainTaskId = contribution .chainTaskId ();
74-
75- final RemoteCall <TransactionReceipt > contributeCall = iexecHubContract .contribute (
76- stringToBytes (chainTaskId ),
77- stringToBytes (contribution .resultHash ()),
78- stringToBytes (contribution .resultSeal ()),
79+ final String txData = PoCoDataEncoder .encodeContribute (
80+ chainTaskId ,
81+ contribution .resultHash (),
82+ contribution .resultSeal (),
7983 contribution .enclaveChallenge (),
80- stringToBytes (contribution .enclaveSignature ()),
81- stringToBytes (contribution .workerPoolSignature ()));
84+ contribution .enclaveSignature (),
85+ contribution .workerPoolSignature ()
86+ );
8287 log .info ("Sent contribute [chainTaskId:{}, contribution:{}]" , chainTaskId , contribution );
8388
84- final TransactionReceipt contributeReceipt = submit (chainTaskId , "contribute" , contributeCall );
89+ final TransactionReceipt receipt = submit ("contribute" , txData );
90+ log .debug ("receipt {}" , receipt );
8591
86- final List <IexecHubContract .TaskContributeEventResponse > contributeEvents =
87- IexecHubContract .getTaskContributeEvents (contributeReceipt ).stream ()
88- .filter (event -> Objects .equals (bytesToString (event .taskid ), chainTaskId )
89- && Objects .equals (event .worker , signerService .getAddress ()))
90- .toList ();
92+ final List <Log > contributeEvents = receipt .getLogs ().stream ()
93+ .filter (log -> log .getTopics ().get (0 ).equals (LogTopic .TASK_CONTRIBUTE_EVENT )
94+ && log .getTopics ().get (1 ).equals (chainTaskId ))
95+ .toList ();
9196 log .debug ("contributeEvents count {} [chainTaskId: {}]" , contributeEvents .size (), chainTaskId );
9297
9398 if (!contributeEvents .isEmpty ()) {
94- final IexecHubContract . TaskContributeEventResponse contributeEvent = contributeEvents .get (0 );
95- if (isSuccessTx (chainTaskId , contributeEvent . log , CONTRIBUTED )) {
96- log .info ("Contributed [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]" ,
97- chainTaskId , contribution , contributeReceipt .getGasUsed (), contributeEvent . log );
99+ final Log contributeEvent = contributeEvents .get (0 );
100+ if (isSuccessTx (chainTaskId , contributeEvent , CONTRIBUTED )) {
101+ log .info ("contribute done [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]" ,
102+ chainTaskId , contribution , receipt .getGasUsed (), contributeEvent );
98103 return contributeEvent ;
99104 }
100105 }
@@ -105,31 +110,28 @@ private IexecHubContract.TaskContributeEventResponse sendContributeTransaction(f
105110 // endregion
106111
107112 // region reveal
108- IexecHubContract . TaskRevealEventResponse reveal (final String chainTaskId , final String resultDigest ) {
113+ Log reveal (final String chainTaskId , final String resultDigest ) {
109114 log .info ("reveal request [chainTaskId:{}, waitingTxCount:{}]" , chainTaskId , getWaitingTransactionCount ());
110- return sendRevealTransaction (chainTaskId , resultDigest );
111- }
112115
113- private IexecHubContract . TaskRevealEventResponse sendRevealTransaction ( final String chainTaskId , final String resultDigest ) {
114- final RemoteCall < TransactionReceipt > revealCall = iexecHubContract . reveal (
115- stringToBytes ( chainTaskId ),
116- stringToBytes ( resultDigest ) );
116+ final String txData = PoCoDataEncoder . encodeReveal (
117+ chainTaskId ,
118+ resultDigest
119+ );
117120 log .info ("Sent reveal [chainTaskId:{}, resultDigest:{}]" , chainTaskId , resultDigest );
118121
119- final TransactionReceipt revealReceipt = submit (chainTaskId , "reveal" , revealCall );
122+ final TransactionReceipt receipt = submit ("reveal" , txData );
120123
121- final List <IexecHubContract .TaskRevealEventResponse > revealEvents =
122- IexecHubContract .getTaskRevealEvents (revealReceipt ).stream ()
123- .filter (event -> Objects .equals (bytesToString (event .taskid ), chainTaskId )
124- && Objects .equals (event .worker , signerService .getAddress ()))
125- .toList ();
124+ final List <Log > revealEvents = receipt .getLogs ().stream ()
125+ .filter (log -> log .getTopics ().get (0 ).equals (LogTopic .TASK_REVEAL_EVENT )
126+ && log .getTopics ().get (1 ).equals (chainTaskId ))
127+ .toList ();
126128 log .debug ("revealEvents count {} [chainTaskId:{}]" , revealEvents .size (), chainTaskId );
127129
128130 if (!revealEvents .isEmpty ()) {
129- final IexecHubContract . TaskRevealEventResponse revealEvent = revealEvents .get (0 );
130- if (isSuccessTx (chainTaskId , revealEvent . log , REVEALED )) {
131- log .info ("Revealed [chainTaskId:{}, resultDigest:{}, gasUsed:{}, log:{}]" ,
132- chainTaskId , resultDigest , revealReceipt .getGasUsed (), revealEvent . log );
131+ final Log revealEvent = revealEvents .get (0 );
132+ if (isSuccessTx (chainTaskId , revealEvent , REVEALED )) {
133+ log .info ("reveal done [chainTaskId:{}, resultDigest:{}, gasUsed:{}, log:{}]" ,
134+ chainTaskId , resultDigest , receipt .getGasUsed (), revealEvent );
133135 return revealEvent ;
134136 }
135137 }
@@ -144,46 +146,40 @@ public Optional<ChainReceipt> contributeAndFinalize(final Contribution contribut
144146 final String callbackData ) {
145147 log .info ("contributeAndFinalize request [chainTaskId:{}, waitingTxCount:{}]" ,
146148 contribution .chainTaskId (), getWaitingTransactionCount ());
147- final IexecHubContract .TaskFinalizeEventResponse finalizeEvent = sendContributeAndFinalizeTransaction (contribution , resultLink , callbackData );
148- return Optional .ofNullable (finalizeEvent )
149- .map (event -> ChainUtils .buildChainReceipt (event .log , contribution .chainTaskId (), getLatestBlockNumber ()));
150- }
151149
152- private IexecHubContract .TaskFinalizeEventResponse sendContributeAndFinalizeTransaction (final Contribution contribution ,
153- final String resultLink ,
154- final String callbackData ) {
155150 final String chainTaskId = contribution .chainTaskId ();
156-
157- final RemoteCall <TransactionReceipt > contributeAndFinalizeCall = iexecHubContract .contributeAndFinalize (
158- stringToBytes (chainTaskId ),
159- stringToBytes (contribution .resultDigest ()),
151+ final String txData = PoCoDataEncoder .encodeContributeAndFinalize (
152+ chainTaskId ,
153+ contribution .resultDigest (),
160154 StringUtils .isNotEmpty (resultLink ) ? resultLink .getBytes (StandardCharsets .UTF_8 ) : new byte [0 ],
161155 StringUtils .isNotEmpty (callbackData ) ? stringToBytes (callbackData ) : new byte [0 ],
162156 contribution .enclaveChallenge (),
163- stringToBytes (contribution .enclaveSignature ()),
164- stringToBytes (contribution .workerPoolSignature ()));
157+ contribution .enclaveSignature (),
158+ contribution .workerPoolSignature ()
159+ );
165160 log .info ("Sent contributeAndFinalize [chainTaskId:{}, contribution:{}, resultLink:{}, callbackData:{}]" ,
166161 chainTaskId , contribution , resultLink , callbackData );
167162
168- final TransactionReceipt receipt = submit (chainTaskId , "contributeAndFinalize" , contributeAndFinalizeCall );
163+ final TransactionReceipt receipt = submit ("contributeAndFinalize" , txData );
169164
170- final List <IexecHubContract . TaskFinalizeEventResponse > finalizeEvents =
171- IexecHubContract . getTaskFinalizeEvents ( receipt ). stream ( )
172- . filter ( event -> Objects . equals ( bytesToString ( event . taskid ), chainTaskId ))
173- .toList ();
165+ final List <Log > finalizeEvents = receipt . getLogs (). stream ()
166+ . filter ( log -> log . getTopics (). get ( 0 ). equals ( LogTopic . TASK_FINALIZE_EVENT )
167+ && log . getTopics (). get ( 1 ). equals ( chainTaskId ))
168+ .toList ();
174169 log .debug ("finalizeEvents count {} [chainTaskId:{}]" , finalizeEvents .size (), chainTaskId );
175170
176171 if (!finalizeEvents .isEmpty ()) {
177- final IexecHubContract . TaskFinalizeEventResponse finalizeEvent = finalizeEvents .get (0 );
178- if (isSuccessTx (chainTaskId , finalizeEvent . log , REVEALED )) {
172+ final Log finalizeEvent = finalizeEvents .get (0 );
173+ if (isSuccessTx (chainTaskId , finalizeEvent , REVEALED )) {
179174 log .info ("contributeAndFinalize done [chainTaskId:{}, contribution:{}, gasUsed:{}, log:{}]" ,
180- chainTaskId , contribution , receipt .getGasUsed (), finalizeEvent .log );
181- return finalizeEvent ;
175+ chainTaskId , contribution , receipt .getGasUsed (), finalizeEvent );
176+ return Optional .of (
177+ ChainUtils .buildChainReceipt (finalizeEvent , contribution .chainTaskId (), getLatestBlockNumber ()));
182178 }
183179 }
184180
185181 log .error ("contributeAndFinalize failed [chainTaskId:{}]" , chainTaskId );
186- return null ;
182+ return Optional . empty () ;
187183 }
188184 // endregion
189185
@@ -267,48 +263,32 @@ public void purgeAllTasksData() {
267263 super .purgeAllTasksData ();
268264 }
269265
270- TransactionReceipt submit (final String chainTaskId , final String transactionType , final RemoteCall < TransactionReceipt > remoteCall ) {
266+ synchronized TransactionReceipt submit (final String function , final String txData ) {
271267 try {
272- final RemoteCallTask remoteCallSend = new RemoteCallTask (chainTaskId , transactionType , remoteCall );
273- return submit (remoteCallSend );
274- } catch (ExecutionException e ) {
275- log .error ("{} asynchronous execution did not complete" , transactionType , e );
276- } catch (InterruptedException e ) {
277- log .error ("{} thread has been interrupted" , transactionType , e );
278- Thread .currentThread ().interrupt ();
268+ final BigInteger nonce = signerService .getNonce ();
269+ final BigInteger gasLimit = "contributeAndFinalize" .equals (function )
270+ ? signerService .estimateGas (hubContractAddress , txData ).add (getCallbackGas ())
271+ : PoCoDataEncoder .getGasLimitForFunction (function );
272+ return waitTxMined (signerService .signAndSendTransaction (
273+ nonce , web3jService .getUserGasPrice (), gasLimit , hubContractAddress , txData ));
274+ } catch (Exception e ) {
275+ log .error ("{} asynchronous execution did not complete" , function , e );
279276 }
280277 // return non-null receipt with empty logs on failure
281278 final TransactionReceipt receipt = new TransactionReceipt ();
282279 receipt .setLogs (List .of ());
283280 return receipt ;
284281 }
285282
286- TransactionReceipt submit (final RemoteCallTask remoteCallTask ) throws ExecutionException , InterruptedException {
287- final Future <TransactionReceipt > future = executor .submit (remoteCallTask );
288- return future .get ();
289- }
290-
291- /**
292- * Sends a transaction to the blockchain and returns its receipt.
293- */
294- static class RemoteCallTask implements Callable <TransactionReceipt > {
295- private final String chainTaskId ;
296- private final String transactionType ;
297- private final RemoteCall <TransactionReceipt > remoteCall ;
298-
299- public RemoteCallTask (String chainTaskId , String transactionType , RemoteCall <TransactionReceipt > remoteCall ) {
300- this .chainTaskId = chainTaskId ;
301- this .transactionType = transactionType ;
302- this .remoteCall = remoteCall ;
303- }
304-
305- @ Override
306- public TransactionReceipt call () throws Exception {
307- final TransactionReceipt receipt = remoteCall .send ();
308- log .debug ("{} transaction hash {} at block {} [chainTaskId:{}]" ,
309- transactionType , receipt .getTransactionHash (), receipt .getBlockNumber (), chainTaskId );
310- log .info ("{} receipt [chainTaskId:{}]" , transactionType , chainTaskId );
311- return receipt ;
283+ TransactionReceipt waitTxMined (final String txHash ) throws IOException , TransactionException {
284+ final TransactionReceipt receipt = txReceiptProcessor .waitForTransactionReceipt (txHash );
285+ log .info ("Transaction receipt [hash:{}, status:{}, revert-reason:{}]" ,
286+ txHash , receipt .getStatus (), receipt .getRevertReason ());
287+ if (receipt .isStatusOK ()) {
288+ successCounter .increment ();
289+ } else {
290+ failureCounter .increment ();
312291 }
292+ return receipt ;
313293 }
314294}
0 commit comments