Skip to content

Commit 1245cde

Browse files
authored
Lock when read watch map, to keep thread safe. (#75)
1 parent 2065eaa commit 1245cde

File tree

1 file changed

+60
-62
lines changed

1 file changed

+60
-62
lines changed

client.go

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,75 +1016,73 @@ func (c *Client) Close() error {
10161016
func (c *Client) WatchMicroServiceWithExtraHandle(microServiceID string, callback func(e *MicroServiceInstanceChangedEvent),
10171017
extraHandle func(action string, opts ...CallOption)) error {
10181018
openlog.Info(fmt.Sprintf("WatchMicroServiceWithExtraHandle, microServiceID:%s", microServiceID))
1019+
c.mutex.Lock()
10191020
if ready, ok := c.watchers[microServiceID]; !ok || !ready {
1020-
c.mutex.Lock()
1021-
if ready, ok := c.watchers[microServiceID]; !ok || !ready {
1022-
openlog.Info(fmt.Sprintf("WatchMicroServiceWithExtraHandle watch, microServiceID:%s", microServiceID))
1023-
c.watchers[microServiceID] = true
1024-
scheme := "wss"
1025-
if !c.opt.EnableSSL {
1026-
scheme = "ws"
1027-
}
1028-
host := c.GetAddress()
1029-
u := url.URL{
1030-
Scheme: scheme,
1031-
Host: host,
1032-
Path: fmt.Sprintf("%s%s/%s%s", MSAPIPath,
1033-
MicroservicePath, microServiceID, WatchPath),
1034-
}
1035-
conn, _, err := c.dialWebsocket(&u)
1036-
if err != nil {
1037-
c.watchers[microServiceID] = false
1038-
c.mutex.Unlock()
1039-
return fmt.Errorf("watching microservice dial catch an exception,microServiceID: %s, error:%s", microServiceID, err.Error())
1040-
}
1021+
openlog.Info(fmt.Sprintf("WatchMicroServiceWithExtraHandle watch, microServiceID:%s", microServiceID))
1022+
c.watchers[microServiceID] = true
1023+
scheme := "wss"
1024+
if !c.opt.EnableSSL {
1025+
scheme = "ws"
1026+
}
1027+
host := c.GetAddress()
1028+
u := url.URL{
1029+
Scheme: scheme,
1030+
Host: host,
1031+
Path: fmt.Sprintf("%s%s/%s%s", MSAPIPath,
1032+
MicroservicePath, microServiceID, WatchPath),
1033+
}
1034+
conn, _, err := c.dialWebsocket(&u)
1035+
if err != nil {
1036+
c.watchers[microServiceID] = false
1037+
c.mutex.Unlock()
1038+
return fmt.Errorf("watching microservice dial catch an exception,microServiceID: %s, error:%s", microServiceID, err.Error())
1039+
}
10411040

1042-
c.conns[microServiceID] = conn
1043-
// After successfully subscribing to the service, pull the dependency again.
1044-
// This prevents the event from not being notified after one of the dual engines fails and the other has no dependencies.
1045-
extraHandle("watchSucceed", WithAddress(host))
1046-
go func() {
1047-
for {
1048-
messageType, message, err := conn.ReadMessage()
1041+
c.conns[microServiceID] = conn
1042+
// After successfully subscribing to the service, pull the dependency again.
1043+
// This prevents the event from not being notified after one of the dual engines fails and the other has no dependencies.
1044+
extraHandle("watchSucceed", WithAddress(host))
1045+
go func() {
1046+
for {
1047+
messageType, message, err := conn.ReadMessage()
1048+
if err != nil {
1049+
openlog.Error(fmt.Sprintf("%s:%s", "conn.ReadMessage()", err.Error()))
1050+
break
1051+
}
1052+
if messageType == websocket.TextMessage {
1053+
var response MicroServiceInstanceChangedEvent
1054+
err := json.Unmarshal(message, &response)
10491055
if err != nil {
1050-
openlog.Error(fmt.Sprintf("%s:%s", "conn.ReadMessage()", err.Error()))
1051-
break
1052-
}
1053-
if messageType == websocket.TextMessage {
1054-
var response MicroServiceInstanceChangedEvent
1055-
err := json.Unmarshal(message, &response)
1056-
if err != nil {
1057-
if strings.Contains(string(message), "service does not exist") {
1058-
openlog.Error(fmt.Sprintf("%s:%s", "json.Unmarshal(message, &response), message", string(message)))
1059-
c.mutex.Lock()
1060-
delete(c.conns, microServiceID)
1061-
delete(c.watchers, microServiceID)
1062-
c.mutex.Unlock()
1063-
openlog.Info(fmt.Sprintf("delete conn, microServiceID:%s", microServiceID))
1064-
extraHandle("serviceNotExist")
1065-
return
1066-
}
1056+
if strings.Contains(string(message), "service does not exist") {
10671057
openlog.Error(fmt.Sprintf("%s:%s", "json.Unmarshal(message, &response), message", string(message)))
1068-
openlog.Error(fmt.Sprintf("%s:%s", "json.Unmarshal(message, &response)", err.Error()))
1069-
break
1058+
c.mutex.Lock()
1059+
delete(c.conns, microServiceID)
1060+
delete(c.watchers, microServiceID)
1061+
c.mutex.Unlock()
1062+
openlog.Info(fmt.Sprintf("delete conn, microServiceID:%s", microServiceID))
1063+
extraHandle("serviceNotExist")
1064+
return
10701065
}
1071-
callback(&response)
1066+
openlog.Error(fmt.Sprintf("%s:%s", "json.Unmarshal(message, &response), message", string(message)))
1067+
openlog.Error(fmt.Sprintf("%s:%s", "json.Unmarshal(message, &response)", err.Error()))
1068+
break
10721069
}
1070+
callback(&response)
10731071
}
1074-
err = conn.Close()
1075-
if err != nil {
1076-
openlog.Error(fmt.Sprintf("%s:%s", "conn.Close()", err.Error()))
1077-
}
1078-
c.mutex.Lock()
1079-
delete(c.conns, microServiceID)
1080-
delete(c.watchers, microServiceID)
1081-
c.mutex.Unlock()
1082-
openlog.Info(fmt.Sprintf("conn stop, microServiceID:%s", microServiceID))
1083-
c.startBackOffWithExtraHandle(microServiceID, callback, extraHandle)
1084-
}()
1085-
}
1086-
c.mutex.Unlock()
1087-
}
1072+
}
1073+
err = conn.Close()
1074+
if err != nil {
1075+
openlog.Error(fmt.Sprintf("%s:%s", "conn.Close()", err.Error()))
1076+
}
1077+
c.mutex.Lock()
1078+
delete(c.conns, microServiceID)
1079+
delete(c.watchers, microServiceID)
1080+
c.mutex.Unlock()
1081+
openlog.Info(fmt.Sprintf("conn stop, microServiceID:%s", microServiceID))
1082+
c.startBackOffWithExtraHandle(microServiceID, callback, extraHandle)
1083+
}()
1084+
}
1085+
c.mutex.Unlock()
10881086
return nil
10891087
}
10901088

0 commit comments

Comments
 (0)