diff --git a/.gitignore b/.gitignore index 38b0978..3cffdbd 100644 --- a/.gitignore +++ b/.gitignore @@ -176,6 +176,7 @@ cmake-build-*/ # IntelliJ out/ +/.idea/ # mpeltonen/sbt-idea plugin .idea_modules/ diff --git a/cshared.go b/cshared.go index be01b4d..bd37c9b 100644 --- a/cshared.go +++ b/cshared.go @@ -6,17 +6,12 @@ package plugin import "C" import ( - "bytes" - "context" "errors" "fmt" "io" - "log" "os" - "runtime" "strconv" "strings" - "sync" "time" "unsafe" @@ -37,13 +32,6 @@ const ( collectInterval = 1000 * time.Nanosecond ) -var ( - unregister func() - cmt *cmetrics.Context - logger Logger - maxBufferedMessages = defaultMaxBufferedMessages -) - // FLBPluginPreRegister - // //export FLBPluginPreRegister @@ -60,60 +48,35 @@ func FLBPluginPreRegister(hotReloading C.int) int { // //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { - defer registerWG.Done() - - if theInput == nil && theOutput == nil && theCustom == nil { - fmt.Fprintf(os.Stderr, "no input or output or custom registered\n") + meta := pluginMeta.Load() + if meta == nil { + fmt.Fprintf(os.Stderr, "no input, output, or custom plugin registered\n") return input.FLB_RETRY } - if theInput != nil { - out := input.FLBPluginRegister(def, theName, theDesc) - unregister = func() { - input.FLBPluginUnregister(def) - } - return out - } - - if theOutput != nil { - out := output.FLBPluginRegister(def, theName, theDesc) - unregister = func() { - output.FLBPluginUnregister(def) - } - return out - } - - if theCustom != nil { - out := custom.FLBPluginRegister(def, theName, theDesc) - unregister = func() { - custom.FLBPluginUnregister(def) - } - - return out - } - - return input.FLB_ERROR -} - -func cleanup() int { - if unregister != nil { - unregister() - unregister = nil - } - - if runCancel != nil { - runCancel() - runCancel = nil + var newUnregisterFunc func() + var registerCode int + if meta.input != nil { + registerCode = input.FLBPluginRegister(def, meta.name, meta.desc) + newUnregisterFunc = func() { input.FLBPluginUnregister(def) } + } else if meta.output != nil { + registerCode = output.FLBPluginRegister(def, meta.name, meta.desc) + newUnregisterFunc = func() { output.FLBPluginUnregister(def) } + } else if meta.custom != nil { + registerCode = custom.FLBPluginRegister(def, meta.name, meta.desc) + newUnregisterFunc = func() { custom.FLBPluginUnregister(def) } + } else { + fmt.Fprintf(os.Stderr, "no input, output, or custom plugin registered\n") + return input.FLB_RETRY } - if !theInputLock.TryLock() { - return input.FLB_OK + if registerCode != 0 { + fmt.Fprintf(os.Stderr, "error calling plugin register function\n") + return input.FLB_RETRY } - defer theInputLock.Unlock() - if theChannel != nil { - defer close(theChannel) - } + unregisterFunc.Store(&newUnregisterFunc) + registerWG.Done() return input.FLB_OK } @@ -123,153 +86,51 @@ func cleanup() int { // plugins to execute the collect or flush callback. // //export FLBPluginInit -func FLBPluginInit(ptr unsafe.Pointer) int { - initWG.Add(1) - defer initWG.Done() +func FLBPluginInit(ptr unsafe.Pointer) (respCode int) { + currInstanceMu.Lock() + defer currInstanceMu.Unlock() - if theInput == nil && theOutput == nil && theCustom == nil { - fmt.Fprintf(os.Stderr, "no input, output, or custom registered\n") + meta := pluginMeta.Load() + if meta == nil { + fmt.Fprintf(os.Stderr, "no input, output, or custom plugin registered\n") return input.FLB_RETRY } - ctx, cancel := context.WithCancel(context.Background()) - - var err error - if theInput != nil { - defer cancel() - conf := &flbInputConfigLoader{ptr: ptr} - cmt, err = input.FLBPluginGetCMetricsContext(ptr) - if err != nil { - return input.FLB_ERROR - } - logger = &flbInputLogger{ptr: ptr} - fbit := &Fluentbit{ - Conf: conf, - Metrics: makeMetrics(cmt), - Logger: logger, - } - - err = theInput.Init(ctx, fbit) - if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { - maxbuffered, err := strconv.Atoi(maxbuffered) - if err != nil { - maxBufferedMessages = maxbuffered - } - } - } else if theOutput != nil { - defer cancel() - conf := &flbOutputConfigLoader{ptr: ptr} - cmt, err = output.FLBPluginGetCMetricsContext(ptr) - if err != nil { - return output.FLB_ERROR - } - logger = &flbOutputLogger{ptr: ptr} - fbit := &Fluentbit{ - Conf: conf, - Metrics: makeMetrics(cmt), - Logger: logger, - } - err = theOutput.Init(ctx, fbit) - } else { - // intended to longer liveness for custom plugin - runCancel = cancel - conf := &flbCustomConfigLoader{ptr: ptr} - logger = &flbCustomLogger{ptr: ptr} - fbit := &Fluentbit{ - Conf: conf, - Metrics: nil, - Logger: logger, - } - err = theCustom.Init(ctx, fbit) - } - if err != nil { - fmt.Fprintf(os.Stderr, "init: %v\n", err) - return input.FLB_ERROR - } - - return input.FLB_OK -} - -// flbPluginReset is meant to reset the plugin between tests. -func flbPluginReset() { - theInputLock.Lock() - defer theInputLock.Unlock() - defer func() { - if ret := recover(); ret != nil { - fmt.Fprintf(os.Stderr, "Channel is already closed") - return + if currInstance == nil { + currInstance = newPluginInstance(*meta) + if setupInstanceForTesting != nil { + setupInstanceForTesting(currInstance) } - }() - - close(theChannel) - theInput = nil -} - -func testFLBPluginInputCallback() ([]byte, error) { - data := unsafe.Pointer(nil) - var csize C.size_t - - FLBPluginInputCallback(&data, &csize) - - if data == nil { - return []byte{}, nil - } - - defer C.free(data) - return C.GoBytes(data, C.int(csize)), nil -} - -// prepareOutputFlush is a testing utility. -func prepareOutputFlush(output OutputPlugin) error { - theOutput = output - FLBPluginOutputPreRun(0) - return nil -} - -// Lock used to synchronize access to theInput variable. -var theInputLock sync.Mutex - -// prepareInputCollector is meant to prepare resources for input collectors -func prepareInputCollector(multiInstance bool) { - runCtx, runCancel = context.WithCancel(context.Background()) - if !multiInstance { - theChannel = make(chan Message, maxBufferedMessages) } - theInputLock.Lock() - if multiInstance { - if theChannel == nil { - theChannel = make(chan Message, maxBufferedMessages) - } - defer theInputLock.Unlock() + if err := currInstance.init(ptr); err != nil { + fmt.Fprintf(os.Stderr, err.Error()) + return flbReturnCode(err) } - go func(theChannel chan<- Message) { - if !multiInstance { - defer theInputLock.Unlock() - } - - go func(theChannel chan<- Message) { - err := theInput.Collect(runCtx, theChannel) - if err != nil { - fmt.Fprintf(os.Stderr, "collect error: %v\n", err) - } - }(theChannel) - - <-runCtx.Done() - - log.Printf("goroutine will be stopping: name=%q\n", theName) - }(theChannel) + return input.FLB_OK } -// FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialized, the plugin invoked only once before executing the input callbacks. +// FLBPluginInputPreRun is invoked by the fluent-bit runtime after the plugin has been +// initialized using FLBPluginRegister but before executing plugin callback functions. // //export FLBPluginInputPreRun func FLBPluginInputPreRun(useHotReload C.int) int { registerWG.Wait() - prepareInputCollector(true) + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() + + if instance == nil { + fmt.Fprintf(os.Stderr, "plugin not initialized\n") + return input.FLB_ERROR + } + + if err := instance.resume(); err != nil { + fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err) + return flbReturnCode(err) + } return input.FLB_OK } @@ -279,28 +140,39 @@ func FLBPluginInputPreRun(useHotReload C.int) int { // //export FLBPluginInputPause func FLBPluginInputPause() { - if runCancel != nil { - runCancel() - runCancel = nil + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() + + if instance == nil { + panic("plugin not initialized") } - if !theInputLock.TryLock() { - return + if instance.meta.input == nil { + panic("can only pause input plugins") } - defer theInputLock.Unlock() - if theChannel != nil { - close(theChannel) - theChannel = nil + if err := instance.stop(); err != nil { + panic(err) } } // FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been -// resumeed, the plugin invoked this method and re-running state. +// resumed, the plugin invoked this method and re-running state. // //export FLBPluginInputResume func FLBPluginInputResume() { - prepareInputCollector(true) + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() + + if instance == nil { + panic("plugin not initialized") + } + + if err := instance.resume(); err != nil { + panic(err) + } } // FLBPluginOutputPreExit this method gets invoked by the fluent-bit runtime, once the plugin has been @@ -308,19 +180,16 @@ func FLBPluginInputResume() { // //export FLBPluginOutputPreExit func FLBPluginOutputPreExit() { - if runCancel != nil { - runCancel() - runCancel = nil - } + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() - if !theInputLock.TryLock() { - return + if instance == nil { + panic("plugin not initialized") } - defer theInputLock.Unlock() - if theChannel != nil { - close(theChannel) - theChannel = nil + if err := instance.outputPreExit(); err != nil { + panic(err) } } @@ -330,25 +199,20 @@ func FLBPluginOutputPreExit() { func FLBPluginOutputPreRun(useHotReload C.int) int { registerWG.Wait() - var err error - runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) - go func(runCtx context.Context) { - go func(runCtx context.Context) { - err = theOutput.Flush(runCtx, theChannel) - }(runCtx) - - <-runCtx.Done() + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() - log.Printf("goroutine will be stopping: name=%q\n", theName) - }(runCtx) + if instance == nil { + panic("plugin not initialized") + } - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return output.FLB_ERROR + if err := instance.resume(); err != nil { + fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err) + return flbReturnCode(err) } - return output.FLB_OK + return input.FLB_OK } // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been @@ -361,56 +225,20 @@ func FLBPluginOutputPreRun(useHotReload C.int) int { // //export FLBPluginInputCallback func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { - initWG.Wait() + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() - if theInput == nil { - fmt.Fprintf(os.Stderr, "no input registered\n") + if instance == nil { return input.FLB_RETRY } - buf := bytes.NewBuffer([]byte{}) - - for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_ERROR - } - - b, err := msgpack.Marshal([]any{&EventTime{msg.Time}, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "msgpack marshal: %s\n", err) - return input.FLB_ERROR - } - - buf.Grow(len(b)) - buf.Write(b) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - loop = 0 - default: - loop = 0 - } - } - - if buf.Len() > 0 { - b := buf.Bytes() - cdata := C.CBytes(b) - *data = cdata - if csize != nil { - *csize = C.size_t(len(b)) - } + err := instance.inputCallback(data, csize) + if err != nil { + fmt.Fprintf(os.Stderr, "callback error: %v\n", err) } - return input.FLB_OK + return flbReturnCode(err) } // FLBPluginInputCleanupCallback releases the memory used during the input callback @@ -426,62 +254,21 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // //export FLBPluginFlush func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { - initWG.Wait() - - if theOutput == nil { - fmt.Fprintf(os.Stderr, "no output registered\n") - return output.FLB_RETRY - } - - var err error - select { - case <-runCtx.Done(): - err = runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return output.FLB_ERROR - } - - return output.FLB_OK - default: - } + currInstanceMu.Lock() + instance := currInstance + currInstanceMu.Unlock() in := C.GoBytes(data, clength) tag := C.GoString(ctag) - if err := pluginFlush(tag, in); err != nil { - fmt.Fprintf(os.Stderr, "flush: %s\n", err) - return output.FLB_ERROR + err := instance.outputFlush(tag, in) + if err != nil { + fmt.Fprintf(os.Stderr, "plugin flush error: %v\n", err) } - - return output.FLB_OK + return flbReturnCode(err) } -func pluginFlush(tag string, b []byte) error { - dec := msgpack.NewDecoder(bytes.NewReader(b)) - for { - select { - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return fmt.Errorf("run: %w", err) - } - - return nil - default: - } - - msg, err := decodeMsg(dec, tag) - if errors.Is(err, io.EOF) { - return nil - } - - if err != nil { - return err - } - - theChannel <- msg - } +func marshalMsg(m Message) ([]byte, error) { + return msgpack.Marshal([]any{&EventTime{m.Time}, m.Record}) } // decodeMsg should be called with an already initialized decoder. @@ -536,7 +323,20 @@ func decodeMsg(dec *msgpack.Decoder, tag string) (Message, error) { // //export FLBPluginExit func FLBPluginExit() int { - return cleanup() + currInstanceMu.Lock() + defer currInstanceMu.Unlock() + + if currInstance != nil { + currInstance.stop() //nolint:errcheck + currInstance = nil + } + + if unregister := unregisterFunc.Load(); unregister != nil { + (*unregister)() + unregisterFunc.Store(nil) + } + + return input.FLB_OK } type flbInputConfigLoader struct { @@ -660,3 +460,43 @@ func makeMetrics(cmp *cmetrics.Context) Metrics { }, } } + +func testFLBPluginFlush(data []byte, tag string) int { + cdata := C.CBytes(data) + defer C.free(unsafe.Pointer(cdata)) + + ctag := C.CString(tag) + defer C.free(unsafe.Pointer(ctag)) + + return FLBPluginFlush(cdata, C.int(len(data)), ctag) +} + +// testInputCallback invokes inputCallback and returns the bytes outputted from it. +// This cannot be in the test file since test files can't use CGO. +func testInputCallback(inst *pluginInstance) ([]byte, error) { + data := unsafe.Pointer(nil) + var csize C.size_t + + err := inst.inputCallback(&data, &csize) + + if data == nil { + return []byte{}, err + } + + defer C.free(data) + return C.GoBytes(data, C.int(csize)), err +} + +func testFLBPluginInputCallback() ([]byte, int) { + data := unsafe.Pointer(nil) + var csize C.size_t + + retVal := FLBPluginInputCallback(&data, &csize) + + if data == nil { + return []byte{}, retVal + } + + defer C.free(data) + return C.GoBytes(data, C.int(csize)), retVal +} diff --git a/cshared_test.go b/cshared_test.go index 50e0e6c..127a0cc 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -13,69 +13,457 @@ import ( "time" "unsafe" - "github.com/alecthomas/assert/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/vmihailenco/msgpack/v5" + "github.com/calyptia/cmetrics-go" + "github.com/calyptia/plugin/input" "github.com/calyptia/plugin/metric" ) -type testPluginInputCallbackCtrlC struct{} +func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance { + inst := pluginInstanceWithFakes(newPluginInstance(pluginMetadata{ + name: "test-plugin", + desc: "test plugin", + input: input, + })) + t.Cleanup(func() { + stopErr := make(chan error) + go func() { + stopErr <- inst.stop() + }() -func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { + select { + case err := <-stopErr: + assert.NoError(t, err) + return + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for instance to stop") + } + }) + + return inst +} + +func newTestOutputInstance(t testing.TB, output OutputPlugin) *pluginInstance { + inst := pluginInstanceWithFakes(newPluginInstance(pluginMetadata{ + name: "test-plugin", + desc: "test plugin", + output: output, + })) + t.Cleanup(func() { + stopErr := make(chan error, 2) + go func() { + stopErr <- inst.stop() + }() + + select { + case err := <-stopErr: + assert.NoError(t, err) + return + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for instance to stop") + } + }) + + return inst +} + +func pluginInstanceWithFakes(inst *pluginInstance) *pluginInstance { + return inst. + withCMetricsContextProvider(func(_ unsafe.Pointer) (*cmetrics.Context, error) { + return cmetrics.NewContext() + }). + withConfigLoaderProvider(func(_ unsafe.Pointer) ConfigLoader { + return fakeConfigLoader{} + }) +} + +func TestInputCallbackLifecycle(t *testing.T) { + plugin := newTestInputPlugin() + inst := newTestInputInstance(t, plugin) + + // Initialization + require.NoError(t, inst.init(nil)) + require.Equal(t, int64(1), plugin.initCount.Load()) + + require.ErrorContains(t, inst.init(nil), `unexpected plugin state "initialized"`) + require.Equal(t, int64(1), plugin.initCount.Load(), "initialization should only run once") + + // Early attempt to callback + _, err := testInputCallback(inst) + require.ErrorIs(t, err, retryableError{}) + require.ErrorContains(t, err, `unexpected plugin state "initialized"`) + + // Pre-run + require.NoError(t, inst.resume()) + require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond, + "collect background loop should have started running") + m1 := testMessage(map[string]any{"name": "m1"}) + m2 := testMessage(map[string]any{"name": "m2"}) + plugin.enqueue(m1)() + plugin.enqueue(m2)() + + require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`) + + // Callback + callbackBytes, err := testInputCallback(inst) + require.NoError(t, err) + require.Equal(t, []Message{m1, m2}, decodeMessages(t, callbackBytes)) + require.True(t, plugin.collectRunning.Load()) + + // Stop (ensuring collect loop exits cleanly) + plugin.onCollectDone = func(ch chan<- Message) { + // Keep enqueueing after stop to ensure the plugin message channel wasn't closed early + time.Sleep(10 * time.Millisecond) + ch <- testMessage(map[string]any{"name": "m3"}) + } + require.NoError(t, inst.stop()) + require.False(t, plugin.collectRunning.Load()) + require.NoError(t, inst.stop(), "stop should be idempotent") + + callbackBytes, err = testInputCallback(inst) + require.ErrorIs(t, err, retryableError{}) + require.ErrorContains(t, err, `unexpected plugin state "initialized"`) + assert.Empty(t, callbackBytes) + + // Resume stopped pipeline + require.NoError(t, inst.resume()) + require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`) + callbackBytes, err = testInputCallback(inst) + require.NoError(t, err) + assert.Empty(t, callbackBytes, "m3 message from earlier not dequeued") + require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond, + "collect background loop should have started running") + m4 := testMessage(map[string]any{"name": "m4"}) + plugin.enqueue(m4)() + callbackBytes, err = testInputCallback(inst) + require.NoError(t, err) + require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes)) + + // Stop again + require.NoError(t, inst.stop()) + require.False(t, plugin.collectRunning.Load()) +} + +// TestInputGlobalCallbacks is a simplified variant of TestInputCallbackLifecycle that uses the +// C callback functions invoked by fluent-bit. +func TestInputGlobalCallbacks(t *testing.T) { + t.Cleanup(resetGlobalState) + + plugin := newTestInputPlugin() + + // Registration + RegisterInput("test-name", "test-desc", plugin) + FLBPluginRegister(unsafe.Pointer(&input.FLBPluginProxyDef{})) + + require.Equal(t, &pluginMetadata{ + name: "test-name", + desc: "test-desc", + input: plugin, + }, pluginMeta.Load()) + + // Initialization + setupInstanceForTesting = func(inst *pluginInstance) { + pluginInstanceWithFakes(inst) + } + FLBPluginInit(nil) + require.Equal(t, int64(1), plugin.initCount.Load()) + + currInstanceMu.Lock() + inst := currInstance + currInstanceMu.Unlock() + require.NotNil(t, inst) + + // Pre-run + FLBPluginInputPreRun(0) + require.Eventually(t, plugin.collectRunning.Load, time.Second, time.Millisecond, + "collect background loop should have started running") + m1 := testMessage(map[string]any{"name": "m1"}) + plugin.enqueue(m1)() + + // Callback + callbackBytes, callbackResp := testFLBPluginInputCallback() + require.Equal(t, input.FLB_OK, callbackResp) + require.Equal(t, []Message{m1}, decodeMessages(t, callbackBytes)) + require.True(t, plugin.collectRunning.Load()) + + // Pause + FLBPluginInputPause() + require.False(t, plugin.collectRunning.Load()) + FLBPluginInputPause() // Idempotent + + callbackBytes, callbackResp = testFLBPluginInputCallback() + require.Equal(t, input.FLB_RETRY, callbackResp) + assert.Empty(t, callbackBytes) + + // Resume stopped pipeline + FLBPluginInputResume() + callbackBytes, callbackResp = testFLBPluginInputCallback() + require.Equal(t, input.FLB_OK, callbackResp) + m4 := testMessage(map[string]any{"name": "m4"}) + plugin.enqueue(m4)() + callbackBytes, callbackResp = testFLBPluginInputCallback() + require.Equal(t, input.FLB_OK, callbackResp) + require.Equal(t, []Message{m4}, decodeMessages(t, callbackBytes)) + + // Stop again + FLBPluginExit() + require.False(t, plugin.collectRunning.Load()) + require.Equal(t, input.FLB_OK, FLBPluginExit()) // Idempotent +} + +// TestOutputCallbackLifecycle is a simplified variant of TestOutputCallbackLifecycle that uses the +// C callback functions invoked by fluent-bit. +func TestOutputCallbackLifecycle(t *testing.T) { + plugin := newTestOutputPlugin() + inst := newTestOutputInstance(t, plugin) + + // Initialization + require.NoError(t, inst.init(nil)) + require.Equal(t, int64(1), plugin.initCount.Load()) + + require.ErrorContains(t, inst.init(nil), `unexpected plugin state "initialized"`) + require.Equal(t, int64(1), plugin.initCount.Load(), "initialization should only run once") + + // Early attempt to flush + require.ErrorContains(t, inst.outputFlush("", nil), `invalid plugin state "initialized"`) + + // Pre-run + require.NoError(t, inst.resume()) + require.Eventually(t, plugin.flushRunning.Load, time.Second, time.Millisecond, + "flush background loop should have started running") + + // Flush + m1 := testMessage(map[string]any{"name": "m1"}) + m2 := testMessage(map[string]any{"name": "m2"}) + require.NoError(t, inst.outputFlush("", mustMarshalMessages(t, []Message{m1, m2}))) + require.Eventually(t, func() bool { return len(plugin.flushedMessages) == 2 }, time.Second, time.Millisecond) + require.Equal(t, []Message{m1, m2}, []Message{<-plugin.flushedMessages, <-plugin.flushedMessages}) + + m3 := testMessage(map[string]any{"name": "m3"}) + require.NoError(t, inst.outputFlush("", mustMarshalMessages(t, []Message{m3}))) + require.Eventually(t, func() bool { return len(plugin.flushedMessages) == 1 }, time.Second, time.Millisecond) + require.Equal(t, []Message{m3}, []Message{<-plugin.flushedMessages}) + + require.ErrorContains(t, inst.resume(), `invalid plugin state "runnable"`) + + // Pre-exit + require.NoError(t, inst.outputPreExit()) + require.False(t, plugin.flushRunning.Load()) + require.NoError(t, inst.outputPreExit(), "outputPreExit should be idempotent") + + // Exit + require.NoError(t, inst.stop()) + require.NoError(t, inst.stop(), "stop should be idempotent") +} + +func TestOutputGlobalCallbacks(t *testing.T) { + t.Cleanup(resetGlobalState) + + plugin := newTestOutputPlugin() + + // Registration + RegisterOutput("test-name", "test-desc", plugin) + FLBPluginRegister(unsafe.Pointer(&input.FLBPluginProxyDef{})) + + require.Equal(t, &pluginMetadata{ + name: "test-name", + desc: "test-desc", + output: plugin, + }, pluginMeta.Load()) + + // Initialization + setupInstanceForTesting = func(inst *pluginInstance) { + pluginInstanceWithFakes(inst) + } + FLBPluginInit(nil) + require.Equal(t, int64(1), plugin.initCount.Load()) + + currInstanceMu.Lock() + inst := currInstance + currInstanceMu.Unlock() + require.NotNil(t, inst) + + // Pre-run + FLBPluginOutputPreRun(0) + require.Eventually(t, plugin.flushRunning.Load, time.Second, time.Millisecond, + "flush background loop should have started running") + + // Flush + m1 := testMessage(map[string]any{"name": "m1"}) + m2 := testMessage(map[string]any{"name": "m2"}) + require.Equal(t, input.FLB_OK, testFLBPluginFlush(mustMarshalMessages(t, []Message{m1, m2}), "")) + require.Eventually(t, func() bool { return len(plugin.flushedMessages) == 2 }, time.Second, time.Millisecond) + require.Equal(t, []Message{m1, m2}, []Message{<-plugin.flushedMessages, <-plugin.flushedMessages}) + + // Pre-exit + FLBPluginOutputPreExit() + require.False(t, plugin.flushRunning.Load()) + FLBPluginOutputPreExit() // Idempotent + + // Exit + FLBPluginExit() + require.Equal(t, input.FLB_OK, FLBPluginExit()) // Idempotent +} + +// testMessage returns a Message with the given record map and current timestamp. +func testMessage(record map[string]any) Message { + tag := "" + return Message{ + Time: time.Now().UTC(), + Record: record, + tag: &tag, + } +} + +func newTestInputPlugin() *testInputPlugin { + return &testInputPlugin{ + inputs: make(chan *collectMessage), + } +} + +// testInputPlugin is an InputPlugin used to help test plugin callback and concurrency behavior. +type testInputPlugin struct { + initCount atomic.Int64 // Count of calls to Init method. + collectRunning atomic.Bool // Indicates whether the Collect method is running. + onCollectDone func(ch chan<- Message) // Settable callback invoked when Collect is about to return. + + inputs chan *collectMessage +} + +var _ InputPlugin = (*testInputPlugin)(nil) + +func (t *testInputPlugin) Init(ctx context.Context, fbit *Fluentbit) error { + t.initCount.Add(1) return nil } -func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { +func (t *testInputPlugin) Collect(ctx context.Context, ch chan<- Message) error { + t.collectRunning.Store(true) + defer t.collectRunning.Store(false) + + for { + select { + case m := <-t.inputs: + ch <- m.msg + m.collectedWG.Done() + case <-ctx.Done(): + if t.onCollectDone != nil { + t.onCollectDone(ch) + } + return nil + } + } +} + +// enqueue the message m to be processed by Collect. When called, the returned function +// blocks until a running Collect puts m on the plugin's input channel. +func (t *testInputPlugin) enqueue(m Message) (waitForCollected func()) { + cm := &collectMessage{msg: m} + cm.collectedWG.Add(1) + t.inputs <- cm + + return cm.collectedWG.Wait +} + +// collectMessage is a helper wrapper used by testInputPlugin that wraps a Message. +type collectMessage struct { + msg Message + collectedWG sync.WaitGroup // Decremented to 0 when testInputPlugin Collect processes the message. +} + +func decodeMessages(t testing.TB, msgpackBytes []byte) []Message { + var messages []Message + + dec := msgpack.NewDecoder(bytes.NewReader(msgpackBytes)) + for { + msg, err := decodeMsg(dec, "") + if errors.Is(err, io.EOF) { + return messages + } + require.NoError(t, err) + + messages = append(messages, msg) + } +} + +func newTestOutputPlugin() *testOutputPlugin { + return &testOutputPlugin{ + flushedMessages: make(chan Message, 100), + } +} + +type testOutputPlugin struct { + initCount atomic.Int64 // Count of calls to Init method. + flushRunning atomic.Bool // Indicates whether the Flush method is running. + flushedMessages chan Message +} + +var _ OutputPlugin = (*testOutputPlugin)(nil) + +func (t *testOutputPlugin) Init(ctx context.Context, fbit *Fluentbit) error { + t.initCount.Add(1) return nil } -func init() { - registerWG.Done() +func (t *testOutputPlugin) Flush(ctx context.Context, ch <-chan Message) error { + t.flushRunning.Store(true) + defer t.flushRunning.Store(false) + + for { + select { + case m := <-ch: + t.flushedMessages <- m + case <-ctx.Done(): + return nil + } + } } -func TestMain(m *testing.M) { - defer flbPluginReset() - m.Run() +type testPluginInputCallbackCtrlC struct{} + +func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { + return nil } -func TestInputCallbackCtrlC(t *testing.T) { - theInputLock.Lock() - theInput = testPluginInputCallbackCtrlC{} - theInputLock.Unlock() +func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { + return nil +} - cdone := make(chan bool) - timeout := time.NewTimer(1 * time.Second) - defer timeout.Stop() +func TestInputCallbackCtrlC(t *testing.T) { + inst := newTestInputInstance(t, testPluginInputCallbackCtrlC{}) - ptr := unsafe.Pointer(nil) + require.NoError(t, inst.init(nil)) + require.NoError(t, inst.resume()) - // prepare channel for input explicitly. - prepareInputCollector(false) + cdone := make(chan struct{}) + timeout := time.After(1 * time.Second) go func() { - FLBPluginInputCallback(&ptr, nil) - cdone <- true + testInputCallback(inst) + close(cdone) }() select { case <-cdone: - timeout.Stop() - runCancel() - case <-timeout.C: + inst.runCancel() + case <-timeout: t.Fatalf("timed out ...") } } -var testPluginInputCallbackDangleFuncs atomic.Int64 - -type testPluginInputCallbackDangle struct{} +type testPluginInputCallbackDangle struct { + calls atomic.Int64 +} -func (t testPluginInputCallbackDangle) Init(ctx context.Context, fbit *Fluentbit) error { +func (t *testPluginInputCallbackDangle) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Message) error { - testPluginInputCallbackDangleFuncs.Add(1) +func (t *testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Message) error { + t.calls.Add(1) ch <- Message{ Time: time.Now(), Record: map[string]string{ @@ -89,56 +477,50 @@ func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Me // Collect multiple times. This is inline with backward-compatible // behavior. func TestInputCallbackDangle(t *testing.T) { - theInputLock.Lock() - theInput = testPluginInputCallbackDangle{} - theInputLock.Unlock() + input := &testPluginInputCallbackDangle{} + inst := newTestInputInstance(t, input) - cdone := make(chan bool) + cdone := make(chan struct{}) ptr := unsafe.Pointer(nil) // prepare channel for input explicitly. - prepareInputCollector(false) + require.NoError(t, inst.init(ptr)) + require.NoError(t, inst.resume()) go func() { - t := time.NewTicker(collectInterval) - defer t.Stop() + ticker := time.NewTicker(collectInterval) + defer ticker.Stop() - FLBPluginInputCallback(&ptr, nil) + testInputCallback(inst) for { select { - case <-t.C: - FLBPluginInputCallback(&ptr, nil) + case <-ticker.C: + testInputCallback(inst) case <-cdone: return } } }() - timeout := time.NewTimer(5 * time.Second) + time.Sleep(5 * time.Second) - <-timeout.C - timeout.Stop() - runCancel() - cdone <- true + inst.runCancel() + close(cdone) - // Test the assumption that only a single goroutine is - // ingesting records. - if testPluginInputCallbackDangleFuncs.Load() != 1 { - t.Fatalf("Too many callbacks: %d", - testPluginInputCallbackDangleFuncs.Load()) - } + // Test the assumption that only a single goroutine is ingesting records. + require.EqualValues(t, 1, input.calls.Load()) } -var testPluginInputCallbackInfiniteFuncs atomic.Int64 - -type testPluginInputCallbackInfinite struct{} +type testPluginInputCallbackInfinite struct { + calls atomic.Int64 +} -func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { +func (t *testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { - testPluginInputCallbackInfiniteFuncs.Add(1) +func (t *testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { + t.calls.Add(1) for { select { default: @@ -159,27 +541,25 @@ func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- // TestInputCallbackInfinite is a test for the main method most plugins // use where they do not return from the first invocation of collect. func TestInputCallbackInfinite(t *testing.T) { - theInputLock.Lock() - theInput = testPluginInputCallbackInfinite{} - theInputLock.Unlock() + input := &testPluginInputCallbackInfinite{} + inst := newTestInputInstance(t, input) - cdone := make(chan bool) - cshutdown := make(chan bool) - ptr := unsafe.Pointer(nil) + cdone := make(chan struct{}) + cshutdown := make(chan struct{}) // prepare channel for input explicitly. - prepareInputCollector(false) + require.NoError(t, inst.init(nil)) + require.NoError(t, inst.resume()) go func() { - t := time.NewTicker(collectInterval) - defer t.Stop() + ticker := time.NewTicker(collectInterval) + defer ticker.Stop() for { select { - case <-t.C: - FLBPluginInputCallback(&ptr, nil) - if ptr != nil { - cdone <- true + case <-ticker.C: + if out, _ := testInputCallback(inst); len(out) > 0 { + close(cdone) return } case <-cshutdown: @@ -188,25 +568,19 @@ func TestInputCallbackInfinite(t *testing.T) { } }() - timeout := time.NewTimer(10 * time.Second) - defer timeout.Stop() + timeout := time.After(10 * time.Second) select { case <-cdone: - runCancel() + inst.runCancel() // make sure Collect is not being invoked after Done(). time.Sleep(collectInterval * 10) - // Test the assumption that only a single goroutine is - // ingesting records. - if testPluginInputCallbackInfiniteFuncs.Load() != 1 { - t.Fatalf("Too many callbacks: %d", - testPluginInputCallbackInfiniteFuncs.Load()) - } - return - case <-timeout.C: - runCancel() - cshutdown <- true - // This test seems to fail some what frequently because the Collect goroutine + // Test the assumption that only a single goroutine is ingesting records. + require.EqualValues(t, 1, input.calls.Load()) + case <-timeout: + inst.runCancel() + close(cshutdown) + // This test seems to fail somewhat frequently because the Collect goroutine // inside cshared is never being scheduled. t.Fatalf("timed out ...") } @@ -231,7 +605,6 @@ func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- M }, } } - tick.Reset(time.Second * 1) case <-ctx.Done(): return nil } @@ -241,34 +614,34 @@ func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- M // TestInputCallbackInfiniteLatency is a test of the latency between // messages. func TestInputCallbackLatency(t *testing.T) { - theInputLock.Lock() - theInput = testPluginInputCallbackLatency{} - theInputLock.Unlock() + input := &testPluginInputCallbackLatency{} + inst := newTestInputInstance(t, input) - cdone := make(chan bool) - cstarted := make(chan bool) + cdone := make(chan struct{}) + cstarted := make(chan struct{}) cmsg := make(chan []byte) // prepare channel for input explicitly. - prepareInputCollector(false) + require.NoError(t, inst.init(nil)) + require.NoError(t, inst.resume()) go func() { - t := time.NewTicker(collectInterval) - defer t.Stop() + ticker := time.NewTicker(collectInterval) + defer ticker.Stop() - buf, _ := testFLBPluginInputCallback() + buf, _ := testInputCallback(inst) if len(buf) > 0 { cmsg <- buf } - cstarted <- true + close(cstarted) for { select { case <-cdone: - fmt.Println("---- collect done") + t.Log("---- collect done") return - case <-t.C: - buf, _ := testFLBPluginInputCallback() + case <-ticker.C: + buf, _ := testInputCallback(inst) if len(buf) > 0 { cmsg <- buf } @@ -277,10 +650,8 @@ func TestInputCallbackLatency(t *testing.T) { }() <-cstarted - fmt.Println("---- started") - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() - + t.Log("---- started") + timeout := time.After(5 * time.Second) msgs := 0 for { @@ -304,9 +675,9 @@ func TestInputCallbackLatency(t *testing.T) { float64(time.Since(msg.Time)/time.Millisecond)) } } - case <-timeout.C: - runCancel() - cdone <- true + case <-timeout: + inst.runCancel() + close(cdone) if msgs < 128 { t.Fatalf("too few messages: %d", msgs) @@ -357,49 +728,49 @@ func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch cha // TestInputCallbackInfiniteConcurrent is meant to make sure we do not // break anythin with respect to concurrent ingest. func TestInputCallbackInfiniteConcurrent(t *testing.T) { - theInputLock.Lock() - theInput = testInputCallbackInfiniteConcurrent{} - theInputLock.Unlock() + input := &testInputCallbackInfiniteConcurrent{} + inst := newTestInputInstance(t, input) - cdone := make(chan bool) - cstarted := make(chan bool) + cdone := make(chan struct{}) + cstarted := make(chan struct{}) ptr := unsafe.Pointer(nil) concurrentWait.Add(64) // prepare channel for input explicitly. - prepareInputCollector(false) + require.NoError(t, inst.init(ptr)) + require.NoError(t, inst.resume()) - go func(cstarted chan bool) { + go func() { ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() - FLBPluginInputCallback(&ptr, nil) - cstarted <- true + testInputCallback(inst) + close(cstarted) for { select { case <-ticker.C: - FLBPluginInputCallback(&ptr, nil) - case <-runCtx.Done(): + testInputCallback(inst) + case <-inst.runCtx.Done(): return } } - }(cstarted) + }() go func() { concurrentWait.Wait() - cdone <- true + close(cdone) }() <-cstarted - timeout := time.NewTimer(10 * time.Second) + timeout := time.After(10 * time.Second) select { case <-cdone: - runCancel() - case <-timeout.C: - runCancel() + inst.runCancel() + case <-timeout: + inst.runCancel() // this test seems to timeout semi-frequently... need to get to // the bottom of it... t.Fatalf("---- timed out: %d/%d ...", @@ -560,28 +931,28 @@ func TestOutputFlush(t *testing.T) { now := time.Now().UTC() - out := testOutputHandlerReflect{ + out := &testOutputHandlerReflect{ Test: t, Check: func(t *testing.T, msg Message) error { defer wg.Done() - assert.Equal(t, now, msg.Time) - - record := assertType[map[string]any](t, msg.Record) - - foo := assertType[string](t, record["foo"]) - assert.Equal(t, "bar", foo) - - bar := assertType[int8](t, record["bar"]) - assert.Equal(t, 3, bar) - - foobar := assertType[float64](t, record["foobar"]) - assert.Equal(t, 1.337, foobar) + expectTag := "foobar" + assert.Equal(t, Message{ + Time: now, + Record: map[string]any{ + "foo": "bar", + "bar": int8(3), + "foobar": 1.337, + }, + tag: &expectTag, + }, msg) return nil }, } - _ = prepareOutputFlush(&out) + inst := newTestOutputInstance(t, out) + require.NoError(t, inst.init(nil)) + require.NoError(t, inst.resume()) msg := Message{ Time: now, @@ -599,17 +970,55 @@ func TestOutputFlush(t *testing.T) { assert.NoError(t, err) wg.Add(1) - assert.NoError(t, pluginFlush("foobar", b)) + assert.NoError(t, inst.outputFlush("foobar", b)) wg.Wait() } -func assertType[T any](tb testing.TB, got any) T { - tb.Helper() +type fakeConfigLoader map[string]string + +var _ ConfigLoader = (fakeConfigLoader)(nil) + +func (f fakeConfigLoader) String(key string) string { + return f[key] +} + +func mustMarshalMessages(t testing.TB, msgs []Message) []byte { + var buf bytes.Buffer + for _, msg := range msgs { + b, err := marshalMsg(msg) + require.NoError(t, err) + buf.Write(b) + } + return buf.Bytes() +} + +// resetGlobalState resets global plugin state. Intended for use by tests that call stateless FLB* functions. +func resetGlobalState() { + cleanupDone := make(chan struct{}) + go func() { + defer close(cleanupDone) + + setupInstanceForTesting = nil - var want T + currInstanceMu.Lock() + defer currInstanceMu.Unlock() - v, ok := got.(T) - assert.True(tb, ok, "Expected types to be equal:\n-%T\n+%T", want, got) + if currInstance != nil { + if currInstance.meta.output != nil { + FLBPluginOutputPreExit() + } + FLBPluginExit() + } + if pluginMeta.Load() != nil { + registerWG.Add(1) + pluginMeta.Store(nil) + } + }() - return v + // Ensure cleanup finished + select { + case <-cleanupDone: + case <-time.After(2 * time.Second): + panic("timed out cleaning up global plugin instance") + } } diff --git a/plugin.go b/plugin.go index 5260fed..bb10ad0 100644 --- a/plugin.go +++ b/plugin.go @@ -2,39 +2,63 @@ // also, the interfaces for input and output plugins. package plugin +import "C" + import ( + "bytes" "context" + "errors" + "fmt" + "io" + "log" + "os" + "runtime" + "strconv" "sync" "sync/atomic" "time" + "unsafe" - "github.com/calyptia/plugin/metric" -) + "github.com/vmihailenco/msgpack/v5" -// atomicUint32 is used to atomically check if the plugin has been registered. -var atomicUint32 uint32 - -var ( - theName string - theDesc string - theInput InputPlugin - theOutput OutputPlugin - theCustom CustomPlugin + "github.com/calyptia/cmetrics-go" + "github.com/calyptia/plugin/input" + "github.com/calyptia/plugin/metric" ) var ( + // registerWG blocks until FLBPluginRegister has successfully run. registerWG sync.WaitGroup - initWG sync.WaitGroup - runCtx context.Context - runCancel context.CancelFunc - theChannel chan Message + + // Current plugin metadata. This is nil if the plugin has not been registered yet. + // This is set by the RegisterInput, RegisterOutput, or RegisterCustom functions. + pluginMeta atomic.Pointer[pluginMetadata] + unregisterFunc atomic.Pointer[func()] + + // currInstanceMu guards access to currInstance. + currInstanceMu sync.Mutex + // Current instance of the plugin, used by functions called from fluent-bit like FLBPluginInit. + currInstance *pluginInstance + + // setupInstanceForTesting is only for unit testing, and is used to set up newly created + // global pluginInstance when testing calls to FLBPluginInit. + setupInstanceForTesting func(*pluginInstance) ) func init() { + // Require FLBPluginRegister to be called before any other fluent-bit-facing functions execute. registerWG.Add(1) - theChannel = nil } +type instanceState string + +const ( + instanceStateCreated instanceState = "" + instanceStateInitialized instanceState = "initialized" + instanceStateRunnable instanceState = "runnable" + instanceStatePreExit instanceState = "preExit" +) + type Fluentbit struct { Conf ConfigLoader Metrics Metrics @@ -94,39 +118,472 @@ func (m Message) Tag() string { return *m.tag } -// mustOnce allows to be called only once otherwise it panics. -// This is used to register a single plugin per file. -func mustOnce() { - if atomic.LoadUint32(&atomicUint32) == 1 { - panic("plugin already registered") - } - - atomic.StoreUint32(&atomicUint32, 1) -} - // RegisterInput plugin. // This function must be called only once per file. func RegisterInput(name, desc string, in InputPlugin) { - mustOnce() - theName = name - theDesc = desc - theInput = in + if !pluginMeta.CompareAndSwap(nil, &pluginMetadata{ + name: name, + desc: desc, + input: in, + }) { + panic("plugin already registered") + } } // RegisterOutput plugin. // This function must be called only once per file. func RegisterOutput(name, desc string, out OutputPlugin) { - mustOnce() - theName = name - theDesc = desc - theOutput = out + if !pluginMeta.CompareAndSwap(nil, &pluginMetadata{ + name: name, + desc: desc, + output: out, + }) { + panic("plugin already registered") + } } // RegisterCustom plugin. // This function must be called only once per file. func RegisterCustom(name, desc string, custom CustomPlugin) { - mustOnce() - theName = name - theDesc = desc - theCustom = custom + if !pluginMeta.CompareAndSwap(nil, &pluginMetadata{ + name: name, + desc: desc, + custom: custom, + }) { + panic("plugin already registered") + } +} + +// pluginMeta describes a plugin and exposes hooks into its implementation. +type pluginMetadata struct { + name string + desc string + + // Exactly one of the following will be set: + input InputPlugin + output OutputPlugin + custom CustomPlugin +} + +type cmetricsContextProvider func(plugin unsafe.Pointer) (*cmetrics.Context, error) +type configLoaderProvider func(plugin unsafe.Pointer) ConfigLoader + +func newPluginInstance(meta pluginMetadata) *pluginInstance { + return &pluginInstance{ + meta: meta, + cmetricsCtxProvider: input.FLBPluginGetCMetricsContext, + configLoaderProvider: func(ptr unsafe.Pointer) ConfigLoader { + if meta.input != nil { + return &flbInputConfigLoader{ptr: ptr} + } else if meta.output != nil { + return &flbOutputConfigLoader{ptr: ptr} + } else if meta.custom != nil { + return &flbCustomConfigLoader{ptr: ptr} + } + return nil + }, + state: instanceStateCreated, + maxBufferedMessages: defaultMaxBufferedMessages, + } +} + +// pluginInstance is an instance of a plugin. +type pluginInstance struct { + meta pluginMetadata + cmetricsCtxProvider cmetricsContextProvider + configLoaderProvider configLoaderProvider + runningWG sync.WaitGroup // Number of running preRun and callback methods. + + // mu protects all members below. + // It is generally held during state checks and transitions but not during long-running callbacks. + mu sync.Mutex + state instanceState + runCtx context.Context + runCancel context.CancelFunc + msgChannel chan Message + maxBufferedMessages int +} + +// withCMetricsContextProvider overrides the cmetricsContextProvider. +// This must be called immediately after creating the instance. +func (p *pluginInstance) withCMetricsContextProvider(provider cmetricsContextProvider) *pluginInstance { + p.cmetricsCtxProvider = provider + return p +} + +// withConfigLoaderProvider overrides the configLoaderProvider. +// This must be called immediately after creating the instance. +func (p *pluginInstance) withConfigLoaderProvider(provider configLoaderProvider) *pluginInstance { + p.configLoaderProvider = provider + return p +} + +// init initializes a newly created plugin instance. +// This returns an error if the plugin if not in a new state or cannot initialize. +// For input and output plugins, this moves the plugin to an initialized state unless there is an init error. +// For custom plugins, this moves the plugin to a runnable state unless there is an init error. +func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.state != instanceStateCreated { + return fmt.Errorf("unexpected plugin state %q", p.state) + } + + newState := instanceStateInitialized + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if initErr != nil { + cancel() + p.runCtx = nil + p.runCancel = nil + + return + } + p.state = newState + }() + + if p.meta.input != nil { + defer cancel() + cmt, err := p.cmetricsCtxProvider(ptr) + if err != nil { + return err + } + + cfgLoader := p.configLoaderProvider(ptr) + fbit := &Fluentbit{ + Conf: cfgLoader, + Metrics: makeMetrics(cmt), + Logger: &flbInputLogger{ptr: ptr}, + } + + if maxBufferedStr := cfgLoader.String("go.MaxBufferedMessages"); maxBufferedStr != "" { + maxBuffered, err := strconv.Atoi(maxBufferedStr) + if err != nil { + return fmt.Errorf("go.MaxBufferedMessages must be an integer, got %q", maxBufferedStr) + } + + p.maxBufferedMessages = maxBuffered + } + + if err := p.meta.input.Init(ctx, fbit); err != nil { + return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err) + } + } else if p.meta.output != nil { + defer cancel() + cmt, err := p.cmetricsCtxProvider(ptr) + if err != nil { + return err + } + + fbit := &Fluentbit{ + Conf: p.configLoaderProvider(ptr), + Metrics: makeMetrics(cmt), + Logger: &flbOutputLogger{ptr: ptr}, + } + + if err := p.meta.output.Init(ctx, fbit); err != nil { + return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err) + } + } else { + // Custom plugins don't have preInit functions that set context, so they are immediately runnable + p.runCtx, p.runCancel = ctx, cancel + p.state = instanceStateRunnable + + fbit := &Fluentbit{ + Conf: p.configLoaderProvider(ptr), + Metrics: nil, + Logger: &flbOutputLogger{ptr: ptr}, + } + + if err := p.meta.custom.Init(ctx, fbit); err != nil { + return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err) + } + } + + return nil +} + +// inputPreRun transitions an initialized input plugin into runnable state. +func (p *pluginInstance) inputPreRunWithLock() error { + // Only input plugins have a pre-run step + if p.meta.input == nil { + return nil + } + + if p.state != instanceStateInitialized { + return fmt.Errorf("invalid plugin state %q", p.state) + } + + runCtx, runCancel := context.WithCancel(context.Background()) + p.runCtx = runCtx + p.runCancel = runCancel + p.msgChannel = make(chan Message, p.maxBufferedMessages) + p.state = instanceStateRunnable + + p.runningWG.Add(1) + go func() { + defer p.runningWG.Done() + + err := p.meta.input.Collect(runCtx, p.msgChannel) + if err != nil { + fmt.Fprintf(os.Stderr, "collect error: %v\n", err) + } + }() + + go func() { + <-runCtx.Done() + + log.Printf("goroutine will be stopping: name=%q\n", p.meta.name) + }() + + return nil +} + +// outputPreRun transitions an output plugin into runnable state. +func (p *pluginInstance) outputPreRunWithLock() error { + // Only input plugins have a pre-run step + if p.meta.output == nil { + return fmt.Errorf("plugin is not an output plugin") + } + + if p.state != instanceStateInitialized { + return fmt.Errorf("invalid plugin state %q", p.state) + } + + p.runCtx, p.runCancel = context.WithCancel(context.Background()) + p.msgChannel = make(chan Message) + p.state = instanceStateRunnable + + p.runningWG.Add(1) + go func() { + defer p.runningWG.Done() + if err := p.meta.output.Flush(p.runCtx, p.msgChannel); err != nil { + fmt.Fprintf(os.Stderr, "FLBPluginOutputPreRun error: %v\n", err) + } + }() + + go func() { + <-p.runCtx.Done() + + log.Printf("goroutine will be stopping: name=%q\n", p.meta.name) + }() + + return nil +} + +// inputCallback consumes up to maxBufferedMessages message from the plugin's message channel, +// returning early if the plugin is shutdown. +// Consumed messages are marshaled into msgpack bytes and the contents and length of contents +// set in the respective data and csize input variables. +func (p *pluginInstance) inputCallback(data *unsafe.Pointer, csize *C.size_t) error { + if p.meta.input == nil { + return retryableError{fmt.Errorf("no input registered")} + } + + p.mu.Lock() + if p.state != instanceStateRunnable { + p.mu.Unlock() + return retryableError{fmt.Errorf("unexpected plugin state %q", p.state)} + } + + p.runningWG.Add(1) + defer p.runningWG.Done() + p.mu.Unlock() + + buf := bytes.NewBuffer([]byte{}) + + for loop := min(len(p.msgChannel), p.maxBufferedMessages); loop > 0; loop-- { + select { + case msg, ok := <-p.msgChannel: + if !ok { + return fmt.Errorf("cannot consume from message channel") + } + + b, err := marshalMsg(msg) + if err != nil { + return fmt.Errorf("msgpack marshal error: %w", err) + } + + buf.Grow(len(b)) + buf.Write(b) + case <-p.runCtx.Done(): + err := p.runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + loop = 0 // Exit the for loop on plugin shutdown + default: + loop = 0 // Exit the for loop if there are no messages to consume + } + } + + if buf.Len() > 0 { + b := buf.Bytes() + cdata := C.CBytes(b) + *data = cdata + if csize != nil { + *csize = C.size_t(len(b)) + } + } + + return nil +} + +// outputFlush writes the messages in msgpackBytes to the plugin's message channel, +// returning early if the plugin is shutdown. +func (p *pluginInstance) outputFlush(tag string, msgpackBytes []byte) error { + if p.meta.output == nil { + return retryableError{fmt.Errorf("no output plugin registered")} + } + + p.mu.Lock() + if p.state != instanceStateRunnable { + p.mu.Unlock() + return fmt.Errorf("invalid plugin state %q", p.state) + } + + p.runningWG.Add(1) + defer p.runningWG.Done() + p.mu.Unlock() + + dec := msgpack.NewDecoder(bytes.NewReader(msgpackBytes)) + for { + select { + case <-p.runCtx.Done(): + err := p.runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("run: %w", err) + } + + return nil + default: + } + + msg, err := decodeMsg(dec, tag) + if errors.Is(err, io.EOF) { + return nil + } + + if err != nil { + return err + } + + p.msgChannel <- msg + } +} + +// stop stops the plugin, freeing resources and returning it to initialized state. +// Calling stop will signal callbacks to return via the plugin's context, then wait for +// callbacks to finish and return before freeing resources and returning. +func (p *pluginInstance) stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.state != instanceStateRunnable && p.state != instanceStatePreExit { + return nil + } + + if p.runCancel != nil { + p.runCancel() + } + + p.runningWG.Wait() + + if p.msgChannel != nil { + close(p.msgChannel) + } + + p.state = instanceStateInitialized + p.runCtx = nil + p.runCancel = nil + p.msgChannel = nil + + return nil +} + +// resume restarts plugins, running pre-run functions for input and output plugins. +func (p *pluginInstance) resume() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.state != instanceStateInitialized { + return fmt.Errorf("invalid plugin state %q", p.state) + } + + if p.meta.input != nil { + if err := p.inputPreRunWithLock(); err != nil { + return err + } + } else if p.meta.output != nil { + if err := p.outputPreRunWithLock(); err != nil { + return err + } + } + + p.state = instanceStateRunnable + + return nil +} + +// stop prepares to stop an output plugin. +// Calling outputPreExit will signal callbacks to return via the plugin's context, then wait for +// callbacks to finish and return before freeing resources and returning. +func (p *pluginInstance) outputPreExit() error { + p.mu.Lock() + defer p.mu.Unlock() + + // Only output plugins have a pre-exit step + if p.meta.output == nil { + return fmt.Errorf("plugin is not an output plugin") + } + + if p.state == instanceStatePreExit { + return nil + } + if p.state != instanceStateRunnable { + return fmt.Errorf("invalid plugin state %q", p.state) + } + + p.runCancel() + + // Wait for any running callback/flush to finish before closing the message channel + p.runningWG.Wait() + close(p.msgChannel) + + p.state = instanceStatePreExit + p.runCtx = nil + p.runCancel = nil + p.msgChannel = nil + + return nil +} + +// flbReturnCode returns a fluent-bit C int enum value indicating function success. +func flbReturnCode(err error) int { + if err == nil { + return input.FLB_OK + } + if errors.Is(err, retryableError{}) { + return input.FLB_RETRY + } + + return input.FLB_ERROR +} + +type retryableError struct { + error +} + +func (r retryableError) Unwrap() error { + return r.error +} + +func (r retryableError) Is(target error) bool { + _, ok := target.(retryableError) + return ok } diff --git a/plugin_test.go b/plugin_test.go index c11d646..c70887a 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "flag" + "fmt" "io" "os" "path/filepath" @@ -16,6 +17,9 @@ import ( "github.com/alecthomas/assert/v2" "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/require" + + "github.com/calyptia/plugin/input" ) func TestPlugin(t *testing.T) { @@ -175,3 +179,9 @@ func testPlugin(t *testing.T, pool *dockertest.Pool) { t.Fatal("timeout exceeded") } } + +func TestFlbReturnCode(t *testing.T) { + require.Equal(t, input.FLB_OK, flbReturnCode(nil)) + require.Equal(t, input.FLB_ERROR, flbReturnCode(fmt.Errorf("hello"))) + require.Equal(t, input.FLB_RETRY, flbReturnCode(retryableError{fmt.Errorf("hello")})) +}