Skip to content

Commit c4a158a

Browse files
committed
Merge branch 'api-0.12'
2 parents d6b075e + 6f50b08 commit c4a158a

File tree

8 files changed

+242
-30
lines changed

8 files changed

+242
-30
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
This repository contains Go packages that allows to create [Fluent Bit](http://fluentbit.io) plugins. At the moment it only supports the creation of _Output_ plugins.
44

5+
## Requirements
6+
7+
The code of this package is intended to be used with [Fluent Bit 0.12](https://github.com/fluent/fluent-bit/tree/0.12) branch.
8+
9+
> Fluent Bit on GIT master (0.12) uses a different format to set records timestamps, this package is not backward compatible with Fluent Bit 0.11
10+
511
## Usage
612

713
Fluent Bit Go packages are exposed on this repository:
@@ -16,7 +22,7 @@ import "github.com/fluent/fluent-bit-go/output"
1622

1723
for a more practical example please refer to the _out\_gstdout_ plugin implementation located at:
1824

19-
https://github.com/fluent/fluent-bit-go/blob/master/examples/out_gstdout/out_gstdout.go
25+
https://github.com/fluent/fluent-bit-go/blob/api-0.12/examples/out_gstdout/out_gstdout.go
2026

2127
## Contact
2228

examples/out_gstdout/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
all:
22
go build -buildmode=c-shared -o out_gstdout.so .
33

4+
fast:
5+
go build out_gstdout.go
6+
47
clean:
58
rm -rf *.so *.h *~

examples/out_gstdout/README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Example: out_gstdout
2+
3+
The following example code implements a simple output plugin that prints the records to the standard output interface (STDOUT).
4+
5+
Every output plugin go through four callbacks associated to different phases:
6+
7+
| Plugin Phase | Callback |
8+
|---------------------|----------------------------|
9+
| Registration | FLBPluginRegister() |
10+
| Initialization | FLBPluginInit() |
11+
| Runtime Flush | FLBPluginFlush() |
12+
| Exit | FLBPluginExit() |
13+
14+
## Plugin Registration
15+
16+
When Fluent Bit loads a Golang plugin, it lookup and load the registration callback that aims to populate the internal structure with plugin name and description among others:
17+
18+
```go
19+
//export FLBPluginRegister
20+
func FLBPluginRegister(ctx unsafe.Pointer) int {
21+
return output.FLBPluginRegister(ctx, "gstdout", "Stdout GO!")
22+
}
23+
```
24+
25+
This function is invoked at start time _before_ any configuration is done inside the engine.
26+
27+
## Plugin Initialization
28+
29+
Before the engine starts, it initialize all plugins that were requested to start. Upon initialization a configuration context already exists, so the plugin can ask for configuration parameters or do any other internal checks. E.g:
30+
31+
```go
32+
//export FLBPluginInit
33+
func FLBPluginInit(ctx unsafe.Pointer) int {
34+
return output.FLB_OK
35+
}
36+
```
37+
38+
The function must return FLB\_OK when it initialized properly or FLB\_ERROR if something went wrong. If the plugin reports an error, the engine will _not_ load the instance.
39+
40+
## Runtime Flush
41+
42+
Upon flush time, when Fluent Bit want's to flush it buffers, the runtime flush callback will be triggered.
43+
44+
The callback will receive a raw buffer of msgpack data with it proper bytes length and the tag associated.
45+
46+
```go
47+
//export FLBPluginFlush
48+
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
49+
return output.FLB_OK
50+
}
51+
```
52+
53+
> for more details about how to process the incoming msgpack data, refer to the [out_gstdout.go](out_gstdout.go) file.
54+
55+
When done, there are three returning values available:
56+
57+
| Return value | Description |
58+
|---------------|------------------------------------------------|
59+
| FLB\_OK | The data have been processed normally. |
60+
| FLB\_ERROR | An internal error have ocurred, the plugin will not handle the set of records/data again. |
61+
| FLB\_RETRY | A recoverable error have ocurred, the engine can try to flush the records/data later.|
62+
63+
## Plugin Exit
64+
65+
When Fluent Bit will stop using the instance of the plugin, it will trigger the exit callback. e.g:
66+
67+
```go
68+
//export FLBPluginExit
69+
func FLBPluginExit() int {
70+
return output.FLB_OK
71+
}
72+
```

examples/out_gstdout/out_gstdout.go

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,50 @@ package main
22

33
import "github.com/fluent/fluent-bit-go/output"
44
import (
5-
"github.com/ugorji/go/codec"
65
"fmt"
76
"unsafe"
87
"C"
9-
"reflect"
108
)
119

1210
//export FLBPluginRegister
13-
// ctx (context) pointer to fluentbit context (state/ c code)
1411
func FLBPluginRegister(ctx unsafe.Pointer) int {
15-
// roll call for the specifics of the plugin
1612
return output.FLBPluginRegister(ctx, "gstdout", "Stdout GO!")
1713
}
1814

1915
//export FLBPluginInit
2016
// (fluentbit will call this)
2117
// ctx (context) pointer to fluentbit context (state/ c code)
2218
func FLBPluginInit(ctx unsafe.Pointer) int {
19+
// Example to retrieve an optional configuration parameter
20+
param := output.FLBPluginConfigKey(ctx, "param")
21+
fmt.Printf("[flb-go] plugin parameter = '%s'\n", param)
2322
return output.FLB_OK
2423
}
2524

2625
//export FLBPluginFlush
2726
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
2827
var count int
29-
var h codec.Handle = new(codec.MsgpackHandle)
30-
var b []byte
31-
var m interface{}
32-
var err error
28+
var ret int
29+
var ts interface{}
30+
var record map[interface{}]interface{}
3331

34-
b = C.GoBytes(data, length)
35-
dec := codec.NewDecoderBytes(b, h)
32+
// Create Fluent Bit decoder
33+
dec := output.NewDecoder(data, int(length))
3634

37-
// Iterate the original MessagePack array
35+
// Iterate Records
3836
count = 0
3937
for {
40-
// Decode the entry
41-
err = dec.Decode(&m)
42-
if err != nil {
38+
// Extract Record
39+
ret, ts, record = output.GetRecord(dec)
40+
if ret != 0 {
4341
break
4442
}
4543

46-
// Get a slice and their two entries: timestamp and map
47-
slice := reflect.ValueOf(m)
48-
timestamp := slice.Index(0)
49-
data := slice.Index(1)
50-
51-
// Convert slice data to a real map and iterate
52-
map_data := data.Interface().(map[interface{}] interface{})
53-
fmt.Printf("[%d] %s: [%d, {", count, C.GoString(tag), timestamp)
54-
for k, v := range map_data {
44+
// Print record keys and values
45+
timestamp := ts.(output.FLBTime)
46+
fmt.Printf("[%d] %s: [%s, {", count, C.GoString(tag),
47+
timestamp.String())
48+
for k, v := range record {
5549
fmt.Printf("\"%s\": %v, ", k, v)
5650
}
5751
fmt.Printf("}\n")
@@ -68,7 +62,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
6862

6963
//export FLBPluginExit
7064
func FLBPluginExit() int {
71-
return 0
65+
return output.FLB_OK
7266
}
7367

7468
func main() {

output/decoder.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Fluent Bit Go!
2+
// ==============
3+
// Copyright (C) 2015-2017 Treasure Data Inc.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
package output
19+
20+
import (
21+
"unsafe"
22+
"reflect"
23+
"encoding/binary"
24+
"github.com/ugorji/go/codec"
25+
"time"
26+
"C"
27+
)
28+
29+
type FLBDecoder struct {
30+
handle *codec.MsgpackHandle
31+
mpdec *codec.Decoder
32+
}
33+
34+
type FLBTime struct {
35+
time.Time
36+
}
37+
38+
func (f FLBTime) WriteExt(interface{}) []byte {
39+
panic("unsupported")
40+
}
41+
42+
func (f FLBTime) ReadExt(i interface{}, b []byte) {
43+
out := i.(*FLBTime)
44+
sec := binary.BigEndian.Uint32(b)
45+
usec := binary.BigEndian.Uint32(b[4:])
46+
out.Time = time.Unix(int64(sec), int64(usec))
47+
}
48+
49+
func (f FLBTime) ConvertExt(v interface{}) interface{} {
50+
return nil
51+
}
52+
53+
func (f FLBTime) UpdateExt(dest interface{}, v interface{}) {
54+
panic("unsupported")
55+
}
56+
57+
func NewDecoder(data unsafe.Pointer, length int) (*FLBDecoder) {
58+
var b []byte
59+
60+
dec := new(FLBDecoder)
61+
dec.handle = new(codec.MsgpackHandle)
62+
dec.handle.SetExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{})
63+
64+
b = C.GoBytes(data, C.int(length))
65+
dec.mpdec = codec.NewDecoderBytes(b, dec.handle)
66+
67+
return dec
68+
}
69+
70+
func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) {
71+
var check error
72+
var m interface{}
73+
74+
check = dec.mpdec.Decode(&m)
75+
if check != nil {
76+
return -1, 0, nil
77+
}
78+
79+
slice := reflect.ValueOf(m)
80+
t := slice.Index(0).Interface()
81+
data := slice.Index(1)
82+
83+
map_data := data.Interface().(map[interface{}] interface{})
84+
85+
return 0, t, map_data
86+
}

output/flb_output.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit Go!
4+
* ==============
5+
* Copyright (C) 2015-2017 Treasure Data Inc.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLBGO_OUTPUT_H
21+
#define FLBGO_OUTPUT_H
22+
23+
#include <stdio.h>
24+
25+
struct flb_api {
26+
char *(*output_get_property) (char *, void *);
27+
};
28+
29+
struct flbgo_output_plugin {
30+
char *name;
31+
struct flb_api *api;
32+
void *o_ins;
33+
int (*cb_init)();
34+
int (*cb_flush)(void *, size_t, char *);
35+
int (*cb_exit)(void *);
36+
};
37+
38+
char *output_get_property(char *key, void *ctx)
39+
{
40+
struct flbgo_output_plugin *plugin = ctx;
41+
return plugin->api->output_get_property(key, plugin->o_ins);
42+
}
43+
44+
#endif

output/flb_plugin.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
22

3-
/* Fluent Bit
4-
* ==========
5-
* Copyright (C) 2015-2016 Treasure Data Inc.
3+
/* Fluent Bit Go!
4+
* ==============
5+
* Copyright (C) 2015-2017 Treasure Data Inc.
66
*
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
#define FLB_PROXY_OUTPUT_PLUGIN 2
3030
#define FLB_PROXY_GOLANG 11
3131

32+
/* Structure used for registration, it match the one on flb_plugin_proxy.h */
3233
struct flb_plugin_proxy {
3334
int type;
3435
int proxy;

output/output.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Fluent Bit Go!
22
// ==============
3-
// Copyright (C) 2015-2016 Treasure Data Inc.
3+
// Copyright (C) 2015-2017 Treasure Data Inc.
44
//
55
// Licensed under the Apache License, Version 2.0 (the "License");
66
// you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ package output
2020
/*
2121
#include <stdlib.h>
2222
#include "flb_plugin.h"
23+
#include "flb_output.h"
2324
*/
2425
import "C"
2526
import "fmt"
@@ -35,6 +36,7 @@ const FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG
3536

3637
// Local type to define a plugin definition
3738
type FLBPlugin C.struct_flb_plugin_proxy
39+
type FLBOutPlugin C.struct_flbgo_output_plugin
3840

3941
// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
4042
// is passed and the next step is to invoke this FLBPluginRegister() function
@@ -53,8 +55,12 @@ func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
5355
// Release resources allocated by the plugin initialization
5456
func FLBPluginUnregister(ctx unsafe.Pointer) {
5557
p := (*FLBPlugin) (unsafe.Pointer(ctx))
56-
5758
fmt.Printf("[flbgo] unregistering %v\n", p)
5859
C.free(unsafe.Pointer(p.name))
5960
C.free(unsafe.Pointer(p.description))
6061
}
62+
63+
func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
64+
_key := C.CString(key)
65+
return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
66+
}

0 commit comments

Comments
 (0)