Skip to content

Commit 15431f4

Browse files
committed
nacos registry handleServiceEvents is a blocking method, fix multiple service name not subscribe as expect
1 parent 9cdc15e commit 15431f4

File tree

1 file changed

+25
-43
lines changed

1 file changed

+25
-43
lines changed

registry/nacos/registry.go

Lines changed: 25 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -161,64 +161,46 @@ func (nr *nacosRegistry) UnRegister(url *common.URL) error {
161161

162162
// Subscribe returns nil if subscribing registry successfully. If not returns an error.
163163
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
164-
// TODO
165164
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
166165
if role != common.CONSUMER {
167166
return nil
168167
}
169168
serviceName := url.GetParam(constant.InterfaceKey, "")
170-
if serviceName == "*" {
171-
// Subscribe to all services
169+
var serviceNames []string
170+
var err error
171+
if serviceName == constant.AnyValue {
172+
serviceNames, err = nr.getAllSubscribeServiceNames(url)
173+
} else {
174+
serviceNames = []string{getSubscribeName(url)}
175+
}
176+
err = nr.subscribe(serviceNames, notifyListener)
177+
return err
178+
}
179+
180+
// subscribe subscribe services
181+
func (nr *nacosRegistry) subscribe(serviceNames []string, notifyListener registry.NotifyListener) error {
182+
if len(serviceNames) == 0 {
183+
logger.Warnf("No services to listen to.")
184+
return nil
185+
}
186+
outerLoop:
172187
for {
173188
if !nr.IsAvailable() {
174189
logger.Warnf("event listener game over.")
175190
return perrors.New("nacosRegistry is not available.")
176191
}
177-
178-
services, err := nr.getAllSubscribeServiceNames(url)
179-
if err != nil {
180-
if !nr.IsAvailable() {
181-
logger.Warnf("event listener game over.")
182-
return err
183-
}
184-
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
185-
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
186-
continue
187-
}
188-
189-
for _, service := range services {
190-
listener, err := NewNacosListenerWithServiceName(service, nr.URL, nr.namingClient)
192+
for _, serviceName := range serviceNames {
193+
listener, err := NewNacosListenerWithServiceName(serviceName, nr.URL, nr.namingClient)
191194
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
192195
if err != nil {
193-
logger.Warnf("Failed to subscribe to service '%s': %v", service, err)
194-
continue
195-
}
196-
197-
nr.handleServiceEvents(listener, notifyListener)
198-
}
199-
}
200-
} else {
201-
// Subscribe to a specific service
202-
for {
203-
if !nr.IsAvailable() {
204-
logger.Warnf("event listener game over.")
205-
return perrors.New("nacosRegistry is not available.")
206-
}
207-
208-
listener, err := NewNacosListenerWithServiceName(getSubscribeName(url), nr.URL, nr.namingClient)
209-
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
210-
if err != nil {
211-
if !nr.IsAvailable() {
212-
logger.Warnf("event listener game over.")
213-
return err
196+
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
197+
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
198+
break outerLoop
214199
}
215-
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
216-
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
217-
continue
200+
go nr.handleServiceEvents(listener, notifyListener)
218201
}
219-
nr.handleServiceEvents(listener, notifyListener)
220202
}
221-
}
203+
return nil
222204
}
223205

224206
// getAllServices retrieves the list of all services from the registry

0 commit comments

Comments
 (0)