Skip to content

Commit 06cf1dc

Browse files
committed
Global callback tests
1 parent 5add7d9 commit 06cf1dc

File tree

8 files changed

+125
-29
lines changed

8 files changed

+125
-29
lines changed

cshared.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ func FLBPluginInit(ptr unsafe.Pointer) (respCode int) {
9999

100100
if currInstance == nil {
101101
currInstance = newPluginInstance(*meta)
102+
if setupInstanceForTesting != nil {
103+
setupInstanceForTesting(currInstance)
104+
}
102105
}
103106

104107
if err := currInstance.init(ptr); err != nil {
@@ -470,11 +473,11 @@ func makeMetrics(cmp *cmetrics.Context) Metrics {
470473

471474
// testCallback invokes the callback and returns the bytes outputted from it.
472475
// This cannot be in the test file since test files can't use CGO.
473-
func testCallback(inst *pluginInstance) ([]byte, int) {
476+
func testCallback(callbackFunc func(data *unsafe.Pointer, csize *C.size_t) int) ([]byte, int) {
474477
data := unsafe.Pointer(nil)
475478
var csize C.size_t
476479

477-
retCode := inst.inputCallback(&data, &csize)
480+
retCode := callbackFunc(&data, &csize)
478481

479482
if data == nil {
480483
return []byte{}, retCode

cshared_test.go

Lines changed: 112 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,41 @@ import (
2222
"github.com/calyptia/plugin/metric"
2323
)
2424

25+
// TestMain wraps all tests to clean up any calls that mutated globals via public FLB* functions.
26+
func TestMain(m *testing.M) {
27+
cleanupDone := make(chan struct{})
28+
defer func() {
29+
// Cleanup in a goroutine since buggy plugins or code might indefinitely block
30+
go func() {
31+
defer close(cleanupDone)
32+
33+
setupInstanceForTesting = nil
34+
35+
currInstanceMu.Lock()
36+
defer currInstanceMu.Unlock()
37+
38+
if currInstance != nil {
39+
if currInstance.meta.output != nil {
40+
FLBPluginOutputPreExit()
41+
}
42+
FLBPluginExit()
43+
}
44+
if pluginMeta.Load() != nil {
45+
pluginMeta.Store(nil)
46+
}
47+
}()
48+
49+
// Ensure cleanup finished
50+
select {
51+
case <-cleanupDone:
52+
case <-time.After(2 * time.Second):
53+
panic("timed out cleaning up global plugin instance")
54+
}
55+
}()
56+
57+
m.Run()
58+
}
59+
2560
func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance {
2661
inst := pluginInstanceWithFakes(newPluginInstance(pluginMetadata{
2762
name: "test-plugin",
@@ -78,7 +113,7 @@ func TestInputCallbackLifecycle(t *testing.T) {
78113
require.Equal(t, int64(1), plugin.initCount.Load(), "initialization should only run once")
79114

80115
// Early attempt to callback
81-
_, callbackResp := testCallback(inst)
116+
_, callbackResp := testCallback(inst.inputCallback)
82117
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
83118

84119
// Pre-run
@@ -93,7 +128,7 @@ func TestInputCallbackLifecycle(t *testing.T) {
93128
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
94129

95130
// Callback
96-
callbackBytes, callbackResp := testCallback(inst)
131+
callbackBytes, callbackResp := testCallback(inst.inputCallback)
97132
require.Equal(t, input.FLB_OK, callbackResp)
98133
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
99134
require.True(t, plugin.collectRunning.Load())
@@ -108,21 +143,21 @@ func TestInputCallbackLifecycle(t *testing.T) {
108143
require.False(t, plugin.collectRunning.Load())
109144
require.NoError(t, inst.stop(), "stop should be idempotent")
110145

111-
callbackBytes, callbackResp = testCallback(inst)
146+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
112147
require.Equal(t, input.FLB_RETRY, callbackResp)
113148
assert.Empty(t, callbackBytes)
114149

115150
// Resume stopped pipeline
116151
require.NoError(t, inst.resume())
117152
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
118-
callbackBytes, callbackResp = testCallback(inst)
153+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
119154
require.Equal(t, input.FLB_OK, callbackResp)
120155
assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued")
121156
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
122157
"collect background loop should have started running")
123158
m4 := testMessage(map[string]any{"name": "m4"})
124159
plugin.enqueue(m4)()
125-
callbackBytes, callbackResp = testCallback(inst)
160+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
126161
require.Equal(t, input.FLB_OK, callbackResp)
127162
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
128163

@@ -131,6 +166,70 @@ func TestInputCallbackLifecycle(t *testing.T) {
131166
require.False(t, plugin.collectRunning.Load())
132167
}
133168

169+
// TestGlobalCallbacks is a simplified variant of TestInputCallbackLifecycle that uses the
170+
// C callback functions invoked by fluent-bit.
171+
func TestGlobalCallbacks(t *testing.T) {
172+
plugin := newTestInputPlugin()
173+
174+
// Registration
175+
RegisterInput("test-name", "test-desc", plugin)
176+
FLBPluginRegister(unsafe.Pointer(&input.FLBPluginProxyDef{}))
177+
178+
require.Equal(t, &pluginMetadata{
179+
name: "test-name",
180+
desc: "test-desc",
181+
input: plugin,
182+
}, pluginMeta.Load())
183+
184+
// Initialization
185+
setupInstanceForTesting = func(inst *pluginInstance) {
186+
pluginInstanceWithFakes(inst)
187+
}
188+
FLBPluginInit(nil)
189+
require.Equal(t, int64(1), plugin.initCount.Load())
190+
191+
currInstanceMu.Lock()
192+
inst := currInstance
193+
currInstanceMu.Unlock()
194+
require.NotNil(t, inst)
195+
196+
// Pre-run
197+
FLBPluginInputPreRun(0)
198+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
199+
"collect background loop should have started running")
200+
m1 := testMessage(map[string]any{"name": "m1"})
201+
plugin.enqueue(m1)()
202+
203+
// Callback
204+
callbackBytes, callbackResp := testCallback(FLBPluginInputCallback)
205+
require.Equal(t, input.FLB_OK, callbackResp)
206+
require.Equal(t, []Message{m1}, decodeMessages(t, callbackBytes))
207+
require.True(t, plugin.collectRunning.Load())
208+
209+
// Pause
210+
FLBPluginInputPause()
211+
require.False(t, plugin.collectRunning.Load())
212+
FLBPluginInputPause() // Idempotent
213+
214+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
215+
require.Equal(t, input.FLB_RETRY, callbackResp)
216+
assert.Empty(t, callbackBytes)
217+
218+
// Resume stopped pipeline
219+
FLBPluginInputResume()
220+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
221+
require.Equal(t, input.FLB_OK, callbackResp)
222+
m4 := testMessage(map[string]any{"name": "m4"})
223+
plugin.enqueue(m4)()
224+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
225+
require.Equal(t, input.FLB_OK, callbackResp)
226+
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
227+
228+
// Stop again
229+
FLBPluginExit()
230+
require.False(t, plugin.collectRunning.Load())
231+
}
232+
134233
// testMessage returns a Message with the given record map and current timestamp.
135234
func testMessage(record map[string]any) Message {
136235
tag := ""
@@ -232,7 +331,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
232331
timeout := time.After(1 * time.Second)
233332

234333
go func() {
235-
testCallback(inst)
334+
testCallback(inst.inputCallback)
236335
close(cdone)
237336
}()
238337

@@ -281,11 +380,11 @@ func TestInputCallbackDangle(t *testing.T) {
281380
ticker := time.NewTicker(collectInterval)
282381
defer ticker.Stop()
283382

284-
testCallback(inst)
383+
testCallback(inst.inputCallback)
285384
for {
286385
select {
287386
case <-ticker.C:
288-
testCallback(inst)
387+
testCallback(inst.inputCallback)
289388
case <-cdone:
290389
return
291390
}
@@ -349,7 +448,7 @@ func TestInputCallbackInfinite(t *testing.T) {
349448
for {
350449
select {
351450
case <-ticker.C:
352-
testCallback(inst)
451+
testCallback(inst.inputCallback)
353452
if ptr != nil {
354453
close(cdone)
355454
return
@@ -421,7 +520,7 @@ func TestInputCallbackLatency(t *testing.T) {
421520
ticker := time.NewTicker(collectInterval)
422521
defer ticker.Stop()
423522

424-
buf, _ := testCallback(inst)
523+
buf, _ := testCallback(inst.inputCallback)
425524
if len(buf) > 0 {
426525
cmsg <- buf
427526
}
@@ -433,7 +532,7 @@ func TestInputCallbackLatency(t *testing.T) {
433532
t.Log("---- collect done")
434533
return
435534
case <-ticker.C:
436-
buf, _ := testCallback(inst)
535+
buf, _ := testCallback(inst.inputCallback)
437536
if len(buf) > 0 {
438537
cmsg <- buf
439538
}
@@ -537,13 +636,13 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
537636
ticker := time.NewTicker(time.Second * 1)
538637
defer ticker.Stop()
539638

540-
testCallback(inst)
639+
testCallback(inst.inputCallback)
541640
close(cstarted)
542641

543642
for {
544643
select {
545644
case <-ticker.C:
546-
testCallback(inst)
645+
testCallback(inst.inputCallback)
547646
case <-inst.runCtx.Done():
548647
return
549648
}

examples/in_gdummy/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/fluent/fluent-bit-go/examples/gdummy
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.22.4
64

75
require github.com/calyptia/plugin v0.1.6
86

examples/in_gdummy/go.sum

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6
2828
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
2929
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
3030
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
31-
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
32-
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
33-
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
3431
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
3532
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
3633
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=

examples/out_gstdout/go.mod

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
module github.com/fluent/fluent-bit-go/examples/gstdout
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.21.0
64

75
require github.com/calyptia/plugin v0.1.6
86

97
require (
108
github.com/calyptia/cmetrics-go v0.1.7 // indirect
11-
github.com/ugorji/go/codec v1.2.12 // indirect
9+
github.com/ugorji/go/codec v1.2.11 // indirect
1210
)
1311

1412
replace github.com/calyptia/plugin => ../..

examples/out_gstdout/go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,6 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
719719
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
720720
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
721721
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
722-
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
723722
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
724723
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
725724
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=

examples/out_multiinstance/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/fluent/fluent-bit-go/examples/multiinstance
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.21.0
64

75
require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963
86

plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ var (
3939
currInstanceMu sync.Mutex
4040
// Current instance of the plugin, used by functions called from fluent-bit like FLBPluginInit.
4141
currInstance *pluginInstance
42+
43+
// setupInstanceForTesting is only for unit testing, and is used to set up newly created
44+
// global pluginInstance when testing calls to FLBPluginInit.
45+
setupInstanceForTesting func(*pluginInstance)
4246
)
4347

4448
func init() {

0 commit comments

Comments
 (0)