1+ /*
2+ * Copyright 2021 IEXEC BLOCKCHAIN TECH
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package com .iexec .core .chain .adapter ;
18+
19+ import com .iexec .common .chain .adapter .CommandStatus ;
20+ import com .iexec .common .chain .adapter .args .TaskFinalizeArgs ;
21+ import lombok .extern .slf4j .Slf4j ;
22+ import org .apache .commons .lang3 .StringUtils ;
23+ import org .springframework .http .ResponseEntity ;
24+ import org .springframework .stereotype .Service ;
25+
26+ import java .util .Optional ;
27+ import java .util .function .Function ;
28+
29+ import static java .util .concurrent .TimeUnit .SECONDS ;
30+
31+ @ Slf4j
32+ @ Service
33+ public class BlockchainAdapterService {
34+
35+ public static final int WATCH_PERIOD_SECONDS = 1 ;//To tune
36+ public static final int MAX_ATTEMPTS = 50 ;
37+ private final BlockchainAdapterClient blockchainAdapterClient ;
38+
39+ public BlockchainAdapterService (BlockchainAdapterClient blockchainAdapterClient ) {
40+ this .blockchainAdapterClient = blockchainAdapterClient ;
41+ }
42+
43+ /**
44+ * Request on-chain initialization of the task.
45+ *
46+ * @param chainDealId ID of the deal
47+ * @param taskIndex index of the task in the deal
48+ * @return chain task ID is initialization is properly requested
49+ */
50+ public Optional <String > requestInitialize (String chainDealId , int taskIndex ) {
51+ try {
52+ ResponseEntity <String > initializeResponseEntity =
53+ blockchainAdapterClient .requestInitializeTask (chainDealId , taskIndex );
54+ if (initializeResponseEntity .getStatusCode ().is2xxSuccessful ()
55+ && !StringUtils .isEmpty (initializeResponseEntity .getBody ())) {
56+ String chainTaskId = initializeResponseEntity .getBody ();
57+ log .info ("Requested initialize [chainTaskId:{}, chainDealId:{}, " +
58+ "taskIndex:{}]" , chainTaskId , chainDealId , taskIndex );
59+ return Optional .of (chainTaskId );
60+ }
61+ } catch (Throwable e ) {
62+ log .error ("Failed to requestInitialize [chainDealId:{}, " +
63+ "taskIndex:{}]" , chainDealId , taskIndex , e );
64+ }
65+ return Optional .empty ();
66+ }
67+
68+ /**
69+ * Verify if the initialize task command is completed on-chain.
70+ *
71+ * @param chainTaskId ID of the task
72+ * @return true if the tx is mined, false if reverted or empty for other
73+ * cases (too long since still RECEIVED or PROCESSING, adapter error)
74+ */
75+ public Optional <Boolean > isInitialized (String chainTaskId ) {
76+ return isCommandCompleted (blockchainAdapterClient ::getStatusForInitializeTaskRequest ,
77+ chainTaskId , SECONDS .toMillis (WATCH_PERIOD_SECONDS ), MAX_ATTEMPTS , 0 );
78+ }
79+
80+ /**
81+ * Request on-chain finalization of the task.
82+ *
83+ * @param chainTaskId ID of the deal
84+ * @param resultLink link of the result to be published on-chain
85+ * @param callbackData optional data for on-chain callback
86+ * @return chain task ID is initialization is properly requested
87+ */
88+ public Optional <String > requestFinalize (String chainTaskId ,
89+ String resultLink ,
90+ String callbackData ) {
91+ try {
92+ ResponseEntity <String > finalizeResponseEntity =
93+ blockchainAdapterClient .requestFinalizeTask (chainTaskId ,
94+ new TaskFinalizeArgs (resultLink , callbackData ));
95+ if (finalizeResponseEntity .getStatusCode ().is2xxSuccessful ()
96+ && !StringUtils .isEmpty (finalizeResponseEntity .getBody ())) {
97+ log .info ("Requested finalize [chainTaskId:{}, resultLink:{}, " +
98+ "callbackData:{}]" , chainTaskId , resultLink , callbackData );
99+ return Optional .of (chainTaskId );
100+ }
101+ } catch (Throwable e ) {
102+ log .error ("Failed to requestFinalize [chainTaskId:{}, resultLink:{}, " +
103+ "callbackData:{}]" , chainTaskId , resultLink , callbackData , e );
104+ }
105+ return Optional .empty ();
106+ }
107+
108+ /**
109+ * Verify if the finalize task command is completed on-chain.
110+ *
111+ * @param chainTaskId ID of the task
112+ * @return true if the tx is mined, false if reverted or empty for other
113+ * cases (too long since still RECEIVED or PROCESSING, adapter error)
114+ */
115+ public Optional <Boolean > isFinalized (String chainTaskId ) {
116+ return isCommandCompleted (blockchainAdapterClient ::getStatusForFinalizeTaskRequest ,
117+ chainTaskId , SECONDS .toMillis (WATCH_PERIOD_SECONDS ), MAX_ATTEMPTS , 0 );
118+ }
119+
120+ /**
121+ * Verify if a command sent to the adapter is completed on-chain.
122+ *
123+ * @param getCommandStatusFunction method for checking the command is completed
124+ * @param chainTaskId ID of the task
125+ * @param period period in ms between checks
126+ * @param maxAttempts maximum number of attempts for checking
127+ * @param attempt current attempt number
128+ * @return true if the tx is mined, false if reverted or empty for other
129+ * cases (too long since still RECEIVED or PROCESSING, adapter error)
130+ */
131+ Optional <Boolean > isCommandCompleted (
132+ Function <String , ResponseEntity <CommandStatus >> getCommandStatusFunction ,
133+ String chainTaskId ,
134+ long period , int maxAttempts , int attempt ) {
135+ if (attempt >= maxAttempts ) {
136+ log .error ("Reached max retry while waiting command completion " +
137+ "[chainTaskId:{}, maxAttempts:{}]" ,
138+ chainTaskId , maxAttempts );
139+ return Optional .empty ();
140+ }
141+ ResponseEntity <CommandStatus > commandStatusEntity ;
142+ try {
143+ commandStatusEntity = getCommandStatusFunction .apply (chainTaskId );
144+ if (!commandStatusEntity .getStatusCode ().is2xxSuccessful ()
145+ || commandStatusEntity .getBody () == null ) {
146+ return Optional .empty ();
147+ }
148+ CommandStatus status = commandStatusEntity .getBody ();
149+ if (CommandStatus .SUCCESS .equals (status )
150+ || CommandStatus .FAILURE .equals (status )) {
151+ return Optional .of (status .equals (CommandStatus .SUCCESS ));
152+ }
153+ // RECEIVED, PROCESSING
154+ log .warn ("Waiting command completion [chainTaskId:{}, " +
155+ "status:{}, period:{}ms, attempt:{}, maxAttempts:{}]" ,
156+ chainTaskId , status , period , attempt , maxAttempts );
157+ Thread .sleep (period );
158+ return isCommandCompleted (getCommandStatusFunction , chainTaskId ,
159+ period , maxAttempts , attempt + 1 );
160+ } catch (Throwable e ) {
161+ log .error ("Unexpected error while waiting command completion " +
162+ "[chainTaskId:{}, period:{}ms, attempt:{}, maxAttempts:{}]" ,
163+ chainTaskId , period , attempt , maxAttempts , e );
164+ }
165+ return Optional .empty ();
166+ }
167+
168+ }
0 commit comments