Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 9 additions & 32 deletions apis/fluentbit/v1alpha2/clustermultilineparser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ limitations under the License.
package v1alpha2

import (
"bytes"
"fmt"
"reflect"
"sort"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
Expand All @@ -46,6 +43,14 @@ type ClusterMultilineParser struct {
Spec MultilineParserSpec `json:"spec,omitempty"`
}

func (a ClusterMultilineParser) name() string {
return a.Name
}

func (a ClusterMultilineParser) spec() MultilineParserSpec {
return a.Spec
}

// +kubebuilder:object:root=true

// ClusterMultilineParserList contains a list of ClusterMultilineParser
Expand Down Expand Up @@ -73,37 +78,9 @@ func (a ClusterMultilineParserByName) Less(i, j int) bool {
}

func (list ClusterMultilineParserList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

sort.Sort(ClusterMultilineParserByName(list.Items))

for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
}

buf.WriteString("[MULTILINE_PARSER]\n")
buf.WriteString(fmt.Sprintf(" Name %s\n", item.Name))

kvs, err := p.Params(sl)
if err != nil {
return err
}
buf.WriteString(kvs.String())

return nil
}

for i := 0; i < reflect.ValueOf(item.Spec).NumField(); i++ {
p, _ := reflect.ValueOf(item.Spec).Field(i).Interface().(plugins.Plugin)
if err := merge(p); err != nil {
return "", err
}
}
}

return buf.String(), nil
return load(list.Items, sl)
}

func init() {
Expand Down
51 changes: 27 additions & 24 deletions apis/fluentbit/v1alpha2/clustermultilineparser_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ var clusterMultilineParserExpected = `[MULTILINE_PARSER]
Rule "start_state" "/([a-zA-Z]+ \d+ \d+\:\d+\:\d+)(.*)/" "cont"
Rule "cont" "/^\s+at.*/" "cont"
`
var spec = MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Rules: []multilineparser.Rule{
{
Start: "start_state",
Regex: `/([a-zA-Z]+ \d+ \d+\:\d+\:\d+)(.*)/`,
Next: "cont",
},
{
Start: "cont",
Regex: `/^\s+at.*/`,
Next: "cont",
},
},
},
}

var multilineParserSpec = MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Parser: "go",
KeyContent: "log",
},
}

func TestClusterMultilineParserList_Load(t *testing.T) {
g := NewGomegaWithT(t)
Expand All @@ -39,13 +64,7 @@ func TestClusterMultilineParserList_Load(t *testing.T) {
Name: "clustermultilineparser_test0",
Labels: labels,
},
Spec: MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Parser: "go",
KeyContent: "log",
},
},
Spec: multilineParserSpec,
}

customMultilineParser := &ClusterMultilineParser{
Expand All @@ -57,23 +76,7 @@ func TestClusterMultilineParserList_Load(t *testing.T) {
Name: "clustermultilineparser_test1",
Labels: labels,
},
Spec: MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Rules: []multilineparser.Rule{
{
Start: "start_state",
Regex: `/([a-zA-Z]+ \d+ \d+\:\d+\:\d+)(.*)/`,
Next: "cont",
},
{
Start: "cont",
Regex: `/^\s+at.*/`,
Next: "cont",
},
},
},
},
Spec: spec,
}

clustermultilineparsers := ClusterMultilineParserList{
Expand Down
32 changes: 25 additions & 7 deletions apis/fluentbit/v1alpha2/multilineparser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type MultilineParser struct {
Spec MultilineParserSpec `json:"spec,omitempty"`
}

func (a MultilineParser) name() string {
return a.Name
}

func (a MultilineParser) spec() MultilineParserSpec {
return a.Spec
}

// +kubebuilder:object:root=true

// MultilineParserList contains a list of MultilineParser
Expand All @@ -63,19 +71,23 @@ func (a MultilineParserByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}

func (list MultilineParserList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer
type multilineParserInterface interface {
MultilineParser | ClusterMultilineParser
name() string
spec() MultilineParserSpec
}

sort.Sort(MultilineParserByName(list.Items))
func load[T multilineParserInterface](items []T, sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

for _, item := range list.Items {
for _, item := range items {
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
}

buf.WriteString("[MULTILINE_PARSER]\n")
buf.WriteString(fmt.Sprintf(" Name %s\n", item.Name))
buf.WriteString(fmt.Sprintf(" Name %s\n", item.name()))

kvs, err := p.Params(sl)
if err != nil {
Expand All @@ -86,8 +98,8 @@ func (list MultilineParserList) Load(sl plugins.SecretLoader) (string, error) {
return nil
}

for i := 0; i < reflect.ValueOf(item.Spec).NumField(); i++ {
p, _ := reflect.ValueOf(item.Spec).Field(i).Interface().(plugins.Plugin)
for i := 0; i < reflect.ValueOf(item.spec()).NumField(); i++ {
p, _ := reflect.ValueOf(item.spec()).Field(i).Interface().(plugins.Plugin)
if err := merge(p); err != nil {
return "", err
}
Expand All @@ -97,6 +109,12 @@ func (list MultilineParserList) Load(sl plugins.SecretLoader) (string, error) {
return buf.String(), nil
}

func (list MultilineParserList) Load(sl plugins.SecretLoader) (string, error) {
sort.Sort(MultilineParserByName(list.Items))

return load(list.Items, sl)
}

func init() {
SchemeBuilder.Register(&MultilineParser{}, &MultilineParserList{})
}
27 changes: 2 additions & 25 deletions apis/fluentbit/v1alpha2/multilineparser_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/multilineparser"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -39,13 +38,7 @@ func TestMultilineParserList_Load(t *testing.T) {
Name: "multilineparser_test0",
Labels: labels,
},
Spec: MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Parser: "go",
KeyContent: "log",
},
},
Spec: multilineParserSpec,
}

customMultilineParser := &MultilineParser{
Expand All @@ -57,23 +50,7 @@ func TestMultilineParserList_Load(t *testing.T) {
Name: "multilineparser_test1",
Labels: labels,
},
Spec: MultilineParserSpec{
MultilineParser: &multilineparser.MultilineParser{
Type: "regex",
Rules: []multilineparser.Rule{
{
Start: "start_state",
Regex: `/([a-zA-Z]+ \d+ \d+\:\d+\:\d+)(.*)/`,
Next: "cont",
},
{
Start: "cont",
Regex: `/^\s+at.*/`,
Next: "cont",
},
},
},
},
Spec: spec,
}

multilineparsers := MultilineParserList{
Expand Down
33 changes: 3 additions & 30 deletions apis/fluentd/v1alpha1/tests/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ func Test_Cfg2ES(t *testing.T) {

func Test_ClusterCfgInputTail(t *testing.T) {
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
testClusterConfigWithGlobalInputs(t, sl, FluentdInputTail, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-tail.cfg")
testNamespacedConfig(t, sl, FluentdInputTail, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-tail.cfg")
}

func Test_ClusterCfgInputSample(t *testing.T) {
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
testClusterConfigWithGlobalInputs(t, sl, FluentdInputSample, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-sample.cfg")
testNamespacedConfig(t, sl, FluentdInputSample, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-sample.cfg")
}

func Test_ClusterCfgInputMonitorAgent(t *testing.T) {
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
testClusterConfigWithGlobalInputs(t, sl, FluentdInputMonitorAgent, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-monitorAgent.cfg")
testNamespacedConfig(t, sl, FluentdInputMonitorAgent, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-monitorAgent.cfg")
}

func Test_ClusterCfgOutput2ES(t *testing.T) {
Expand Down Expand Up @@ -530,33 +530,6 @@ func testNamespacedConfig(
}
}

// testClusterConfigWithGlobalInputs tests a cluster config with custom global inputs
func testClusterConfigWithGlobalInputs(
t *testing.T,
sl plugins.SecretLoader,
fluentd fluentdv1alpha1.Fluentd,
config *fluentdv1alpha1.FluentdConfig,
clusterOutputs []fluentdv1alpha1.ClusterOutput,
expectedCfgPath string,
) {
g := NewGomegaWithT(t)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(config)
g.Expect(err).NotTo(HaveOccurred())
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, config.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

for i := 0; i < maxRuntimes; i++ {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg(expectedCfgPath))).To(Equal(config))
}
}

// testClusterConfigWithFiltersAndOutputs tests a cluster config with filters and outputs
func testClusterConfigWithFiltersAndOutputs(
t *testing.T,
Expand Down
Loading