-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathregistry_etcd.go
More file actions
108 lines (94 loc) · 2.25 KB
/
registry_etcd.go
File metadata and controls
108 lines (94 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package discovery
import (
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
//EtcdRegistry registers to etcd
type EtcdRegistry struct {
endpoints []string
key string
value string
ttl int64
ctx context.Context
cancel context.CancelFunc
err error
}
//Option is the configuration of EtcdRegistry
type Option struct {
Endpoints []string //etcd urls
RegistryDir string
ServiceName string
NodeID string //key is fmt.Sprintf("%s/%s/%s", option.RegistryDir, option.ServiceName, option.NodeID)
NData NodeData //value
TTL int64 //TTL of key
}
//NodeData is the struct of value part of registered KV
type NodeData map[string]string
//NewEtcdRegistry creates EtcdRegistry
func NewEtcdRegistry(option Option) (r *EtcdRegistry, err error) {
var val []byte
val, err = json.Marshal(option.NData)
if err != nil {
err = errors.Wrap(err, "")
return
}
ctx, cancel := context.WithCancel(context.Background())
r = &EtcdRegistry{
endpoints: option.Endpoints,
key: fmt.Sprintf("%s/%s/%s", option.RegistryDir, option.ServiceName, option.NodeID),
value: string(val),
ttl: option.TTL,
ctx: ctx,
cancel: cancel,
}
return
}
//Close cancel RegisterLoop
func (r *EtcdRegistry) Close() {
r.cancel()
}
//Err returns the error occurred during RegisterLoop
func (r *EtcdRegistry) Err() error {
return r.err
}
//RegisterLoop keepalives the key in a loop until an error occur or EtcdRegistry be closed
func (r *EtcdRegistry) RegisterLoop() {
r.err = nil
cli, err := clientv3.New(clientv3.Config{
Endpoints: r.endpoints,
})
if err != nil {
r.err = errors.Wrap(err, "")
return
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), r.ttl)
if err != nil {
r.err = errors.Wrap(err, "")
return
}
_, err = cli.Put(context.TODO(), r.key, r.value, clientv3.WithLease(resp.ID))
if err != nil {
r.err = errors.Wrap(err, "")
return
}
// the key will be kept forever
ch, kaerr := cli.KeepAlive(r.ctx, resp.ID)
if kaerr != nil {
r.err = errors.Wrap(err, "")
return
}
for {
select {
case ka := <-ch:
if ka == nil {
//context canceled
return
}
//log.Printf("e.key: %+v, ka: %+v", r.key, ka)
}
}
}