Skip to content

Commit c7cece1

Browse files
committed
feat:window输出结果增加溢出策略
1 parent 09fe631 commit c7cece1

File tree

10 files changed

+705
-252
lines changed

10 files changed

+705
-252
lines changed

stream/manager_metrics.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
4242
dataChanCap := int64(cap(s.dataChan))
4343
s.dataChanMux.RUnlock()
4444

45-
return map[string]int64{
45+
stats := map[string]int64{
4646
InputCount: atomic.LoadInt64(&s.inputCount),
4747
OutputCount: atomic.LoadInt64(&s.outputCount),
4848
DroppedCount: atomic.LoadInt64(&s.droppedCount),
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
5555
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
5656
Expanding: int64(atomic.LoadInt32(&s.expanding)),
5757
}
58+
59+
if s.Window != nil {
60+
winStats := s.Window.GetStats()
61+
for k, v := range winStats {
62+
stats[k] = v
63+
}
64+
}
65+
66+
return stats
5867
}
5968

6069
// GetDetailedStats gets detailed performance statistics

streamsql_strategy_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package streamsql
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/rulego/streamsql/types"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
13+
func TestSQLIntegration_StrategyBlock(t *testing.T) {
14+
// 配置:输出缓冲为 1,阻塞策略,超时 100ms
15+
ssql := New(WithCustomPerformance(types.PerformanceConfig{
16+
BufferConfig: types.BufferConfig{
17+
DataChannelSize: 100,
18+
ResultChannelSize: 100,
19+
WindowOutputSize: 1,
20+
},
21+
OverflowConfig: types.OverflowConfig{
22+
Strategy: types.OverflowStrategyBlock,
23+
BlockTimeout: 100 * time.Millisecond,
24+
AllowDataLoss: true,
25+
},
26+
WorkerConfig: types.WorkerConfig{
27+
SinkPoolSize: 0, // 无缓冲任务队列
28+
SinkWorkerCount: 1, // 1个 worker
29+
},
30+
}))
31+
defer ssql.Stop()
32+
33+
// SQL: 每条数据触发一次窗口
34+
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
35+
err := ssql.Execute(rsql)
36+
require.NoError(t, err)
37+
38+
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
39+
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
40+
ssql.AddSyncSink(func(results []map[string]interface{}) {
41+
time.Sleep(500 * time.Millisecond)
42+
})
43+
44+
// 发送 5 条数据
45+
// d1: Worker 处理中 (阻塞 500ms)
46+
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
47+
// d3: Window OutputChan (size 1) -> 填满
48+
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
49+
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
50+
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
51+
// Window.Add 往 triggerChan 写.
52+
//
53+
// 修正分析:
54+
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
55+
// CountingWindow triggerChan size = bufferSize = 1.
56+
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
57+
//
58+
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
59+
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
60+
// 此时 Stream 持有 d2. OutputChan 空.
61+
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
62+
// 所以 OutputChan 是空的!
63+
// Wait, Stream loop:
64+
// result := <-OutputChan. (Stream has d2).
65+
// handleResult(d2) -> Blocked.
66+
// So OutputChan is empty.
67+
// d3: Worker读triggerChan -> OutputChan (d3). Success.
68+
// OutputChan has d3.
69+
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
70+
// Worker 协程 阻塞在 sendResult(d4).
71+
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
72+
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
73+
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
74+
//
75+
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
76+
// sendResult 只有在 OutputChan 满且超时时才 Drop.
77+
//
78+
// d4 阻塞在 sendResult.
79+
// 100ms 后超时 -> Drop d4.
80+
// Worker 继续.
81+
//
82+
// 所以 d4 应该是被 Drop 的那个.
83+
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
84+
// Wait, d5 is in triggerChan, not processed yet.
85+
// So Sent = 3. Dropped = 1 (d4).
86+
87+
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
88+
ssql.Emit(map[string]interface{}{"deviceId": id})
89+
time.Sleep(10 * time.Millisecond)
90+
}
91+
92+
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
93+
time.Sleep(1000 * time.Millisecond)
94+
95+
// 获取统计信息
96+
// d1: Stream 处理完
97+
// d2: Stream 处理完 (Worker 醒来后处理 d2)
98+
// d3: Dropped (Worker 阻塞 -> 超时)
99+
// d4: Dropped (Worker 阻塞 -> 超时)
100+
// d5: Dropped (Worker 阻塞 -> 超时)
101+
// Total Sent: 2 (d1, d2).
102+
// Dropped: 3 (d3, d4, d5).
103+
stats := ssql.stream.GetStats()
104+
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
105+
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
106+
}
107+
108+
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
109+
func TestSQLIntegration_StrategyDrop(t *testing.T) {
110+
// 配置:输出缓冲为 1,丢弃策略
111+
ssql := New(WithCustomPerformance(types.PerformanceConfig{
112+
BufferConfig: types.BufferConfig{
113+
DataChannelSize: 100,
114+
ResultChannelSize: 100,
115+
WindowOutputSize: 1,
116+
},
117+
OverflowConfig: types.OverflowConfig{
118+
Strategy: types.OverflowStrategyDrop,
119+
},
120+
}))
121+
defer ssql.Stop()
122+
123+
// SQL: 每条数据触发一次窗口
124+
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
125+
err := ssql.Execute(rsql)
126+
require.NoError(t, err)
127+
128+
// 连续发送 3 条数据
129+
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
130+
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
131+
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
132+
133+
// 等待处理完成
134+
time.Sleep(200 * time.Millisecond)
135+
136+
// 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加
137+
stats := ssql.stream.GetStats()
138+
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
139+
assert.Equal(t, int64(3), stats["sentCount"])
140+
141+
// 验证最终留在缓冲区的是最后一条数据 (d3)
142+
// 注意:AddSink 会启动 worker 从 OutputChan 读。
143+
// 为了验证,我们直接从 Window 的 OutputChan 读
144+
select {
145+
case result := <-ssql.stream.Window.OutputChan():
146+
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
147+
case <-time.After(100 * time.Millisecond):
148+
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面
149+
}
150+
}

types/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ import (
66
"github.com/rulego/streamsql/aggregator"
77
)
88

9+
const (
10+
// OverflowStrategyBlock blocks when buffer is full
11+
OverflowStrategyBlock = "block"
12+
// OverflowStrategyDrop drops data when buffer is full
13+
OverflowStrategyDrop = "drop"
14+
// OverflowStrategyExpand expands buffer when full (not implemented for windows yet)
15+
OverflowStrategyExpand = "expand"
16+
)
17+
918
// Config stream processing configuration
1019
type Config struct {
1120
// SQL processing related configuration

window/counting_window.go

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"reflect"
2323
"strings"
2424
"sync"
25+
"sync/atomic"
26+
"time"
2527

2628
"github.com/rulego/streamsql/utils/cast"
2729

@@ -75,12 +77,9 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) {
7577
}
7678

7779
// Use unified performance config to get window output buffer size
78-
bufferSize := 100 // Default value
79-
if (config.PerformanceConfig != types.PerformanceConfig{}) {
80+
bufferSize := 1000 // Default value
81+
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
8082
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
81-
if bufferSize < 10 {
82-
bufferSize = 10 // Minimum value
83-
}
8483
}
8584

8685
cw := &CountingWindow{
@@ -161,18 +160,7 @@ func (cw *CountingWindow) Start() {
161160
cw.callback(data)
162161
}
163162

164-
select {
165-
case cw.outputChan <- data:
166-
cw.mu.Lock()
167-
cw.sentCount++
168-
cw.mu.Unlock()
169-
case <-cw.ctx.Done():
170-
return
171-
default:
172-
cw.mu.Lock()
173-
cw.droppedCount++
174-
cw.mu.Unlock()
175-
}
163+
cw.sendResult(data)
176164
} else {
177165
cw.mu.Unlock()
178166
}
@@ -184,6 +172,58 @@ func (cw *CountingWindow) Start() {
184172
}()
185173
}
186174

175+
func (cw *CountingWindow) sendResult(data []types.Row) {
176+
strategy := cw.config.PerformanceConfig.OverflowConfig.Strategy
177+
timeout := cw.config.PerformanceConfig.OverflowConfig.BlockTimeout
178+
179+
if strategy == types.OverflowStrategyBlock {
180+
if timeout <= 0 {
181+
timeout = 5 * time.Second
182+
}
183+
select {
184+
case cw.outputChan <- data:
185+
atomic.AddInt64(&cw.sentCount, 1)
186+
case <-time.After(timeout):
187+
// Timeout, check if data loss is allowed
188+
if cw.config.PerformanceConfig.OverflowConfig.AllowDataLoss {
189+
atomic.AddInt64(&cw.droppedCount, 1)
190+
// Drop new data (simplest fallback for block)
191+
} else {
192+
atomic.AddInt64(&cw.droppedCount, 1)
193+
}
194+
case <-cw.ctx.Done():
195+
return
196+
}
197+
return
198+
}
199+
200+
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
201+
// If the buffer is full, remove the oldest item to make space for the new item.
202+
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
203+
select {
204+
case cw.outputChan <- data:
205+
atomic.AddInt64(&cw.sentCount, 1)
206+
case <-cw.ctx.Done():
207+
return
208+
default:
209+
// Try to drop oldest data to make room for new data
210+
select {
211+
case <-cw.outputChan:
212+
// Successfully dropped one old item
213+
select {
214+
case cw.outputChan <- data:
215+
atomic.AddInt64(&cw.sentCount, 1)
216+
default:
217+
// Still failed, drop current
218+
atomic.AddInt64(&cw.droppedCount, 1)
219+
}
220+
default:
221+
// Channel empty, try to send again or drop
222+
atomic.AddInt64(&cw.droppedCount, 1)
223+
}
224+
}
225+
}
226+
187227
func (cw *CountingWindow) Trigger() {
188228
// Note: trigger logic has been merged into Start method to avoid data races
189229
// This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method
@@ -211,17 +251,14 @@ func (cw *CountingWindow) Reset() {
211251
cw.dataBuffer = nil
212252
cw.keyedBuffer = make(map[string][]types.Row)
213253
cw.keyedCount = make(map[string]int)
214-
cw.sentCount = 0
215-
cw.droppedCount = 0
254+
atomic.StoreInt64(&cw.sentCount, 0)
255+
atomic.StoreInt64(&cw.droppedCount, 0)
216256
}
217257

218258
func (cw *CountingWindow) GetStats() map[string]int64 {
219-
cw.mu.Lock()
220-
defer cw.mu.Unlock()
221-
222259
return map[string]int64{
223-
"sentCount": cw.sentCount,
224-
"droppedCount": cw.droppedCount,
260+
"sentCount": atomic.LoadInt64(&cw.sentCount),
261+
"droppedCount": atomic.LoadInt64(&cw.droppedCount),
225262
"bufferSize": int64(cap(cw.outputChan)),
226263
"bufferUsed": int64(len(cw.outputChan)),
227264
}

window/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Window interface {
4242
OutputChan() <-chan []types.Row
4343
SetCallback(callback func([]types.Row))
4444
Trigger()
45+
GetStats() map[string]int64
4546
}
4647

4748
func CreateWindow(config types.WindowConfig) (Window, error) {

0 commit comments

Comments
 (0)