Skip to content

Commit 040cb6b

Browse files
committed
fix:window data race
1 parent b3ea1fb commit 040cb6b

File tree

3 files changed

+32
-6
lines changed

3 files changed

+32
-6
lines changed

window/counting_window.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ func (cw *CountingWindow) Start() {
7777
if shouldTrigger {
7878
// 在持有锁的情况下立即处理
7979
slot := cw.createSlot(cw.dataBuffer[:cw.threshold])
80-
for i := range cw.dataBuffer[:cw.threshold] {
81-
// 由于Row是值类型,这里需要通过指针来修改Slot字段
82-
cw.dataBuffer[i].Slot = slot
83-
}
8480
data := make([]types.Row, cw.threshold)
8581
copy(data, cw.dataBuffer[:cw.threshold])
82+
// 设置Slot字段到复制的数据中,避免修改原始dataBuffer
83+
for i := range data {
84+
data[i].Slot = slot
85+
}
8686

8787
if len(cw.dataBuffer) > cw.threshold {
8888
remaining := len(cw.dataBuffer) - cw.threshold

window/counting_window_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,35 @@ func TestCountingWindow(t *testing.T) {
5151
case <-time.After(2 * time.Second):
5252
t.Error("No results received within timeout")
5353
}
54-
assert.Len(t, cw.dataBuffer, 1)
54+
// 验证窗口状态:添加第4个数据后,第一个窗口已触发,剩余1个数据(值为3)
55+
// 继续添加2个数据,应该再次触发
56+
cw.Add(4) // 添加第5个数据
57+
cw.Add(5) // 添加第6个数据,应该再次触发(3,4,5)
58+
59+
// 等待第二次触发
60+
select {
61+
case res := <-resultsChan:
62+
assert.Len(t, res, 3)
63+
assert.Equal(t, 3, res[0].Data, "第二批第一个元素应该是3")
64+
assert.Equal(t, 4, res[1].Data, "第二批第二个元素应该是4")
65+
assert.Equal(t, 5, res[2].Data, "第二批第三个元素应该是5")
66+
case <-time.After(2 * time.Second):
67+
t.Error("No second results received within timeout")
68+
}
69+
5570
// Test case 2: Reset
5671
cw.Reset()
57-
assert.Len(t, cw.dataBuffer, 0)
72+
// Reset后添加数据验证重置是否成功
73+
cw.Add(100)
74+
cw.Add(101)
75+
cw.Add(102)
76+
select {
77+
case res := <-resultsChan:
78+
assert.Len(t, res, 3)
79+
assert.Equal(t, 100, res[0].Data, "重置后第一个元素应该是100")
80+
case <-time.After(2 * time.Second):
81+
t.Error("No results after reset received within timeout")
82+
}
5883
}
5984

6085
func TestCountingWindowBadThreshold(t *testing.T) {

window/tumbling_window.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func (tw *TumblingWindow) Start() {
115115
select {
116116
// 当定时器到期时,触发窗口。
117117
case <-tw.timer.C:
118+
// 在调用Trigger前不需要额外加锁,因为Trigger方法内部已经有锁保护
118119
tw.Trigger()
119120
// 当上下文被取消时,停止定时器并退出循环。
120121
case <-tw.ctx.Done():

0 commit comments

Comments
 (0)