Skip to content

Commit 362f942

Browse files
authored
fix: remove from tracker after windows are created and fix concurrency issues with rater (#3051)
Signed-off-by: Yashash H L <[email protected]>
1 parent 367d212 commit 362f942

File tree

12 files changed

+151
-296
lines changed

12 files changed

+151
-296
lines changed

pkg/daemon/server/service/rater/helper.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,48 +31,6 @@ const (
3131
rateNotAvailable = float64(math.MinInt)
3232
)
3333

34-
// UpdatePendingCount updates the pending count for a pod at a given time
35-
func UpdatePendingCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podPendingCounts *PodPendingCount) {
36-
if podPendingCounts == nil {
37-
return
38-
}
39-
40-
items := q.Items()
41-
// find the element matching the input timestamp and update it
42-
for _, i := range items {
43-
if i.timestamp == time {
44-
i.UpdatePending(podPendingCounts)
45-
return
46-
}
47-
}
48-
49-
// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
50-
tc := NewTimestampedCounts(time)
51-
tc.UpdatePending(podPendingCounts)
52-
q.Append(tc)
53-
}
54-
55-
// UpdateCount updates the count of processed messages for a pod at a given time
56-
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount) {
57-
if podReadCounts == nil {
58-
return
59-
}
60-
61-
items := q.Items()
62-
// find the element matching the input timestamp and update it
63-
for _, i := range items {
64-
if i.timestamp == time {
65-
i.Update(podReadCounts)
66-
return
67-
}
68-
}
69-
70-
// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
71-
tc := NewTimestampedCounts(time)
72-
tc.Update(podReadCounts)
73-
q.Append(tc)
74-
}
75-
7634
// CalculatePending calculates the pending messages for a given partition in the last lookback seconds
7735
func CalculatePending(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) int64 {
7836
counts := q.Items()

pkg/daemon/server/service/rater/helper_test.go

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -27,84 +27,6 @@ import (
2727

2828
const TestTime = 1620000000
2929

30-
func TestUpdateCount(t *testing.T) {
31-
t.Run("givenTimeExistsPodExistsPartitionExistsCountAvailable_whenUpdate_thenUpdatePodPartitionCount", func(t *testing.T) {
32-
q := sharedqueue.New[*TimestampedCounts](1800)
33-
tc := NewTimestampedCounts(TestTime)
34-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
35-
q.Append(tc)
36-
37-
UpdateCount(q, TestTime, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0}})
38-
39-
assert.Equal(t, 1, q.Length())
40-
assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
41-
})
42-
43-
t.Run("givenTimeExistsPodExistsPartitionNotExistsCountAvailable_whenUpdate_thenAddPodPartitionCount", func(t *testing.T) {
44-
q := sharedqueue.New[*TimestampedCounts](1800)
45-
tc := NewTimestampedCounts(TestTime)
46-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
47-
q.Append(tc)
48-
49-
UpdateCount(q, TestTime, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0, "partition2": 30.0}})
50-
51-
assert.Equal(t, 1, q.Length())
52-
assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
53-
assert.Equal(t, 30.0, q.Items()[0].podPartitionCount["pod1"]["partition2"])
54-
})
55-
56-
t.Run("givenTimeExistsPodNotExistsCountAvailable_whenUpdate_thenAddPodCount", func(t *testing.T) {
57-
q := sharedqueue.New[*TimestampedCounts](1800)
58-
tc := NewTimestampedCounts(TestTime)
59-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}})
60-
q.Append(tc)
61-
62-
UpdateCount(q, TestTime, &PodReadCount{"pod2", map[string]float64{"partition1": 10.0}})
63-
64-
assert.Equal(t, 1, q.Length())
65-
assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
66-
assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod2"]["partition1"])
67-
})
68-
69-
t.Run("givenTimeExistsPodExistsCountNotAvailable_whenUpdate_thenNotUpdatePod", func(t *testing.T) {
70-
q := sharedqueue.New[*TimestampedCounts](1800)
71-
tc := NewTimestampedCounts(TestTime)
72-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
73-
q.Append(tc)
74-
75-
UpdateCount(q, TestTime, nil)
76-
77-
assert.Equal(t, 1, q.Length())
78-
assert.Equal(t, 1, len(q.Items()[0].podPartitionCount))
79-
assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
80-
})
81-
82-
t.Run("givenTimeExistsPodNotExistsCountNotAvailable_whenUpdate_thenNoUpdate", func(t *testing.T) {
83-
q := sharedqueue.New[*TimestampedCounts](1800)
84-
tc := NewTimestampedCounts(TestTime)
85-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
86-
q.Append(tc)
87-
88-
UpdateCount(q, TestTime, nil)
89-
90-
assert.Equal(t, 1, q.Length())
91-
assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
92-
})
93-
94-
t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenAddNewItem", func(t *testing.T) {
95-
q := sharedqueue.New[*TimestampedCounts](1800)
96-
tc := NewTimestampedCounts(TestTime)
97-
tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
98-
q.Append(tc)
99-
100-
UpdateCount(q, TestTime+1, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0}})
101-
102-
assert.Equal(t, 2, q.Length())
103-
assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"])
104-
assert.Equal(t, 20.0, q.Items()[1].podPartitionCount["pod1"]["partition1"])
105-
})
106-
}
107-
10830
func TestCalculateRate(t *testing.T) {
10931
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnRateNotAvailable", func(t *testing.T) {
11032
q := sharedqueue.New[*TimestampedCounts](1800)

pkg/daemon/server/service/rater/options.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ limitations under the License.
1616

1717
package rater
1818

19+
import "time"
20+
1921
type options struct {
2022
// Number of workers working on collecting counts of processed messages.
2123
workers int
22-
// Time in milliseconds, each element in the work queue will be picked up in an interval of this period of time.
23-
taskInterval int
24+
// Time in seconds, each element in the work queue will be picked up in an interval of this period of time.
25+
taskInterval time.Duration
2426
}
2527

2628
type Option func(*options)
@@ -34,7 +36,7 @@ func defaultOptions() *options {
3436
return &options{
3537
workers: 50, // default max replicas is 50
3638
// we execute the rater metrics fetching every 5 seconds
37-
taskInterval: 5,
39+
taskInterval: 5 * time.Second,
3840
}
3941
}
4042

@@ -44,8 +46,8 @@ func WithWorkers(n int) Option {
4446
}
4547
}
4648

47-
func WithTaskInterval(n int) Option {
49+
func WithTaskInterval(duration time.Duration) Option {
4850
return func(o *options) {
49-
o.taskInterval = n
51+
o.taskInterval = duration
5052
}
5153
}

pkg/daemon/server/service/rater/rater.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"maps"
2525
"math"
2626
"net/http"
27+
"sync"
2728
"time"
2829

2930
dto "github.com/prometheus/client_model/go"
@@ -69,6 +70,7 @@ type Rater struct {
6970
timestampedPendingCount map[string]*sharedqueue.OverflowQueue[*TimestampedCounts]
7071
// this can be updated dynamically, defaults to user-specified value in the spec
7172
lookBackSeconds map[string]*atomic.Float64
73+
mu sync.RWMutex
7274
options *options
7375
}
7476

@@ -184,15 +186,65 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int, times
184186
if podPendingCount == nil {
185187
log.Debugf("Failed retrieving pending counts for pod %s vertex %s", podInfo.podName, podInfo.vertexName)
186188
}
187-
UpdatePendingCount(r.timestampedPendingCount[podInfo.vertexName], timestamp, podPendingCount)
189+
r.updatePendingCount(podInfo.vertexName, timestamp, podPendingCount)
188190
}
189191
} else {
190192
log.Debugf("Pod %s does not exist, updating it with nil...", podInfo.podName)
191193
}
192-
UpdateCount(r.timestampedPodCounts[podInfo.vertexName], timestamp, podReadCount)
194+
r.updateCount(podInfo.vertexName, timestamp, podReadCount)
193195
return nil
194196
}
195197

198+
func (r *Rater) updateCount(vertexName string, timestamp int64, podReadCount *PodReadCount) {
199+
r.mu.Lock()
200+
defer r.mu.Unlock()
201+
202+
if podReadCount == nil {
203+
return
204+
}
205+
206+
q := r.timestampedPodCounts[vertexName]
207+
// Peek the last element
208+
items := q.Items()
209+
if len(items) > 0 {
210+
last := items[len(items)-1]
211+
// Merge if same timestamp
212+
if last.timestamp == timestamp {
213+
last.Update(podReadCount)
214+
return
215+
}
216+
}
217+
218+
tc := NewTimestampedCounts(timestamp)
219+
tc.Update(podReadCount)
220+
q.Append(tc)
221+
}
222+
223+
func (r *Rater) updatePendingCount(vertexName string, timestamp int64, podPendingCounts *PodPendingCount) {
224+
r.mu.Lock()
225+
defer r.mu.Unlock()
226+
227+
if podPendingCounts == nil {
228+
return
229+
}
230+
q := r.timestampedPendingCount[vertexName]
231+
232+
items := q.Items()
233+
if len(items) > 0 {
234+
last := items[len(items)-1]
235+
// Merge if same timestamp
236+
if last.timestamp == timestamp {
237+
last.UpdatePending(podPendingCounts)
238+
return
239+
}
240+
}
241+
242+
// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
243+
tc := NewTimestampedCounts(timestamp)
244+
tc.UpdatePending(podPendingCounts)
245+
q.Append(tc)
246+
}
247+
196248
func (r *Rater) Start(ctx context.Context) error {
197249
r.log.Infof("Starting rater...")
198250
taskCh := make(chan *podTask)
@@ -225,7 +277,7 @@ func (r *Rater) Start(ctx context.Context) error {
225277
}
226278
}
227279

228-
ticker := time.NewTicker(time.Duration(r.options.taskInterval) * time.Millisecond)
280+
ticker := time.NewTicker(r.options.taskInterval)
229281
defer ticker.Stop()
230282

231283
// Following for loop keeps calling assign() function to assign monitoring tasks to the workers.

pkg/daemon/server/service/rater/rater_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func TestRater_Start(t *testing.T) {
108108
},
109109
},
110110
}
111-
r := NewRater(ctx, pipeline, WithTaskInterval(1000))
111+
r := NewRater(ctx, pipeline, WithTaskInterval(time.Second))
112112
podTracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second*1))
113113
podTracker.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}
114114
r.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}

pkg/mvtxdaemon/server/service/rater/helper.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,6 @@ const (
3232
rateNotAvailable = float64(math.MinInt)
3333
)
3434

35-
// UpdateCount updates the count for a given timestamp in the queue.
36-
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodMetricsCount) {
37-
if podReadCounts == nil {
38-
return
39-
}
40-
41-
items := q.Items()
42-
// find the element matching the input timestamp and update it
43-
for _, i := range items {
44-
if i.timestamp == time {
45-
i.Update(podReadCounts)
46-
return
47-
}
48-
}
49-
50-
// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
51-
tc := NewTimestampedCounts(time)
52-
tc.Update(podReadCounts)
53-
q.Append(tc)
54-
}
55-
5635
// CalculateRate calculates the rate of a MonoVertex for a given lookback period.
5736
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 {
5837
counts := q.Items()

pkg/mvtxdaemon/server/service/rater/helper_test.go

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -30,71 +30,6 @@ import (
3030

3131
const TestTime = 1620000000
3232

33-
func TestUpdateCount(t *testing.T) {
34-
t.Run("givenTimeExistsPodExistsCountAvailable_whenUpdate_thenUpdatePodPartitionCount", func(t *testing.T) {
35-
q := sharedqueue.New[*TimestampedCounts](1800)
36-
tc := NewTimestampedCounts(TestTime)
37-
tc.Update(&PodMetricsCount{"pod1", 10.0})
38-
q.Append(tc)
39-
40-
UpdateCount(q, TestTime, &PodMetricsCount{"pod1", 20.0})
41-
42-
assert.Equal(t, 1, q.Length())
43-
assert.Equal(t, 20.0, q.Items()[0].podReadCounts["pod1"])
44-
})
45-
46-
t.Run("givenTimeExistsPodNotExistsCountAvailable_whenUpdate_thenAddPodCount", func(t *testing.T) {
47-
q := sharedqueue.New[*TimestampedCounts](1800)
48-
tc := NewTimestampedCounts(TestTime)
49-
tc.Update(&PodMetricsCount{"pod1", 20.0})
50-
q.Append(tc)
51-
52-
UpdateCount(q, TestTime, &PodMetricsCount{"pod2", 10.0})
53-
54-
assert.Equal(t, 1, q.Length())
55-
assert.Equal(t, 20.0, q.Items()[0].podReadCounts["pod1"])
56-
assert.Equal(t, 10.0, q.Items()[0].podReadCounts["pod2"])
57-
})
58-
59-
t.Run("givenTimeExistsPodExistsCountNotAvailable_whenUpdate_thenNotUpdatePod", func(t *testing.T) {
60-
q := sharedqueue.New[*TimestampedCounts](1800)
61-
tc := NewTimestampedCounts(TestTime)
62-
tc.Update(&PodMetricsCount{"pod1", 10.0})
63-
q.Append(tc)
64-
65-
UpdateCount(q, TestTime, nil)
66-
67-
assert.Equal(t, 1, q.Length())
68-
assert.Equal(t, 1, len(q.Items()[0].podReadCounts))
69-
assert.Equal(t, 10.0, q.Items()[0].podReadCounts["pod1"])
70-
})
71-
72-
t.Run("givenTimeExistsPodNotExistsCountNotAvailable_whenUpdate_thenNoUpdate", func(t *testing.T) {
73-
q := sharedqueue.New[*TimestampedCounts](1800)
74-
tc := NewTimestampedCounts(TestTime)
75-
tc.Update(&PodMetricsCount{"pod1", 10.0})
76-
q.Append(tc)
77-
78-
UpdateCount(q, TestTime, nil)
79-
80-
assert.Equal(t, 1, q.Length())
81-
assert.Equal(t, 10.0, q.Items()[0].podReadCounts["pod1"])
82-
})
83-
84-
t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenAddNewItem", func(t *testing.T) {
85-
q := sharedqueue.New[*TimestampedCounts](1800)
86-
tc := NewTimestampedCounts(TestTime)
87-
tc.Update(&PodMetricsCount{"pod1", 10.0})
88-
q.Append(tc)
89-
90-
UpdateCount(q, TestTime+1, &PodMetricsCount{"pod1", 20.0})
91-
92-
assert.Equal(t, 2, q.Length())
93-
assert.Equal(t, 10.0, q.Items()[0].podReadCounts["pod1"])
94-
assert.Equal(t, 20.0, q.Items()[1].podReadCounts["pod1"])
95-
})
96-
}
97-
9833
func TestCalculatePending(t *testing.T) {
9934
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnPendingNotAvailable", func(t *testing.T) {
10035
q := sharedqueue.New[*TimestampedCounts](1800)

0 commit comments

Comments
 (0)