Skip to content

Commit 2b9483f

Browse files
committed
Implement more format on Loki writer
- Implement same formats as stdout writer (printf, json, fields) - Allow reordering json keys - Refactor and add more tests
1 parent 8ded3b0 commit 2b9483f

File tree

5 files changed

+120
-92
lines changed

5 files changed

+120
-92
lines changed

pkg/api/write_loki.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type WriteLoki struct {
4444
// scales of '1ms' (one millisecond) or just '1' (one nanosecond)
4545
// Default value is '1s'
4646
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
47+
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"`
48+
Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"`
4749
}
4850

4951
func (w *WriteLoki) SetDefaults() {
@@ -71,6 +73,9 @@ func (w *WriteLoki) SetDefaults() {
7173
if w.TimestampScale == "" {
7274
w.TimestampScale = "1s"
7375
}
76+
if w.Format == "" {
77+
w.Format = "json"
78+
}
7479
}
7580

7681
func (w *WriteLoki) Validate() error {

pkg/pipeline/write/write_loki.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@ import (
3030
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
3131

3232
logAdapter "github.com/go-kit/kit/log/logrus"
33-
jsonIter "github.com/json-iterator/go"
3433
"github.com/netobserv/loki-client-go/loki"
3534
"github.com/netobserv/loki-client-go/pkg/backoff"
3635
"github.com/netobserv/loki-client-go/pkg/urlutil"
3736
"github.com/prometheus/common/model"
3837
"github.com/sirupsen/logrus"
3938
)
4039

41-
var jsonEncodingConfig = jsonIter.Config{}.Froze()
42-
4340
var (
4441
keyReplacer = strings.NewReplacer("/", "_", ".", "_", "-", "_")
4542
)
@@ -60,6 +57,7 @@ type Loki struct {
6057
timeNow func() time.Time
6158
exitChan <-chan struct{}
6259
metrics *metrics
60+
formatter func(config.GenericMap) string
6361
}
6462

6563
func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
@@ -124,13 +122,10 @@ func (l *Loki) ProcessRecord(in config.GenericMap) error {
124122
delete(out, label)
125123
}
126124

127-
js, err := jsonEncodingConfig.Marshal(out)
128-
if err != nil {
129-
return err
130-
}
125+
logline := l.formatter(out)
131126

132127
timestamp := l.extractTimestamp(out)
133-
err = l.client.Handle(labels, timestamp, string(js))
128+
err := l.client.Handle(labels, timestamp, logline)
134129
if err == nil {
135130
l.metrics.recordsWritten.Inc()
136131
}
@@ -255,6 +250,7 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
255250
}
256251
}
257252

253+
f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder)
258254
l := &Loki{
259255
lokiConfig: lokiConfig,
260256
apiConfig: lokiConfigIn,
@@ -264,6 +260,7 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
264260
timeNow: time.Now,
265261
exitChan: pUtils.ExitChannel(),
266262
metrics: newMetrics(opMetrics, params.Name),
263+
formatter: f,
267264
}
268265

269266
return l, nil

pkg/pipeline/write/write_loki_test.go

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,24 @@ const timeout = 5 * time.Second
4141

4242
type fakeEmitter struct {
4343
mock.Mock
44+
reorderJSON bool
4445
}
4546

4647
func (f *fakeEmitter) Handle(labels model.LabelSet, timestamp time.Time, record string) error {
47-
// sort alphabetically records just for simplifying testing verification with JSON strings
48-
recordMap := map[string]interface{}{}
49-
if err := json.Unmarshal([]byte(record), &recordMap); err != nil {
50-
panic("expected JSON: " + err.Error())
51-
}
52-
recordBytes, err := json.Marshal(recordMap)
53-
if err != nil {
54-
panic("error unmarshaling: " + err.Error())
48+
if f.reorderJSON {
49+
// sort alphabetically records just for simplifying testing verification with JSON strings
50+
recordMap := map[string]interface{}{}
51+
if err := json.Unmarshal([]byte(record), &recordMap); err != nil {
52+
panic("expected JSON: " + err.Error())
53+
}
54+
recordBytes, err := json.Marshal(recordMap)
55+
if err != nil {
56+
panic("error unmarshaling: " + err.Error())
57+
}
58+
a := f.Mock.Called(labels, timestamp, string(recordBytes))
59+
return a.Error(0)
5560
}
56-
a := f.Mock.Called(labels, timestamp, string(recordBytes))
61+
a := f.Mock.Called(labels, timestamp, record)
5762
return a.Error(0)
5863
}
5964

@@ -122,32 +127,17 @@ parameters:
122127
}
123128

124129
func TestLoki_ProcessRecord(t *testing.T) {
125-
var yamlConfig = `
126-
log-level: debug
127-
pipeline:
128-
- name: write1
129-
parameters:
130-
- name: write1
131-
write:
132-
type: loki
133-
loki:
134-
url: http://loki:3100/
135-
timestampLabel: ts
136-
ignoreList:
137-
- ignored
138-
staticLabels:
139-
static: label
140-
labels:
141-
- foo
142-
- bar
143-
`
144-
v, cfg := test.InitConfig(t, yamlConfig)
145-
require.NotNil(t, v)
146-
147-
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0])
130+
params := api.WriteLoki{
131+
URL: "http://loki:3100/",
132+
TimestampLabel: "ts",
133+
IgnoreList: []string{"ignored"},
134+
StaticLabels: model.LabelSet{"static": "label"},
135+
Labels: []string{"foo", "bar"},
136+
}
137+
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: &params}})
148138
require.NoError(t, err)
149139

150-
fe := fakeEmitter{}
140+
fe := fakeEmitter{reorderJSON: true}
151141
fe.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil)
152142
loki.client = &fe
153143

@@ -171,6 +161,49 @@ parameters:
171161
}, time.Unix(124567, 0), `{"other":"val","ts":124567,"value":5678}`)
172162
}
173163

164+
func TestLoki_ProcessRecordOrdered(t *testing.T) {
165+
params := api.WriteLoki{
166+
URL: "http://loki:3100/",
167+
TimestampLabel: "ts",
168+
Reorder: true,
169+
}
170+
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: &params}})
171+
require.NoError(t, err)
172+
173+
fe := fakeEmitter{reorderJSON: false}
174+
fe.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil)
175+
loki.client = &fe
176+
177+
require.NoError(t, loki.ProcessRecord(map[string]interface{}{"ts": 123456, "c": "c", "e": "e", "a": "a", "d": "d", "b": "b"}))
178+
179+
fe.AssertCalled(t, "Handle", model.LabelSet{}, time.Unix(123456, 0), `{"a":"a","b":"b","c":"c","d":"d","e":"e","ts":123456}`)
180+
}
181+
182+
func TestLoki_ProcessRecordGoFormat(t *testing.T) {
183+
params := api.WriteLoki{
184+
URL: "http://loki:3100/",
185+
TimestampLabel: "ts",
186+
Labels: []string{"foo"},
187+
Format: "printf",
188+
}
189+
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: &params}})
190+
require.NoError(t, err)
191+
192+
fe := fakeEmitter{}
193+
fe.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil)
194+
loki.client = &fe
195+
196+
// WHEN it processes input records
197+
require.NoError(t, loki.ProcessRecord(map[string]interface{}{
198+
"ts": 123456, "foo": "fooLabel", "bar": "barLabel", "value": 1234}))
199+
require.NoError(t, loki.ProcessRecord(map[string]interface{}{
200+
"ts": 124567, "foo": "fooLabel2", "bar": "barLabel2", "value": 5678, "other": "val"}))
201+
202+
// THEN it forwards the records extracting the timestamp and labels from the configuration
203+
fe.AssertCalled(t, "Handle", model.LabelSet{"foo": "fooLabel"}, time.Unix(123456, 0), `map[bar:barLabel ts:123456 value:1234]`)
204+
fe.AssertCalled(t, "Handle", model.LabelSet{"foo": "fooLabel2"}, time.Unix(124567, 0), `map[bar:barLabel2 other:val ts:124567 value:5678]`)
205+
}
206+
174207
func TestTimestampScale(t *testing.T) {
175208
// verifies that the unix residual time (below 1-second precision) is properly
176209
// incorporated into the timestamp whichever scale it is
@@ -262,26 +295,12 @@ func TestTimestampExtraction_LocalTime(t *testing.T) {
262295
// Tests that labels are sanitized before being sent to loki.
263296
// Labels that are invalid even if sanitized are ignored
264297
func TestSanitizedLabels(t *testing.T) {
265-
var yamlConfig = `
266-
log-level: debug
267-
pipeline:
268-
- name: write1
269-
parameters:
270-
- name: write1
271-
write:
272-
type: loki
273-
loki:
274-
url: http://loki:3100/
275-
labels:
276-
- "fo.o"
277-
- "ba-r"
278-
- "ba/z"
279-
- "ignored?"
280-
`
281-
v, cfg := test.InitConfig(t, yamlConfig)
282-
require.NotNil(t, v)
283-
284-
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0])
298+
params := api.WriteLoki{
299+
URL: "http://loki:3100/",
300+
TimestampLabel: "ts",
301+
Labels: []string{"fo.o", "ba-r", "ba/z", "ignored?"},
302+
}
303+
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: &params}})
285304
require.NoError(t, err)
286305

287306
fe := fakeEmitter{}

pkg/pipeline/write/write_stdout.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,50 +18,61 @@
1818
package write
1919

2020
import (
21-
"encoding/json"
2221
"fmt"
23-
"os"
2422
"sort"
23+
"strings"
2524
"text/tabwriter"
2625
"time"
2726

27+
jsonIter "github.com/json-iterator/go"
2828
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2929
"github.com/sirupsen/logrus"
3030
)
3131

3232
type writeStdout struct {
33-
format string
33+
formatter func(config.GenericMap) string
3434
}
3535

3636
// Write writes a flow before being stored
3737
func (t *writeStdout) Write(v config.GenericMap) {
3838
logrus.Tracef("entering writeStdout Write")
39-
if t.format == "json" {
40-
txt, _ := json.Marshal(v)
41-
fmt.Println(string(txt))
42-
} else if t.format == "fields" {
43-
var order sort.StringSlice
44-
for fieldName := range v {
45-
order = append(order, fieldName)
39+
fmt.Println(t.formatter(v))
40+
}
41+
42+
func formatter(format string, reorder bool) func(config.GenericMap) string {
43+
if format == "json" {
44+
jconf := jsonIter.Config{
45+
SortMapKeys: reorder,
46+
}.Froze()
47+
return func(v config.GenericMap) string {
48+
b, _ := jconf.Marshal(v)
49+
return string(b)
4650
}
47-
order.Sort()
48-
w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)
49-
fmt.Fprintf(w, "\n\nFlow record at %s:\n", time.Now().Format(time.StampMilli))
50-
for _, field := range order {
51-
fmt.Fprintf(w, "%v\t=\t%v\n", field, v[field])
51+
} else if format == "fields" {
52+
return func(v config.GenericMap) string {
53+
var sb strings.Builder
54+
var order sort.StringSlice
55+
for fieldName := range v {
56+
order = append(order, fieldName)
57+
}
58+
order.Sort()
59+
w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', 0)
60+
fmt.Fprintf(w, "\n\nFlow record at %s:\n", time.Now().Format(time.StampMilli))
61+
for _, field := range order {
62+
fmt.Fprintf(w, "%v\t=\t%v\n", field, v[field])
63+
}
64+
w.Flush()
65+
return sb.String()
5266
}
53-
w.Flush()
54-
} else {
55-
fmt.Printf("%s: %v\n", time.Now().Format(time.StampMilli), v)
67+
}
68+
return func(v config.GenericMap) string {
69+
return fmt.Sprintf("%v", v)
5670
}
5771
}
5872

5973
// NewWriteStdout create a new write
6074
func NewWriteStdout(params config.StageParam) (Writer, error) {
6175
logrus.Debugf("entering NewWriteStdout")
62-
writeStdout := &writeStdout{}
63-
if params.Write != nil && params.Write.Stdout != nil {
64-
writeStdout.format = params.Write.Stdout.Format
65-
}
66-
return writeStdout, nil
76+
f := formatter(params.Write.Stdout.Format, false)
77+
return &writeStdout{formatter: f}, nil
6778
}

pkg/pipeline/write/write_stdout_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,13 @@ package write
2020
import (
2121
"testing"
2222

23+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2324
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2425
"github.com/stretchr/testify/require"
2526
)
2627

27-
func Test_WriteStdout(_ *testing.T) {
28-
ws := writeStdout{}
29-
ws.Write(config.GenericMap{"key": "test"})
30-
}
31-
3228
func Test_NewWriteStdout(t *testing.T) {
33-
writer, err := NewWriteStdout(config.StageParam{})
34-
require.Nil(t, err)
35-
require.Equal(t, writer, &writeStdout{})
29+
writer, err := NewWriteStdout(config.StageParam{Write: &config.Write{Stdout: &api.WriteStdout{}}})
30+
require.NoError(t, err)
31+
writer.Write(config.GenericMap{"key": "test"})
3632
}

0 commit comments

Comments
 (0)