Skip to content

Commit b69db5e

Browse files
authored
chore: refactor etcd discov (#5046)
1 parent ee6b7ce commit b69db5e

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

core/discov/publisher.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ func (p *Publisher) doKeepAlive() error {
9292
default:
9393
cli, err := p.doRegister()
9494
if err != nil {
95-
logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %s", err.Error())
95+
logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %v", err)
9696
break
9797
}
9898

9999
if err := p.keepAliveAsync(cli); err != nil {
100-
logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %s", err.Error())
100+
logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %v", err)
101101
break
102102
}
103103

@@ -125,33 +125,39 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
125125
}
126126

127127
threading.GoSafe(func() {
128-
watchChan := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut())
128+
wch := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut())
129+
129130
for {
130131
select {
131132
case _, ok := <-ch:
132133
if !ok {
133134
p.revoke(cli)
134135
if err := p.doKeepAlive(); err != nil {
135-
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error())
136+
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
136137
}
137138
return
138139
}
139140

140-
case c := <-watchChan:
141+
case c := <-wch:
141142
if c.Err() != nil {
142-
logc.Errorf(cli.Ctx(), "etcd publisher watch: %s", c.Err().Error())
143+
logc.Errorf(cli.Ctx(), "etcd publisher watch: %v", c.Err())
143144
if err := p.doKeepAlive(); err != nil {
144-
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error())
145+
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
145146
}
146147
return
147148
}
148-
if c.Events[0].Type == clientv3.EventTypeDelete {
149-
logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", c.Events[0].Kv.Key, c.Events[0].Type)
150-
_, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease))
151-
if err != nil {
152-
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %s", err.Error())
153-
} else {
154-
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", p.fullKey, p.value)
149+
150+
for _, evt := range c.Events {
151+
if evt.Type == clientv3.EventTypeDelete {
152+
logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v",
153+
evt.Kv.Key, evt.Type)
154+
_, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease))
155+
if err != nil {
156+
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %v", err)
157+
} else {
158+
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s",
159+
p.fullKey, p.value)
160+
}
155161
}
156162
}
157163
case <-p.pauseChan:
@@ -160,7 +166,7 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
160166
select {
161167
case <-p.resumeChan:
162168
if err := p.doKeepAlive(); err != nil {
163-
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error())
169+
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
164170
}
165171
return
166172
case <-p.quit.Done():
@@ -195,7 +201,7 @@ func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, erro
195201

196202
func (p *Publisher) revoke(cli internal.EtcdClient) {
197203
if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
198-
logc.Errorf(cli.Ctx(), "etcd publisher revoke: %s", err.Error())
204+
logc.Errorf(cli.Ctx(), "etcd publisher revoke: %v", err)
199205
}
200206
}
201207

0 commit comments

Comments
 (0)