66import com .iexec .common .notification .TaskNotification ;
77import com .iexec .common .replicate .ReplicateDetails ;
88import com .iexec .common .replicate .ReplicateStatus ;
9- import com .iexec .common .security .Signature ;
10- import com .iexec .common .utils .SignatureUtils ;
11- import com .iexec .worker .chain .CredentialsService ;
129import com .iexec .worker .config .CoreConfigurationService ;
10+ import com .iexec .worker .security .TokenService ;
1311import feign .FeignException ;
1412import lombok .extern .slf4j .Slf4j ;
1513import org .springframework .http .HttpStatus ;
1614import org .springframework .stereotype .Service ;
17- import org .web3j .crypto .ECKeyPair ;
1815
19- import java .util .ArrayList ;
20- import java .util .Collections ;
2116import java .util .List ;
2217import java .util .Optional ;
2318
2621@ Slf4j
2722public class CustomFeignClient {
2823
29-
3024 private static final int RETRY_TIME = 5000 ;
31- private static final String TOKEN_PREFIX = "Bearer " ;
3225 private final String coreURL ;
33- private String currentToken ;
34-
26+ private TokenService tokenService ;
3527 private CoreClient coreClient ;
3628 private WorkerClient workerClient ;
3729 private ReplicateClient replicateClient ;
38- private CredentialsService credentialsService ;
3930
4031 public CustomFeignClient (CoreClient coreClient ,
4132 WorkerClient workerClient ,
4233 ReplicateClient replicateClient ,
43- CredentialsService credentialsService ,
44- CoreConfigurationService coreConfigurationService ) {
34+ CoreConfigurationService coreConfigurationService ,
35+ TokenService tokenService ) {
4536 this .coreClient = coreClient ;
4637 this .workerClient = workerClient ;
4738 this .replicateClient = replicateClient ;
48- this .credentialsService = credentialsService ;
4939 this .coreURL = coreConfigurationService .getUrl ();
50- this .currentToken = "" ;
40+ this .tokenService = tokenService ;
5141 }
5242
53- public PublicConfiguration getPublicConfiguration () {
43+ //TODO: Make a generic method for REST API calls (Unauthorized, Retry, ..)
44+ public String ping () {
5445 try {
55- return workerClient .getPublicConfiguration ( );
46+ return workerClient .ping ( tokenService . getToken () );
5647 } catch (FeignException e ) {
57- if (e .status () == 0 ) {
58- log . error ( "Failed to getPublicConfiguration, will retry" );
59- sleep ();
60- return getPublicConfiguration ( );
48+ if (HttpStatus . valueOf ( e .status ()). equals ( HttpStatus . UNAUTHORIZED ) ) {
49+ tokenService . expireToken ( );
50+ } else {
51+ log . error ( "Failed to ping (will retry) [instance:{}, status:{}]" , coreURL , e . status () );
6152 }
53+ sleep ();
54+ return ping ();
6255 }
63- return null ;
6456 }
6557
66- public String getCoreVersion () {
58+ public PublicConfiguration getPublicConfiguration () {
6759 try {
68- return coreClient . getCoreVersion ();
60+ return workerClient . getPublicConfiguration ();
6961 } catch (FeignException e ) {
70- if (e .status () == 0 ) {
71- log . error ( "Failed to getCoreVersion, will retry" );
72- sleep ();
73- return getCoreVersion ( );
62+ if (HttpStatus . valueOf ( e .status ()). equals ( HttpStatus . UNAUTHORIZED ) ) {
63+ tokenService . expireToken ( );
64+ } else {
65+ log . error ( "Failed to getPublicConfiguration (will retry) [instance:{}, status:{}]" , coreURL , e . status () );
7466 }
67+ sleep ();
68+ return getPublicConfiguration ();
7569 }
76- return null ;
7770 }
7871
79- public String ping () {
72+ public String getCoreVersion () {
8073 try {
81- return workerClient . ping ( getToken () );
74+ return coreClient . getCoreVersion ( );
8275 } catch (FeignException e ) {
83- if (e .status () == 0 ) {
84- log .error ("Failed to ping [instance:{}]" , coreURL );
85- } else if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
86- generateNewToken ();
87- return workerClient .ping (getToken ());
76+ if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
77+ tokenService .expireToken ();
78+ } else {
79+ log .error ("Failed to getCoreVersion (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
8880 }
81+ sleep ();
82+ return getCoreVersion ();
8983 }
90-
91- return "" ;
9284 }
9385
86+ //TODO: Make registerWorker return Worker
9487 public void registerWorker (WorkerConfigurationModel model ) {
9588 try {
96- workerClient .registerWorker (getToken (), model );
89+ workerClient .registerWorker (tokenService . getToken (), model );
9790 } catch (FeignException e ) {
98- if (e .status () == 0 ) {
99- log .error ("Failed to registerWorker, will retry [instance:{}]" , coreURL );
100- sleep ();
101- registerWorker (model );
102- } else if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
103- generateNewToken ();
104- workerClient .registerWorker (getToken (), model );
91+ if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
92+ tokenService .expireToken ();
93+ } else {
94+ log .error ("Failed to registerWorker (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
10595 }
96+ sleep ();
97+ registerWorker (model );
10698 }
10799 }
108100
109101 public List <TaskNotification > getMissedTaskNotifications (long lastAvailableBlockNumber ) {
110- List <TaskNotification > interruptedReplicates = new ArrayList <>();
111-
112102 try {
113- interruptedReplicates = replicateClient .getMissedTaskNotifications (lastAvailableBlockNumber , getToken ());
103+ return replicateClient .getMissedTaskNotifications (lastAvailableBlockNumber , tokenService . getToken ());
114104 } catch (FeignException e ) {
115- if (e .status () == HttpStatus .UNAUTHORIZED .value ()) {
116- generateNewToken ();
117- interruptedReplicates = replicateClient .getMissedTaskNotifications (lastAvailableBlockNumber , getToken ());
105+ if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
106+ tokenService .expireToken ();
118107 } else {
119- log .error ("Failed to get interrupted replicates [instance:{}]" , coreURL );
120- e .printStackTrace ();
108+ log .error ("Failed to getMissedTaskNotifications (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
121109 }
110+ sleep ();
111+ return getMissedTaskNotifications (lastAvailableBlockNumber );
122112 }
123-
124- return interruptedReplicates ;
125113 }
126114
127115 public List <String > getTasksInProgress () {
128116 try {
129- return workerClient .getCurrentTasks (getToken ());
117+ return workerClient .getCurrentTasks (tokenService . getToken ());
130118 } catch (FeignException e ) {
131- if (e .status () == 0 ) {
132- log .error ("Failed to get tasks in progress, will retry [instance:{}]" , coreURL );
133- sleep ();
134- } else if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
135- generateNewToken ();
136- return workerClient .getCurrentTasks (getToken ());
119+ if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
120+ tokenService .expireToken ();
121+ } else {
122+ log .error ("Failed to getTasksInProgress (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
137123 }
124+ sleep ();
125+ return getTasksInProgress ();
138126 }
139-
140- return Collections .emptyList ();
141127 }
142128
143129 public Optional <ContributionAuthorization > getAvailableReplicate (long lastAvailableBlockNumber ) {
144- ContributionAuthorization contributionAuth = null ;
145-
146130 try {
147- contributionAuth = replicateClient .getAvailableReplicate (lastAvailableBlockNumber , getToken ());
131+ ContributionAuthorization contributionAuth = replicateClient .getAvailableReplicate (lastAvailableBlockNumber , tokenService .getToken ());
132+ return contributionAuth == null ? Optional .empty () : Optional .of (contributionAuth );
148133 } catch (FeignException e ) {
149- if (e .status () == HttpStatus .UNAUTHORIZED .value ()) {
150- generateNewToken ();
151- contributionAuth = replicateClient .getAvailableReplicate (lastAvailableBlockNumber , getToken ());
134+ if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
135+ tokenService .expireToken ();
152136 } else {
153- log .error ("Failed to get an available replicate [instance:{}]" , coreURL );
154- e .printStackTrace ();
137+ log .error ("Failed to getAvailableReplicate (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
155138 }
139+ sleep ();
140+ return getAvailableReplicate (lastAvailableBlockNumber );
156141 }
157-
158- return contributionAuth == null ? Optional .empty () : Optional .of (contributionAuth );
159142 }
160143
161144 public void updateReplicateStatus (String chainTaskId , ReplicateStatus status ) {
162145 updateReplicateStatus (chainTaskId , status , ReplicateDetails .builder ().build ());
163146 }
164147
165148 public void updateReplicateStatus (String chainTaskId , ReplicateStatus status , ReplicateDetails details ) {
166- log .info (status .toString () + " [chainTaskId:{}]" , chainTaskId );
167-
168149 try {
169- replicateClient .updateReplicateStatus (chainTaskId , status , getToken (), details );
150+ replicateClient .updateReplicateStatus (chainTaskId , status , tokenService .getToken (), details );
151+ log .info (status .toString () + " [chainTaskId:{}]" , chainTaskId );
170152 } catch (FeignException e ) {
171- if (e .status () == 0 ) {
172- log .error ("Failed to updateReplicateStatus, will retry [instance:{}]" , coreURL );
173- sleep ();
174- updateReplicateStatus (chainTaskId , status , details );
175- return ;
176- }
177-
178153 if (HttpStatus .valueOf (e .status ()).equals (HttpStatus .UNAUTHORIZED )) {
179- generateNewToken ();
180- log .info (status .toString () + " [chainTaskId:{}]" , chainTaskId );
181- replicateClient .updateReplicateStatus (chainTaskId , status , getToken (), details );
182- }
183- }
184- }
185-
186- private String getChallenge (String workerAddress ) {
187- try {
188- return workerClient .getChallenge (workerAddress );
189- } catch (FeignException e ) {
190- if (e .status () == 0 ) {
191- log .error ("Failed to get core challenge, will retry [instance:{}]" , coreURL );
192- sleep ();
193- return getChallenge (workerAddress );
194- }
195- }
196- return null ;
197- }
198-
199- private String login (String workerAddress , Signature signature ) {
200- try {
201- return workerClient .login (workerAddress , signature );
202- } catch (FeignException e ) {
203- if (e .status () == 0 ) {
204- log .error ("Failed to login, will retry [instance:{}]" , coreURL );
205- sleep ();
206- return login (workerAddress , signature );
154+ tokenService .expireToken ();
155+ } else {
156+ log .error ("Failed to updateReplicateStatus (will retry) [instance:{}, status:{}]" , coreURL , e .status ());
207157 }
158+ sleep ();
159+ updateReplicateStatus (chainTaskId , status , details );
208160 }
209- return null ;
210161 }
211162
212163 private void sleep () {
@@ -216,26 +167,4 @@ private void sleep() {
216167 }
217168 }
218169
219- private String getToken () {
220- if (currentToken .isEmpty ()) {
221- String workerAddress = credentialsService .getCredentials ().getAddress ();
222- ECKeyPair ecKeyPair = credentialsService .getCredentials ().getEcKeyPair ();
223- String challenge = getChallenge (workerAddress );
224-
225- Signature signature = SignatureUtils .hashAndSign (challenge , workerAddress , ecKeyPair );
226- currentToken = TOKEN_PREFIX + login (workerAddress , signature );
227- }
228-
229- return currentToken ;
230- }
231-
232- private void expireToken () {
233- currentToken = "" ;
234- }
235-
236- private String generateNewToken () {
237- expireToken ();
238- return getToken ();
239- }
240-
241170}
0 commit comments