Skip to content

Commit 50c78d4

Browse files
committed
Create ResourceTracker
Signed-off-by: Justin Jung <[email protected]>
1 parent a7592b6 commit 50c78d4

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed

pkg/util/resource/tracker.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package resource
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
type memoryBuckets struct {
9+
buckets [3]uint64 // 3 buckets for 3 seconds
10+
lastUpdate time.Time
11+
currentIdx int
12+
}
13+
14+
type ResourceTracker struct {
15+
cpuData map[string]time.Duration
16+
memoryData map[string]*memoryBuckets
17+
lastUpdate map[string]time.Time // Track last update per requestID
18+
19+
mu sync.RWMutex
20+
}
21+
22+
type IResourceTracker interface {
23+
AddDuration(requestID string, duration time.Duration)
24+
AddBytes(requestID string, bytes uint64)
25+
GetSlowestQuery() (requestID string, duration time.Duration)
26+
GetHeaviestQuery() (requestID string, bytes uint64)
27+
}
28+
29+
func NewResourceTracker() *ResourceTracker {
30+
rt := &ResourceTracker{
31+
cpuData: make(map[string]time.Duration),
32+
memoryData: make(map[string]*memoryBuckets),
33+
lastUpdate: make(map[string]time.Time),
34+
}
35+
36+
// Start cleanup goroutine
37+
go rt.cleanupLoop()
38+
39+
return rt
40+
}
41+
42+
func (rt *ResourceTracker) AddDuration(requestID string, duration time.Duration) {
43+
rt.mu.Lock()
44+
now := time.Now()
45+
rt.cpuData[requestID] += duration
46+
rt.lastUpdate[requestID] = now
47+
rt.mu.Unlock()
48+
}
49+
50+
func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) {
51+
rt.mu.Lock()
52+
now := time.Now().Truncate(time.Second)
53+
54+
buckets, exists := rt.memoryData[requestID]
55+
if !exists {
56+
buckets = &memoryBuckets{
57+
lastUpdate: now,
58+
currentIdx: 0,
59+
}
60+
rt.memoryData[requestID] = buckets
61+
}
62+
63+
// Calculate seconds drift and rotate buckets if needed
64+
secondsDrift := int(now.Sub(buckets.lastUpdate).Seconds())
65+
if secondsDrift > 0 {
66+
// Clear old buckets
67+
for i := 0; i < min(secondsDrift, 3); i++ {
68+
nextIdx := (buckets.currentIdx + 1 + i) % 3
69+
buckets.buckets[nextIdx] = 0
70+
}
71+
// Update current index
72+
buckets.currentIdx = (buckets.currentIdx + secondsDrift) % 3
73+
buckets.lastUpdate = now
74+
}
75+
76+
// Add bytes to current bucket
77+
buckets.buckets[buckets.currentIdx] += bytes
78+
rt.lastUpdate[requestID] = time.Now()
79+
rt.mu.Unlock()
80+
}
81+
82+
func (rt *ResourceTracker) GetSlowestQuery() (string, time.Duration) {
83+
rt.mu.RLock()
84+
defer rt.mu.RUnlock()
85+
86+
var maxID string
87+
var maxDuration time.Duration
88+
89+
for id, duration := range rt.cpuData {
90+
if duration > maxDuration {
91+
maxDuration = duration
92+
maxID = id
93+
}
94+
}
95+
96+
return maxID, maxDuration
97+
}
98+
99+
func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) {
100+
rt.mu.RLock()
101+
defer rt.mu.RUnlock()
102+
103+
var maxID string
104+
var maxBytes uint64
105+
106+
for id, buckets := range rt.memoryData {
107+
// Sum all buckets (represents last 3 seconds)
108+
var totalBytes uint64
109+
for _, bytes := range buckets.buckets {
110+
totalBytes += bytes
111+
}
112+
if totalBytes > maxBytes {
113+
maxBytes = totalBytes
114+
maxID = id
115+
}
116+
}
117+
118+
return maxID, maxBytes
119+
}
120+
121+
func (rt *ResourceTracker) cleanupLoop() {
122+
ticker := time.NewTicker(time.Second)
123+
defer ticker.Stop()
124+
125+
for range ticker.C {
126+
rt.cleanup()
127+
}
128+
}
129+
130+
func (rt *ResourceTracker) cleanup() {
131+
rt.mu.Lock()
132+
defer rt.mu.Unlock()
133+
134+
now := time.Now()
135+
cutoff := now.Add(-5 * time.Second)
136+
137+
// Remove stale requestIDs
138+
for requestID, lastUpdate := range rt.lastUpdate {
139+
if lastUpdate.Before(cutoff) {
140+
delete(rt.cpuData, requestID)
141+
delete(rt.memoryData, requestID)
142+
delete(rt.lastUpdate, requestID)
143+
}
144+
}
145+
146+
// Memory buckets are self-cleaning via rotation, no additional cleanup needed
147+
}

0 commit comments

Comments
 (0)