Skip to content

Commit 03ba2b0

Browse files
cosmo0920edsiper
authored andcommitted
input: Implement Golang Input plugin interface and added input example
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 780004b commit 03ba2b0

File tree

10 files changed

+401
-0
lines changed

10 files changed

+401
-0
lines changed

examples/in_gdummy/Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
go build -buildmode=c-shared -o in_gdummy.so .
3+
4+
fast:
5+
go build in_gdummy.go
6+
7+
clean:
8+
rm -rf *.so *.h *~

examples/in_gdummy/README.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Example: in_gdummy
2+
3+
The following example code implements an input plugin that works with
4+
separated input collecting threads that is introduced in Fluent Bit 1.9.
5+
It describes how to share context from the
6+
specified instance configuration to the input callback.
7+
8+
Every output plugin go through four callbacks associated to different phases:
9+
10+
| Plugin Phase | Callback |
11+
|---------------------|----------------------------|
12+
| Registration | FLBPluginRegister() |
13+
| Initialization | FLBPluginInit() |
14+
| Input Callback | FLBPluginInputCallback() |
15+
| Exit | FLBPluginExit() |
16+
17+
## Plugin Registration
18+
19+
When Fluent Bit loads a Golang input plugin, it looks up and loads the registration
20+
callback that aims to populate the internal structure with plugin name and
21+
description:
22+
23+
```go
24+
//export FLBPluginRegister
25+
func FLBPluginRegister(def unsafe.Pointer) int {
26+
return input.FLBPluginRegister(def, "gdummy", "dummy Go!")
27+
}
28+
```
29+
30+
## Plugin Initialization
31+
32+
Before the engine starts, it initialize all plugins that were requested to start.
33+
Upon initialization a configuration context already exists,
34+
so the plugin can ask for configuration parameters or do any other internal checks. E.g:
35+
36+
```go
37+
//export FLBPluginInit
38+
func FLBPluginInit(ctx unsafe.Pointer) int {
39+
return input.FLB_OK
40+
}
41+
```
42+
43+
The function must return FLB\_OK when it initialized properly or FLB\_ERROR if something went wrong. If the plugin reports an error, the engine will _not_ load the instance.
44+
45+
## Input Callback
46+
47+
When Fluent Bit wants to collect logs from Golang input plugin, the input callback will be triggered.
48+
49+
The callback will send a raw buffer of msgpack data with it proper bytes length into Fluent Bit core.
50+
51+
```go
52+
//export FLBPluginInputCallback
53+
func FLBPluginInputCallback(data *unsafe.Pointer, size *C.size_t) int {
54+
now := time.Now()
55+
// To handle nanosecond precision on Golang input plugin, you must wrap up time instances with input.FLBTime type.
56+
flb_time := input.FLBTime{now}
57+
message := map[string]string{"message": "dummy"}
58+
59+
entry := []interface{}{flb_time, message}
60+
61+
// Some encoding logs to msgpack payload stuffs.
62+
// It needs to Wait for some period on Golang input plugin side, until the new records are emitted.
63+
64+
*data = unsafe.Pointer(&packed[0])
65+
*size = C.size_t(len(packed))
66+
return input.FLB_OK
67+
}
68+
```
69+
70+
> for more details about how to process the sending msgpack data into Fluent Bit core, please refer to the [in_gdummy.go](in_gdummy.go) file.
71+
72+
When done, there are three returning values available:
73+
74+
| Return value | Description |
75+
|---------------|------------------------------------------------|
76+
| FLB\_OK | The data have been processed normally. |
77+
| FLB\_ERROR | An internal error have ocurred, the plugin will not handle the set of records/data again. |
78+
| FLB\_RETRY | A recoverable error have ocurred, the engine can try to flush the records/data later.|
79+
80+
## Plugin Exit
81+
82+
When Fluent Bit will stop using the instance of the plugin, it will trigger the exit callback. e.g:
83+
84+
```go
85+
//export FLBPluginExit
86+
func FLBPluginExit() int {
87+
return input.FLB_OK
88+
}
89+
```

examples/in_gdummy/go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github.com/fluent/fluent-bit-go/examples/gdummy
2+
3+
go 1.17
4+
5+
require github.com/fluent/fluent-bit-go v0.0.0-20211213025035-0be1ffb0c49b
6+
7+
require github.com/ugorji/go/codec v1.1.7 // indirect
8+
9+
replace github.com/fluent/fluent-bit-go => ../..

examples/in_gdummy/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
github.com/fluent/fluent-bit-go v0.0.0-20211213025035-0be1ffb0c49b h1:IbixhsdiEE03BnEAWop5eeShptYKfru5Dg9HaPBm9ms=
2+
github.com/fluent/fluent-bit-go v0.0.0-20211213025035-0be1ffb0c49b/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ=
3+
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
4+
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
5+
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
6+
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=

examples/in_gdummy/in_gdummy.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package main
2+
3+
import "C"
4+
import (
5+
"fmt"
6+
"time"
7+
"unsafe"
8+
9+
"github.com/fluent/fluent-bit-go/input"
10+
)
11+
12+
//export FLBPluginRegister
13+
func FLBPluginRegister(def unsafe.Pointer) int {
14+
return input.FLBPluginRegister(def, "gdummy", "dummy GO!")
15+
}
16+
17+
//export FLBPluginInit
18+
// (fluentbit will call this)
19+
// plugin (context) pointer to fluentbit context (state/ c code)
20+
func FLBPluginInit(plugin unsafe.Pointer) int {
21+
// Example to retrieve an optional configuration parameter
22+
param := input.FLBPluginConfigKey(plugin, "param")
23+
fmt.Printf("[flb-go] plugin parameter = '%s'\n", param)
24+
return input.FLB_OK
25+
}
26+
27+
//export FLBPluginInputCallback
28+
func FLBPluginInputCallback(data *unsafe.Pointer, size *C.size_t) int {
29+
now := time.Now()
30+
flb_time := input.FLBTime{now}
31+
message := map[string]string{"message": "dummy"}
32+
33+
entry := []interface{}{flb_time, message}
34+
35+
enc := input.NewEncoder()
36+
packed, err := enc.Encode(entry)
37+
if err != nil {
38+
fmt.Println("Can't convert to msgpack:", message, err)
39+
return input.FLB_ERROR
40+
}
41+
42+
*data = unsafe.Pointer(&packed[0])
43+
*size = C.size_t(len(packed))
44+
// For emitting interval adjustment.
45+
time.Sleep(1000 * time.Millisecond)
46+
47+
return input.FLB_OK
48+
}
49+
50+
//export FLBPluginExit
51+
func FLBPluginExit() int {
52+
return input.FLB_OK
53+
}
54+
55+
func main() {
56+
}

input/encoder.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Fluent Bit Go!
2+
// ==============
3+
// Copyright (C) 2022 The Fluent Bit Go Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
19+
package input
20+
21+
import (
22+
"C"
23+
"encoding/binary"
24+
"reflect"
25+
"time"
26+
27+
"github.com/ugorji/go/codec"
28+
)
29+
30+
type FLBEncoder struct {
31+
handle *codec.MsgpackHandle
32+
mpenc *codec.Encoder
33+
}
34+
35+
type FLBTime struct {
36+
time.Time
37+
}
38+
39+
func (f FLBTime) WriteExt(i interface{}) []byte {
40+
bs := make([]byte, 8)
41+
42+
tm := i.(*FLBTime)
43+
utc := tm.UTC()
44+
45+
sec := utc.Unix()
46+
nsec := utc.Nanosecond()
47+
48+
binary.BigEndian.PutUint32(bs, uint32(sec))
49+
binary.BigEndian.PutUint32(bs[4:], uint32(nsec))
50+
51+
return bs
52+
}
53+
54+
func (f FLBTime) ReadExt(i interface{}, b []byte) {
55+
panic("unsupported")
56+
}
57+
58+
func NewEncoder() *FLBEncoder {
59+
enc := new(FLBEncoder)
60+
enc.handle = new(codec.MsgpackHandle)
61+
enc.handle.WriteExt = true
62+
enc.handle.SetBytesExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{})
63+
64+
return enc
65+
}
66+
67+
func (enc *FLBEncoder) Encode(val interface{}) (packed []byte, err error) {
68+
enc.mpenc = codec.NewEncoderBytes(&packed, enc.handle)
69+
err = enc.mpenc.Encode(val)
70+
return
71+
}

input/flb_input.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit Go!
4+
* ==============
5+
* Copyright (C) 2022 The Fluent Bit Go Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLBGO_INPUT_H
21+
#define FLBGO_INPUT_H
22+
23+
struct flb_api {
24+
char *_;
25+
char *(*input_get_property) (char *, void *);
26+
};
27+
28+
struct flb_plugin_proxy_context {
29+
void *remote_context;
30+
};
31+
32+
/* This structure is used for initialization.
33+
* It matches the one in proxy/go/go.c in fluent-bit source code.
34+
*/
35+
struct flbgo_input_plugin {
36+
void *_;
37+
struct flb_api *api;
38+
struct flb_input_instance *i_ins;
39+
struct flb_plugin_proxy_context *context;
40+
};
41+
42+
char *input_get_property(char *key, void *plugin)
43+
{
44+
struct flbgo_input_plugin *p = plugin;
45+
return p->api->input_get_property(key, p->i_ins);
46+
}
47+
48+
#endif

input/flb_plugin.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit Go!
4+
* ==============
5+
* Copyright (C) 2022 The Fluent Bit Go Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLBGO_PLUGIN_H
21+
#define FLBGO_PLUGIN_H
22+
23+
/* Return values */
24+
#define FLB_ERROR 0
25+
#define FLB_OK 1
26+
#define FLB_RETRY 2
27+
28+
/* Proxy definition */
29+
#define FLB_PROXY_INPUT_PLUGIN 1
30+
#define FLB_PROXY_GOLANG 11
31+
32+
/* This structure is used for registration.
33+
* It matches the one in flb_plugin_proxy.h in fluent-bit source code.
34+
*/
35+
struct flb_plugin_proxy_def {
36+
int type;
37+
int proxy;
38+
int flags;
39+
char *name;
40+
char *description;
41+
};
42+
43+
#endif

0 commit comments

Comments
 (0)