Skip to content

Commit cbb2a94

Browse files
committed
Global callback tests
1 parent 5add7d9 commit cbb2a94

File tree

8 files changed

+123
-29
lines changed

8 files changed

+123
-29
lines changed

cshared.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ func FLBPluginInit(ptr unsafe.Pointer) (respCode int) {
9999

100100
if currInstance == nil {
101101
currInstance = newPluginInstance(*meta)
102+
if setupInstanceForTesting != nil {
103+
setupInstanceForTesting(currInstance)
104+
}
102105
}
103106

104107
if err := currInstance.init(ptr); err != nil {
@@ -470,11 +473,11 @@ func makeMetrics(cmp *cmetrics.Context) Metrics {
470473

471474
// testCallback invokes the callback and returns the bytes outputted from it.
472475
// This cannot be in the test file since test files can't use CGO.
473-
func testCallback(inst *pluginInstance) ([]byte, int) {
476+
func testCallback(callbackFunc func(data *unsafe.Pointer, csize *C.size_t) int) ([]byte, int) {
474477
data := unsafe.Pointer(nil)
475478
var csize C.size_t
476479

477-
retCode := inst.inputCallback(&data, &csize)
480+
retCode := callbackFunc(&data, &csize)
478481

479482
if data == nil {
480483
return []byte{}, retCode

cshared_test.go

Lines changed: 110 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,40 @@ import (
2222
"github.com/calyptia/plugin/metric"
2323
)
2424

25+
// TestMain wraps all tests to clean up any calls that mutated globals via
26+
// public FLB* functions.
27+
func TestMain(m *testing.M) {
28+
cleanupDone := make(chan struct{})
29+
defer func() {
30+
go func() {
31+
defer close(cleanupDone)
32+
33+
setupInstanceForTesting = nil
34+
35+
currInstanceMu.Lock()
36+
defer currInstanceMu.Unlock()
37+
38+
if currInstance != nil {
39+
if currInstance.meta.output != nil {
40+
FLBPluginOutputPreExit()
41+
}
42+
FLBPluginExit()
43+
}
44+
if pluginMeta.Load() != nil {
45+
pluginMeta.Store(nil)
46+
}
47+
}()
48+
49+
select {
50+
case <-cleanupDone:
51+
case <-time.After(2 * time.Second):
52+
panic("timed out cleaning up global plugin instance")
53+
}
54+
}()
55+
56+
m.Run()
57+
}
58+
2559
func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance {
2660
inst := pluginInstanceWithFakes(newPluginInstance(pluginMetadata{
2761
name: "test-plugin",
@@ -66,6 +100,69 @@ func pluginInstanceWithFakes(inst *pluginInstance) *pluginInstance {
66100
})
67101
}
68102

103+
func TestGlobalCallbacks(t *testing.T) {
104+
plugin := newTestInputPlugin()
105+
//inst := newTestInputInstance(t, plugin)
106+
107+
// Registration
108+
RegisterInput("test-name", "test-desc", plugin)
109+
FLBPluginRegister(unsafe.Pointer(&input.FLBPluginProxyDef{}))
110+
111+
require.Equal(t, &pluginMetadata{
112+
name: "test-name",
113+
desc: "test-desc",
114+
input: plugin,
115+
}, pluginMeta.Load())
116+
117+
// Initialization
118+
setupInstanceForTesting = func(inst *pluginInstance) {
119+
pluginInstanceWithFakes(inst)
120+
}
121+
FLBPluginInit(nil)
122+
require.Equal(t, int64(1), plugin.initCount.Load())
123+
124+
currInstanceMu.Lock()
125+
inst := currInstance
126+
currInstanceMu.Unlock()
127+
require.NotNil(t, inst)
128+
129+
// Pre-run
130+
FLBPluginInputPreRun(0)
131+
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
132+
"collect background loop should have started running")
133+
m1 := testMessage(map[string]any{"name": "m1"})
134+
plugin.enqueue(m1)()
135+
136+
// Callback
137+
callbackBytes, callbackResp := testCallback(FLBPluginInputCallback)
138+
require.Equal(t, input.FLB_OK, callbackResp)
139+
require.Equal(t, []Message{m1}, decodeMessages(t, callbackBytes))
140+
require.True(t, plugin.collectRunning.Load())
141+
142+
// Pause
143+
FLBPluginInputPause()
144+
require.False(t, plugin.collectRunning.Load())
145+
FLBPluginInputPause() // Idempotent
146+
147+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
148+
require.Equal(t, input.FLB_RETRY, callbackResp)
149+
assert.Empty(t, callbackBytes)
150+
151+
// Resume stopped pipeline
152+
FLBPluginInputResume()
153+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
154+
require.Equal(t, input.FLB_OK, callbackResp)
155+
m4 := testMessage(map[string]any{"name": "m4"})
156+
plugin.enqueue(m4)()
157+
callbackBytes, callbackResp = testCallback(FLBPluginInputCallback)
158+
require.Equal(t, input.FLB_OK, callbackResp)
159+
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
160+
161+
// Stop again
162+
FLBPluginExit()
163+
require.False(t, plugin.collectRunning.Load())
164+
}
165+
69166
func TestInputCallbackLifecycle(t *testing.T) {
70167
plugin := newTestInputPlugin()
71168
inst := newTestInputInstance(t, plugin)
@@ -78,7 +175,7 @@ func TestInputCallbackLifecycle(t *testing.T) {
78175
require.Equal(t, int64(1), plugin.initCount.Load(), "initialization should only run once")
79176

80177
// Early attempt to callback
81-
_, callbackResp := testCallback(inst)
178+
_, callbackResp := testCallback(inst.inputCallback)
82179
require.Equal(t, input.FLB_RETRY, callbackResp, "pre-run must be called before callback")
83180

84181
// Pre-run
@@ -93,7 +190,7 @@ func TestInputCallbackLifecycle(t *testing.T) {
93190
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
94191

95192
// Callback
96-
callbackBytes, callbackResp := testCallback(inst)
193+
callbackBytes, callbackResp := testCallback(inst.inputCallback)
97194
require.Equal(t, input.FLB_OK, callbackResp)
98195
require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes))
99196
require.True(t, plugin.collectRunning.Load())
@@ -108,21 +205,21 @@ func TestInputCallbackLifecycle(t *testing.T) {
108205
require.False(t, plugin.collectRunning.Load())
109206
require.NoError(t, inst.stop(), "stop should be idempotent")
110207

111-
callbackBytes, callbackResp = testCallback(inst)
208+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
112209
require.Equal(t, input.FLB_RETRY, callbackResp)
113210
assert.Empty(t, callbackBytes)
114211

115212
// Resume stopped pipeline
116213
require.NoError(t, inst.resume())
117214
require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`)
118-
callbackBytes, callbackResp = testCallback(inst)
215+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
119216
require.Equal(t, input.FLB_OK, callbackResp)
120217
assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued")
121218
require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond,
122219
"collect background loop should have started running")
123220
m4 := testMessage(map[string]any{"name": "m4"})
124221
plugin.enqueue(m4)()
125-
callbackBytes, callbackResp = testCallback(inst)
222+
callbackBytes, callbackResp = testCallback(inst.inputCallback)
126223
require.Equal(t, input.FLB_OK, callbackResp)
127224
require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes))
128225

@@ -232,7 +329,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
232329
timeout := time.After(1 * time.Second)
233330

234331
go func() {
235-
testCallback(inst)
332+
testCallback(inst.inputCallback)
236333
close(cdone)
237334
}()
238335

@@ -281,11 +378,11 @@ func TestInputCallbackDangle(t *testing.T) {
281378
ticker := time.NewTicker(collectInterval)
282379
defer ticker.Stop()
283380

284-
testCallback(inst)
381+
testCallback(inst.inputCallback)
285382
for {
286383
select {
287384
case <-ticker.C:
288-
testCallback(inst)
385+
testCallback(inst.inputCallback)
289386
case <-cdone:
290387
return
291388
}
@@ -349,7 +446,7 @@ func TestInputCallbackInfinite(t *testing.T) {
349446
for {
350447
select {
351448
case <-ticker.C:
352-
testCallback(inst)
449+
testCallback(inst.inputCallback)
353450
if ptr != nil {
354451
close(cdone)
355452
return
@@ -421,7 +518,7 @@ func TestInputCallbackLatency(t *testing.T) {
421518
ticker := time.NewTicker(collectInterval)
422519
defer ticker.Stop()
423520

424-
buf, _ := testCallback(inst)
521+
buf, _ := testCallback(inst.inputCallback)
425522
if len(buf) > 0 {
426523
cmsg <- buf
427524
}
@@ -433,7 +530,7 @@ func TestInputCallbackLatency(t *testing.T) {
433530
t.Log("---- collect done")
434531
return
435532
case <-ticker.C:
436-
buf, _ := testCallback(inst)
533+
buf, _ := testCallback(inst.inputCallback)
437534
if len(buf) > 0 {
438535
cmsg <- buf
439536
}
@@ -537,13 +634,13 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
537634
ticker := time.NewTicker(time.Second * 1)
538635
defer ticker.Stop()
539636

540-
testCallback(inst)
637+
testCallback(inst.inputCallback)
541638
close(cstarted)
542639

543640
for {
544641
select {
545642
case <-ticker.C:
546-
testCallback(inst)
643+
testCallback(inst.inputCallback)
547644
case <-inst.runCtx.Done():
548645
return
549646
}

examples/in_gdummy/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/fluent/fluent-bit-go/examples/gdummy
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.22.4
64

75
require github.com/calyptia/plugin v0.1.6
86

examples/in_gdummy/go.sum

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6
2828
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
2929
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
3030
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
31-
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
32-
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
33-
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
3431
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
3532
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
3633
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=

examples/out_gstdout/go.mod

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
module github.com/fluent/fluent-bit-go/examples/gstdout
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.21.0
64

75
require github.com/calyptia/plugin v0.1.6
86

97
require (
108
github.com/calyptia/cmetrics-go v0.1.7 // indirect
11-
github.com/ugorji/go/codec v1.2.12 // indirect
9+
github.com/ugorji/go/codec v1.2.11 // indirect
1210
)
1311

1412
replace github.com/calyptia/plugin => ../..

examples/out_gstdout/go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,6 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
719719
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
720720
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
721721
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
722-
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
723722
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
724723
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
725724
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=

examples/out_multiinstance/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/fluent/fluent-bit-go/examples/multiinstance
22

3-
go 1.23.0
4-
5-
toolchain go1.24.2
3+
go 1.21.0
64

75
require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963
86

plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ var (
3939
currInstanceMu sync.Mutex
4040
// Current instance of the plugin, used by functions called from fluent-bit like FLBPluginInit.
4141
currInstance *pluginInstance
42+
43+
// setupInstanceForTesting is only for unit testing, and is used to set up newly created
44+
// global pluginInstance when testing calls to FLBPluginInit.
45+
setupInstanceForTesting func(*pluginInstance)
4246
)
4347

4448
func init() {

0 commit comments

Comments
 (0)