Skip to content

Commit b795ee2

Browse files
committed
wip
1 parent fd266fc commit b795ee2

File tree

2 files changed

+48
-39
lines changed

2 files changed

+48
-39
lines changed

cshared.go

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ const (
3636
collectInterval = 1000 * time.Nanosecond
3737
)
3838

39-
var (
40-
unregister func()
39+
const (
4140
maxBufferedMessages = defaultMaxBufferedMessages
4241
)
4342

@@ -71,32 +70,32 @@ func FLBPluginRegister(def unsafe.Pointer) (returnCode int) {
7170
return input.FLB_RETRY
7271
}
7372

74-
if meta.input != nil {
75-
out := input.FLBPluginRegister(def, meta.name, meta.desc)
76-
unregister = func() {
77-
input.FLBPluginUnregister(def)
78-
}
79-
return out
73+
if unregisterFunc.Load() != nil {
74+
fmt.Fprintf(os.Stderr, "plugin already registered\n")
75+
return input.FLB_ERROR
8076
}
8177

82-
if meta.output != nil {
83-
out := output.FLBPluginRegister(def, meta.name, meta.desc)
84-
unregister = func() {
85-
output.FLBPluginUnregister(def)
86-
}
87-
return out
88-
}
78+
var newUnregisterFunc func()
8979

90-
if meta.custom != nil {
91-
out := custom.FLBPluginRegister(def, meta.name, meta.desc)
92-
unregister = func() {
93-
custom.FLBPluginUnregister(def)
94-
}
80+
if meta.input != nil {
81+
returnCode = input.FLBPluginRegister(def, meta.name, meta.desc)
82+
newUnregisterFunc = func() { input.FLBPluginUnregister(def) }
83+
} else if meta.output != nil {
84+
returnCode = output.FLBPluginRegister(def, meta.name, meta.desc)
85+
newUnregisterFunc = func() { output.FLBPluginUnregister(def) }
86+
} else if meta.custom != nil {
87+
returnCode = custom.FLBPluginRegister(def, meta.name, meta.desc)
88+
newUnregisterFunc = func() { custom.FLBPluginUnregister(def) }
89+
} else {
90+
fmt.Fprintf(os.Stderr, "no input, output, or custom plugin registered\n")
91+
return input.FLB_RETRY
92+
}
9593

96-
return out
94+
if returnCode == input.FLB_OK {
95+
unregisterFunc.Store(&newUnregisterFunc)
9796
}
9897

99-
return input.FLB_ERROR
98+
return
10099
}
101100

102101
// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
@@ -558,18 +557,18 @@ func decodeMsg(dec *msgpack.Decoder, tag string) (Message, error) {
558557
//
559558
//export FLBPluginExit
560559
func FLBPluginExit() int {
561-
if unregister != nil {
562-
unregister()
563-
unregister = nil
564-
}
565-
566560
currInstanceMu.Lock()
567561
defer currInstanceMu.Unlock()
568562

569563
if currInstance != nil {
570-
currInstance.stop()
564+
currInstance.pause() //nolint:errcheck
565+
currInstance = nil
566+
}
567+
568+
if unregister := unregisterFunc.Load(); unregister != nil {
569+
(*unregister)()
570+
unregisterFunc.Store(nil)
571571
}
572-
currInstance = nil
573572

574573
return input.FLB_OK
575574
}

plugin.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,33 @@ var (
2727

2828
// Current plugin metadata. This is nil if the plugin has not been registered yet.
2929
// This is set by the RegisterInput, RegisterOutput, or RegisterCustom functions.
30-
pluginMeta atomic.Pointer[pluginMetadata]
30+
pluginMeta atomic.Pointer[pluginMetadata]
31+
unregisterFunc atomic.Pointer[func()]
32+
33+
// currInstanceMu guards access to currInstance
34+
currInstanceMu sync.Mutex
35+
// Current instance of the plugin, used by functions called from fluent-bit like FLBPluginInit.
36+
currInstance *pluginInstance
3137
)
3238

3339
// pluginMeta describes a plugin and exposes hooks into its implementation.
3440
type pluginMetadata struct {
35-
name string
36-
desc string
41+
name string
42+
desc string
43+
44+
// Exactly one of the following will be set:
3745
input InputPlugin
3846
output OutputPlugin
3947
custom CustomPlugin
4048
}
4149

42-
var (
43-
currInstanceMu sync.Mutex // Guards access to currInstance
44-
currInstance *pluginInstance
45-
)
46-
4750
type instanceState string
4851

4952
const (
53+
instanceStateCreated instanceState = ""
5054
instanceStateInitialized instanceState = "initialized"
5155
instanceStateRunnable instanceState = "runnable"
5256
instanceStatePreExit instanceState = "preExit"
53-
instanceStateStopped instanceState = "stopped"
5457
)
5558

5659
type retryableError error
@@ -75,31 +78,38 @@ func newPluginInstance(meta pluginMetadata) *pluginInstance {
7578
configLoaderProvider: func(ptr unsafe.Pointer) ConfigLoader {
7679
return &flbInputConfigLoader{ptr: ptr}
7780
},
81+
state: instanceStateCreated,
7882
}
7983
}
8084

85+
// pluginInstance is an instance of a plugin.
8186
type pluginInstance struct {
8287
meta pluginMetadata
8388
cmetricsCtxProvider cmetricsContextProvider
8489
configLoaderProvider configLoaderProvider
8590

86-
mu sync.RWMutex
91+
mu sync.RWMutex // Protects all members below:
8792
state instanceState
8893
runCtx context.Context
8994
runCancel context.CancelFunc
9095
msgChannel chan Message
9196
}
9297

98+
// withCMetricsContextProvider overrides the cmetricsContextProvider.
99+
// This must be called immediately after creating the instance.
93100
func (p *pluginInstance) withCMetricsContextProvider(provider cmetricsContextProvider) *pluginInstance {
94101
p.cmetricsCtxProvider = provider
95102
return p
96103
}
97104

105+
// withConfigLoaderProvider overrides the configLoaderProvider.
106+
// This must be called immediately after creating the instance.
98107
func (p *pluginInstance) withConfigLoaderProvider(provider configLoaderProvider) *pluginInstance {
99108
p.configLoaderProvider = provider
100109
return p
101110
}
102111

112+
// init initializes a newly created plugin instance.
103113
func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
104114
p.mu.Lock()
105115
defer p.mu.Unlock()

0 commit comments

Comments
 (0)