Skip to content

Commit 9071339

Browse files
committed
fix after review
1 parent 0c8806e commit 9071339

File tree

6 files changed

+31
-24
lines changed

6 files changed

+31
-24
lines changed

e2e/start_work_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ozontech/file.d/fd"
2525
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
2626
_ "github.com/ozontech/file.d/plugin/action/add_host"
27+
_ "github.com/ozontech/file.d/plugin/action/cardinality"
2728
_ "github.com/ozontech/file.d/plugin/action/convert_date"
2829
_ "github.com/ozontech/file.d/plugin/action/convert_log_level"
2930
_ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes"

plugin/action/cardinality/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ Leave empty for default metric naming.
5050

5151
<br>
5252

53+
**`metric_label_value`** *`string`* *`default=unknown`*
54+
55+
Value assigned to the metric label when cardinality limit is exceeded.
56+
57+
<br>
58+
5359
**`limit`** *`int`* *`default=10000`*
5460

5561
Maximum allowed number of unique values for monitored fields.

plugin/action/cardinality/cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ type Cache struct {
1414
ttl int64
1515
}
1616

17-
func NewCache(ttl time.Duration) (*Cache, error) {
17+
func NewCache(ttl time.Duration) *Cache {
1818
return &Cache{
1919
tree: radix.New(),
2020
ttl: ttl.Nanoseconds(),
2121
mu: &sync.RWMutex{},
22-
}, nil
22+
}
2323
}
2424

2525
func (c *Cache) Set(key string) bool {
@@ -60,8 +60,8 @@ func (c *Cache) delete(key string) {
6060

6161
func (c *Cache) CountPrefix(prefix string) (count int) {
6262
var keysToDelete []string
63-
c.mu.RLock()
6463
now := xtime.GetInaccurateUnixNano()
64+
c.mu.RLock()
6565
c.tree.WalkPrefix(prefix, func(s string, v any) bool {
6666
timeValue := v.(int64)
6767
if c.isExpire(now, timeValue) {

plugin/action/cardinality/cache_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ import (
99
)
1010

1111
func TestNewCache(t *testing.T) {
12-
cache, err := NewCache(time.Minute)
13-
assert.NoError(t, err)
12+
cache := NewCache(time.Minute)
1413
assert.NotNil(t, cache)
1514
}
1615

1716
func TestSetAndExists(t *testing.T) {
18-
cache, _ := NewCache(time.Minute)
17+
cache := NewCache(time.Minute)
1918

2019
t.Run("basic set and get", func(t *testing.T) {
2120
key := "test-key"
@@ -32,7 +31,7 @@ func TestSetAndExists(t *testing.T) {
3231
}
3332

3433
func TestDelete(t *testing.T) {
35-
cache, _ := NewCache(time.Minute)
34+
cache := NewCache(time.Minute)
3635

3736
key := "to-delete"
3837
cache.Set(key)
@@ -50,7 +49,7 @@ func TestDelete(t *testing.T) {
5049
}
5150

5251
func TestCountPrefix(t *testing.T) {
53-
cache, _ := NewCache(time.Minute)
52+
cache := NewCache(time.Minute)
5453

5554
keys := []string{
5655
"key1_subkey1",
@@ -86,7 +85,7 @@ func TestCountPrefix(t *testing.T) {
8685
}
8786

8887
func TestConcurrentOperations(t *testing.T) {
89-
cache, _ := NewCache(time.Minute)
88+
cache := NewCache(time.Minute)
9089

9190
var wg sync.WaitGroup
9291
keys := []string{"key1", "key2", "key3"}
@@ -154,7 +153,7 @@ func TestConcurrentOperations(t *testing.T) {
154153
}
155154

156155
func TestTTL(t *testing.T) {
157-
cache, _ := NewCache(100 * time.Millisecond)
156+
cache := NewCache(100 * time.Millisecond)
158157

159158
key := "ttl-key"
160159
cache.Set(key)

plugin/action/cardinality/cardinality.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Plugin struct {
4141
fields []string
4242
logger *zap.Logger
4343

44-
cardinalityDiscardCounter *prometheus.CounterVec
44+
cardinalityApplyCounter *prometheus.CounterVec
4545
}
4646

4747
const (
@@ -82,6 +82,11 @@ type Config struct {
8282
// > Leave empty for default metric naming.
8383
MetricPrefix string `json:"metric_prefix" default:""` // *
8484

85+
// > @3@4@5@6
86+
// >
87+
// > Value assigned to the metric label when cardinality limit is exceeded.
88+
MetricLabelValue string `json:"metric_label_value" default:"unknown"` // *
89+
8590
// > @3@4@5@6
8691
// >
8792
// > Maximum allowed number of unique values for monitored fields.
@@ -134,13 +139,13 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string
134139
func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) {
135140
var metricName string
136141
if prefix == "" {
137-
metricName = "cardinality_discard_total"
142+
metricName = "cardinality_applied_total"
138143
} else {
139-
metricName = fmt.Sprintf(`cardinality_discard_%s_total`, prefix)
144+
metricName = fmt.Sprintf(`cardinality_applied_%s_total`, prefix)
140145
}
141-
p.cardinalityDiscardCounter = p.makeMetric(ctl,
146+
p.cardinalityApplyCounter = p.makeMetric(ctl,
142147
metricName,
143-
"Total number of events discarded due to cardinality limits",
148+
"Total number of events applied due to cardinality limits",
144149
p.keys...,
145150
)
146151
}
@@ -149,11 +154,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
149154
p.config = config.(*Config)
150155
p.logger = params.Logger.Desugar()
151156

152-
var err error
153-
p.cache, err = NewCache(p.config.TTL_)
154-
if err != nil {
155-
panic(err)
156-
}
157+
p.cache = NewCache(p.config.TTL_)
157158

158159
p.keys = make([]string, 0, len(p.config.KeyFields))
159160
for _, fs := range p.config.KeyFields {
@@ -198,16 +199,16 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
198199
value := mapToStringSorted(cacheKey, cacheValue)
199200
keysCount := p.cache.CountPrefix(key)
200201

201-
if p.config.Limit > 0 && keysCount >= p.config.Limit {
202+
if p.config.Limit >= 0 && keysCount >= p.config.Limit {
202203
labelsValues := make([]string, 0, len(p.keys))
203204
for _, key := range p.keys {
204205
if val, exists := cacheKey[key]; exists {
205206
labelsValues = append(labelsValues, val)
206207
} else {
207-
labelsValues = append(labelsValues, "unknown")
208+
labelsValues = append(labelsValues, p.config.MetricLabelValue)
208209
}
209210
}
210-
p.cardinalityDiscardCounter.WithLabelValues(labelsValues...).Inc()
211+
p.cardinalityApplyCounter.WithLabelValues(labelsValues...).Inc()
211212
switch p.config.Action {
212213
case actionDiscard:
213214
return pipeline.ActionDiscard

plugin/action/cardinality/cardinality_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func TestCardinalityLimitDiscardIfNoSetKeyFields(t *testing.T) {
183183
}
184184

185185
func TestSetAndCountPrefix(t *testing.T) {
186-
cache, _ := NewCache(time.Minute)
186+
cache := NewCache(time.Minute)
187187

188188
cacheKey := map[string]string{
189189
"host": "localhost",

0 commit comments

Comments
 (0)