2020import cn .hutool .core .lang .generator .SnowflakeGenerator ;
2121import com .alipay .remoting .Connection ;
2222import com .alipay .remoting .ConnectionEventType ;
23+ import com .alipay .remoting .config .BoltClientOption ;
2324import com .alipay .remoting .config .Configs ;
2425import com .alipay .remoting .exception .RemotingException ;
2526import com .alipay .remoting .rpc .RpcClient ;
2627import com .alipay .remoting .serialization .HessianSerializer ;
2728import com .alipay .remoting .serialization .SerializerManager ;
28- import lombok .Getter ;
29- import lombok .Setter ;
3029import lombok .extern .slf4j .Slf4j ;
30+ import org .apache .commons .lang3 .StringUtils ;
3131import org .dromara .dynamictp .client .cluster .AdminClusterManager ;
3232import org .dromara .dynamictp .client .cluster .AdminNode ;
33- import org .dromara .dynamictp .client .processor .ClientUserProcessor ;
34- import org .dromara .dynamictp .client .processor .CloseEventProcessor ;
35- import org .dromara .dynamictp .client .processor .ConnectEventProcessor ;
3633import org .dromara .dynamictp .client .loadbalance .NodeSelector ;
3734import org .dromara .dynamictp .client .loadbalance .RandomNodeSelector ;
3835import org .dromara .dynamictp .client .loadbalance .RoundRobinNodeSelector ;
3936import org .dromara .dynamictp .client .loadbalance .WeightedRoundRobinNodeSelector ;
37+ import org .dromara .dynamictp .client .processor .ClientUserProcessor ;
38+ import org .dromara .dynamictp .client .processor .CloseEventProcessor ;
39+ import org .dromara .dynamictp .client .processor .ConnectEventProcessor ;
40+ import org .dromara .dynamictp .client .properties .AdminClientProperties ;
4041import org .dromara .dynamictp .common .entity .RpcRequest ;
41- import org .springframework .beans .factory .annotation .Value ;
4242
4343import javax .annotation .PostConstruct ;
4444import javax .annotation .PreDestroy ;
4545import java .util .Collections ;
4646import java .util .List ;
47+ import java .util .Objects ;
4748import java .util .concurrent .Executors ;
4849import java .util .concurrent .ScheduledExecutorService ;
4950import java .util .concurrent .TimeUnit ;
5960@ Slf4j
6061public class AdminClient {
6162
62- @ Value ("${dynamictp.adminNodes:}" )
63- private String adminNodes ;
64-
65- @ Value ("${dynamictp.loadBalanceStrategy:roundRobin}" )
66- private String loadBalanceStrategy ;
67-
68- @ Setter
69- @ Value ("${dynamictp.clientName:${spring.application.name}}" )
63+ private final String adminNodes ;
64+ private final String loadBalanceStrategy ;
7065 private String clientName ;
71-
72- @ Setter
73- @ Value ("${dynamictp.serviceName:${spring.application.name}}" )
74- private String serviceName ;
75-
76- @ Value ("${dynamictp.adminEnabled:false}" )
77- private Boolean adminEnabled ;
78-
79- @ Getter
66+ private final String serviceName ;
67+ private final Boolean adminEnabled ;
8068 private static final SnowflakeGenerator SNOWFLAKE_GENERATOR = new SnowflakeGenerator ();
81-
8269 private final RpcClient client = new RpcClient ();
83-
84- @ Getter
8570 private static final HessianSerializer SERIALIZER = new HessianSerializer ();
86-
87- /**
88- * Use AtomicReference to ensure thread safety
89- */
9071 private static final AtomicReference <Connection > CONNECTION_REF = new AtomicReference <>();
91-
92- public static Connection getConnection () {
93- return CONNECTION_REF .get ();
94- }
95-
96- public static void setConnection (Connection connection ) {
97- CONNECTION_REF .set (connection );
98- }
99-
100- /**
101- * Cluster manager
102- */
10372 private AdminClusterManager clusterManager ;
73+ private ScheduledExecutorService heartbeatExecutor ;
10474
10575 /**
10676 * Connection state management
10777 */
10878 private final AtomicBoolean isConnected = new AtomicBoolean (false );
10979 private final AtomicInteger retryCount = new AtomicInteger (0 );
11080
111- private ScheduledExecutorService heartbeatExecutor ;
81+ public AdminClient (ClientUserProcessor clientUserProcessor , AdminClientProperties properties ) {
82+ this .clientName = properties .getClientName ();
83+ this .serviceName = properties .getServiceName ();
84+ this .adminNodes = properties .getNodes ();
85+ this .loadBalanceStrategy = properties .getLoadBalanceStrategy ();
86+ this .adminEnabled = properties .isEnabled ();
11287
113- public AdminClient (ClientUserProcessor clientUserProcessor ) {
114- this (clientUserProcessor , "" );
88+ initRpcClient (clientUserProcessor );
11589 }
11690
117- public AdminClient ( ClientUserProcessor clientUserProcessor , String clientName ) {
118- this ( clientUserProcessor , clientName , "" , "" , "" , false );
91+ public static Connection getConnection ( ) {
92+ return CONNECTION_REF . get ( );
11993 }
12094
121- public AdminClient (ClientUserProcessor clientUserProcessor , String clientName , String serviceName , String adminNodes , String loadBalanceStrategy , Boolean adminEnabled ) {
122- if (!clientName .isEmpty ()) {
123- this .clientName = clientName ;
124- }
125- if (!serviceName .isEmpty ()) {
126- this .serviceName = serviceName ;
127- }
128- if (!adminNodes .isEmpty ()) {
129- this .adminNodes = adminNodes ;
130- }
131- if (!loadBalanceStrategy .isEmpty ()) {
132- this .loadBalanceStrategy = loadBalanceStrategy ;
133- }
134- this .adminEnabled = adminEnabled ;
95+ public static void setConnection (Connection connection ) {
96+ CONNECTION_REF .set (connection );
97+ }
13598
99+ private void initRpcClient (ClientUserProcessor clientUserProcessor ) {
136100 client .addConnectionEventProcessor (ConnectionEventType .CONNECT , new ConnectEventProcessor (this ));
137101 client .addConnectionEventProcessor (ConnectionEventType .CLOSE , new CloseEventProcessor (this ));
138102 client .registerUserProcessor (clientUserProcessor );
139- client .enableReconnectSwitch ( );
103+ client .option ( BoltClientOption . CONN_RECONNECT_SWITCH , true );
140104 client .startup ();
141105 SerializerManager .addSerializer (1 , SERIALIZER );
142106 System .setProperty (Configs .SERIALIZER , String .valueOf (SERIALIZER ));
@@ -145,12 +109,14 @@ public AdminClient(ClientUserProcessor clientUserProcessor, String clientName, S
145109 @ PostConstruct
146110 public void init () {
147111 try {
112+ if (StringUtils .isBlank (clientName )) {
113+ clientName = serviceName ;
114+ }
148115 initClusterManager ();
149116 createConnection ();
150117 startHeartbeat ();
151118 } catch (Exception e ) {
152119 log .error ("Failed to initialize AdminClient" , e );
153- // If initialization fails, don't start heartbeat to avoid continuous retries
154120 if (heartbeatExecutor != null ) {
155121 stopHeartbeat ();
156122 }
0 commit comments