diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index 06d8b48e..e0a42cf9 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -97,7 +97,7 @@ type RpcClient struct { ctx context.Context name string labels map[string]string - currentConnection IConnection + currentConnection atomic.Value rpcClientStatus RpcClientStatus eventChan chan ConnectionEvent reconnectionChan chan ReconnectContext @@ -125,6 +125,18 @@ type ConnectionEvent struct { eventType ConnectionStatus } +func (r *RpcClient) getCurrentConnection() IConnection { + if conn, ok := r.currentConnection.Load().(IConnection); ok { + return conn + } else { + return nil + } +} + +func (r *RpcClient) setCurrentConnection(conn IConnection) { + r.currentConnection.Store(conn) +} + func (r *RpcClient) putAllLabels(labels map[string]string) { for k, v := range labels { r.labels[k] = v @@ -233,7 +245,7 @@ func (r *RpcClient) Start() { if currentConnection != nil { logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name, currentConnection.getServerInfo(), currentConnection.getConnectionId()) - r.currentConnection = currentConnection + r.setCurrentConnection(currentConnection) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) r.notifyConnectionChange(CONNECTED) } else { @@ -246,11 +258,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) { } func (r *RpcClient) notifyServerSrvChange() { - if r.currentConnection == nil { + currentConnection := r.getCurrentConnection() + if currentConnection == nil { r.switchServerAsync(ServerInfo{}, false) return } - curServerInfo := r.currentConnection.getServerInfo() + curServerInfo := currentConnection.getServerInfo() var found bool for _, ele := range r.nacosServer.GetServerList() { if ele.IpAddr == curServerInfo.serverIp { @@ -308,7 +321,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { if onRequestFail && r.sendHealthCheck() { - logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo()) + var serverInfo interface{} = nil + if currentConnection := r.getCurrentConnection(); currentConnection != nil { + serverInfo = currentConnection.getServerInfo() + } + logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) return } @@ -334,14 +351,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { if connectionNew != nil && err == nil { logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo, connectionNew.getConnectionId()) - - if r.currentConnection != nil { + currentConnection := r.getCurrentConnection() + if currentConnection != nil { logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo, - r.currentConnection.getConnectionId()) - r.currentConnection.setAbandon(true) + currentConnection.getConnectionId()) + currentConnection.setAbandon(true) r.closeConnection() } - r.currentConnection = connectionNew + r.setCurrentConnection(connectionNew) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) r.notifyConnectionChange(CONNECTED) return @@ -367,8 +384,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { } func (r *RpcClient) closeConnection() { - if r.currentConnection != nil { - r.currentConnection.close() + currentConnection := r.getCurrentConnection() + if currentConnection != nil { + currentConnection.close() r.notifyConnectionChange(DISCONNECTED) } } @@ -379,7 +397,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) { if len(listeners) == 0 { return } - logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId()) + var connectionId string + if currentConnection := r.getCurrentConnection(); currentConnection != nil { + connectionId = currentConnection.getConnectionId() + } + logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId) for _, v := range listeners { if event.isConnected() { v.OnConnected() @@ -401,10 +423,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) { r.lastActiveTimestamp.Store(time.Now()) return } else { - if r.currentConnection == nil || r.isShutdown() { + currentConnection := r.getCurrentConnection() + if currentConnection == nil || r.isShutdown() { return } - logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId()) + logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId()) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY)) reconnectContext = ReconnectContext{onRequestFail: false} } @@ -412,10 +435,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) { } func (r *RpcClient) sendHealthCheck() bool { - if r.currentConnection == nil { + currentConnection := r.getCurrentConnection() + if currentConnection == nil { return false } - response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(), + response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(), constant.DEFAULT_TIMEOUT_MILLS, r) if err != nil { logger.Errorf("client sendHealthCheck failed,err=%v", err) @@ -480,14 +504,19 @@ func (c *ConnectionEvent) toString() string { func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) { retryTimes := 0 start := util.CurrentMillis() - var currentErr error + var ( + currentErr error + currentConnection IConnection + ) for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills { - if r.currentConnection == nil || !r.IsRunning() { + currentConnection = r.getCurrentConnection() + if currentConnection == nil || !r.IsRunning() { + rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus))) currentErr = waitReconnect(timeoutMills, &retryTimes, request, - errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc())) + errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc())) continue } - response, err := r.currentConnection.request(request, timeoutMills, r) + response, err := currentConnection.request(request, timeoutMills, r) if err != nil { currentErr = waitReconnect(timeoutMills, &retryTimes, request, err) continue @@ -497,7 +526,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r r.mux.Lock() if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) { logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s", - r.currentConnection.getConnectionId(), request.GetRequestType()) + currentConnection.getConnectionId(), request.GetRequestType()) r.switchServerAsync(ServerInfo{}, false) } r.mux.Unlock()