Skip to content

Commit d641376

Browse files
committed
wip
1 parent f02efa2 commit d641376

File tree

8 files changed

+61
-39
lines changed

8 files changed

+61
-39
lines changed

cshared.go

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package plugin
66
import "C"
77

88
import (
9-
"bytes"
109
"context"
1110
"errors"
1211
"fmt"
@@ -237,7 +236,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
237236
return input.FLB_RETRY
238237
}
239238

240-
return instance.callback(data, csize)
239+
return instance.inputCallback(data, csize)
241240
}
242241

243242
// FLBPluginInputCleanupCallback releases the memory used during the input callback
@@ -280,42 +279,14 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
280279

281280
in := C.GoBytes(data, clength)
282281
tag := C.GoString(ctag)
283-
if err := pluginFlush(instance, tag, in); err != nil {
282+
if err := instance.outputFlush(tag, in); err != nil {
284283
fmt.Fprintf(os.Stderr, "flush: %s\n", err)
285284
return output.FLB_ERROR
286285
}
287286

288287
return output.FLB_OK
289288
}
290289

291-
func pluginFlush(instance *pluginInstance, tag string, b []byte) error {
292-
dec := msgpack.NewDecoder(bytes.NewReader(b))
293-
for {
294-
select {
295-
case <-instance.runCtx.Done():
296-
err := instance.runCtx.Err()
297-
if err != nil && !errors.Is(err, context.Canceled) {
298-
fmt.Fprintf(os.Stderr, "run: %s\n", err)
299-
return fmt.Errorf("run: %w", err)
300-
}
301-
302-
return nil
303-
default:
304-
}
305-
306-
msg, err := decodeMsg(dec, tag)
307-
if errors.Is(err, io.EOF) {
308-
return nil
309-
}
310-
311-
if err != nil {
312-
return err
313-
}
314-
315-
instance.msgChannel <- msg
316-
}
317-
}
318-
319290
// decodeMsg should be called with an already initialized decoder.
320291
func decodeMsg(dec *msgpack.Decoder, tag string) (Message, error) {
321292
var out Message
@@ -512,7 +483,7 @@ func testCallback(inst *pluginInstance) ([]byte, int) {
512483
data := unsafe.Pointer(nil)
513484
var csize C.size_t
514485

515-
retCode := inst.callback(&data, &csize)
486+
retCode := inst.inputCallback(&data, &csize)
516487

517488
if data == nil {
518489
return []byte{}, retCode

cshared_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ func TestOutputFlush(t *testing.T) {
762762
assert.NoError(t, err)
763763

764764
wg.Add(1)
765-
assert.NoError(t, pluginFlush(inst, "foobar", b))
765+
assert.NoError(t, inst.outputFlush("foobar", b))
766766
wg.Wait()
767767
}
768768

examples/in_gdummy/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/fluent/fluent-bit-go/examples/gdummy
22

3-
go 1.22.4
3+
go 1.23.0
4+
5+
toolchain go1.24.2
46

57
require github.com/calyptia/plugin v0.1.6
68

examples/in_gdummy/go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6
2828
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
2929
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
3030
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
31+
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
32+
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
33+
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
3134
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
3235
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
3336
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=

examples/out_gstdout/go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
module github.com/fluent/fluent-bit-go/examples/gstdout
22

3-
go 1.21.0
3+
go 1.23.0
4+
5+
toolchain go1.24.2
46

57
require github.com/calyptia/plugin v0.1.6
68

79
require (
810
github.com/calyptia/cmetrics-go v0.1.7 // indirect
9-
github.com/ugorji/go/codec v1.2.11 // indirect
11+
github.com/ugorji/go/codec v1.2.12 // indirect
1012
)
1113

1214
replace github.com/calyptia/plugin => ../..

examples/out_gstdout/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
719719
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
720720
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
721721
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
722+
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
722723
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
723724
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
724725
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=

examples/out_multiinstance/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/fluent/fluent-bit-go/examples/multiinstance
22

3-
go 1.21.0
3+
go 1.23.0
4+
5+
toolchain go1.24.2
46

57
require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963
68

plugin.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"errors"
1111
"fmt"
12+
"io"
1213
"log"
1314
"os"
1415
"runtime"
@@ -367,11 +368,11 @@ func (p *pluginInstance) outputPreRunWithLock() error {
367368
return nil
368369
}
369370

370-
// callback consumes up to maxBufferedMessages message from the plugin's message channel,
371+
// inputCallback consumes up to maxBufferedMessages message from the plugin's message channel,
371372
// returning early if the plugin is shutdown.
372373
// Consumed messages are marshaled into msgpack bytes and the contents and length of contents
373374
// set in the respective data and csize input variables.
374-
func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
375+
func (p *pluginInstance) inputCallback(data *unsafe.Pointer, csize *C.size_t) int {
375376
p.mu.Lock()
376377
if p.state != instanceStateRunnable {
377378
p.mu.Unlock()
@@ -427,6 +428,46 @@ func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
427428
return input.FLB_OK
428429
}
429430

431+
// outputFlush writes the messages in msgpackBytes to the plugin's message channel,
432+
// returning early if the plugin is shutdown.
433+
func (p *pluginInstance) outputFlush(tag string, msgpackBytes []byte) error {
434+
p.mu.Lock()
435+
if p.state != instanceStateRunnable {
436+
p.mu.Unlock()
437+
return fmt.Errorf("invalid plugin state %q", p.state)
438+
}
439+
440+
p.runningWG.Add(1)
441+
defer p.runningWG.Done()
442+
p.mu.Unlock()
443+
444+
dec := msgpack.NewDecoder(bytes.NewReader(msgpackBytes))
445+
for {
446+
select {
447+
case <-p.runCtx.Done():
448+
err := p.runCtx.Err()
449+
if err != nil && !errors.Is(err, context.Canceled) {
450+
fmt.Fprintf(os.Stderr, "run: %s\n", err)
451+
return fmt.Errorf("run: %w", err)
452+
}
453+
454+
return nil
455+
default:
456+
}
457+
458+
msg, err := decodeMsg(dec, tag)
459+
if errors.Is(err, io.EOF) {
460+
return nil
461+
}
462+
463+
if err != nil {
464+
return err
465+
}
466+
467+
p.msgChannel <- msg
468+
}
469+
}
470+
430471
// stop stops the plugin, freeing resources and returning it to initialized state.
431472
// Calling stop will signal callbacks to return via the plugin's context, then wait for
432473
// callbacks to finish and return before freeing resources and returning.

0 commit comments

Comments
 (0)