-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwatcher_etcd.go
More file actions
116 lines (108 loc) · 2.92 KB
/
watcher_etcd.go
File metadata and controls
116 lines (108 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package discovery
import (
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
//EtcdWatcher models after grpc.naming.Watcher
type EtcdWatcher struct {
key string
endpoints []string
cli *clientv3.Client
watcher clientv3.Watcher
revision int64
ctx context.Context
cancel context.CancelFunc
}
//NewEtcdWatcher creates EtcdWatcher
func NewEtcdWatcher(registryDir, serviceName string, endpoints []string) (w *EtcdWatcher, err error) {
var cli *clientv3.Client
cli, err = clientv3.New(clientv3.Config{
Endpoints: endpoints,
})
if err != nil {
err = errors.Wrap(err, "")
return
}
key := fmt.Sprintf("%s/%s", registryDir, serviceName)
watcher := clientv3.NewWatcher(cli)
ctx, cancel := context.WithCancel(context.Background())
w = &EtcdWatcher{
key: key,
endpoints: endpoints,
cli: cli,
watcher: watcher,
ctx: ctx,
cancel: cancel,
}
return
}
//Close cancels the in-flight etcd transaction
func (w *EtcdWatcher) Close() {
w.cancel()
}
//Next returns current value if it's invoked the first time, otherwise blocks until next update occur
func (w *EtcdWatcher) Next() (updates []*naming.Update, err error) {
updates = []*naming.Update{}
if w.revision == 0 {
var resp *clientv3.GetResponse
if resp, err = clientv3.NewKV(w.cli).Get(w.ctx, w.key, clientv3.WithPrefix()); err != nil {
err = errors.Wrap(err, "")
return
}
for _, item := range resp.Kvs {
nodeData := NodeData{}
err = json.Unmarshal([]byte(item.Value), &nodeData)
if err != nil {
err = errors.Wrap(err, "")
return
}
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: string(item.Key)[len(w.key)+1:],
Metadata: &nodeData,
})
}
//log.Printf("first updates, revision %v\n", resp.Header.Revision)
w.revision = resp.Header.Revision
return
}
//log.Printf("watching revision %v\n", w.revision+1)
ch := w.watcher.Watch(w.ctx, w.key, clientv3.WithPrefix(), clientv3.WithRev(w.revision+1))
select {
case resp := <-ch:
err = resp.Err()
if err != nil {
return
}
//log.Printf("get updates, revision %v\n", resp.Header.Revision)
//if w.revision+1 != resp.Header.Revision {
// log.Printf("revision mismatch, resp: %# v\n", resp)
//}
w.revision = resp.Header.Revision
for _, e := range resp.Events {
if e.Type == clientv3.EventTypeDelete {
//e.Kv.Value is []
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: string(e.Kv.Key)[len(w.key)+1:],
})
} else if e.IsCreate() || e.IsModify() {
nodeData := NodeData{}
if err = json.Unmarshal([]byte(e.Kv.Value), &nodeData); err != nil {
err = errors.Wrap(err, "")
return
}
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: string(e.Kv.Key)[len(w.key)+1:],
Metadata: &nodeData,
})
}
}
}
return
}