Skip to content

Commit ec49b40

Browse files
authored
nacos registry support subscribe multi category (#2783)
* nacos registry support subscribe multi category * nacos registry handleServiceEvents is a blocking method, fix multiple service name not subscribe as expect * add error check * fmt * fix lint * fix lint * fix lint
1 parent db2cc25 commit ec49b40

File tree

2 files changed

+61
-127
lines changed

2 files changed

+61
-127
lines changed

registry/nacos/listener.go

Lines changed: 11 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type callback func(services []model.Instance, err error)
5252

5353
type nacosListener struct {
5454
namingClient *nacosClient.NacosNamingClient
55-
listenURL *common.URL
55+
serviceName string
5656
regURL *common.URL
5757
events *gxchan.UnboundedChan
5858
instanceMap map[string]model.Instance
@@ -61,32 +61,16 @@ type nacosListener struct {
6161
subscribeParam *vo.SubscribeParam
6262
}
6363

64-
// NewNacosListener creates a data listener for nacos
65-
func NewNacosListener(url, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
66-
listener := &nacosListener{
64+
// NewNacosListenerWithServiceName creates a data listener for nacos
65+
func NewNacosListenerWithServiceName(serviceName string, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) *nacosListener {
66+
return &nacosListener{
6767
namingClient: namingClient,
68-
listenURL: url,
68+
serviceName: serviceName,
6969
regURL: regURL,
7070
events: gxchan.NewUnboundedChan(32),
7171
instanceMap: map[string]model.Instance{},
7272
done: make(chan struct{}),
7373
}
74-
err := listener.startListen()
75-
return listener, err
76-
}
77-
78-
// NewNacosListener creates a data listener for nacos
79-
func NewNacosListenerWithServiceName(serviceName string, url, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
80-
listener := &nacosListener{
81-
namingClient: namingClient,
82-
listenURL: url,
83-
regURL: regURL,
84-
events: gxchan.NewUnboundedChan(32),
85-
instanceMap: map[string]model.Instance{},
86-
done: make(chan struct{}),
87-
}
88-
err := listener.startListenWithServiceName(serviceName)
89-
return listener, err
9074
}
9175

9276
func generateUrl(instance model.Instance) *common.URL {
@@ -188,37 +172,18 @@ func getSubscribeName(url *common.URL) string {
188172
return buffer.String()
189173
}
190174

191-
func (nl *nacosListener) startListen() error {
175+
func (nl *nacosListener) listenService(serviceName string) error {
192176
if nl.namingClient == nil {
193177
return perrors.New("nacos naming namingClient stopped")
194178
}
195-
nl.subscribeParam = createSubscribeParam(nl.listenURL, nl.regURL, nl.Callback)
179+
nl.subscribeParam = createSubscribeParam(serviceName, nl.regURL, nl.Callback)
196180
if nl.subscribeParam == nil {
197181
return perrors.New("create nacos subscribeParam failed")
198182
}
199-
go func() {
200-
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
201-
if err == nil {
202-
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
203-
}
204-
}()
205-
return nil
206-
}
207-
208-
func (nl *nacosListener) startListenWithServiceName(serviceName string) error {
209-
if nl.namingClient == nil {
210-
return perrors.New("nacos naming namingClient stopped")
211-
}
212-
nl.subscribeParam = createSubscribeParamWithServiceName(serviceName, nl.regURL, nl.Callback)
213-
if nl.subscribeParam == nil {
214-
return perrors.New("create nacos subscribeParam failed")
183+
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
184+
if err == nil {
185+
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
215186
}
216-
go func() {
217-
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
218-
if err == nil {
219-
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
220-
}
221-
}()
222187
return nil
223188
}
224189

@@ -235,7 +200,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
235200
for {
236201
select {
237202
case <-nl.done:
238-
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenURL)
203+
logger.Warnf("nacos listener is close!service name:%+v", nl.serviceName)
239204
return nil, perrors.New("listener stopped")
240205

241206
case val := <-nl.events.Out():

registry/nacos/registry.go

Lines changed: 50 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package nacos
2020
import (
2121
"bytes"
2222
"fmt"
23+
"math"
2324
"strconv"
2425
"strings"
2526
"time"
@@ -158,94 +159,83 @@ func (nr *nacosRegistry) UnRegister(url *common.URL) error {
158159
return nil
159160
}
160161

161-
func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
162-
return NewNacosListener(conf, nr.URL, nr.namingClient)
163-
}
164-
165162
// Subscribe returns nil if subscribing registry successfully. If not returns an error.
166163
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
167-
// TODO
168164
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
169165
if role != common.CONSUMER {
170166
return nil
171167
}
172168
serviceName := url.GetParam(constant.InterfaceKey, "")
173-
if serviceName == "*" {
174-
// Subscribe to all services
175-
for {
176-
if !nr.IsAvailable() {
177-
logger.Warnf("event listener game over.")
178-
return perrors.New("nacosRegistry is not available.")
179-
}
180-
181-
services, err := nr.getAllSubscribeServiceNames()
182-
if err != nil {
183-
if !nr.IsAvailable() {
184-
logger.Warnf("event listener game over.")
185-
return err
186-
}
187-
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
188-
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
189-
continue
190-
}
191-
192-
for _, service := range services {
193-
listener, err := nr.subscribeToService(url, service)
194-
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
195-
if err != nil {
196-
logger.Warnf("Failed to subscribe to service '%s': %v", service, err)
197-
continue
198-
}
199-
200-
nr.handleServiceEvents(listener, notifyListener)
201-
}
169+
var serviceNames []string
170+
var err error
171+
if serviceName == constant.AnyValue {
172+
serviceNames, err = nr.getAllSubscribeServiceNames(url)
173+
if err != nil {
174+
return err
202175
}
203176
} else {
204-
// Subscribe to a specific service
205-
for {
206-
if !nr.IsAvailable() {
207-
logger.Warnf("event listener game over.")
208-
return perrors.New("nacosRegistry is not available.")
209-
}
177+
serviceNames = []string{getSubscribeName(url)}
178+
}
179+
return nr.subscribe(serviceNames, notifyListener)
180+
}
210181

211-
listener, err := nr.subscribe(url)
182+
// subscribe subscribe services
183+
func (nr *nacosRegistry) subscribe(serviceNames []string, notifyListener registry.NotifyListener) error {
184+
if len(serviceNames) == 0 {
185+
logger.Warnf("No services to listen to.")
186+
return nil
187+
}
188+
for {
189+
if !nr.IsAvailable() {
190+
logger.Warnf("event listener game over.")
191+
return perrors.New("nacosRegistry is not available.")
192+
}
193+
var err error
194+
for _, serviceName := range serviceNames {
195+
listener := NewNacosListenerWithServiceName(serviceName, nr.URL, nr.namingClient)
196+
err = listener.listenService(serviceName)
212197
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
213198
if err != nil {
214-
if !nr.IsAvailable() {
215-
logger.Warnf("event listener game over.")
216-
return err
217-
}
218-
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
199+
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
219200
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
220-
continue
201+
break
221202
}
222-
nr.handleServiceEvents(listener, notifyListener)
203+
go nr.handleServiceEvents(listener, notifyListener)
204+
}
205+
if err == nil {
206+
break
223207
}
224208
}
209+
return nil
225210
}
226211

227212
// getAllServices retrieves the list of all services from the registry
228-
func (nr *nacosRegistry) getAllSubscribeServiceNames() ([]string, error) {
213+
func (nr *nacosRegistry) getAllSubscribeServiceNames(url *common.URL) ([]string, error) {
229214
services, err := nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
230215
GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
231216
PageNo: 1,
232-
PageSize: 10,
217+
PageSize: math.MaxInt32,
233218
})
234-
subScribeServiceNames := []string{}
219+
if err != nil {
220+
logger.Errorf("query services error: %v", err)
221+
return nil, err
222+
}
223+
var subScribeServiceNames []string
224+
categories := strings.Split(url.GetParam(constant.CategoryKey, constant.DefaultCategory), constant.CommaSeparator)
235225
for _, dom := range services.Doms {
236-
if strings.HasPrefix(dom, "providers:") {
237-
subScribeServiceNames = append(subScribeServiceNames, dom)
226+
if strings.Contains(dom, constant.NacosServiceNameSeparator) {
227+
realCategory := strings.Split(dom, constant.NacosServiceNameSeparator)[0]
228+
for _, item := range categories {
229+
if item == realCategory {
230+
subScribeServiceNames = append(subScribeServiceNames, dom)
231+
}
232+
}
238233
}
239234
}
240235

241236
return subScribeServiceNames, err
242237
}
243238

244-
// subscribeToService subscribes to a specific service in the registry
245-
func (nr *nacosRegistry) subscribeToService(url *common.URL, service string) (listener registry.Listener, err error) {
246-
return NewNacosListenerWithServiceName(service, url, nr.URL, nr.namingClient)
247-
}
248-
249239
// handleServiceEvents receives service events from the listener and notifies the notifyListener
250240
func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, notifyListener registry.NotifyListener) {
251241
for {
@@ -262,7 +252,7 @@ func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, notifyL
262252

263253
// UnSubscribe :
264254
func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ registry.NotifyListener) error {
265-
param := createSubscribeParam(url, nr.URL, nil)
255+
param := createSubscribeParam(getSubscribeName(url), nr.URL, nil)
266256
if param == nil {
267257
return nil
268258
}
@@ -294,28 +284,7 @@ func (nr *nacosRegistry) LoadSubscribeInstances(url *common.URL, notify registry
294284
return nil
295285
}
296286

297-
func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam {
298-
serviceName := getSubscribeName(url)
299-
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
300-
if cb == nil {
301-
v, ok := listenerCache.Load(serviceName + groupName)
302-
if !ok {
303-
return nil
304-
}
305-
listener, ok := v.(*nacosListener)
306-
if !ok {
307-
return nil
308-
}
309-
cb = listener.Callback
310-
}
311-
return &vo.SubscribeParam{
312-
ServiceName: serviceName,
313-
SubscribeCallback: cb,
314-
GroupName: groupName,
315-
}
316-
}
317-
318-
func createSubscribeParamWithServiceName(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
287+
func createSubscribeParam(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
319288
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
320289
if cb == nil {
321290
v, ok := listenerCache.Load(serviceName + groupName)

0 commit comments

Comments
 (0)