@@ -161,62 +161,39 @@ func (nr *nacosRegistry) UnRegister(url *common.URL) error {
161161
162162// Subscribe returns nil if subscribing registry successfully. If not returns an error.
163163func (nr * nacosRegistry ) Subscribe (url * common.URL , notifyListener registry.NotifyListener ) error {
164- // TODO
165164 role , _ := strconv .Atoi (url .GetParam (constant .RegistryRoleKey , "" ))
166165 if role != common .CONSUMER {
167166 return nil
168167 }
169168 serviceName := url .GetParam (constant .InterfaceKey , "" )
170- if serviceName == "*" {
171- // Subscribe to all services
172- for {
173- if ! nr .IsAvailable () {
174- logger .Warnf ("event listener game over." )
175- return perrors .New ("nacosRegistry is not available." )
176- }
177-
178- services , err := nr .getAllSubscribeServiceNames (url )
179- if err != nil {
180- if ! nr .IsAvailable () {
181- logger .Warnf ("event listener game over." )
182- return err
183- }
184- logger .Warnf ("getAllServices() = err:%v" , perrors .WithStack (err ))
185- time .Sleep (time .Duration (RegistryConnDelay ) * time .Second )
186- continue
187- }
188-
189- for _ , service := range services {
190- listener , err := NewNacosListenerWithServiceName (service , nr .URL , nr .namingClient )
191- metrics .Publish (metricsRegistry .NewSubscribeEvent (err == nil ))
192- if err != nil {
193- logger .Warnf ("Failed to subscribe to service '%s': %v" , service , err )
194- continue
195- }
196-
197- nr .handleServiceEvents (listener , notifyListener )
198- }
199- }
169+ var serviceNames []string
170+ var err error
171+ if serviceName == constant .AnyValue {
172+ serviceNames , err = nr .getAllSubscribeServiceNames (url )
200173 } else {
201- // Subscribe to a specific service
202- for {
203- if ! nr .IsAvailable () {
204- logger .Warnf ("event listener game over." )
205- return perrors .New ("nacosRegistry is not available." )
206- }
174+ serviceNames = []string {getSubscribeName (url )}
175+ }
176+ err = nr .subscribe (serviceNames , notifyListener )
177+ return err
178+ }
207179
208- listener , err := NewNacosListenerWithServiceName (getSubscribeName (url ), nr .URL , nr .namingClient )
180+ // subscribe subscribe services
181+ func (nr * nacosRegistry ) subscribe (serviceNames []string , notifyListener registry.NotifyListener ) error {
182+ for {
183+ // Subscribe to a specific service
184+ if ! nr .IsAvailable () {
185+ logger .Warnf ("event listener game over." )
186+ return perrors .New ("nacosRegistry is not available." )
187+ }
188+ for _ , serviceName := range serviceNames {
189+ listener , err := NewNacosListenerWithServiceName (serviceName , nr .URL , nr .namingClient )
209190 metrics .Publish (metricsRegistry .NewSubscribeEvent (err == nil ))
210191 if err != nil {
211- if ! nr .IsAvailable () {
212- logger .Warnf ("event listener game over." )
213- return err
214- }
215- logger .Warnf ("getListener() = err:%v" , perrors .WithStack (err ))
192+ logger .Warnf ("getAllServices() = err:%v" , perrors .WithStack (err ))
216193 time .Sleep (time .Duration (RegistryConnDelay ) * time .Second )
217- continue
194+ break
218195 }
219- nr .handleServiceEvents (listener , notifyListener )
196+ go nr .handleServiceEvents (listener , notifyListener )
220197 }
221198 }
222199}
0 commit comments