Skip to content

Commit 8712278

Browse files
committed
wip
1 parent bee7e1b commit 8712278

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

cshared_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
8080

8181
ptr := unsafe.Pointer(nil)
8282

83-
// prepare channel for input explicitly.
84-
//prepareInputCollector(false)
85-
//require.NoError(t, inst.inputPreRun())
86-
8783
go func() {
88-
//FLBPluginInputCallback(&ptr, nil)
8984
inst.callback(&ptr, nil)
9085
close(cdone)
9186
}()
@@ -135,7 +130,6 @@ func TestInputCallbackDangle(t *testing.T) {
135130
ticker := time.NewTicker(collectInterval)
136131
defer ticker.Stop()
137132

138-
//FLBPluginInputCallback(&ptr, nil)
139133
inst.callback(&ptr, nil)
140134
for {
141135
select {
@@ -205,7 +199,6 @@ func TestInputCallbackInfinite(t *testing.T) {
205199
for {
206200
select {
207201
case <-ticker.C:
208-
//FLBPluginInputCallback(&ptr, nil)
209202
inst.callback(&ptr, nil)
210203
if ptr != nil {
211204
close(cdone)

plugin.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ func (p *pluginInstance) withConfigLoaderProvider(provider configLoaderProvider)
214214
}
215215

216216
// init initializes a newly created plugin instance.
217+
// This returns an error if the plugin if not in a new state or cannot initialize.
218+
// For input and output plugins, this moves the plugin to an initialized state unless there is an init error.
219+
// For custom plugins, this moves the plugin to a runnable state unless there is an init error.
217220
func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
218221
p.mu.Lock()
219222
defer p.mu.Unlock()
@@ -296,6 +299,7 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
296299
return nil
297300
}
298301

302+
// inputPreRun transitions an initialized input plugin into runnable state.
299303
func (p *pluginInstance) inputPreRun() error {
300304
p.mu.Lock()
301305
defer p.mu.Unlock()
@@ -331,6 +335,7 @@ func (p *pluginInstance) inputPreRun() error {
331335
return nil
332336
}
333337

338+
// outputPreRun transitions an output plugin into runnable state.
334339
func (p *pluginInstance) outputPreRun() error {
335340
p.mu.Lock()
336341
defer p.mu.Unlock()
@@ -365,6 +370,10 @@ func (p *pluginInstance) outputPreRun() error {
365370
return nil
366371
}
367372

373+
// callback consumes up to maxBufferedMessages message from the plugin's message channel,
374+
// returning early if the plugin is shutdown.
375+
// Consumed messages are marshaled into msgpack bytes and the contents and length of contents
376+
// set in the respective data and csize input variables.
368377
func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
369378
p.mu.Lock()
370379
if p.state != instanceStateRunnable {
@@ -402,9 +411,9 @@ func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
402411
// fluent-bit to kick in before any remaining data has not been GC'ed
403412
// causing a sigsegv.
404413
defer runtime.GC()
405-
loop = 0
414+
loop = 0 // Exit the for loop on plugin shutdown
406415
default:
407-
loop = 0
416+
loop = 0 // Exit the for loop if there are no messages to consume
408417
}
409418
}
410419

@@ -420,6 +429,9 @@ func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
420429
return input.FLB_OK
421430
}
422431

432+
// stop stops the plugin, freeing resources and returning it to initialized state.
433+
// Calling stop will signal callbacks to return via the plugin's context, then wait for
434+
// callbacks to finish and return before freeing resources and returning.
423435
func (p *pluginInstance) stop() error {
424436
p.mu.Lock()
425437
defer p.mu.Unlock()
@@ -446,6 +458,7 @@ func (p *pluginInstance) stop() error {
446458
return nil
447459
}
448460

461+
// resume restarts plugins, running pre-run functions for input and output plugins.
449462
func (p *pluginInstance) resume() error {
450463
p.mu.Lock()
451464
defer p.mu.Unlock()
@@ -454,14 +467,24 @@ func (p *pluginInstance) resume() error {
454467
return fmt.Errorf("cannot resume plugin in state %q", p.state)
455468
}
456469

457-
if err := p.inputPreRun(); err != nil {
458-
return err
470+
if p.meta.input != nil {
471+
if err := p.inputPreRun(); err != nil {
472+
return err
473+
}
474+
} else if p.meta.output != nil {
475+
if err := p.outputPreRun(); err != nil {
476+
return err
477+
}
459478
}
460479

461-
p.state = instanceStateInitialized
480+
p.state = instanceStateRunnable
481+
462482
return nil
463483
}
464484

485+
// stop prepares to stop an output plugin.
486+
// Calling outputPreExit will signal callbacks to return via the plugin's context, then wait for
487+
// callbacks to finish and return before freeing resources and returning.
465488
func (p *pluginInstance) outputPreExit() error {
466489
p.mu.Lock()
467490
defer p.mu.Unlock()
@@ -484,6 +507,7 @@ func (p *pluginInstance) outputPreExit() error {
484507
p.state = instanceStatePreExit
485508
p.runCtx = nil
486509
p.runCancel = nil
510+
p.msgChannel = nil
487511

488512
return nil
489513
}

0 commit comments

Comments
 (0)