Skip to content
This repository was archived by the owner on Feb 14, 2023. It is now read-only.

Commit ed9aea6

Browse files
committed
Return entities as endpoints
1 parent 0d5af00 commit ed9aea6

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

serverset.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ func (ss *ServerSet) createFullPath(connection *zk.Conn) error {
121121

122122
// structure of the data in each member znode
123123
// Mimics finagle serverset structure.
124-
type entity struct {
124+
type Entity struct {
125125
ServiceEndpoint endpoint `json:"serviceEndpoint"`
126-
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"` // unused
126+
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
127127
Status string `json:"status"`
128128
}
129129

@@ -132,8 +132,8 @@ type endpoint struct {
132132
Port int `json:"port"`
133133
}
134134

135-
func newEntity(host string, port int) *entity {
136-
return &entity{
135+
func newEntity(host string, port int) *Entity {
136+
return &Entity{
137137
ServiceEndpoint: endpoint{host, port},
138138
AdditionalEndpoints: make(map[string]endpoint),
139139
Status: statusAlive,

watch.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type Watch struct {
3333

3434
// lock for read/writing the endpoints slice
3535
lock sync.RWMutex
36-
endpoints []string
36+
endpoints []Entity
3737
}
3838

3939
// Watch creates a new watch on this server set. Changes to the set will
@@ -121,6 +121,18 @@ func (ss *ServerSet) Watch() (*Watch, error) {
121121
func (w *Watch) Endpoints() []string {
122122
w.lock.RLock()
123123
defer w.lock.RUnlock()
124+
endpoints := make([]string, 0, len(w.endpoints))
125+
for _, e := range w.endpoints {
126+
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
127+
}
128+
129+
sort.Strings(endpoints)
130+
return endpoints
131+
}
132+
133+
func (w *Watch) EndpointEntities() []Entity {
134+
w.lock.RLock()
135+
defer w.lock.RUnlock()
124136

125137
return w.endpoints
126138
}
@@ -172,8 +184,8 @@ func (w *Watch) watch(connection *zk.Conn) ([]string, <-chan zk.Event, error) {
172184
return children, events, err
173185
}
174186

175-
func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, error) {
176-
endpoints := make([]string, 0, len(keys))
187+
func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]Entity, error) {
188+
endpoints := make([]Entity, 0, len(keys))
177189

178190
for _, k := range keys {
179191
if !strings.HasPrefix(k, MemberPrefix) {
@@ -191,16 +203,15 @@ func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, e
191203
}
192204

193205
if e.Status == statusAlive {
194-
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
206+
endpoints = append(endpoints, *e)
195207
}
196208
}
197209

198-
sort.Strings(endpoints)
199210
return endpoints, nil
200211

201212
}
202213

203-
func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
214+
func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*Entity, error) {
204215

205216
data, _, err := connection.Get(w.serverSet.directoryPath() + "/" + key)
206217
if err == zk.ErrNoNode {
@@ -221,7 +232,7 @@ func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
221232
return w.getEndpoint(connection, key)
222233
}
223234

224-
e := &entity{}
235+
e := &Entity{}
225236
err = json.Unmarshal(data, &e)
226237
if err != nil {
227238
return nil, err

0 commit comments

Comments
 (0)