1717
1818package com .huaweicloud .service .engine .common .transport ;
1919
20+ import java .net .URI ;
2021import java .util .Collections ;
2122import java .util .HashMap ;
2223import java .util .Map ;
2627
2728import org .apache .commons .lang3 .StringUtils ;
2829import org .apache .servicecomb .foundation .auth .AuthHeaderProvider ;
29- import org .apache .servicecomb .http .client .event .EngineConnectChangedEvent ;
30- import org .apache .servicecomb .service .center .client .OperationEvents ;
30+ import org .apache .servicecomb .http .client .event .OperationEvents ;
3131import org .apache .servicecomb .service .center .client .ServiceCenterClient ;
3232import org .apache .servicecomb .service .center .client .model .RbacTokenRequest ;
3333import org .apache .servicecomb .service .center .client .model .RbacTokenResponse ;
4141import com .google .common .eventbus .Subscribe ;
4242import com .google .common .util .concurrent .Futures ;
4343import com .google .common .util .concurrent .ListenableFuture ;
44+ import com .huaweicloud .common .event .EventManager ;
4445import com .huaweicloud .service .engine .common .configration .bootstrap .BootstrapProperties ;
4546import com .huaweicloud .service .engine .common .configration .bootstrap .DiscoveryBootstrapProperties ;
46- import com .huaweicloud .service .engine .common .configration .bootstrap .MicroserviceProperties ;
4747import com .huaweicloud .service .engine .common .configration .bootstrap .ServiceCombRBACProperties ;
4848import com .huaweicloud .service .engine .common .configration .bootstrap .ServiceCombSSLProperties ;
4949import com .huaweicloud .service .engine .common .disovery .ServiceCenterUtils ;
50- import com .huaweicloud .common .event .EventManager ;
5150
5251import jakarta .ws .rs .core .Response .Status ;
5352
@@ -59,37 +58,26 @@ public class RBACRequestAuthHeaderProvider implements AuthHeaderProvider {
5958 // e.g. not found: will query token after token expired period
6059 public static final String INVALID_TOKEN = "invalid" ;
6160
62- private static final String UN_AUTHORIZED_CODE_HALF_OPEN = "401302" ;
63-
6461 public static final String CACHE_KEY = "token" ;
6562
6663 public static final String AUTH_HEADER = "Authorization" ;
6764
6865 private static final long TOKEN_REFRESH_TIME_IN_SECONDS = 20 * 60 * 1000 ;
6966
70- private final DiscoveryBootstrapProperties discoveryProperties ;
71-
72- private final ServiceCombSSLProperties serviceCombSSLProperties ;
67+ private static final Object LOCK = new Object ();
7368
7469 private final ServiceCombRBACProperties serviceCombRBACProperties ;
7570
76- private final MicroserviceProperties microserviceProperties ;
77-
7871 private ExecutorService executorService ;
7972
8073 private LoadingCache <String , String > cache ;
8174
82- private String lastErrorCode = "401302" ;
83-
84- private int lastStatusCode = 401 ;
85-
8675 private ServiceCenterClient serviceCenterClient ;
8776
8877 public RBACRequestAuthHeaderProvider (BootstrapProperties bootstrapProperties , Environment env ) {
89- this . discoveryProperties = bootstrapProperties .getDiscoveryBootstrapProperties ();
90- this . serviceCombSSLProperties = bootstrapProperties .getServiceCombSSLProperties ();
78+ DiscoveryBootstrapProperties discoveryProperties = bootstrapProperties .getDiscoveryBootstrapProperties ();
79+ ServiceCombSSLProperties serviceCombSSLProperties = bootstrapProperties .getServiceCombSSLProperties ();
9180 this .serviceCombRBACProperties = bootstrapProperties .getServiceCombRBACProperties ();
92- this .microserviceProperties = bootstrapProperties .getMicroserviceProperties ();
9381
9482 if (enabled ()) {
9583 serviceCenterClient = ServiceCenterUtils .serviceCenterClient (discoveryProperties ,
@@ -98,38 +86,41 @@ public RBACRequestAuthHeaderProvider(BootstrapProperties bootstrapProperties, En
9886
9987 executorService = Executors .newFixedThreadPool (1 , t -> new Thread (t , "rbac-executor" ));
10088 cache = CacheBuilder .newBuilder ()
101- .maximumSize (1 )
89+ .maximumSize (10 )
10290 .refreshAfterWrite (refreshTime (), TimeUnit .MILLISECONDS )
10391 .build (new CacheLoader <String , String >() {
10492 @ Override
10593 public String load (String key ) {
106- return createHeaders ();
94+ return createHeaders (key );
10795 }
10896
10997 @ Override
11098 public ListenableFuture <String > reload (String key , String oldValue ) {
111- return Futures .submit (() -> createHeaders (), executorService );
99+ return Futures .submit (() -> createHeaders (key ), executorService );
112100 }
113101 });
114102 }
115103 }
116104
117105 @ Subscribe
118- public void onNotPermittedEvent (OperationEvents .UnAuthorizedOperationEvent event ) {
119- this .executorService .submit (this ::retryRefresh );
106+ public void onUnAuthorizedOperationEvent (OperationEvents .UnAuthorizedOperationEvent event ) {
107+ LOGGER .warn ("address {} unAuthorized, refresh cache token!" , event .getAddress ());
108+ cache .refresh (getHostByAddress (event .getAddress ()));
120109 }
121110
122- @ Subscribe
123- public void onEngineConnectChangedEvent (EngineConnectChangedEvent event ) {
124- cache .refresh (CACHE_KEY );
111+ private static String getHostByAddress (String address ) {
112+ try {
113+ URI uri = URI .create (address );
114+ return uri .getHost ();
115+ } catch (Exception e ) {
116+ LOGGER .error ("get host by address [{}] error!" , address , e );
117+ return CACHE_KEY ;
118+ }
125119 }
126120
127- protected String createHeaders () {
128- LOGGER .info ("start to create RBAC headers" );
129-
130- RbacTokenResponse rbacTokenResponse = callCreateHeaders ();
131- lastErrorCode = rbacTokenResponse .getErrorCode ();
132- lastStatusCode = rbacTokenResponse .getStatusCode ();
121+ protected String createHeaders (String key ) {
122+ LOGGER .info ("start to create server [{}] RBAC headers" , key );
123+ RbacTokenResponse rbacTokenResponse = callCreateHeaders (key );
133124
134125 if (Status .UNAUTHORIZED .getStatusCode () == rbacTokenResponse .getStatusCode ()
135126 || Status .FORBIDDEN .getStatusCode () == rbacTokenResponse .getStatusCode ()) {
@@ -140,51 +131,68 @@ protected String createHeaders() {
140131 // service center not support, do not try
141132 LOGGER .warn ("service center do not support RBAC token, you should not config account info" );
142133 return INVALID_TOKEN ;
134+ } else if (Status .INTERNAL_SERVER_ERROR .getStatusCode () == rbacTokenResponse .getStatusCode ()) {
135+ // return null for server_error, so the token information can be re-fetched on the next call.
136+ // It will prompt 'CacheLoader returned null for key xxx'
137+ LOGGER .warn ("service center query RBAC token error!" );
138+ return null ;
143139 }
144140
145- LOGGER .info ("refresh token successfully {}" , rbacTokenResponse .getStatusCode ());
141+ LOGGER .info ("refresh server [{}] token successfully {}" , key , rbacTokenResponse .getStatusCode ());
146142 return rbacTokenResponse .getToken ();
147143 }
148144
149- protected RbacTokenResponse callCreateHeaders () {
145+ protected RbacTokenResponse callCreateHeaders (String host ) {
150146 RbacTokenRequest request = new RbacTokenRequest ();
151147 request .setName (serviceCombRBACProperties .getName ());
152148 request .setPassword (serviceCombRBACProperties .getPassword ());
153-
154- return serviceCenterClient .queryToken (request );
149+ try {
150+ return serviceCenterClient .queryToken (request , host );
151+ } catch (Exception e ) {
152+ LOGGER .error ("query token from server [{}] error!" , host , e );
153+ }
154+ RbacTokenResponse response = new RbacTokenResponse ();
155+ response .setStatusCode (Status .INTERNAL_SERVER_ERROR .getStatusCode ());
156+ return response ;
155157 }
156158
157159 protected long refreshTime () {
158160 return TOKEN_REFRESH_TIME_IN_SECONDS ;
159161 }
160162
163+ /**
164+ * Retrieve the corresponding engine token cache information based on the host in the request
165+ * to resolve cross-engine authentication issues in dual-engine disaster recovery scenarios.
166+ *
167+ * @param host host
168+ * @return token info
169+ */
161170 @ Override
162- public Map <String , String > authHeaders () {
171+ public Map <String , String > authHeaders (String host ) {
163172 if (!enabled ()) {
164173 return Collections .emptyMap ();
165174 }
166-
167- try {
168- String header = cache .get (CACHE_KEY );
169- if (!StringUtils .isEmpty (header )) {
170- Map <String , String > tokens = new HashMap <>(1 );
171- tokens .put (AUTH_HEADER , "Bearer " + header );
172- return tokens ;
175+ String address = host ;
176+ if (StringUtils .isEmpty (address )) {
177+ address = CACHE_KEY ;
178+ }
179+ synchronized (LOCK ) {
180+ try {
181+ String header = cache .get (address );
182+ if (!StringUtils .isEmpty (header )) {
183+ Map <String , String > tokens = new HashMap <>(1 );
184+ tokens .put (AUTH_HEADER , "Bearer " + header );
185+ return tokens ;
186+ }
187+ } catch (Exception e ) {
188+ LOGGER .error ("Get auth headers failed" , e );
173189 }
174- } catch (Exception e ) {
175- LOGGER .error ("Get auth headers failed" , e );
190+ return Collections .emptyMap ();
176191 }
177- return Collections .emptyMap ();
178192 }
179193
180194 private boolean enabled () {
181195 return !StringUtils .isEmpty (serviceCombRBACProperties .getName ()) && !StringUtils
182196 .isEmpty (serviceCombRBACProperties .getPassword ());
183197 }
184-
185- private void retryRefresh () {
186- if (Status .UNAUTHORIZED .getStatusCode () == lastStatusCode && UN_AUTHORIZED_CODE_HALF_OPEN .equals (lastErrorCode )) {
187- cache .refresh (microserviceProperties .getName ());
188- }
189- }
190198}
0 commit comments