Skip to content
This repository was archived by the owner on Feb 14, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions serverset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (ss *ServerSet) createFullPath(connection *zk.Conn) error {

// structure of the data in each member znode
// Mimics finagle serverset structure.
type entity struct {
type Entity struct {
ServiceEndpoint endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"` // unused
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
Status string `json:"status"`
}

Expand All @@ -132,8 +132,8 @@ type endpoint struct {
Port int `json:"port"`
}

func newEntity(host string, port int) *entity {
return &entity{
func newEntity(host string, port int) *Entity {
return &Entity{
ServiceEndpoint: endpoint{host, port},
AdditionalEndpoints: make(map[string]endpoint),
Status: statusAlive,
Expand Down
26 changes: 19 additions & 7 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Watch struct {

// lock for read/writing the endpoints slice
lock sync.RWMutex
endpoints []string
endpoints []Entity
}

// Watch creates a new watch on this server set. Changes to the set will
Expand Down Expand Up @@ -121,6 +121,19 @@ func (ss *ServerSet) Watch() (*Watch, error) {
func (w *Watch) Endpoints() []string {
w.lock.RLock()
defer w.lock.RUnlock()
endpoints := make([]string, 0, len(w.endpoints))
for _, e := range w.endpoints {
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
}

sort.Strings(endpoints)
return endpoints
}

// EndpointEntities returns a slice of the current list of Entites associated with this watch, collected at the last event.
func (w *Watch) EndpointEntities() []Entity {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[]Entity or []*Entity Not really sure which one is better here. But I think just using Entity everywhere would keep it consistent.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! thank you! Maybe add a comment here and then rebase off the current master so I can merge.

w.lock.RLock()
defer w.lock.RUnlock()

return w.endpoints
}
Expand Down Expand Up @@ -172,8 +185,8 @@ func (w *Watch) watch(connection *zk.Conn) ([]string, <-chan zk.Event, error) {
return children, events, err
}

func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, error) {
endpoints := make([]string, 0, len(keys))
func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]Entity, error) {
endpoints := make([]Entity, 0, len(keys))

for _, k := range keys {
if !strings.HasPrefix(k, MemberPrefix) {
Expand All @@ -191,16 +204,15 @@ func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, e
}

if e.Status == statusAlive {
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
endpoints = append(endpoints, *e)
}
}

sort.Strings(endpoints)
return endpoints, nil

}

func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*Entity, error) {

data, _, err := connection.Get(w.serverSet.directoryPath() + "/" + key)
if err == zk.ErrNoNode {
Expand All @@ -221,7 +233,7 @@ func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
return w.getEndpoint(connection, key)
}

e := &entity{}
e := &Entity{}
err = json.Unmarshal(data, &e)
if err != nil {
return nil, err
Expand Down