Skip to content

Commit fd1dd6c

Browse files
authored
feat(window): support statewindow partition by (#3936)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 35ae471 commit fd1dd6c

File tree

10 files changed

+153
-21
lines changed

10 files changed

+153
-21
lines changed

.github/workflows/markdown_config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"MD045": false,
1111
"MD046": false,
1212
"MD051": false,
13-
"MD059": false
13+
"MD059": false,
14+
"MD060": false
1415
}

docs/en_US/sqls/windows.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,30 @@ Output:
112112
[{"a":2},{"a":2}]
113113
```
114114

115+
#### State Window Partitioning
116+
117+
We can perform partitioning calculations on a state window using the `partition by` clause, as follows:
118+
119+
```sql
120+
SELECT * from demo group by statewindow(a = 1, a = 5) over (partition by b)
121+
```
122+
123+
For the following input:
124+
125+
```txt
126+
{"a":1,"b":1}
127+
{"a":1,"b":2}
128+
{"a":5,"b":1}
129+
```
130+
131+
The output is as follows:
132+
133+
```txt
134+
[{"a":1,"b":1},{"a":5,"b":1}]
135+
```
136+
137+
The partition `b=2` didn't be output due to the partition haven't trigger the condition yet.
138+
115139
## Count window
116140

117141
Please notice that the count window does not concern time, it only concern about events count.

docs/zh_CN/sqls/windows.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,30 @@ SELECT * from demo group by statewindow(a > 1)
114114
[{"a":2},{"a":2}]
115115
```
116116

117+
#### 条件窗口分区
118+
119+
我们可以通过 partition by 子句对条件窗口进行分区计算,如下:
120+
121+
```sql
122+
SELECT * from demo group by statewindow(a = 1, a = 5) over (partition by b)
123+
```
124+
125+
此时对于如下输入:
126+
127+
```txt
128+
{"a":1,"b":1}
129+
{"a":1,"b":2}
130+
{"a":5,"b":1}
131+
```
132+
133+
输出如下:
134+
135+
```txt
136+
[{"a":1,"b":1},{"a":5,"b":1}]
137+
```
138+
139+
`b=2` 的数据并没有被输出,因为该分区窗口并未满足条件。
140+
117141
## 计数窗口
118142

119143
请注意计数窗口不关注时间,只关注事件发生的次数。

internal/topo/node/window_op.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ type WindowConfig struct {
5656
SingleCondition ast.Expr
5757
BeginCondition ast.Expr
5858
EmitCondition ast.Expr
59-
Dimensions ast.Dimensions
59+
60+
PartitionExpr *ast.PartitionExpr
6061
}
6162

6263
type WindowOperator struct {

internal/topo/node/window_v2_op.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func init() {
3939
gob.Register(&WindowScanner{})
4040
gob.Register(time.Time{})
4141
gob.Register(&StateWindowStatus{})
42+
gob.Register(map[string]*StateWindowStatus{})
4243
}
4344

4445
type WindowV2Operator struct {
@@ -106,7 +107,7 @@ type WindowV2Exec interface {
106107
type StateWindowOp struct {
107108
*WindowV2Operator
108109
status map[string]*StateWindowStatus
109-
Dimensions ast.Dimensions
110+
PartitionExpr *ast.PartitionExpr
110111
SingleCondition ast.Expr
111112
BeginCondition ast.Expr
112113
EmitCondition ast.Expr
@@ -127,8 +128,8 @@ func NewStateWindowOp(o *WindowV2Operator) *StateWindowOp {
127128
EmitCondition: o.windowConfig.EmitCondition,
128129
SingleCondition: o.windowConfig.SingleCondition,
129130
stateFuncs: o.windowConfig.StateFuncs,
130-
Dimensions: o.windowConfig.Dimensions,
131131
status: make(map[string]*StateWindowStatus),
132+
PartitionExpr: o.windowConfig.PartitionExpr,
132133
}
133134
}
134135

@@ -145,6 +146,23 @@ func (s *StateWindowOp) emit(ctx api.StreamContext, status *StateWindowStatus) {
145146
s.onSend(ctx, results)
146147
}
147148

149+
func calPartition(fv *xsql.FunctionValuer, partitionExpr *ast.PartitionExpr, row *xsql.Tuple) string {
150+
name := "parKey_"
151+
if partitionExpr == nil {
152+
return name
153+
}
154+
ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(row, fv, &xsql.WildcardValuer{Data: row})}
155+
for _, expr := range partitionExpr.Exprs {
156+
r := ve.Eval(expr)
157+
if _, ok := r.(error); ok {
158+
continue
159+
} else {
160+
name += fmt.Sprintf("%v,", r)
161+
}
162+
}
163+
return name
164+
}
165+
148166
func (s *StateWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
149167
v, err := ctx.GetState(V2WindowInputsKey)
150168
if err == nil && v != nil {
@@ -166,7 +184,7 @@ func (s *StateWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
166184
s.onProcessStart(ctx, input)
167185
switch row := data.(type) {
168186
case *xsql.Tuple:
169-
name := calDimension(fv, s.Dimensions, row)
187+
name := calPartition(fv, s.PartitionExpr, row)
170188
status, ok := s.status[name]
171189
if !ok {
172190
status = &StateWindowStatus{

internal/topo/node/window_v2_op_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,65 @@ func init() {
3737
testx.InitEnv("node_test")
3838
}
3939

40+
func TestStateWindowStatePartition(t *testing.T) {
41+
conf.IsTesting = true
42+
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
43+
now := time.Now()
44+
o := &def.RuleOption{
45+
BufferLength: 10,
46+
}
47+
kv, err := store.GetKV("stream")
48+
require.NoError(t, err)
49+
require.NoError(t, prepareStream())
50+
sql := "select count(*) from stream group by statewindow(a =1 , a = 2) over (partition by b)"
51+
stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse()
52+
require.NoError(t, err)
53+
p, err := planner.CreateLogicalPlan(stmt, o, kv)
54+
require.NoError(t, err)
55+
require.NotNil(t, p)
56+
windowPlan := extractWindowPlan(p)
57+
require.NotNil(t, windowPlan)
58+
op, err := node.NewWindowV2Op("window", node.WindowConfig{
59+
Type: windowPlan.WindowType(),
60+
BeginCondition: windowPlan.GetBeginCondition(),
61+
EmitCondition: windowPlan.GetEmitCondition(),
62+
PartitionExpr: windowPlan.GetPartitionExpr(),
63+
}, o)
64+
require.NoError(t, err)
65+
require.NotNil(t, op)
66+
input, _ := op.GetInput()
67+
output := make(chan any, 10)
68+
op.AddOutput(output, "output")
69+
errCh := make(chan error, 10)
70+
op.Exec(ctx, errCh)
71+
waitExecute()
72+
input <- &xsql.Tuple{Message: map[string]any{"a": true}, Timestamp: now}
73+
input <- &xsql.Tuple{Message: map[string]any{"a": false}, Timestamp: now.Add(500 * time.Millisecond)}
74+
waitExecute()
75+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1), "b": int64(1)}, Timestamp: now}
76+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1), "b": int64(2)}, Timestamp: now}
77+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(2), "b": int64(1)}, Timestamp: now.Add(500 * time.Millisecond)}
78+
waitExecute()
79+
got := <-output
80+
wt, ok := got.(*xsql.WindowTuples)
81+
require.True(t, ok)
82+
require.NotNil(t, wt)
83+
d := wt.ToMaps()
84+
require.Equal(t, []map[string]any{
85+
{
86+
"a": int64(1),
87+
"b": int64(1),
88+
},
89+
{
90+
"a": int64(2),
91+
"b": int64(1),
92+
},
93+
}, d)
94+
cancel()
95+
waitExecute()
96+
op.Close()
97+
}
98+
4099
func TestStateWindowState(t *testing.T) {
41100
conf.IsTesting = true
42101
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()

internal/topo/planner/planner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources ma
423423
BeginCondition: t.beginCondition,
424424
EmitCondition: t.emitCondition,
425425
SingleCondition: t.singleCondition,
426-
Dimensions: t.dimensions,
426+
PartitionExpr: t.PartitionExpr,
427427
StateFuncs: t.stateFuncs,
428428
}
429429
// state window only support v2 window
@@ -644,7 +644,7 @@ func createLogicalPlanFull(stmt *ast.SelectStatement, opt *def.RuleOption, store
644644
beginCondition: w.BeginCondition,
645645
emitCondition: w.EmitCondition,
646646
singleCondition: w.SingleCondition,
647-
dimensions: dimensions.GetGroups(),
647+
PartitionExpr: w.PartitionExpr,
648648
}.Init()
649649
if w.Length != nil {
650650
wp.length = int(w.Length.Val)

internal/topo/planner/windowPlan.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
type WindowPlan struct {
2626
baseLogicalPlan
27-
dimensions ast.Dimensions
27+
PartitionExpr *ast.PartitionExpr
2828
singleCondition ast.Expr
2929
beginCondition ast.Expr
3030
emitCondition ast.Expr
@@ -67,6 +67,10 @@ func (p *WindowPlan) GetEmitCondition() ast.Expr {
6767
return p.emitCondition
6868
}
6969

70+
func (p *WindowPlan) GetPartitionExpr() *ast.PartitionExpr {
71+
return p.PartitionExpr
72+
}
73+
7074
func (p *WindowPlan) BuildExplainInfo() {
7175
t := p.wtype.String()
7276
info := "{ length:" + strconv.Itoa(p.length) + ", "

internal/xsql/parser.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -972,13 +972,10 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
972972
win.Filter = f
973973
}
974974
// parse over when clause
975-
c, err := p.ParseOver4Window()
975+
err = p.ParseOver4Window(win)
976976
if err != nil {
977977
return nil, err
978-
} else if c != nil {
979-
win.TriggerCondition = c
980978
}
981-
982979
return win, nil
983980
}
984981
}
@@ -1616,25 +1613,28 @@ func (p *Parser) parseStreamOptions() (*ast.Options, error) {
16161613
return opts, nil
16171614
}
16181615

1619-
func (p *Parser) ParseOver4Window() (ast.Expr, error) {
1616+
func (p *Parser) ParseOver4Window(win *ast.Window) error {
16201617
if tok, _ := p.scanIgnoreWhitespace(); tok != ast.OVER {
16211618
p.unscan()
1622-
return nil, nil
1619+
return nil
16231620
}
16241621
if tok, lit := p.scanIgnoreWhitespace(); tok != ast.LPAREN {
1625-
return nil, fmt.Errorf("Found %q after OVER, expect parentheses.", lit)
1622+
return fmt.Errorf("Found %q after OVER, expect parentheses.", lit)
16261623
}
1627-
if tok, lit := p.scanIgnoreWhitespace(); tok != ast.WHEN {
1628-
return nil, fmt.Errorf("Found %q after OVER(, expect WHEN.", lit)
1624+
var err error
1625+
win.TriggerCondition, err = p.parseWhen()
1626+
if err != nil {
1627+
return err
16291628
}
1630-
expr, err := p.ParseExpr()
1629+
win.PartitionExpr, err = p.parsePartitionBy()
16311630
if err != nil {
1632-
return nil, err
1631+
return err
16331632
}
1633+
16341634
if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
1635-
return nil, fmt.Errorf("Found %q after OVER, expect right parentheses.", lit)
1635+
return fmt.Errorf("Found %q after OVER, expect right parentheses.", lit)
16361636
}
1637-
return expr, nil
1637+
return nil
16381638
}
16391639

16401640
// Only support filter on window now

pkg/ast/statement.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ func (w WindowType) String() string {
205205
}
206206

207207
type Window struct {
208+
PartitionExpr *PartitionExpr
208209
TriggerCondition Expr
209210
SingleCondition Expr
210211
BeginCondition Expr

0 commit comments

Comments
 (0)