Skip to content

Commit 4e14c14

Browse files
committed
wip
1 parent c3eb925 commit 4e14c14

File tree

3 files changed

+79
-49
lines changed

3 files changed

+79
-49
lines changed

cshared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func FLBPluginInputPreRun(useHotReload C.int) int {
135135
return input.FLB_ERROR
136136
}
137137

138-
if err := instance.inputPreRun(); err != nil {
138+
if err := instance.resume(); err != nil {
139139
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
140140
return flbReturnCode(err)
141141
}
@@ -211,7 +211,7 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
211211
panic("plugin not initialized")
212212
}
213213

214-
if err := instance.outputPreRun(); err != nil {
214+
if err := instance.resume(); err != nil {
215215
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
216216
return flbReturnCode(err)
217217
}

cshared_test.go

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,6 @@ import (
2222
"github.com/calyptia/plugin/metric"
2323
)
2424

25-
type testPluginInputCallbackCtrlC struct{}
26-
27-
func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error {
28-
return nil
29-
}
30-
31-
func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error {
32-
return nil
33-
}
34-
35-
type fakeConfigLoader map[string]string
36-
37-
var _ ConfigLoader = (fakeConfigLoader)(nil)
38-
39-
func (f fakeConfigLoader) String(key string) string {
40-
return f[key]
41-
}
42-
4325
func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance {
4426
inst := pluginInstanceWithFakes(newPluginInstance(pluginMetadata{
4527
name: "test-plugin",
@@ -100,29 +82,56 @@ func TestInputCallbackLifecycle(t *testing.T) {
10082
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
10183

10284
// Pre-run
103-
require.NoError(t, inst.inputPreRun())
85+
require.NoError(t, inst.resume())
10486
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
10587
"collect background loop should have started running")
10688
m1 := testMessage(map[string]any{"name": "m1"})
10789
m2 := testMessage(map[string]any{"name": "m2"})
10890
plugin.enqueue(m1)()
10991
plugin.enqueue(m2)()
11092

111-
require.ErrorContains(t, inst.inputPreRun(), `invalid plugin state "runnable"`)
93+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
11294

11395
// Callback
114-
time.Sleep(time.Second)
11596
callbackBytes, callbackResp := testCallback(inst)
11697
require.Equal(t, input.FLB_OK, callbackResp)
11798
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
11899
require.True(t, plugin.collectRunning.Load())
119100

120-
// Stop
101+
// Stop (ensuring collect loop exits cleanly)
102+
plugin.onCollectDone = func(ch chan<- Message) {
103+
// Keep enqueueing after stop to ensure the plugin message channel wasn't closed early
104+
time.Sleep(10 * time.Millisecond)
105+
ch <- testMessage(map[string]any{"name": "m3"})
106+
}
121107
require.NoError(t, inst.stop())
122108
require.False(t, plugin.collectRunning.Load())
109+
require.NoError(t, inst.stop(), "stop should be idempotent")
110+
111+
callbackBytes, callbackResp = testCallback(inst)
112+
require.Equal(t, input.FLB_RETRY, callbackResp)
113+
assert.Empty(t, callbackBytes)
114+
115+
// Resume stopped pipeline
116+
require.NoError(t, inst.resume())
117+
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
118+
callbackBytes, callbackResp = testCallback(inst)
119+
require.Equal(t, input.FLB_OK, callbackResp)
120+
assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued")
121+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
122+
"collect background loop should have started running")
123+
m4 := testMessage(map[string]any{"name": "m4"})
124+
plugin.enqueue(m4)()
125+
callbackBytes, callbackResp = testCallback(inst)
126+
require.Equal(t, input.FLB_OK, callbackResp)
127+
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
128+
129+
// Stop again
123130
require.NoError(t, inst.stop())
131+
require.False(t, plugin.collectRunning.Load())
124132
}
125133

134+
// testMessage returns a Message with the given record map and current timestamp.
126135
func testMessage(record map[string]any) Message {
127136
tag := ""
128137
return Message{
@@ -132,21 +141,19 @@ func testMessage(record map[string]any) Message {
132141
}
133142
}
134143

135-
type collectMessage struct {
136-
msg Message
137-
collectedWG sync.WaitGroup
138-
}
139-
140144
func newTestInputPlugin() *testInputPlugin {
141145
return &testInputPlugin{
142146
inputs: make(chan *collectMessage),
143147
}
144148
}
145149

150+
// testInputPlugin is an InputPlugin used to help test plugin callback and concurrency behavior.
146151
type testInputPlugin struct {
147-
initCount atomic.Int64
148-
collectRunning atomic.Bool
149-
inputs chan *collectMessage
152+
initCount atomic.Int64 // Count of calls to Init method.
153+
collectRunning atomic.Bool // Indicates whether the Collect method is running.
154+
onCollectDone func(ch chan<- Message) // Settable callback invoked when Collect is about to return.
155+
156+
inputs chan *collectMessage
150157
}
151158

152159
var _ InputPlugin = (*testInputPlugin)(nil)
@@ -166,11 +173,16 @@ func (t *testInputPlugin) Collect(ctx context.Context, ch chan<- Message) error
166173
ch <- m.msg
167174
m.collectedWG.Done()
168175
case <-ctx.Done():
176+
if t.onCollectDone != nil {
177+
t.onCollectDone(ch)
178+
}
169179
return nil
170180
}
171181
}
172182
}
173183

184+
// enqueue the message m to be processed by Collect. When called, the returned function
185+
// blocks until a running Collect puts m on the plugin's input channel.
174186
func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) {
175187
cm := &collectMessage{msg: m}
176188
cm.collectedWG.Add(1)
@@ -179,6 +191,12 @@ func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) {
179191
return cm.collectedWG.Wait
180192
}
181193

194+
// collectMessage is a helper wrapper used by testInputPlugin that wraps a Message.
195+
type collectMessage struct {
196+
msg Message
197+
collectedWG sync.WaitGroup // Decremented to 0 when testInputPlugin Collect processes the message.
198+
}
199+
182200
func decodeMessages(t testing.TB, msgpackBytes []byte) []Message {
183201
var messages []Message
184202

@@ -194,11 +212,21 @@ func decodeMessages(t testing.TB, msgpackBytes []byte) []Message {
194212
}
195213
}
196214

215+
type testPluginInputCallbackCtrlC struct{}
216+
217+
func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error {
218+
return nil
219+
}
220+
221+
func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error {
222+
return nil
223+
}
224+
197225
func TestInputCallbackCtrlC(t *testing.T) {
198226
inst := newTestInputInstance(t, testPluginInputCallbackCtrlC{})
199227

200228
require.NoError(t, inst.init(nil))
201-
require.NoError(t, inst.inputPreRun())
229+
require.NoError(t, inst.resume())
202230

203231
cdone := make(chan struct{})
204232
timeout := time.After(1 * time.Second)
@@ -247,7 +275,7 @@ func TestInputCallbackDangle(t *testing.T) {
247275

248276
// prepare channel for input explicitly.
249277
require.NoError(t, inst.init(ptr))
250-
require.NoError(t, inst.inputPreRun())
278+
require.NoError(t, inst.resume())
251279

252280
go func() {
253281
ticker := time.NewTicker(collectInterval)
@@ -312,7 +340,7 @@ func TestInputCallbackInfinite(t *testing.T) {
312340

313341
// prepare channel for input explicitly.
314342
require.NoError(t, inst.init(ptr))
315-
require.NoError(t, inst.inputPreRun())
343+
require.NoError(t, inst.resume())
316344

317345
go func() {
318346
ticker := time.NewTicker(collectInterval)
@@ -387,7 +415,7 @@ func TestInputCallbackLatency(t *testing.T) {
387415

388416
// prepare channel for input explicitly.
389417
require.NoError(t, inst.init(nil))
390-
require.NoError(t, inst.inputPreRun())
418+
require.NoError(t, inst.resume())
391419

392420
go func() {
393421
ticker := time.NewTicker(collectInterval)
@@ -503,7 +531,7 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
503531

504532
// prepare channel for input explicitly.
505533
require.NoError(t, inst.init(ptr))
506-
require.NoError(t, inst.inputPreRun())
534+
require.NoError(t, inst.resume())
507535

508536
go func() {
509537
ticker := time.NewTicker(time.Second * 1)
@@ -716,7 +744,7 @@ func TestOutputFlush(t *testing.T) {
716744
}
717745
inst := newTestOutputInstance(t, out)
718746
require.NoError(t, inst.init(nil))
719-
require.NoError(t, inst.outputPreRun())
747+
require.NoError(t, inst.resume())
720748

721749
msg := Message{
722750
Time: now,
@@ -737,3 +765,11 @@ func TestOutputFlush(t *testing.T) {
737765
assert.NoError(t, pluginFlush(inst, "foobar", b))
738766
wg.Wait()
739767
}
768+
769+
type fakeConfigLoader map[string]string
770+
771+
var _ ConfigLoader = (fakeConfigLoader)(nil)
772+
773+
func (f fakeConfigLoader) String(key string) string {
774+
return f[key]
775+
}

plugin.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,7 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
300300
}
301301

302302
// inputPreRun transitions an initialized input plugin into runnable state.
303-
func (p *pluginInstance) inputPreRun() error {
304-
p.mu.Lock()
305-
defer p.mu.Unlock()
306-
303+
func (p *pluginInstance) inputPreRunWithLock() error {
307304
// Only input plugins have a pre-run step
308305
if p.meta.input == nil {
309306
return nil
@@ -339,10 +336,7 @@ func (p *pluginInstance) inputPreRun() error {
339336
}
340337

341338
// outputPreRun transitions an output plugin into runnable state.
342-
func (p *pluginInstance) outputPreRun() error {
343-
p.mu.Lock()
344-
defer p.mu.Unlock()
345-
339+
func (p *pluginInstance) outputPreRunWithLock() error {
346340
// Only input plugins have a pre-run step
347341
if p.meta.output == nil {
348342
return fmt.Errorf("plugin is not an output plugin")
@@ -468,15 +462,15 @@ func (p *pluginInstance) resume() error {
468462
defer p.mu.Unlock()
469463

470464
if p.state != instanceStateInitialized {
471-
return fmt.Errorf("cannot resume plugin in state %q", p.state)
465+
return fmt.Errorf("invalid plugin state %q", p.state)
472466
}
473467

474468
if p.meta.input != nil {
475-
if err := p.inputPreRun(); err != nil {
469+
if err := p.inputPreRunWithLock(); err != nil {
476470
return err
477471
}
478472
} else if p.meta.output != nil {
479-
if err := p.outputPreRun(); err != nil {
473+
if err := p.outputPreRunWithLock(); err != nil {
480474
return err
481475
}
482476
}

0 commit comments

Comments
 (0)