Skip to content

Commit beb6f18

Browse files
committed
wip
1 parent c3eb925 commit beb6f18

File tree

3 files changed

+62
-31
lines changed

3 files changed

+62
-31
lines changed

cshared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func FLBPluginInputPreRun(useHotReload C.int) int {
135135
return input.FLB_ERROR
136136
}
137137

138-
if err := instance.inputPreRun(); err != nil {
138+
if err := instance.resume(); err != nil {
139139
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
140140
return flbReturnCode(err)
141141
}
@@ -211,7 +211,7 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
211211
panic("plugin not initialized")
212212
}
213213

214-
if err := instance.outputPreRun(); err != nil {
214+
if err := instance.resume(); err != nil {
215215
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
216216
return flbReturnCode(err)
217217
}

cshared_test.go

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,29 +100,57 @@ func TestInputCallbackLifecycle(t *testing.T) {
100100
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
101101

102102
// Pre-run
103-
require.NoError(t, inst.inputPreRun())
103+
require.NoError(t, inst.resume())
104104
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
105105
"collect background loop should have started running")
106106
m1 := testMessage(map[string]any{"name": "m1"})
107107
m2 := testMessage(map[string]any{"name": "m2"})
108108
plugin.enqueue(m1)()
109109
plugin.enqueue(m2)()
110110

111-
require.ErrorContains(t, inst.inputPreRun(), `invalid plugin state "runnable"`)
111+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
112112

113113
// Callback
114-
time.Sleep(time.Second)
114+
//time.Sleep(time.Second)
115115
callbackBytes, callbackResp := testCallback(inst)
116116
require.Equal(t, input.FLB_OK, callbackResp)
117117
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
118118
require.True(t, plugin.collectRunning.Load())
119119

120-
// Stop
120+
// Stop (ensuring collect loop exits cleanly)
121+
plugin.onCollectDone = func(ch chan<- Message) {
122+
// Keep enqueueing after stop to ensure the plugin message channel wasn't closed early
123+
time.Sleep(10 * time.Millisecond)
124+
ch <- testMessage(map[string]any{"name": "m3"})
125+
}
121126
require.NoError(t, inst.stop())
122127
require.False(t, plugin.collectRunning.Load())
128+
require.NoError(t, inst.stop(), "stop should be idempotent")
129+
130+
callbackBytes, callbackResp = testCallback(inst)
131+
require.Equal(t, input.FLB_RETRY, callbackResp)
132+
assert.Empty(t, callbackBytes)
133+
134+
// Resume stopped pipeline
135+
require.NoError(t, inst.resume())
136+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
137+
callbackBytes, callbackResp = testCallback(inst)
138+
require.Equal(t, input.FLB_OK, callbackResp)
139+
assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued")
140+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
141+
"collect background loop should have started running")
142+
m4 := testMessage(map[string]any{"name": "m4"})
143+
plugin.enqueue(m4)()
144+
callbackBytes, callbackResp = testCallback(inst)
145+
require.Equal(t, input.FLB_OK, callbackResp)
146+
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
147+
148+
// Stop again
123149
require.NoError(t, inst.stop())
150+
require.False(t, plugin.collectRunning.Load())
124151
}
125152

153+
// testMessage returns a Message with the given record map and current timestamp.
126154
func testMessage(record map[string]any) Message {
127155
tag := ""
128156
return Message{
@@ -132,21 +160,19 @@ func testMessage(record map[string]any) Message {
132160
}
133161
}
134162

135-
type collectMessage struct {
136-
msg Message
137-
collectedWG sync.WaitGroup
138-
}
139-
140163
func newTestInputPlugin() *testInputPlugin {
141164
return &testInputPlugin{
142165
inputs: make(chan *collectMessage),
143166
}
144167
}
145168

169+
// testInputPlugin is an InputPlugin used to help test plugin callback and concurrency behavior.
146170
type testInputPlugin struct {
147-
initCount atomic.Int64
148-
collectRunning atomic.Bool
149-
inputs chan *collectMessage
171+
initCount atomic.Int64 // Count of calls to Init method.
172+
collectRunning atomic.Bool // Indicates whether the Collect method is running.
173+
onCollectDone func(ch chan<- Message) // Settable callback invoked when Collect is about to return.
174+
175+
inputs chan *collectMessage
150176
}
151177

152178
var _ InputPlugin = (*testInputPlugin)(nil)
@@ -166,11 +192,16 @@ func (t *testInputPlugin) Collect(ctx context.Context, ch chan<- Message) error
166192
ch <- m.msg
167193
m.collectedWG.Done()
168194
case <-ctx.Done():
195+
if t.onCollectDone != nil {
196+
t.onCollectDone(ch)
197+
}
169198
return nil
170199
}
171200
}
172201
}
173202

203+
// enqueue the message m to be processed by Collect. When called, the returned function
204+
// blocks until a running Collect puts m on the plugin's input channel.
174205
func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) {
175206
cm := &collectMessage{msg: m}
176207
cm.collectedWG.Add(1)
@@ -179,6 +210,12 @@ func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) {
179210
return cm.collectedWG.Wait
180211
}
181212

213+
// collectMessage is a helper wrapper used by testInputPlugin that wraps a Message.
214+
type collectMessage struct {
215+
msg Message
216+
collectedWG sync.WaitGroup // Decremented to 0 when testInputPlugin Collect processes the message.
217+
}
218+
182219
func decodeMessages(t testing.TB, msgpackBytes []byte) []Message {
183220
var messages []Message
184221

@@ -198,7 +235,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
198235
inst := newTestInputInstance(t, testPluginInputCallbackCtrlC{})
199236

200237
require.NoError(t, inst.init(nil))
201-
require.NoError(t, inst.inputPreRun())
238+
require.NoError(t, inst.resume())
202239

203240
cdone := make(chan struct{})
204241
timeout := time.After(1 * time.Second)
@@ -247,7 +284,7 @@ func TestInputCallbackDangle(t *testing.T) {
247284

248285
// prepare channel for input explicitly.
249286
require.NoError(t, inst.init(ptr))
250-
require.NoError(t, inst.inputPreRun())
287+
require.NoError(t, inst.resume())
251288

252289
go func() {
253290
ticker := time.NewTicker(collectInterval)
@@ -312,7 +349,7 @@ func TestInputCallbackInfinite(t *testing.T) {
312349

313350
// prepare channel for input explicitly.
314351
require.NoError(t, inst.init(ptr))
315-
require.NoError(t, inst.inputPreRun())
352+
require.NoError(t, inst.resume())
316353

317354
go func() {
318355
ticker := time.NewTicker(collectInterval)
@@ -387,7 +424,7 @@ func TestInputCallbackLatency(t *testing.T) {
387424

388425
// prepare channel for input explicitly.
389426
require.NoError(t, inst.init(nil))
390-
require.NoError(t, inst.inputPreRun())
427+
require.NoError(t, inst.resume())
391428

392429
go func() {
393430
ticker := time.NewTicker(collectInterval)
@@ -503,7 +540,7 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
503540

504541
// prepare channel for input explicitly.
505542
require.NoError(t, inst.init(ptr))
506-
require.NoError(t, inst.inputPreRun())
543+
require.NoError(t, inst.resume())
507544

508545
go func() {
509546
ticker := time.NewTicker(time.Second * 1)
@@ -716,7 +753,7 @@ func TestOutputFlush(t *testing.T) {
716753
}
717754
inst := newTestOutputInstance(t, out)
718755
require.NoError(t, inst.init(nil))
719-
require.NoError(t, inst.outputPreRun())
756+
require.NoError(t, inst.resume())
720757

721758
msg := Message{
722759
Time: now,

plugin.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,7 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
300300
}
301301

302302
// inputPreRun transitions an initialized input plugin into runnable state.
303-
func (p *pluginInstance) inputPreRun() error {
304-
p.mu.Lock()
305-
defer p.mu.Unlock()
306-
303+
func (p *pluginInstance) inputPreRunWithLock() error {
307304
// Only input plugins have a pre-run step
308305
if p.meta.input == nil {
309306
return nil
@@ -339,10 +336,7 @@ func (p *pluginInstance) inputPreRun() error {
339336
}
340337

341338
// outputPreRun transitions an output plugin into runnable state.
342-
func (p *pluginInstance) outputPreRun() error {
343-
p.mu.Lock()
344-
defer p.mu.Unlock()
345-
339+
func (p *pluginInstance) outputPreRunWithLock() error {
346340
// Only input plugins have a pre-run step
347341
if p.meta.output == nil {
348342
return fmt.Errorf("plugin is not an output plugin")
@@ -468,15 +462,15 @@ func (p *pluginInstance) resume() error {
468462
defer p.mu.Unlock()
469463

470464
if p.state != instanceStateInitialized {
471-
return fmt.Errorf("cannot resume plugin in state %q", p.state)
465+
return fmt.Errorf("invalid plugin state %q", p.state)
472466
}
473467

474468
if p.meta.input != nil {
475-
if err := p.inputPreRun(); err != nil {
469+
if err := p.inputPreRunWithLock(); err != nil {
476470
return err
477471
}
478472
} else if p.meta.output != nil {
479-
if err := p.outputPreRun(); err != nil {
473+
if err := p.outputPreRunWithLock(); err != nil {
480474
return err
481475
}
482476
}

0 commit comments

Comments
 (0)