Skip to content

Commit 0c8806e

Browse files
committed
add cardinality action plugin
1 parent 8c5de82 commit 0c8806e

File tree

13 files changed

+837
-1
lines changed

13 files changed

+837
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ TBD: throughput on production servers.
4444

4545
**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md), [socket](plugin/input/socket/README.md)
4646

47-
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [decode](plugin/action/decode/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [hash](plugin/action/hash/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
47+
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [cardinality](plugin/action/cardinality/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [decode](plugin/action/decode/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [hash](plugin/action/hash/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
4848

4949
**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [loki](plugin/output/loki/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)
5050

_sidebar.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
- Action
2626
- [add_file_name](plugin/action/add_file_name/README.md)
2727
- [add_host](plugin/action/add_host/README.md)
28+
- [cardinality](plugin/action/cardinality/README.md)
2829
- [convert_date](plugin/action/convert_date/README.md)
2930
- [convert_log_level](plugin/action/convert_log_level/README.md)
3031
- [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md)

cmd/file.d/file.d.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/ozontech/file.d/pipeline"
2020
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
2121
_ "github.com/ozontech/file.d/plugin/action/add_host"
22+
_ "github.com/ozontech/file.d/plugin/action/cardinality"
2223
_ "github.com/ozontech/file.d/plugin/action/convert_date"
2324
_ "github.com/ozontech/file.d/plugin/action/convert_log_level"
2425
_ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes"

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/alecthomas/kingpin v2.2.6+incompatible
1212
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
1313
github.com/alicebob/miniredis/v2 v2.30.5
14+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310
1415
github.com/bitly/go-simplejson v0.5.1
1516
github.com/bmatcuk/doublestar/v4 v4.0.2
1617
github.com/bufbuild/protocompile v0.13.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0w
1818
github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
1919
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
2020
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
21+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310 h1:BUAU3CGlLvorLI26FmByPp2eC2qla6E1Tw+scpcg/to=
22+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
2123
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
2224
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
2325
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

plugin/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,26 @@ It is only applicable for input plugins k8s and file.
188188
It adds field containing hostname to an event.
189189

190190
[More details...](plugin/action/add_host/README.md)
191+
## cardinality
192+
Limits the cardinality of fields on events or drops events.
193+
194+
**An example for discarding events with high cardinality field:**
195+
```yaml
196+
pipelines:
197+
example_pipeline:
198+
...
199+
- type: cardinality
200+
limit: 10
201+
action: discard
202+
metric_prefix: service_zone
203+
key:
204+
- service
205+
fields:
206+
- zone
207+
...
208+
```
209+
210+
[More details...](plugin/action/cardinality/README.md)
191211
## convert_date
192212
It converts field date/time data to different format.
193213

plugin/action/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,26 @@ It is only applicable for input plugins k8s and file.
99
It adds field containing hostname to an event.
1010

1111
[More details...](plugin/action/add_host/README.md)
12+
## cardinality
13+
Limits the cardinality of fields on events or drops events.
14+
15+
**An example for discarding events with high cardinality field:**
16+
```yaml
17+
pipelines:
18+
example_pipeline:
19+
...
20+
- type: cardinality
21+
limit: 10
22+
action: discard
23+
metric_prefix: service_zone
24+
key:
25+
- service
26+
fields:
27+
- zone
28+
...
29+
```
30+
31+
[More details...](plugin/action/cardinality/README.md)
1232
## convert_date
1333
It converts field date/time data to different format.
1434

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Cardinality limit plugin
2+
@introduction
3+
4+
## Config params
5+
@config-params|description
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Cardinality limit plugin
2+
Limits the cardinality of fields on events or drops events.
3+
4+
**An example for discarding events with high cardinality field:**
5+
```yaml
6+
pipelines:
7+
example_pipeline:
8+
...
9+
- type: cardinality
10+
limit: 10
11+
action: discard
12+
metric_prefix: service_zone
13+
key:
14+
- service
15+
fields:
16+
- zone
17+
...
18+
```
19+
20+
## Config params
21+
**`key`** *`[]cfg.FieldSelector`* *`required`*
22+
23+
Fields used to group events before calculating cardinality.
24+
Events with the same key values are aggregated together.
25+
Required for proper cardinality tracking per logical group.
26+
27+
<br>
28+
29+
**`fields`** *`[]cfg.FieldSelector`* *`required`*
30+
31+
Target fields whose unique values are counted within each key group.
32+
The plugin monitors how many distinct values these fields contain.
33+
Required to define what constitutes high cardinality.
34+
35+
<br>
36+
37+
**`action`** *`string`* *`default=nothing`* *`options=discard|remove_fields|nothing`*
38+
39+
Action to perform when cardinality limit is exceeded.
40+
Determines whether to discard events, remove fields, or just monitor.
41+
Choose based on whether you need to preserve other event data.
42+
43+
<br>
44+
45+
**`metric_prefix`** *`string`*
46+
47+
Prefix added to metric names for better organization.
48+
Useful when running multiple instances to avoid metric name collisions.
49+
Leave empty for default metric naming.
50+
51+
<br>
52+
53+
**`limit`** *`int`* *`default=10000`*
54+
55+
Maximum allowed number of unique values for monitored fields.
56+
When exceeded within a key group, the configured action triggers.
57+
Set based on expected diversity and system capacity.
58+
59+
<br>
60+
61+
**`ttl`** *`cfg.Duration`* *`default=1h`*
62+
63+
Time-to-live for cardinality tracking cache entries.
64+
Prevents unbounded memory growth by forgetting old unique values.
65+
Should align with typical patterns of field value changes.
66+
67+
<br>
68+
69+
70+
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/cardinality/cache.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package discard
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
radix "github.com/armon/go-radix"
8+
"github.com/ozontech/file.d/xtime"
9+
)
10+
11+
type Cache struct {
12+
mu *sync.RWMutex
13+
tree *radix.Tree
14+
ttl int64
15+
}
16+
17+
func NewCache(ttl time.Duration) (*Cache, error) {
18+
return &Cache{
19+
tree: radix.New(),
20+
ttl: ttl.Nanoseconds(),
21+
mu: &sync.RWMutex{},
22+
}, nil
23+
}
24+
25+
func (c *Cache) Set(key string) bool {
26+
c.mu.Lock()
27+
defer c.mu.Unlock()
28+
29+
c.tree.Insert(key, xtime.GetInaccurateUnixNano())
30+
return true
31+
}
32+
33+
func (c *Cache) IsExists(key string) bool {
34+
c.mu.RLock()
35+
timeValue, found := c.tree.Get(key)
36+
c.mu.RUnlock()
37+
38+
if found {
39+
now := xtime.GetInaccurateUnixNano()
40+
isExpire := c.isExpire(now, timeValue.(int64))
41+
if isExpire {
42+
c.delete(key)
43+
return false
44+
}
45+
}
46+
return found
47+
}
48+
49+
func (c *Cache) isExpire(now, value int64) bool {
50+
diff := now - value
51+
return diff > c.ttl
52+
}
53+
54+
func (c *Cache) delete(key string) {
55+
c.mu.Lock()
56+
defer c.mu.Unlock()
57+
58+
c.tree.Delete(key)
59+
}
60+
61+
func (c *Cache) CountPrefix(prefix string) (count int) {
62+
var keysToDelete []string
63+
c.mu.RLock()
64+
now := xtime.GetInaccurateUnixNano()
65+
c.tree.WalkPrefix(prefix, func(s string, v any) bool {
66+
timeValue := v.(int64)
67+
if c.isExpire(now, timeValue) {
68+
keysToDelete = append(keysToDelete, s)
69+
} else {
70+
count++
71+
}
72+
return false
73+
})
74+
c.mu.RUnlock()
75+
76+
if len(keysToDelete) > 0 {
77+
for _, key := range keysToDelete {
78+
c.delete(key)
79+
}
80+
}
81+
return
82+
}

0 commit comments

Comments
 (0)