Skip to content

Commit ee6b7ce

Browse files
authored
fix: Issue with etcd key disappearing and unable to auto-re-register (#4960)
1 parent d150248 commit ee6b7ce

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

core/discov/publisher.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
125125
}
126126

127127
threading.GoSafe(func() {
128+
watchChan := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut())
128129
for {
129130
select {
130131
case _, ok := <-ch:
@@ -135,6 +136,24 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
135136
}
136137
return
137138
}
139+
140+
case c := <-watchChan:
141+
if c.Err() != nil {
142+
logc.Errorf(cli.Ctx(), "etcd publisher watch: %s", c.Err().Error())
143+
if err := p.doKeepAlive(); err != nil {
144+
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error())
145+
}
146+
return
147+
}
148+
if c.Events[0].Type == clientv3.EventTypeDelete {
149+
logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", c.Events[0].Kv.Key, c.Events[0].Type)
150+
_, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease))
151+
if err != nil {
152+
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %s", err.Error())
153+
} else {
154+
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", p.fullKey, p.value)
155+
}
156+
}
138157
case <-p.pauseChan:
139158
logc.Infof(cli.Ctx(), "paused etcd renew, key: %s, value: %s", p.key, p.value)
140159
p.revoke(cli)

core/discov/publisher_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/zeromicro/go-zero/core/lang"
1616
"github.com/zeromicro/go-zero/core/logx"
1717
"github.com/zeromicro/go-zero/core/stringx"
18+
"go.etcd.io/etcd/api/v3/mvccpb"
1819
clientv3 "go.etcd.io/etcd/client/v3"
1920
"golang.org/x/net/http2"
2021
"google.golang.org/grpc"
@@ -211,6 +212,9 @@ func TestPublisher_keepAliveAsyncQuit(t *testing.T) {
211212
defer restore()
212213
cli.EXPECT().Ctx().AnyTimes()
213214
cli.EXPECT().KeepAlive(gomock.Any(), id)
215+
// Add Watch expectation for the new watch mechanism
216+
watchChan := make(<-chan clientv3.WatchResponse)
217+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan)
214218
var wg sync.WaitGroup
215219
wg.Add(1)
216220
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
@@ -232,6 +236,9 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) {
232236
defer restore()
233237
cli.EXPECT().Ctx().AnyTimes()
234238
cli.EXPECT().KeepAlive(gomock.Any(), id)
239+
// Add Watch expectation for the new watch mechanism
240+
watchChan := make(<-chan clientv3.WatchResponse)
241+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan)
235242
pub := NewPublisher(nil, "thekey", "thevalue")
236243
var wg sync.WaitGroup
237244
wg.Add(1)
@@ -245,6 +252,112 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) {
245252
wg.Wait()
246253
}
247254

255+
// Test case for key deletion and re-registration (covers lines 148-155)
256+
func TestPublisher_keepAliveAsyncKeyDeletion(t *testing.T) {
257+
ctrl := gomock.NewController(t)
258+
defer ctrl.Finish()
259+
const id clientv3.LeaseID = 1
260+
cli := internal.NewMockEtcdClient(ctrl)
261+
restore := setMockClient(cli)
262+
defer restore()
263+
cli.EXPECT().Ctx().AnyTimes()
264+
cli.EXPECT().KeepAlive(gomock.Any(), id)
265+
266+
// Create a watch channel that will send a delete event
267+
watchChan := make(chan clientv3.WatchResponse, 1)
268+
watchResp := clientv3.WatchResponse{
269+
Events: []*clientv3.Event{{
270+
Type: clientv3.EventTypeDelete,
271+
Kv: &mvccpb.KeyValue{
272+
Key: []byte("thekey"),
273+
},
274+
}},
275+
}
276+
watchChan <- watchResp
277+
278+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan))
279+
280+
var wg sync.WaitGroup
281+
wg.Add(1) // Only wait for Revoke call
282+
283+
// Use a channel to signal when Put has been called
284+
putCalled := make(chan struct{})
285+
286+
// Expect the re-put operation when key is deleted
287+
cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) {
288+
close(putCalled) // Signal that Put has been called
289+
}).Return(nil, nil)
290+
291+
// Expect revoke when Stop is called
292+
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
293+
wg.Done()
294+
})
295+
296+
pub := NewPublisher(nil, "thekey", "thevalue")
297+
pub.lease = id
298+
pub.fullKey = "thekey"
299+
300+
assert.Nil(t, pub.keepAliveAsync(cli))
301+
302+
// Wait for Put to be called, then stop
303+
<-putCalled
304+
pub.Stop()
305+
wg.Wait()
306+
}
307+
308+
// Test case for key deletion with re-put error (covers error branch in lines 151-152)
309+
func TestPublisher_keepAliveAsyncKeyDeletionPutError(t *testing.T) {
310+
ctrl := gomock.NewController(t)
311+
defer ctrl.Finish()
312+
const id clientv3.LeaseID = 1
313+
cli := internal.NewMockEtcdClient(ctrl)
314+
restore := setMockClient(cli)
315+
defer restore()
316+
cli.EXPECT().Ctx().AnyTimes()
317+
cli.EXPECT().KeepAlive(gomock.Any(), id)
318+
319+
// Create a watch channel that will send a delete event
320+
watchChan := make(chan clientv3.WatchResponse, 1)
321+
watchResp := clientv3.WatchResponse{
322+
Events: []*clientv3.Event{{
323+
Type: clientv3.EventTypeDelete,
324+
Kv: &mvccpb.KeyValue{
325+
Key: []byte("thekey"),
326+
},
327+
}},
328+
}
329+
watchChan <- watchResp
330+
331+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan))
332+
333+
var wg sync.WaitGroup
334+
wg.Add(1) // Only wait for Revoke call
335+
336+
// Use a channel to signal when Put has been called
337+
putCalled := make(chan struct{})
338+
339+
// Expect the re-put operation to fail
340+
cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) {
341+
close(putCalled) // Signal that Put has been called
342+
}).Return(nil, errors.New("put error"))
343+
344+
// Expect revoke when Stop is called
345+
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
346+
wg.Done()
347+
})
348+
349+
pub := NewPublisher(nil, "thekey", "thevalue")
350+
pub.lease = id
351+
pub.fullKey = "thekey"
352+
353+
assert.Nil(t, pub.keepAliveAsync(cli))
354+
355+
// Wait for Put to be called, then stop
356+
<-putCalled
357+
pub.Stop()
358+
wg.Wait()
359+
}
360+
248361
func TestPublisher_Resume(t *testing.T) {
249362
publisher := new(Publisher)
250363
publisher.resumeChan = make(chan lang.PlaceholderType)
@@ -273,6 +386,9 @@ func TestPublisher_keepAliveAsync(t *testing.T) {
273386
defer restore()
274387
cli.EXPECT().Ctx().AnyTimes()
275388
cli.EXPECT().KeepAlive(gomock.Any(), id)
389+
// Add Watch expectation for the new watch mechanism
390+
watchChan := make(<-chan clientv3.WatchResponse)
391+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan)
276392
cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
277393
ID: 1,
278394
}, nil)

0 commit comments

Comments
 (0)