Skip to content

Commit d9607df

Browse files
committed
wip
1 parent c3eb925 commit d9607df

File tree

3 files changed

+48
-23
lines changed

3 files changed

+48
-23
lines changed

cshared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func FLBPluginInputPreRun(useHotReload C.int) int {
135135
return input.FLB_ERROR
136136
}
137137

138-
if err := instance.inputPreRun(); err != nil {
138+
if err := instance.resume(); err != nil {
139139
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
140140
return flbReturnCode(err)
141141
}
@@ -211,7 +211,7 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
211211
panic("plugin not initialized")
212212
}
213213

214-
if err := instance.outputPreRun(); err != nil {
214+
if err := instance.resume(); err != nil {
215215
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
216216
return flbReturnCode(err)
217217
}

cshared_test.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,27 +100,54 @@ func TestInputCallbackLifecycle(t *testing.T) {
100100
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
101101

102102
// Pre-run
103-
require.NoError(t, inst.inputPreRun())
103+
require.NoError(t, inst.resume())
104104
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
105105
"collect background loop should have started running")
106106
m1 := testMessage(map[string]any{"name": "m1"})
107107
m2 := testMessage(map[string]any{"name": "m2"})
108108
plugin.enqueue(m1)()
109109
plugin.enqueue(m2)()
110110

111-
require.ErrorContains(t, inst.inputPreRun(), `invalid plugin state "runnable"`)
111+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
112112

113113
// Callback
114-
time.Sleep(time.Second)
114+
//time.Sleep(time.Second)
115115
callbackBytes, callbackResp := testCallback(inst)
116116
require.Equal(t, input.FLB_OK, callbackResp)
117117
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
118118
require.True(t, plugin.collectRunning.Load())
119119

120-
// Stop
120+
// Stop (ensuring collect loop exits cleanly)
121+
plugin.onCollectDone = func(ch chan<- Message) {
122+
// Keep enqueueing after stop to ensure the plugin message channel wasn't closed early
123+
time.Sleep(10 * time.Millisecond)
124+
ch <- testMessage(map[string]any{"name": "m3"})
125+
}
121126
require.NoError(t, inst.stop())
122127
require.False(t, plugin.collectRunning.Load())
128+
require.NoError(t, inst.stop(), "stop should be idempotent")
129+
130+
callbackBytes, callbackResp = testCallback(inst)
131+
require.Equal(t, input.FLB_RETRY, callbackResp)
132+
assert.Empty(t, callbackBytes)
133+
134+
// Resume stopped pipeline
135+
require.NoError(t, inst.resume())
136+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
137+
callbackBytes, callbackResp = testCallback(inst)
138+
require.Equal(t, input.FLB_OK, callbackResp)
139+
assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued")
140+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
141+
"collect background loop should have started running")
142+
m4 := testMessage(map[string]any{"name": "m4"})
143+
plugin.enqueue(m4)()
144+
callbackBytes, callbackResp = testCallback(inst)
145+
require.Equal(t, input.FLB_OK, callbackResp)
146+
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
147+
148+
// Stop again
123149
require.NoError(t, inst.stop())
150+
require.False(t, plugin.collectRunning.Load())
124151
}
125152

126153
func testMessage(record map[string]any) Message {
@@ -147,6 +174,7 @@ type testInputPlugin struct {
147174
initCount atomic.Int64
148175
collectRunning atomic.Bool
149176
inputs chan *collectMessage
177+
onCollectDone func(ch chan<- Message)
150178
}
151179

152180
var _ InputPlugin = (*testInputPlugin)(nil)
@@ -166,6 +194,9 @@ func (t *testInputPlugin) Collect(ctx context.Context, ch chan<- Message) error
166194
ch <- m.msg
167195
m.collectedWG.Done()
168196
case <-ctx.Done():
197+
if t.onCollectDone != nil {
198+
t.onCollectDone(ch)
199+
}
169200
return nil
170201
}
171202
}
@@ -198,7 +229,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
198229
inst := newTestInputInstance(t, testPluginInputCallbackCtrlC{})
199230

200231
require.NoError(t, inst.init(nil))
201-
require.NoError(t, inst.inputPreRun())
232+
require.NoError(t, inst.resume())
202233

203234
cdone := make(chan struct{})
204235
timeout := time.After(1 * time.Second)
@@ -247,7 +278,7 @@ func TestInputCallbackDangle(t *testing.T) {
247278

248279
// prepare channel for input explicitly.
249280
require.NoError(t, inst.init(ptr))
250-
require.NoError(t, inst.inputPreRun())
281+
require.NoError(t, inst.resume())
251282

252283
go func() {
253284
ticker := time.NewTicker(collectInterval)
@@ -312,7 +343,7 @@ func TestInputCallbackInfinite(t *testing.T) {
312343

313344
// prepare channel for input explicitly.
314345
require.NoError(t, inst.init(ptr))
315-
require.NoError(t, inst.inputPreRun())
346+
require.NoError(t, inst.resume())
316347

317348
go func() {
318349
ticker := time.NewTicker(collectInterval)
@@ -387,7 +418,7 @@ func TestInputCallbackLatency(t *testing.T) {
387418

388419
// prepare channel for input explicitly.
389420
require.NoError(t, inst.init(nil))
390-
require.NoError(t, inst.inputPreRun())
421+
require.NoError(t, inst.resume())
391422

392423
go func() {
393424
ticker := time.NewTicker(collectInterval)
@@ -503,7 +534,7 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
503534

504535
// prepare channel for input explicitly.
505536
require.NoError(t, inst.init(ptr))
506-
require.NoError(t, inst.inputPreRun())
537+
require.NoError(t, inst.resume())
507538

508539
go func() {
509540
ticker := time.NewTicker(time.Second * 1)
@@ -716,7 +747,7 @@ func TestOutputFlush(t *testing.T) {
716747
}
717748
inst := newTestOutputInstance(t, out)
718749
require.NoError(t, inst.init(nil))
719-
require.NoError(t, inst.outputPreRun())
750+
require.NoError(t, inst.resume())
720751

721752
msg := Message{
722753
Time: now,

plugin.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,7 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
300300
}
301301

302302
// inputPreRun transitions an initialized input plugin into runnable state.
303-
func (p *pluginInstance) inputPreRun() error {
304-
p.mu.Lock()
305-
defer p.mu.Unlock()
306-
303+
func (p *pluginInstance) inputPreRunWithLock() error {
307304
// Only input plugins have a pre-run step
308305
if p.meta.input == nil {
309306
return nil
@@ -339,10 +336,7 @@ func (p *pluginInstance) inputPreRun() error {
339336
}
340337

341338
// outputPreRun transitions an output plugin into runnable state.
342-
func (p *pluginInstance) outputPreRun() error {
343-
p.mu.Lock()
344-
defer p.mu.Unlock()
345-
339+
func (p *pluginInstance) outputPreRunWithLock() error {
346340
// Only input plugins have a pre-run step
347341
if p.meta.output == nil {
348342
return fmt.Errorf("plugin is not an output plugin")
@@ -468,15 +462,15 @@ func (p *pluginInstance) resume() error {
468462
defer p.mu.Unlock()
469463

470464
if p.state != instanceStateInitialized {
471-
return fmt.Errorf("cannot resume plugin in state %q", p.state)
465+
return fmt.Errorf("invalid plugin state %q", p.state)
472466
}
473467

474468
if p.meta.input != nil {
475-
if err := p.inputPreRun(); err != nil {
469+
if err := p.inputPreRunWithLock(); err != nil {
476470
return err
477471
}
478472
} else if p.meta.output != nil {
479-
if err := p.outputPreRun(); err != nil {
473+
if err := p.outputPreRunWithLock(); err != nil {
480474
return err
481475
}
482476
}

0 commit comments

Comments
 (0)