Skip to content

Commit 76c5c53

Browse files
authored
fix(sink): fix tuple collector batcher (#3898)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 1737237 commit 76c5c53

File tree

4 files changed

+331
-6
lines changed

4 files changed

+331
-6
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2025 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package node
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/lf-edge/ekuiper/contract/v2/api"
21+
22+
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
23+
"github.com/lf-edge/ekuiper/v2/internal/xsql"
24+
"github.com/lf-edge/ekuiper/v2/pkg/infra"
25+
)
26+
27+
type BatchMergerOp struct {
28+
*defaultSinkNode
29+
// save lastRow to get the props
30+
lastRow any
31+
wt *xsql.WindowTuples
32+
}
33+
34+
func NewBatchMergerOp(name string, rOpt *def.RuleOption) (*BatchMergerOp, error) {
35+
return &BatchMergerOp{
36+
defaultSinkNode: newDefaultSinkNode(name, rOpt),
37+
}, nil
38+
}
39+
40+
// Exec decode op receives map/[]map and converts it to []map.
41+
func (o *BatchMergerOp) Exec(ctx api.StreamContext, errCh chan<- error) {
42+
o.prepareExec(ctx, errCh, "op")
43+
go func() {
44+
defer func() {
45+
o.Close()
46+
}()
47+
err := infra.SafeRun(func() error {
48+
count := 0
49+
for {
50+
select {
51+
case <-ctx.Done():
52+
ctx.GetLogger().Infof("batch writer node %s is finished", o.name)
53+
return nil
54+
case item := <-o.input:
55+
data, processed := o.ingest(ctx, item)
56+
if processed {
57+
break
58+
}
59+
switch dt := data.(type) {
60+
case xsql.BatchEOFTuple:
61+
if count > 0 {
62+
o.Broadcast(o.wt)
63+
o.onSend(ctx, o.wt)
64+
count = 0
65+
o.lastRow = nil
66+
o.wt = nil
67+
}
68+
case *xsql.SliceTuple:
69+
o.onProcessStart(ctx, data)
70+
o.appendWindowTuples(dt)
71+
o.onProcessEnd(ctx)
72+
count++
73+
case xsql.Row:
74+
o.onProcessStart(ctx, data)
75+
o.appendWindowTuples(dt)
76+
o.onProcessEnd(ctx)
77+
o.lastRow = dt
78+
count++
79+
case api.MessageTupleList:
80+
o.onProcessStart(ctx, data)
81+
// TODO: find a way to avoid using ToMaps
82+
for _, m := range dt.ToMaps() {
83+
row := &xsql.Tuple{
84+
Message: m,
85+
}
86+
o.appendWindowTuples(row)
87+
}
88+
o.onProcessEnd(ctx)
89+
o.lastRow = dt
90+
count++
91+
default:
92+
o.onError(ctx, fmt.Errorf("unknown data type: %T", data))
93+
}
94+
}
95+
}
96+
})
97+
if err != nil {
98+
infra.DrainError(ctx, err, errCh)
99+
}
100+
}()
101+
}
102+
103+
func (o *BatchMergerOp) ingest(ctx api.StreamContext, item any) (any, bool) {
104+
ctx.GetLogger().Debugf("receive %v", item)
105+
item, processed := o.preprocess(ctx, item)
106+
if processed {
107+
return item, processed
108+
}
109+
switch d := item.(type) {
110+
case error:
111+
if o.sendError {
112+
o.Broadcast(d)
113+
}
114+
return nil, true
115+
case *xsql.WatermarkTuple, xsql.EOFTuple:
116+
o.Broadcast(d)
117+
return nil, true
118+
}
119+
return item, false
120+
}
121+
122+
func (o *BatchMergerOp) appendWindowTuples(row xsql.Row) {
123+
if o.wt == nil {
124+
o.wt = &xsql.WindowTuples{
125+
Content: make([]xsql.Row, 0),
126+
}
127+
}
128+
o.wt.Content = append(o.wt.Content, row)
129+
o.lastRow = row
130+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright 2025 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package node
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
24+
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
25+
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
26+
"github.com/lf-edge/ekuiper/v2/internal/xsql"
27+
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
28+
"github.com/lf-edge/ekuiper/v2/pkg/model"
29+
)
30+
31+
func TestNewBatchMergerOp(t *testing.T) {
32+
op, err := NewBatchMergerOp("op1", &def.RuleOption{BufferLength: 10, SendError: true})
33+
require.NoError(t, err)
34+
assert.NotNil(t, op)
35+
assert.Equal(t, "op1", op.name)
36+
}
37+
38+
func TestBatchMergerOpRun(t *testing.T) {
39+
testcases := []struct {
40+
name string
41+
input []any
42+
err string
43+
expect int
44+
}{
45+
{
46+
name: "single tuple",
47+
input: []any{
48+
&xsql.Tuple{
49+
Emitter: "test",
50+
Message: map[string]any{
51+
"b": 12,
52+
},
53+
},
54+
},
55+
expect: 1,
56+
},
57+
{
58+
name: "multiple tuples",
59+
input: []any{
60+
&xsql.Tuple{
61+
Emitter: "test",
62+
Message: map[string]any{
63+
"b": 12,
64+
},
65+
},
66+
&xsql.Tuple{
67+
Emitter: "test",
68+
Message: map[string]any{
69+
"a": "a",
70+
"b": 20,
71+
"c": "hello",
72+
},
73+
},
74+
},
75+
expect: 2,
76+
},
77+
{
78+
name: "slice tuple",
79+
input: []any{
80+
&xsql.SliceTuple{
81+
SourceContent: model.SliceVal{nil, 12},
82+
},
83+
&xsql.SliceTuple{
84+
SourceContent: model.SliceVal{"a", 20},
85+
},
86+
},
87+
expect: 2,
88+
},
89+
{
90+
name: "transformed tuple list",
91+
input: []any{
92+
&xsql.TransformedTupleList{
93+
Maps: []map[string]any{
94+
{
95+
"a": "a",
96+
"b": 20,
97+
"c": "hello",
98+
},
99+
{
100+
"a": "a2",
101+
},
102+
},
103+
},
104+
},
105+
expect: 2,
106+
},
107+
}
108+
mc := mockclock.GetMockClock()
109+
ctx := mockContext.NewMockContext("testBatchMergerOpRun", "op1")
110+
for _, tc := range testcases {
111+
t.Run(tc.name, func(t *testing.T) {
112+
op, err := NewBatchMergerOp("test", &def.RuleOption{BufferLength: 10, SendError: true})
113+
require.NoError(t, err)
114+
out := make(chan any, 100)
115+
err = op.AddOutput(out, "test")
116+
require.NoError(t, err)
117+
errCh := make(chan error)
118+
op.Exec(ctx, errCh)
119+
for _, item := range tc.input {
120+
op.input <- item
121+
mc.Add(30 * time.Millisecond)
122+
}
123+
op.input <- xsql.BatchEOFTuple(time.Now())
124+
result := <-out
125+
if tc.err != "" {
126+
e, ok := result.(error)
127+
if ok {
128+
assert.EqualError(t, e, tc.err)
129+
} else {
130+
assert.Fail(t, "expected error", tc.err)
131+
}
132+
} else {
133+
wt, ok := result.(*xsql.WindowTuples)
134+
assert.True(t, ok, "expected WindowTuples, got %T", result)
135+
if tc.expect > 0 {
136+
assert.NotNil(t, wt)
137+
assert.Equal(t, tc.expect, len(wt.Content))
138+
} else {
139+
assert.Nil(t, result)
140+
}
141+
}
142+
})
143+
}
144+
}
145+
146+
func TestBatchMergerOpErrorHandling(t *testing.T) {
147+
ctx := mockContext.NewMockContext("testBatchMergerOpErrorHandling", "op1")
148+
op, err := NewBatchMergerOp("test", &def.RuleOption{BufferLength: 10, SendError: true})
149+
require.NoError(t, err)
150+
out := make(chan any, 100)
151+
err = op.AddOutput(out, "test")
152+
require.NoError(t, err)
153+
errCh := make(chan error)
154+
op.Exec(ctx, errCh)
155+
testErr := assert.AnError
156+
op.input <- testErr
157+
result := <-out
158+
e, ok := result.(error)
159+
assert.True(t, ok)
160+
assert.Equal(t, testErr, e)
161+
}
162+
163+
func TestBatchMergerOpWatermarkAndEOF(t *testing.T) {
164+
ctx := mockContext.NewMockContext("testBatchMergerOpWatermarkAndEOF", "op1")
165+
op, err := NewBatchMergerOp("test", &def.RuleOption{BufferLength: 10, SendError: true})
166+
require.NoError(t, err)
167+
out := make(chan any, 100)
168+
err = op.AddOutput(out, "test")
169+
require.NoError(t, err)
170+
errCh := make(chan error)
171+
op.Exec(ctx, errCh)
172+
watermark := &xsql.WatermarkTuple{Timestamp: time.Now()}
173+
op.input <- watermark
174+
result := <-out
175+
assert.Equal(t, watermark, result)
176+
eof := xsql.EOFTuple("")
177+
op.input <- eof
178+
result = <-out
179+
assert.Equal(t, eof, result)
180+
}

internal/topo/node/sink_node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ func tupleCollect(ctx api.StreamContext, sink api.Sink, data any) (err error) {
235235
case api.MessageTuple:
236236
err = sink.(api.TupleCollector).Collect(ctx, d)
237237
case *xsql.RawTuple: // may receive raw tuple from data template
238+
// TODO: whether needs to consider json list?
238239
var message map[string]any
239240
err = json.Unmarshal(d.Rawdata, &message)
240241
if err != nil {

internal/topo/planner/planner_sink.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,30 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti
210210
}
211211
index++
212212
result = append(result, transformOp)
213+
_, isTupleCollector := s.(api.TupleCollector)
214+
_, isBytesCollector := s.(api.BytesCollector)
215+
if !isBytesCollector && !isTupleCollector {
216+
return nil, fmt.Errorf("sink %s does not implement any collector", sinkName)
217+
}
213218
if batchEnabled {
214-
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, schema, sc)
215-
if err != nil {
216-
return nil, err
219+
if isBytesCollector {
220+
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, schema, sc)
221+
if err != nil {
222+
return nil, err
223+
}
224+
index++
225+
result = append(result, batchWriterOp)
226+
} else if isTupleCollector {
227+
batchMergerOp, err := node.NewBatchMergerOp(fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options)
228+
if err != nil {
229+
return nil, err
230+
}
231+
index++
232+
result = append(result, batchMergerOp)
217233
}
218-
index++
219-
result = append(result, batchWriterOp)
220234
}
221235
// Encode will convert the result to []byte
222-
if _, ok := s.(api.BytesCollector); ok {
236+
if isBytesCollector {
223237
if !batchEnabled {
224238
encodeOp, err := node.NewEncodeOp(tp.GetContext(), fmt.Sprintf("%s_%d_encode", sinkName, index), options, schema, sc)
225239
if err != nil {

0 commit comments

Comments
 (0)