Skip to content

Commit 680efa0

Browse files
committed
wip
1 parent b795ee2 commit 680efa0

File tree

3 files changed

+70
-58
lines changed

3 files changed

+70
-58
lines changed

cshared.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ const (
3636
collectInterval = 1000 * time.Nanosecond
3737
)
3838

39-
const (
40-
maxBufferedMessages = defaultMaxBufferedMessages
41-
)
42-
4339
// FLBPluginPreRegister -
4440
//
4541
//export FLBPluginPreRegister
@@ -138,17 +134,10 @@ func (p *pluginInstance) inputPreRun() error {
138134
return fmt.Errorf("invalid plugin state %q", p.state)
139135
}
140136

141-
if p.runCtx != nil || p.runCancel != nil {
142-
return fmt.Errorf("plugin context already set")
143-
}
144-
if p.msgChannel != nil {
145-
return fmt.Errorf("plugin channel already set")
146-
}
147-
148137
runCtx, runCancel := context.WithCancel(context.Background())
149138
p.runCtx = runCtx
150139
p.runCancel = runCancel
151-
p.msgChannel = make(chan Message, maxBufferedMessages)
140+
p.msgChannel = make(chan Message, p.maxBufferedMessages)
152141
p.state = instanceStateRunnable
153142

154143
go func() {
@@ -204,21 +193,24 @@ func FLBPluginInputPause() {
204193
panic(fmt.Errorf("plugin not initialized"))
205194
}
206195

207-
if err := instance.pause(); err != nil {
196+
if err := instance.stop(); err != nil {
208197
panic(err)
209198
}
210199
}
211200

212-
func (p *pluginInstance) pause() error {
201+
func (p *pluginInstance) stop() error {
213202
p.mu.Lock()
203+
p.runningWG.Wait()
214204
defer p.mu.Unlock()
215205

216-
if p.state != instanceStateRunnable {
217-
return fmt.Errorf("cannot pause plugin in state %q", p.state)
206+
if p.state != instanceStateRunnable && p.state != instanceStatePreExit {
207+
return fmt.Errorf("cannot stop plugin in state %q", p.state)
218208
}
219209

220-
p.runCancel()
221-
close(p.msgChannel)
210+
if p.state == instanceStateRunnable {
211+
p.runCancel()
212+
close(p.msgChannel)
213+
}
222214

223215
p.state = instanceStateInitialized
224216
p.runCtx = nil
@@ -286,10 +278,10 @@ func (p *pluginInstance) outputPreExit() error {
286278

287279
// Only output plugins have a pre-exit step
288280
if p.meta.output == nil {
289-
return nil
281+
return fmt.Errorf("plugin is not an output plugin")
290282
}
291283

292-
if p.state != instanceStateInitialized {
284+
if p.state != instanceStateRunnable {
293285
return fmt.Errorf("invalid plugin state %q", p.state)
294286
}
295287

@@ -377,16 +369,18 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
377369
}
378370

379371
func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
380-
p.mu.RLock()
381-
defer p.mu.RUnlock()
382-
372+
p.mu.Lock()
383373
if p.state != instanceStateRunnable {
384374
return input.FLB_RETRY
385375
}
386376

377+
p.runningWG.Add(1)
378+
defer p.runningWG.Done()
379+
p.mu.Unlock()
380+
387381
buf := bytes.NewBuffer([]byte{})
388382

389-
for loop := min(len(p.msgChannel), maxBufferedMessages); loop > 0; loop-- {
383+
for loop := min(len(p.msgChannel), p.maxBufferedMessages); loop > 0; loop-- {
390384
select {
391385
case msg, ok := <-p.msgChannel:
392386
if !ok {
@@ -561,7 +555,7 @@ func FLBPluginExit() int {
561555
defer currInstanceMu.Unlock()
562556

563557
if currInstance != nil {
564-
currInstance.pause() //nolint:errcheck
558+
currInstance.stop() //nolint:errcheck
565559
currInstance = nil
566560
}
567561

cshared_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"time"
1414
"unsafe"
1515

16-
"github.com/alecthomas/assert/v2"
16+
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
1818
"github.com/vmihailenco/msgpack/v5"
1919

@@ -45,7 +45,7 @@ func newTestInputInstance(t testing.TB, input InputPlugin) *pluginInstance {
4545
desc: "test plugin",
4646
input: input,
4747
}))
48-
t.Cleanup(inst.stop)
48+
t.Cleanup(func() { assert.NoError(t, inst.stop()) })
4949
return inst
5050
}
5151

@@ -55,7 +55,7 @@ func newTestOutputInstance(t testing.TB, output OutputPlugin) *pluginInstance {
5555
desc: "test plugin",
5656
output: output,
5757
}))
58-
t.Cleanup(inst.stop)
58+
t.Cleanup(func() { assert.NoError(t, inst.stop()) })
5959
return inst
6060
}
6161

@@ -587,18 +587,16 @@ func TestOutputFlush(t *testing.T) {
587587
Check: func(t *testing.T, msg Message) error {
588588
defer wg.Done()
589589

590-
assert.Equal(t, now, msg.Time)
591-
592-
record := assertType[map[string]any](t, msg.Record)
593-
594-
foo := assertType[string](t, record["foo"])
595-
assert.Equal(t, "bar", foo)
596-
597-
bar := assertType[int8](t, record["bar"])
598-
assert.Equal(t, 3, bar)
599-
600-
foobar := assertType[float64](t, record["foobar"])
601-
assert.Equal(t, 1.337, foobar)
590+
expectTag := "foobar"
591+
assert.Equal(t, Message{
592+
Time: now,
593+
Record: map[string]any{
594+
"foo": "bar",
595+
"bar": int8(3),
596+
"foobar": 1.337,
597+
},
598+
tag: &expectTag,
599+
}, msg)
602600

603601
return nil
604602
},

plugin.go

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ func newPluginInstance(meta pluginMetadata) *pluginInstance {
7878
configLoaderProvider: func(ptr unsafe.Pointer) ConfigLoader {
7979
return &flbInputConfigLoader{ptr: ptr}
8080
},
81-
state: instanceStateCreated,
81+
state: instanceStateCreated,
82+
maxBufferedMessages: defaultMaxBufferedMessages,
8283
}
8384
}
8485

@@ -87,12 +88,14 @@ type pluginInstance struct {
8788
meta pluginMetadata
8889
cmetricsCtxProvider cmetricsContextProvider
8990
configLoaderProvider configLoaderProvider
90-
91-
mu sync.RWMutex // Protects all members below:
92-
state instanceState
93-
runCtx context.Context
94-
runCancel context.CancelFunc
95-
msgChannel chan Message
91+
runningWG sync.WaitGroup
92+
93+
mu sync.Mutex // Protects all members below:
94+
state instanceState
95+
runCtx context.Context
96+
runCancel context.CancelFunc
97+
msgChannel chan Message
98+
maxBufferedMessages int
9699
}
97100

98101
// withCMetricsContextProvider overrides the cmetricsContextProvider.
@@ -151,28 +154,45 @@ func (p *pluginInstance) init(ptr unsafe.Pointer) (initErr error) {
151154
return fmt.Errorf("go.MaxBufferedMessages must be an integer, got %q", maxBufferedStr)
152155
}
153156

154-
maxBufferedMessages = maxBuffered // TODO: put this on plugin?
157+
p.maxBufferedMessages = maxBuffered
155158
}
156159

157160
if err := p.meta.input.Init(ctx, fbit); err != nil {
158161
return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err)
159162
}
160-
}
163+
} else if p.meta.output != nil {
164+
defer cancel()
165+
cmt, err := p.cmetricsCtxProvider(ptr)
166+
if err != nil {
167+
return err
168+
}
161169

162-
// TODO: other branches
170+
fbit := &Fluentbit{
171+
Conf: p.configLoaderProvider(ptr),
172+
Metrics: makeMetrics(cmt),
173+
Logger: &flbOutputLogger{ptr: ptr},
174+
}
163175

164-
return nil
165-
}
176+
if err := p.meta.output.Init(ctx, fbit); err != nil {
177+
return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err)
178+
}
179+
} else {
180+
// Custom plugins don't have preInit functions that set context
181+
p.runCtx, p.runCancel = ctx, cancel
182+
p.state = instanceStateRunnable
166183

167-
func (p *pluginInstance) stop() {
168-
p.mu.Lock()
169-
defer p.mu.Unlock()
184+
fbit := &Fluentbit{
185+
Conf: p.configLoaderProvider(ptr),
186+
Metrics: nil,
187+
Logger: &flbOutputLogger{ptr: ptr},
188+
}
170189

171-
if p.state == instanceStateRunnable {
172-
if err := p.pause(); err != nil {
173-
panic(err)
190+
if err := p.meta.custom.Init(ctx, fbit); err != nil {
191+
return fmt.Errorf("initializing plugin %q: %w", p.meta.name, err)
174192
}
175193
}
194+
195+
return nil
176196
}
177197

178198
type Fluentbit struct {

0 commit comments

Comments
 (0)