Skip to content

Commit b50c4ad

Browse files
committed
feat: Add structured metadata support for Loki output plugin
Signed-off-by: ProCodec <[email protected]>
1 parent a3ca4c9 commit b50c4ad

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

apis/fluentbit/v1alpha2/clusteroutput_types_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,3 +414,109 @@ func TestClusterOutputList_Load_As_Yaml(t *testing.T) {
414414
i++
415415
}
416416
}
417+
418+
func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) {
419+
g := NewGomegaWithT(t)
420+
sl := plugins.NewSecretLoader(nil, "testnamespace")
421+
422+
lokiOutput := ClusterOutput{
423+
TypeMeta: metav1.TypeMeta{
424+
APIVersion: "fluentbit.fluent.io/v1alpha2",
425+
Kind: "ClusterOutput",
426+
},
427+
ObjectMeta: metav1.ObjectMeta{
428+
Name: "loki_output_with_metadata",
429+
},
430+
Spec: OutputSpec{
431+
Match: "kube.*",
432+
Loki: &output.Loki{
433+
Host: "loki-gateway",
434+
Port: ptrInt32(int32(3100)),
435+
Labels: []string{
436+
"job=fluentbit",
437+
"environment=production",
438+
},
439+
StructuredMetadata: map[string]string{
440+
"pod": "${record['kubernetes']['pod_name']}",
441+
"container": "${record['kubernetes']['container_name']}",
442+
"trace_id": "${record['trace_id']}",
443+
},
444+
StructuredMetadataKeys: []string{
445+
"level",
446+
"caller",
447+
},
448+
},
449+
},
450+
}
451+
452+
outputs := ClusterOutputList{
453+
Items: []ClusterOutput{lokiOutput},
454+
}
455+
456+
expected := `[Output]
457+
Name loki
458+
Match kube.*
459+
host loki-gateway
460+
port 3100
461+
labels environment=production,job=fluentbit
462+
structured_metadata container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']}
463+
structured_metadata_keys level,caller
464+
`
465+
466+
result, err := outputs.Load(sl)
467+
g.Expect(err).NotTo(HaveOccurred())
468+
g.Expect(result).To(Equal(expected))
469+
}
470+
471+
func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) {
472+
g := NewGomegaWithT(t)
473+
sl := plugins.NewSecretLoader(nil, "testnamespace")
474+
475+
lokiOutput := ClusterOutput{
476+
TypeMeta: metav1.TypeMeta{
477+
APIVersion: "fluentbit.fluent.io/v1alpha2",
478+
Kind: "ClusterOutput",
479+
},
480+
ObjectMeta: metav1.ObjectMeta{
481+
Name: "loki_output_with_metadata",
482+
},
483+
Spec: OutputSpec{
484+
Match: "kube.*",
485+
Loki: &output.Loki{
486+
Host: "loki-gateway",
487+
Port: ptrInt32(int32(3100)),
488+
Labels: []string{
489+
"job=fluentbit",
490+
"environment=production",
491+
},
492+
StructuredMetadata: map[string]string{
493+
"pod": "${record['kubernetes']['pod_name']}",
494+
"container": "${record['kubernetes']['container_name']}",
495+
"trace_id": "${record['trace_id']}",
496+
},
497+
StructuredMetadataKeys: []string{
498+
"level",
499+
"caller",
500+
},
501+
},
502+
},
503+
}
504+
505+
outputs := ClusterOutputList{
506+
Items: []ClusterOutput{lokiOutput},
507+
}
508+
509+
expected := `outputs:
510+
- name: loki
511+
match: "kube.*"
512+
host: loki-gateway
513+
port: 3100
514+
labels: environment=production,job=fluentbit
515+
structured_metadata: container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']}
516+
structured_metadata_keys: level,caller
517+
`
518+
519+
result, err := outputs.LoadAsYaml(sl, 0)
520+
g.Expect(err).NotTo(HaveOccurred())
521+
g.Expect(result).To(Equal(expected))
522+
}

apis/fluentbit/v1alpha2/plugins/output/loki_types.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package output
33
import (
44
"fmt"
55
"strings"
6+
"sort"
67

78
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
89
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params"
@@ -56,6 +57,13 @@ type Loki struct {
5657
// Specify the name of the key from the original record that contains the Tenant ID.
5758
// The value of the key is set as X-Scope-OrgID of HTTP header. It is useful to set Tenant ID dynamically.
5859
TenantIDKey string `json:"tenantIDKey,omitempty"`
60+
// Stream structured metadata for API request. It can be multiple comma separated key=value pairs.
61+
// This is used for high cardinality data that isn't suited for using labels.
62+
// Only supported in Loki 3.0+ with schema v13 and TSDB storage.
63+
StructuredMetadata map[string]string `json:"structuredMetadata,omitempty"`
64+
// Optional list of record keys that will be placed as structured metadata.
65+
// This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys.
66+
StructuredMetadataKeys []string `json:"structuredMetadataKeys,omitempty"`
5967
*plugins.TLS `json:"tls,omitempty"`
6068
// Include fluentbit networking options for this output-plugin
6169
*plugins.Networking `json:"networking,omitempty"`
@@ -134,6 +142,21 @@ func (l *Loki) Params(sl plugins.SecretLoader) (*params.KVs, error) {
134142
if l.TenantIDKey != "" {
135143
kvs.Insert("tenant_id_key", l.TenantIDKey)
136144
}
145+
// Handle structured metadata
146+
if l.StructuredMetadata != nil && len(l.StructuredMetadata) > 0 {
147+
var metadataPairs []string
148+
for k, v := range l.StructuredMetadata {
149+
metadataPairs = append(metadataPairs, fmt.Sprintf("%s=%s", k, v))
150+
}
151+
if len(metadataPairs) > 0 {
152+
sort.Strings(metadataPairs)
153+
kvs.Insert("structured_metadata", strings.Join(metadataPairs, ","))
154+
}
155+
}
156+
// Handle structured metadata keys
157+
if l.StructuredMetadataKeys != nil && len(l.StructuredMetadataKeys) > 0 {
158+
kvs.Insert("structured_metadata_keys", strings.Join(l.StructuredMetadataKeys, ","))
159+
}
137160
if l.TLS != nil {
138161
tls, err := l.TLS.Params(sl)
139162
if err != nil {

0 commit comments

Comments
 (0)