Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions pkg/cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (sd *etcdServiceDiscovery) SyncServers(firstSync bool) error {
}

// delete invalid servers (local ones that are not in etcd)
var allIds = make([]string, 0)
allIds := make([]string, 0)

// Spawn worker goroutines that will work in parallel
parallelGetter := newParallelGetter(sd.cli, sd.syncServersParallelism)
Expand Down Expand Up @@ -616,21 +616,19 @@ func (sd *etcdServiceDiscovery) revoke() error {
go func() {
defer close(c)
logger.Log.Debug("waiting for etcd revoke")

if sd.cli != nil {
_, err := sd.cli.Revoke(context.TODO(), sd.leaseID)
ctx, cancel := context.WithTimeout(context.Background(), sd.revokeTimeout)
_, err := sd.cli.Revoke(ctx, sd.leaseID)
cancel()
c <- err
} else {
c <- nil
}
logger.Log.Debug("finished waiting for etcd revoke")
}()
select {
case err := <-c:
return err // completed normally
case <-time.After(sd.revokeTimeout):
logger.Log.Warn("timed out waiting for etcd revoke")
return nil // timed out
}
err := <-c
return err
}

func (sd *etcdServiceDiscovery) addServer(sv *Server) {
Expand Down Expand Up @@ -712,7 +710,6 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
case <-sd.stopChan:
return
}

}
}(w)
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,26 @@ func (ns *NatsRPCServer) handleMessages() {
select {
case msg := <-ns.subChan:
ns.reportMetrics()
dropped, err := ns.sub.Dropped()
if err != nil {
logger.Log.Errorf("error getting number of dropped messages: %s", err.Error())
}
if dropped > ns.dropped {
logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
ns.dropped = dropped
// Check if subscription is still valid before accessing it
// This can happen during connection replacement when the old subscription becomes invalid
var dropped int
if ns.sub != nil && ns.sub.IsValid() {
var err error
dropped, err = ns.sub.Dropped()
if err != nil {
logger.Log.Errorf("error getting number of dropped messages: %s", err.Error())
}
if dropped > ns.dropped {
logger.Log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
ns.dropped = dropped
}
}
subsChanLen := float64(len(ns.subChan))
maxPending = math.Max(float64(maxPending), subsChanLen)
logger.Log.Debugf("subs channel size: %v, max: %v, dropped: %v", subsChanLen, maxPending, dropped)
req := &protos.Request{}
// TODO: Add tracing here to report delay to start processing message in spans
err = proto.Unmarshal(msg.Data, req)
err := proto.Unmarshal(msg.Data, req)
if err != nil {
// should answer rpc with an error
logger.Log.Error("error unmarshalling rpc message:", err.Error())
Expand Down
16 changes: 9 additions & 7 deletions pkg/groups/memory_group_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
globalCtx context.Context
globalCancel context.CancelFunc
cleanupWG sync.WaitGroup
cleanupOnce sync.Once
)

// MemoryGroupService base in server memory solution
Expand Down Expand Up @@ -230,11 +231,12 @@ func (c *MemoryGroupService) GroupRenewTTL(ctx context.Context, groupName string
}

func (c *MemoryGroupService) Close() {
// Only cancel if this is the last service (we can't easily track refs, so we cancel anyway)
// The goroutine will exit when the context is cancelled
if globalCancel != nil {
globalCancel()
// Wait for the goroutine to exit
cleanupWG.Wait()
}
// Only cancel once, even if Close() is called multiple times
cleanupOnce.Do(func() {
if globalCancel != nil {
globalCancel()
// Wait for the goroutine to exit
cleanupWG.Wait()
}
})
}