diff --git a/pkg/cluster/etcd_service_discovery.go b/pkg/cluster/etcd_service_discovery.go index 6812a400..9185689c 100644 --- a/pkg/cluster/etcd_service_discovery.go +++ b/pkg/cluster/etcd_service_discovery.go @@ -549,7 +549,7 @@ func (sd *etcdServiceDiscovery) SyncServers(firstSync bool) error { } // delete invalid servers (local ones that are not in etcd) - var allIds = make([]string, 0) + allIds := make([]string, 0) // Spawn worker goroutines that will work in parallel parallelGetter := newParallelGetter(sd.cli, sd.syncServersParallelism) @@ -616,21 +616,19 @@ func (sd *etcdServiceDiscovery) revoke() error { go func() { defer close(c) logger.Log.Debug("waiting for etcd revoke") + if sd.cli != nil { - _, err := sd.cli.Revoke(context.TODO(), sd.leaseID) + ctx, cancel := context.WithTimeout(context.Background(), sd.revokeTimeout) + _, err := sd.cli.Revoke(ctx, sd.leaseID) + cancel() c <- err } else { c <- nil } logger.Log.Debug("finished waiting for etcd revoke") }() - select { - case err := <-c: - return err // completed normally - case <-time.After(sd.revokeTimeout): - logger.Log.Warn("timed out waiting for etcd revoke") - return nil // timed out - } + err := <-c + return err } func (sd *etcdServiceDiscovery) addServer(sv *Server) { @@ -712,7 +710,6 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() { case <-sd.stopChan: return } - } }(w) } diff --git a/pkg/cluster/nats_rpc_server.go b/pkg/cluster/nats_rpc_server.go index 41a96295..b2e30123 100644 --- a/pkg/cluster/nats_rpc_server.go +++ b/pkg/cluster/nats_rpc_server.go @@ -214,20 +214,26 @@ func (ns *NatsRPCServer) handleMessages() { select { case msg := <-ns.subChan: ns.reportMetrics() - dropped, err := ns.sub.Dropped() - if err != nil { - logger.Log.Errorf("error getting number of dropped messages: %s", err.Error()) - } - if dropped > ns.dropped { - logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped) - ns.dropped = dropped + // Check if subscription is still valid before accessing it + // This can happen during connection replacement when the old subscription becomes invalid + var dropped int + if ns.sub != nil && ns.sub.IsValid() { + var err error + dropped, err = ns.sub.Dropped() + if err != nil { + logger.Log.Errorf("error getting number of dropped messages: %s", err.Error()) + } + if dropped > ns.dropped { + logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped) + ns.dropped = dropped + } } subsChanLen := float64(len(ns.subChan)) maxPending = math.Max(float64(maxPending), subsChanLen) logger.Log.Debugf("subs channel size: %v, max: %v, dropped: %v", subsChanLen, maxPending, dropped) req := &protos.Request{} // TODO: Add tracing here to report delay to start processing message in spans - err = proto.Unmarshal(msg.Data, req) + err := proto.Unmarshal(msg.Data, req) if err != nil { // should answer rpc with an error logger.Log.Error("error unmarshalling rpc message:", err.Error()) diff --git a/pkg/groups/memory_group_service.go b/pkg/groups/memory_group_service.go index c7ef1e12..aa51efb3 100644 --- a/pkg/groups/memory_group_service.go +++ b/pkg/groups/memory_group_service.go @@ -16,6 +16,7 @@ var ( globalCtx context.Context globalCancel context.CancelFunc cleanupWG sync.WaitGroup + cleanupOnce sync.Once ) // MemoryGroupService base in server memory solution @@ -230,11 +231,12 @@ func (c *MemoryGroupService) GroupRenewTTL(ctx context.Context, groupName string } func (c *MemoryGroupService) Close() { - // Only cancel if this is the last service (we can't easily track refs, so we cancel anyway) - // The goroutine will exit when the context is cancelled - if globalCancel != nil { - globalCancel() - // Wait for the goroutine to exit - cleanupWG.Wait() - } + // Only cancel once, even if Close() is called multiple times + cleanupOnce.Do(func() { + if globalCancel != nil { + globalCancel() + // Wait for the goroutine to exit + cleanupWG.Wait() + } + }) }