Skip to content

Commit a07cc9a

Browse files
authored
fix: delete missing workers (#3273)
1 parent dc739a7 commit a07cc9a

File tree

1 file changed

+36
-14
lines changed

1 file changed

+36
-14
lines changed

pkg/scheduling/v1/prometheus_extension.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ import (
1010
)
1111

1212
type PrometheusExtension struct {
13-
mu sync.RWMutex
14-
tenants map[string]*sqlcv1.Tenant
13+
mu sync.RWMutex
14+
tenants map[uuid.UUID]*sqlcv1.Tenant
15+
tenantIdToWorkerLabels map[uuid.UUID]map[WorkerPromLabels]struct{}
1516
}
1617

1718
func NewPrometheusExtension() *PrometheusExtension {
1819
return &PrometheusExtension{
19-
tenants: make(map[string]*sqlcv1.Tenant),
20+
tenants: make(map[uuid.UUID]*sqlcv1.Tenant),
21+
tenantIdToWorkerLabels: make(map[uuid.UUID]map[WorkerPromLabels]struct{}),
2022
}
2123
}
2224

@@ -25,7 +27,7 @@ func (p *PrometheusExtension) SetTenants(tenants []*sqlcv1.Tenant) {
2527
defer p.mu.Unlock()
2628

2729
for _, tenant := range tenants {
28-
p.tenants[tenant.ID.String()] = tenant
30+
p.tenants[tenant.ID] = tenant
2931
}
3032
}
3133

@@ -35,18 +37,20 @@ type WorkerPromLabels struct {
3537
}
3638

3739
func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput) {
38-
p.mu.RLock()
39-
defer p.mu.RUnlock()
40+
p.mu.Lock()
41+
defer p.mu.Unlock()
4042

41-
workerPromLabelsToSlotData := make(map[*WorkerPromLabels]*SlotUtilization)
43+
tenantIdStr := tenantId.String()
44+
45+
workerPromLabelsToSlotData := make(map[WorkerPromLabels]*SlotUtilization)
4246

4347
for workerId, utilization := range input.WorkerSlotUtilization {
4448
worker, ok := input.Workers[workerId]
4549
if !ok {
4650
continue
4751
}
4852

49-
promLabels := &WorkerPromLabels{
53+
promLabels := WorkerPromLabels{
5054
ID: worker.WorkerId,
5155
Name: worker.Name,
5256
}
@@ -55,20 +59,37 @@ func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *Snapshot
5559
if ok {
5660
data.UtilizedSlots += utilization.UtilizedSlots
5761
data.NonUtilizedSlots += utilization.NonUtilizedSlots
58-
workerPromLabelsToSlotData[promLabels] = data
5962
} else {
60-
workerPromLabelsToSlotData[promLabels] = utilization
63+
workerPromLabelsToSlotData[promLabels] = &SlotUtilization{
64+
UtilizedSlots: utilization.UtilizedSlots,
65+
NonUtilizedSlots: utilization.NonUtilizedSlots,
66+
}
67+
}
68+
}
69+
70+
if known, ok := p.tenantIdToWorkerLabels[tenantId]; ok {
71+
for labels := range known {
72+
if _, stillActive := workerPromLabelsToSlotData[labels]; !stillActive {
73+
prometheus.TenantWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
74+
prometheus.TenantUsedWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
75+
prometheus.TenantAvailableWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
76+
}
6177
}
6278
}
6379

80+
currentWorkers := make(map[WorkerPromLabels]struct{}, len(workerPromLabelsToSlotData))
6481
for promLabels, utilization := range workerPromLabelsToSlotData {
6582
usedSlots := float64(utilization.UtilizedSlots)
6683
availableSlots := float64(utilization.NonUtilizedSlots)
6784

68-
prometheus.TenantWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(usedSlots + availableSlots)
69-
prometheus.TenantUsedWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(usedSlots)
70-
prometheus.TenantAvailableWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(availableSlots)
85+
prometheus.TenantWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots + availableSlots)
86+
prometheus.TenantUsedWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots)
87+
prometheus.TenantAvailableWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(availableSlots)
88+
89+
currentWorkers[promLabels] = struct{}{}
7190
}
91+
92+
p.tenantIdToWorkerLabels[tenantId] = currentWorkers
7293
}
7394

7495
func (p *PrometheusExtension) PostAssign(tenantId uuid.UUID, input *PostAssignInput) {}
@@ -77,6 +98,7 @@ func (p *PrometheusExtension) Cleanup() error {
7798
p.mu.Lock()
7899
defer p.mu.Unlock()
79100

80-
p.tenants = make(map[string]*sqlcv1.Tenant)
101+
p.tenants = make(map[uuid.UUID]*sqlcv1.Tenant)
102+
p.tenantIdToWorkerLabels = make(map[uuid.UUID]map[WorkerPromLabels]struct{})
81103
return nil
82104
}

0 commit comments

Comments
 (0)