Skip to content

Commit fd266fc

Browse files
committed
wip
1 parent 0f2bef1 commit fd266fc

File tree

4 files changed

+55
-74
lines changed

4 files changed

+55
-74
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ cmake-build-*/
176176

177177
# IntelliJ
178178
out/
179+
/.idea/
179180

180181
# mpeltonen/sbt-idea plugin
181182
.idea_modules/

cshared.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ func FLBPluginPreRegister(hotReloading C.int) int {
5757
// can be provided.
5858
//
5959
//export FLBPluginRegister
60-
func FLBPluginRegister(def unsafe.Pointer) int {
61-
defer registerWG.Done()
60+
func FLBPluginRegister(def unsafe.Pointer) (returnCode int) {
61+
defer func() {
62+
// Only unblock waiters on registerWG if registration succeeds
63+
if returnCode == input.FLB_OK {
64+
registerWG.Done()
65+
}
66+
}()
6267

6368
meta := pluginMeta.Load()
6469
if meta == nil {
@@ -130,7 +135,7 @@ func (p *pluginInstance) inputPreRun() error {
130135
return nil
131136
}
132137

133-
if p.state != instanceStateInitialized && p.state != instanceStatePaused {
138+
if p.state != instanceStateInitialized {
134139
return fmt.Errorf("invalid plugin state %q", p.state)
135140
}
136141

@@ -145,6 +150,7 @@ func (p *pluginInstance) inputPreRun() error {
145150
p.runCtx = runCtx
146151
p.runCancel = runCancel
147152
p.msgChannel = make(chan Message, maxBufferedMessages)
153+
p.state = instanceStateRunnable
148154

149155
go func() {
150156
err := p.meta.input.Collect(runCtx, p.msgChannel)
@@ -162,8 +168,8 @@ func (p *pluginInstance) inputPreRun() error {
162168
return nil
163169
}
164170

165-
// FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been
166-
// initialized, the plugin invoked only once before executing the input callbacks.
171+
// FLBPluginInputPreRun is invoked by the fluent-bit runtime after the plugin has been
172+
// initialized using FLBPluginRegister but before executing plugin callback functions.
167173
//
168174
//export FLBPluginInputPreRun
169175
func FLBPluginInputPreRun(useHotReload C.int) int {
@@ -208,14 +214,14 @@ func (p *pluginInstance) pause() error {
208214
p.mu.Lock()
209215
defer p.mu.Unlock()
210216

211-
if !(p.state == instanceStateInitialized) {
217+
if p.state != instanceStateRunnable {
212218
return fmt.Errorf("cannot pause plugin in state %q", p.state)
213219
}
214220

215221
p.runCancel()
216222
close(p.msgChannel)
217223

218-
p.state = instanceStatePaused
224+
p.state = instanceStateInitialized
219225
p.runCtx = nil
220226
p.runCancel = nil
221227
p.msgChannel = nil
@@ -227,7 +233,7 @@ func (p *pluginInstance) resume() error {
227233
p.mu.Lock()
228234
defer p.mu.Unlock()
229235

230-
if p.state != instanceStatePaused {
236+
if p.state != instanceStateInitialized {
231237
return fmt.Errorf("cannot resume plugin in state %q", p.state)
232238
}
233239

@@ -310,19 +316,30 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
310316
panic("plugin not initialized")
311317
}
312318

313-
instance.outputPreRun()
319+
if err := instance.outputPreRun(); err != nil {
320+
fmt.Fprintf(os.Stderr, "plugin pre-run error: %v\n", err)
321+
return flbReturnCode(err)
322+
}
314323

315324
return input.FLB_OK
316325
}
317326

318-
func (p *pluginInstance) outputPreRun() {
327+
func (p *pluginInstance) outputPreRun() error {
319328
p.mu.Lock()
320329
defer p.mu.Unlock()
321330

322-
// TODO: state check
331+
// Only input plugins have a pre-run step
332+
if p.meta.output == nil {
333+
return fmt.Errorf("plugin is not an output plugin")
334+
}
335+
336+
if p.state != instanceStateInitialized {
337+
return fmt.Errorf("invalid plugin state %q", p.state)
338+
}
323339

324340
p.runCtx, p.runCancel = context.WithCancel(context.Background())
325341
p.msgChannel = make(chan Message)
342+
p.state = instanceStateRunnable
326343

327344
go func() {
328345
if err := p.meta.output.Flush(p.runCtx, p.msgChannel); err != nil {
@@ -335,6 +352,8 @@ func (p *pluginInstance) outputPreRun() {
335352

336353
log.Printf("goroutine will be stopping: name=%q\n", p.meta.name)
337354
}()
355+
356+
return nil
338357
}
339358

340359
// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
@@ -362,7 +381,7 @@ func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
362381
p.mu.RLock()
363382
defer p.mu.RUnlock()
364383

365-
if p.state != instanceStateInitialized {
384+
if p.state != instanceStateRunnable {
366385
return input.FLB_RETRY
367386
}
368387

@@ -428,7 +447,7 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
428447
instance := currInstance
429448
currInstanceMu.Unlock()
430449

431-
if instance == nil || instance.state != instanceStateInitialized { // TODO: ok?
450+
if instance == nil || instance.state != instanceStateRunnable {
432451
return output.FLB_RETRY
433452
}
434453

@@ -437,14 +456,9 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
437456
return output.FLB_RETRY
438457
}
439458

440-
if instance == nil || instance.runCtx == nil {
441-
return output.FLB_RETRY
442-
}
443-
444-
var err error
445459
select {
446460
case <-instance.runCtx.Done():
447-
err = instance.runCtx.Err()
461+
err := instance.runCtx.Err()
448462
if err != nil && !errors.Is(err, context.Canceled) {
449463
fmt.Fprintf(os.Stderr, "run: %s\n", err)
450464
return output.FLB_ERROR
@@ -465,10 +479,6 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
465479
}
466480

467481
func pluginFlush(instance *pluginInstance, tag string, b []byte) error {
468-
if instance.msgChannel == nil {
469-
return fmt.Errorf("no instance or channel available")
470-
}
471-
472482
dec := msgpack.NewDecoder(bytes.NewReader(b))
473483
for {
474484
select {

cshared_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ func TestOutputFlush(t *testing.T) {
605605
}
606606
inst := newTestOutputInstance(t, out)
607607
require.NoError(t, inst.init(nil))
608-
inst.outputPreRun()
608+
require.NoError(t, inst.outputPreRun())
609609

610610
msg := Message{
611611
Time: now,

plugin.go

Lines changed: 20 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@ import (
1717
"github.com/calyptia/plugin/metric"
1818
)
1919

20+
func init() {
21+
registerWG.Add(1)
22+
}
23+
2024
var (
25+
// registerWG blocks until FLBPluginRegister has successfully run.
26+
registerWG sync.WaitGroup
27+
2128
// Current plugin metadata. This is nil if the plugin has not been registered yet.
29+
// This is set by the RegisterInput, RegisterOutput, or RegisterCustom functions.
2230
pluginMeta atomic.Pointer[pluginMetadata]
2331
)
2432

33+
// pluginMeta describes a plugin and exposes hooks into its implementation.
2534
type pluginMetadata struct {
2635
name string
2736
desc string
@@ -31,9 +40,6 @@ type pluginMetadata struct {
3140
}
3241

3342
var (
34-
// registerWG gates access to pluginMeta before the plugin has been registered.
35-
registerWG sync.WaitGroup
36-
3743
currInstanceMu sync.Mutex // Guards access to currInstance
3844
currInstance *pluginInstance
3945
)
@@ -42,10 +48,9 @@ type instanceState string
4248

4349
const (
4450
instanceStateInitialized instanceState = "initialized"
45-
//instanceStatePreRun instanceState = "preRun"
46-
instanceStatePaused instanceState = "paused"
47-
instanceStatePreExit instanceState = "preExit"
48-
instanceStateStopped instanceState = "stopped"
51+
instanceStateRunnable instanceState = "runnable"
52+
instanceStatePreExit instanceState = "preExit"
53+
instanceStateStopped instanceState = "stopped"
4954
)
5055

5156
type retryableError error
@@ -103,13 +108,17 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
103108
return fmt.Errorf("unexpected plugin state %q", p.state)
104109
}
105110

111+
newState := instanceStateInitialized
106112
ctx, cancel := context.WithCancel(context.Background())
107113
defer func() {
108114
if initErr != nil {
109115
cancel()
110116
p.runCtx = nil
111117
p.runCancel = nil
118+
119+
return
112120
}
121+
p.state = newState
113122
}()
114123

115124
if p.meta.input != nil {
@@ -141,7 +150,6 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
141150
}
142151

143152
// TODO: other branches
144-
p.state = instanceStateInitialized
145153

146154
return nil
147155
}
@@ -150,49 +158,11 @@ func (p *pluginInstance) stop() {
150158
p.mu.Lock()
151159
defer p.mu.Unlock()
152160

153-
if p.state == instanceStateStopped {
154-
return
155-
}
156-
157-
if p.runCancel != nil {
158-
p.runCancel()
159-
}
160-
if p.msgChannel != nil {
161-
close(p.msgChannel)
161+
if p.state == instanceStateRunnable {
162+
if err := p.pause(); err != nil {
163+
panic(err)
164+
}
162165
}
163-
164-
p.state = instanceStatePaused
165-
}
166-
167-
//// getCurrInstance safely gets the current plugin instance
168-
//func getCurrInstance() *pluginInstance {
169-
// currInstanceMu.Lock()
170-
// defer currInstanceMu.Unlock()
171-
//
172-
// return currInstance
173-
//}
174-
//
175-
//// setCurrInstance safely sets the current plugin instance
176-
//func setCurrInstance(instance *pluginInstance) {
177-
// currInstanceMu.Lock()
178-
// defer currInstanceMu.Unlock()
179-
//
180-
// currInstance = instance
181-
//}
182-
183-
//// getOrCreateCurrInstance safely gets or creates the current plugin instance
184-
//func getOrCreateCurrInstance() *pluginInstance {
185-
// currInstanceMu.Lock()
186-
// defer currInstanceMu.Unlock()
187-
//
188-
// if currInstance == nil {
189-
// currInstance = &pluginInstance{}
190-
// }
191-
// return currInstance
192-
//}
193-
194-
func init() {
195-
registerWG.Add(1)
196166
}
197167

198168
type Fluentbit struct {

0 commit comments

Comments
 (0)