Skip to content

Commit 3d1e88a

Browse files
committed
cardinality: dig sub-fields
1 parent 6a96138 commit 3d1e88a

File tree

3 files changed

+52
-30
lines changed

3 files changed

+52
-30
lines changed

plugin/action/cardinality/README.md

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ Leave empty for default metric naming.
5050

5151
<br>
5252

53-
**`metric_label_value`** *`string`* *`default=unknown`*
54-
55-
Value assigned to the metric label when cardinality limit is exceeded.
56-
57-
<br>
58-
5953
**`limit`** *`int`* *`default=10000`*
6054

6155
Maximum allowed number of unique values for monitored fields.

plugin/action/cardinality/cardinality.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,18 @@ pipelines:
3737
type Plugin struct {
3838
cache *Cache
3939
config *Config
40-
keys []string
41-
fields []string
40+
keys []parsedField
41+
fields []parsedField
4242
logger *zap.Logger
4343

4444
cardinalityApplyCounter *prometheus.CounterVec
4545
}
4646

47+
type parsedField struct {
48+
name string
49+
value []string
50+
}
51+
4752
const (
4853
actionDiscard = "discard"
4954
actionRemoveFields = "remove_fields"
@@ -82,11 +87,6 @@ type Config struct {
8287
// > Leave empty for default metric naming.
8388
MetricPrefix string `json:"metric_prefix" default:""` // *
8489

85-
// > @3@4@5@6
86-
// >
87-
// > Value assigned to the metric label when cardinality limit is exceeded.
88-
MetricLabelValue string `json:"metric_label_value" default:"unknown"` // *
89-
9090
// > @3@4@5@6
9191
// >
9292
// > Maximum allowed number of unique values for monitored fields.
@@ -136,6 +136,14 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string
136136
return ctl.RegisterCounterVec(name, help, labelNames...)
137137
}
138138

139+
func keyMetricLabels(fields []parsedField) []string {
140+
result := make([]string, 0, len(fields))
141+
for i := range fields {
142+
result = append(result, fields[i].name)
143+
}
144+
return result
145+
}
146+
139147
func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) {
140148
var metricName string
141149
if prefix == "" {
@@ -146,7 +154,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) {
146154
p.cardinalityApplyCounter = p.makeMetric(ctl,
147155
metricName,
148156
"Total number of events applied due to cardinality limits",
149-
p.keys...,
157+
keyMetricLabels(p.keys)...,
150158
)
151159
}
152160

@@ -156,21 +164,35 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
156164

157165
p.cache = NewCache(p.config.TTL_)
158166

159-
p.keys = make([]string, 0, len(p.config.KeyFields))
167+
p.keys = make([]parsedField, 0, len(p.config.KeyFields))
160168
for _, fs := range p.config.KeyFields {
161169
if fs != "" {
162-
p.keys = append(p.keys, cfg.ParseFieldSelector(string(fs))[0])
170+
parsedFields := cfg.ParseFieldSelector(string(fs))
171+
p.keys = append(p.keys, struct {
172+
name string
173+
value []string
174+
}{
175+
name: strings.Join(parsedFields, "_"),
176+
value: parsedFields,
177+
})
163178
}
164179
}
165180

166181
if len(p.config.Fields) == 0 {
167182
p.logger.Fatal("you have to set key fields")
168183
}
169184

170-
p.fields = make([]string, 0, len(p.config.Fields))
185+
p.fields = make([]parsedField, 0, len(p.config.Fields))
171186
for _, fs := range p.config.Fields {
172187
if fs != "" {
173-
p.fields = append(p.fields, cfg.ParseFieldSelector(string(fs))[0])
188+
parsedFields := cfg.ParseFieldSelector(string(fs))
189+
p.fields = append(p.fields, struct {
190+
name string
191+
value []string
192+
}{
193+
name: strings.Join(parsedFields, "_"),
194+
value: parsedFields,
195+
})
174196
}
175197
}
176198

@@ -185,27 +207,26 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
185207
cacheKey := make(map[string]string, len(p.keys))
186208

187209
for _, key := range p.keys {
188-
value := pipeline.CloneString(event.Root.Dig(key).AsString())
189-
cacheKey[key] = value
210+
value := pipeline.CloneString(event.Root.Dig(key.value...).AsString())
211+
cacheKey[key.name] = value
190212
}
191213

192214
cacheValue := make(map[string]string, len(p.fields))
193215
for _, key := range p.fields {
194-
value := pipeline.CloneString(event.Root.Dig(key).AsString())
195-
cacheValue[key] = value
216+
value := pipeline.CloneString(event.Root.Dig(key.value...).AsString())
217+
cacheValue[key.name] = value
196218
}
197219

198220
key := mapToKey(cacheKey)
199221
value := mapToStringSorted(cacheKey, cacheValue)
200222
keysCount := p.cache.CountPrefix(key)
201223

202224
if p.config.Limit >= 0 && keysCount >= p.config.Limit {
203-
labelsValues := make([]string, 0, len(p.keys))
225+
var labelsValues []string
226+
labelsValues = make([]string, 0, len(p.keys))
204227
for _, key := range p.keys {
205-
if val, exists := cacheKey[key]; exists {
228+
if val, exists := cacheKey[key.name]; exists {
206229
labelsValues = append(labelsValues, val)
207-
} else {
208-
labelsValues = append(labelsValues, p.config.MetricLabelValue)
209230
}
210231
}
211232
p.cardinalityApplyCounter.WithLabelValues(labelsValues...).Inc()
@@ -214,7 +235,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
214235
return pipeline.ActionDiscard
215236
case actionRemoveFields:
216237
for _, key := range p.fields {
217-
event.Root.Dig(key).Suicide()
238+
event.Root.Dig(key.value...).Suicide()
218239
}
219240
}
220241
} else {

plugin/action/cardinality/cardinality_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ func TestMapToStringSorted(t *testing.T) {
5959
func TestCardinalityLimitDiscard(t *testing.T) {
6060
limit := 10
6161
config := &Config{
62-
KeyFields: []cfg.FieldSelector{"host"},
63-
Fields: []cfg.FieldSelector{"i"},
62+
KeyFields: []cfg.FieldSelector{"info.host", "not_exists_fields"},
63+
Fields: []cfg.FieldSelector{"value.i"},
6464
Limit: limit,
6565
Action: actionDiscard,
6666
TTL_: 1 * time.Hour,
@@ -84,7 +84,7 @@ func TestCardinalityLimitDiscard(t *testing.T) {
8484
})
8585

8686
for i := 0; i < genEventsCnt; i++ {
87-
json := fmt.Sprintf(`{"host":"localhost","i":"%d"}`, i)
87+
json := fmt.Sprintf(`{"info": {"host":"localhost"},"value":{"i":"%d"}}`, i)
8888
input.In(10, "test", test.NewOffset(0), []byte(json))
8989
}
9090
inWg.Wait()
@@ -118,17 +118,23 @@ func TestCardinalityLimitRemoveFields(t *testing.T) {
118118
})
119119

120120
outEventsCnt := 0
121+
wrongEventsCnt := 0
121122
outWg := sync.WaitGroup{}
122123
outWg.Add(genEventsCnt)
123124
output.SetOutFn(func(e *pipeline.Event) {
124125
defer outWg.Done()
125126

126127
// check exists field
127128
value := pipeline.CloneString(e.Root.Dig(string(config.Fields[0])).AsString())
129+
keyValue := pipeline.CloneString(e.Root.Dig(string(config.KeyFields[0])).AsString())
128130

129131
if value != "" {
130132
outEventsCnt++
131133
}
134+
135+
if keyValue == "" {
136+
wrongEventsCnt++
137+
}
132138
})
133139

134140
for i := 0; i < genEventsCnt; i++ {
@@ -140,6 +146,7 @@ func TestCardinalityLimitRemoveFields(t *testing.T) {
140146

141147
p.Stop()
142148
assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count")
149+
assert.Equal(t, 0, wrongEventsCnt)
143150
assert.Equal(t, limit, outEventsCnt, "wrong out events count")
144151
}
145152

0 commit comments

Comments
 (0)