Skip to content

Commit c3eb925

Browse files
committed
wip
1 parent 8712278 commit c3eb925

File tree

3 files changed

+147
-22
lines changed

3 files changed

+147
-22
lines changed

cshared.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -506,17 +506,18 @@ func makeMetrics(cmp *cmetrics.Context) Metrics {
506506
}
507507
}
508508

509-
// testFLBPluginInputCallback cannot be in a test file since test files can't use CGO.
510-
func testFLBPluginInputCallback(inst *pluginInstance) ([]byte, error) {
509+
// testCallback invokes the callback and returns the bytes outputted from it.
510+
// This cannot be in the test file since test files can't use CGO.
511+
func testCallback(inst *pluginInstance) ([]byte, int) {
511512
data := unsafe.Pointer(nil)
512513
var csize C.size_t
513514

514-
inst.callback(&data, &csize)
515+
retCode := inst.callback(&data, &csize)
515516

516517
if data == nil {
517-
return []byte{}, nil
518+
return []byte{}, retCode
518519
}
519520

520521
defer C.free(data)
521-
return C.GoBytes(data, C.int(csize)), nil
522+
return C.GoBytes(data, C.int(csize)), retCode
522523
}

cshared_test.go

Lines changed: 134 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/vmihailenco/msgpack/v5"
1919

2020
"github.com/calyptia/cmetrics-go"
21+
"github.com/calyptia/plugin/input"
2122
"github.com/calyptia/plugin/metric"
2223
)
2324

@@ -45,7 +46,21 @@ func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance {
4546
desc: "test plugin",
4647
input: input,
4748
}))
48-
t.Cleanup(func() { assert.NoError(t, inst.stop()) })
49+
t.Cleanup(func() {
50+
stopErr := make(chan error)
51+
go func() {
52+
stopErr <- inst.stop()
53+
}()
54+
55+
select {
56+
case err := <-stopErr:
57+
assert.NoError(t, err)
58+
return
59+
case <-time.After(time.Second):
60+
require.Fail(t, "timed out waiting for instance to stop")
61+
}
62+
})
63+
4964
return inst
5065
}
5166

@@ -69,6 +84,116 @@ func pluginInstanceWithFakes(inst *pluginInstance) *pluginInstance {
6984
})
7085
}
7186

87+
func TestInputCallbackLifecycle(t *testing.T) {
88+
plugin := newTestInputPlugin()
89+
inst := newTestInputInstance(t, plugin)
90+
91+
// Initialization
92+
require.NoError(t, inst.init(nil))
93+
require.Equal(t, int64(1), plugin.initCount.Load())
94+
95+
require.ErrorContains(t, inst.init(nil), `unexpected plugin state "initialized"`)
96+
require.Equal(t, int64(1), plugin.initCount.Load(), "initialization should only run once")
97+
98+
// Early attempt to callback
99+
_, callbackResp := testCallback(inst)
100+
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
101+
102+
// Pre-run
103+
require.NoError(t, inst.inputPreRun())
104+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
105+
"collect background loop should have started running")
106+
m1 := testMessage(map[string]any{"name": "m1"})
107+
m2 := testMessage(map[string]any{"name": "m2"})
108+
plugin.enqueue(m1)()
109+
plugin.enqueue(m2)()
110+
111+
require.ErrorContains(t, inst.inputPreRun(), `invalid plugin state "runnable"`)
112+
113+
// Callback
114+
time.Sleep(time.Second)
115+
callbackBytes, callbackResp := testCallback(inst)
116+
require.Equal(t, input.FLB_OK, callbackResp)
117+
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
118+
require.True(t, plugin.collectRunning.Load())
119+
120+
// Stop
121+
require.NoError(t, inst.stop())
122+
require.False(t, plugin.collectRunning.Load())
123+
require.NoError(t, inst.stop())
124+
}
125+
126+
func testMessage(record map[string]any) Message {
127+
tag := ""
128+
return Message{
129+
Time: time.Now().UTC(),
130+
Record: record,
131+
tag: &tag,
132+
}
133+
}
134+
135+
type collectMessage struct {
136+
msg Message
137+
collectedWG sync.WaitGroup
138+
}
139+
140+
func newTestInputPlugin() *testInputPlugin {
141+
return &testInputPlugin{
142+
inputs: make(chan *collectMessage),
143+
}
144+
}
145+
146+
type testInputPlugin struct {
147+
initCount atomic.Int64
148+
collectRunning atomic.Bool
149+
inputs chan *collectMessage
150+
}
151+
152+
var _ InputPlugin = (*testInputPlugin)(nil)
153+
154+
func (t *testInputPlugin) Init(ctx context.Context, fbit *Fluentbit) error {
155+
t.initCount.Add(1)
156+
return nil
157+
}
158+
159+
func (t *testInputPlugin) Collect(ctx context.Context, ch chan<- Message) error {
160+
t.collectRunning.Store(true)
161+
defer t.collectRunning.Store(false)
162+
163+
for {
164+
select {
165+
case m := <-t.inputs:
166+
ch <- m.msg
167+
m.collectedWG.Done()
168+
case <-ctx.Done():
169+
return nil
170+
}
171+
}
172+
}
173+
174+
func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) {
175+
cm := &collectMessage{msg: m}
176+
cm.collectedWG.Add(1)
177+
t.inputs <- cm
178+
179+
return cm.collectedWG.Wait
180+
}
181+
182+
func decodeMessages(t testing.TB, msgpackBytes []byte) []Message {
183+
var messages []Message
184+
185+
dec := msgpack.NewDecoder(bytes.NewReader(msgpackBytes))
186+
for {
187+
msg, err := decodeMsg(dec, "")
188+
if errors.Is(err, io.EOF) {
189+
return messages
190+
}
191+
require.NoError(t, err)
192+
193+
messages = append(messages, msg)
194+
}
195+
}
196+
72197
func TestInputCallbackCtrlC(t *testing.T) {
73198
inst := newTestInputInstance(t, testPluginInputCallbackCtrlC{})
74199

@@ -78,10 +203,8 @@ func TestInputCallbackCtrlC(t *testing.T) {
78203
cdone := make(chan struct{})
79204
timeout := time.After(1 * time.Second)
80205

81-
ptr := unsafe.Pointer(nil)
82-
83206
go func() {
84-
inst.callback(&ptr, nil)
207+
testCallback(inst)
85208
close(cdone)
86209
}()
87210

@@ -130,12 +253,11 @@ func TestInputCallbackDangle(t *testing.T) {
130253
ticker := time.NewTicker(collectInterval)
131254
defer ticker.Stop()
132255

133-
inst.callback(&ptr, nil)
256+
testCallback(inst)
134257
for {
135258
select {
136259
case <-ticker.C:
137-
//FLBPluginInputCallback(&ptr, nil)
138-
inst.callback(&ptr, nil)
260+
testCallback(inst)
139261
case <-cdone:
140262
return
141263
}
@@ -199,7 +321,7 @@ func TestInputCallbackInfinite(t *testing.T) {
199321
for {
200322
select {
201323
case <-ticker.C:
202-
inst.callback(&ptr, nil)
324+
testCallback(inst)
203325
if ptr != nil {
204326
close(cdone)
205327
return
@@ -271,7 +393,7 @@ func TestInputCallbackLatency(t *testing.T) {
271393
ticker := time.NewTicker(collectInterval)
272394
defer ticker.Stop()
273395

274-
buf, _ := testFLBPluginInputCallback(inst)
396+
buf, _ := testCallback(inst)
275397
if len(buf) > 0 {
276398
cmsg <- buf
277399
}
@@ -283,7 +405,7 @@ func TestInputCallbackLatency(t *testing.T) {
283405
t.Log("---- collect done")
284406
return
285407
case <-ticker.C:
286-
buf, _ := testFLBPluginInputCallback(inst)
408+
buf, _ := testCallback(inst)
287409
if len(buf) > 0 {
288410
cmsg <- buf
289411
}
@@ -387,15 +509,13 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
387509
ticker := time.NewTicker(time.Second * 1)
388510
defer ticker.Stop()
389511

390-
//FLBPluginInputCallback(&ptr, nil)
391-
inst.callback(&ptr, nil)
512+
testCallback(inst)
392513
close(cstarted)
393514

394515
for {
395516
select {
396517
case <-ticker.C:
397-
//FLBPluginInputCallback(&ptr, nil)
398-
inst.callback(&ptr, nil)
518+
testCallback(inst)
399519
case <-inst.runCtx.Done():
400520
return
401521
}

plugin.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ type pluginInstance struct {
187187
meta pluginMetadata
188188
cmetricsCtxProvider cmetricsContextProvider
189189
configLoaderProvider configLoaderProvider
190-
runningWG sync.WaitGroup
190+
runningWG sync.WaitGroup // Number of running preRun and callback methods.
191191

192192
// mu protects all members below.
193193
// It is generally held during state checks and transitions but not during long-running callbacks.
@@ -319,7 +319,10 @@ func (p *pluginInstance) inputPreRun() error {
319319
p.msgChannel = make(chan Message, p.maxBufferedMessages)
320320
p.state = instanceStateRunnable
321321

322+
p.runningWG.Add(1)
322323
go func() {
324+
defer p.runningWG.Done()
325+
323326
err := p.meta.input.Collect(runCtx, p.msgChannel)
324327
if err != nil {
325328
fmt.Fprintf(os.Stderr, "collect error: %v\n", err)
@@ -353,8 +356,8 @@ func (p *pluginInstance) outputPreRun() error {
353356
p.msgChannel = make(chan Message)
354357
p.state = instanceStateRunnable
355358

359+
p.runningWG.Add(1)
356360
go func() {
357-
p.runningWG.Add(1)
358361
defer p.runningWG.Done()
359362
if err := p.meta.output.Flush(p.runCtx, p.msgChannel); err != nil {
360363
fmt.Fprintf(os.Stderr, "FLBPluginOutputPreRun error: %v\n", err)
@@ -377,6 +380,7 @@ func (p *pluginInstance) outputPreRun() error {
377380
func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
378381
p.mu.Lock()
379382
if p.state != instanceStateRunnable {
383+
p.mu.Unlock()
380384
return input.FLB_RETRY
381385
}
382386

@@ -437,7 +441,7 @@ func (p *pluginInstance) stop() error {
437441
defer p.mu.Unlock()
438442

439443
if p.state != instanceStateRunnable && p.state != instanceStatePreExit {
440-
return fmt.Errorf("cannot stop plugin in state %q", p.state)
444+
return nil
441445
}
442446

443447
if p.runCancel != nil {

0 commit comments

Comments
 (0)