Skip to content

Commit ae70432

Browse files
fix: properly handling timeout when revoking ETCD lease (#419)
* fix: properly handling timeout when revoking ETCD lease * fix intermittent test issues Co-authored-by: Rafael da Fonseca <rafael.fonseca@wildlifestudios.com>
1 parent 9e69f33 commit ae70432

File tree

3 files changed

+30
-25
lines changed

3 files changed

+30
-25
lines changed

pkg/cluster/etcd_service_discovery.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func (sd *etcdServiceDiscovery) SyncServers(firstSync bool) error {
549549
}
550550

551551
// delete invalid servers (local ones that are not in etcd)
552-
var allIds = make([]string, 0)
552+
allIds := make([]string, 0)
553553

554554
// Spawn worker goroutines that will work in parallel
555555
parallelGetter := newParallelGetter(sd.cli, sd.syncServersParallelism)
@@ -616,21 +616,19 @@ func (sd *etcdServiceDiscovery) revoke() error {
616616
go func() {
617617
defer close(c)
618618
logger.Log.Debug("waiting for etcd revoke")
619+
619620
if sd.cli != nil {
620-
_, err := sd.cli.Revoke(context.TODO(), sd.leaseID)
621+
ctx, cancel := context.WithTimeout(context.Background(), sd.revokeTimeout)
622+
_, err := sd.cli.Revoke(ctx, sd.leaseID)
623+
cancel()
621624
c <- err
622625
} else {
623626
c <- nil
624627
}
625628
logger.Log.Debug("finished waiting for etcd revoke")
626629
}()
627-
select {
628-
case err := <-c:
629-
return err // completed normally
630-
case <-time.After(sd.revokeTimeout):
631-
logger.Log.Warn("timed out waiting for etcd revoke")
632-
return nil // timed out
633-
}
630+
err := <-c
631+
return err
634632
}
635633

636634
func (sd *etcdServiceDiscovery) addServer(sv *Server) {
@@ -712,7 +710,6 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
712710
case <-sd.stopChan:
713711
return
714712
}
715-
716713
}
717714
}(w)
718715
}

pkg/cluster/nats_rpc_server.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,20 +214,26 @@ func (ns *NatsRPCServer) handleMessages() {
214214
select {
215215
case msg := <-ns.subChan:
216216
ns.reportMetrics()
217-
dropped, err := ns.sub.Dropped()
218-
if err != nil {
219-
logger.Log.Errorf("error getting number of dropped messages: %s", err.Error())
220-
}
221-
if dropped > ns.dropped {
222-
logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
223-
ns.dropped = dropped
217+
// Check if subscription is still valid before accessing it
218+
// This can happen during connection replacement when the old subscription becomes invalid
219+
var dropped int
220+
if ns.sub != nil && ns.sub.IsValid() {
221+
var err error
222+
dropped, err = ns.sub.Dropped()
223+
if err != nil {
224+
logger.Log.Errorf("error getting number of dropped messages: %s", err.Error())
225+
}
226+
if dropped > ns.dropped {
227+
logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
228+
ns.dropped = dropped
229+
}
224230
}
225231
subsChanLen := float64(len(ns.subChan))
226232
maxPending = math.Max(float64(maxPending), subsChanLen)
227233
logger.Log.Debugf("subs channel size: %v, max: %v, dropped: %v", subsChanLen, maxPending, dropped)
228234
req := &protos.Request{}
229235
// TODO: Add tracing here to report delay to start processing message in spans
230-
err = proto.Unmarshal(msg.Data, req)
236+
err := proto.Unmarshal(msg.Data, req)
231237
if err != nil {
232238
// should answer rpc with an error
233239
logger.Log.Error("error unmarshalling rpc message:", err.Error())

pkg/groups/memory_group_service.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var (
1616
globalCtx context.Context
1717
globalCancel context.CancelFunc
1818
cleanupWG sync.WaitGroup
19+
cleanupOnce sync.Once
1920
)
2021

2122
// MemoryGroupService base in server memory solution
@@ -230,11 +231,12 @@ func (c *MemoryGroupService) GroupRenewTTL(ctx context.Context, groupName string
230231
}
231232

232233
func (c *MemoryGroupService) Close() {
233-
// Only cancel if this is the last service (we can't easily track refs, so we cancel anyway)
234-
// The goroutine will exit when the context is cancelled
235-
if globalCancel != nil {
236-
globalCancel()
237-
// Wait for the goroutine to exit
238-
cleanupWG.Wait()
239-
}
234+
// Only cancel once, even if Close() is called multiple times
235+
cleanupOnce.Do(func() {
236+
if globalCancel != nil {
237+
globalCancel()
238+
// Wait for the goroutine to exit
239+
cleanupWG.Wait()
240+
}
241+
})
240242
}

0 commit comments

Comments
 (0)