@@ -90,7 +90,8 @@ type Plugin struct {
9090 fields []parsedField
9191 logger * zap.Logger
9292
93- cardinalityApplyCounter * prometheus.CounterVec
93+ cardinalityApplyCounter * prometheus.CounterVec
94+ cardinalityUniqueValuesGauge * prometheus.GaugeVec
9495}
9596
9697type parsedField struct {
@@ -102,6 +103,7 @@ type parsedField struct {
102103const (
103104 actionDiscard = "discard"
104105 actionRemoveFields = "remove_fields"
106+ actionNothing = "nothing"
105107)
106108
107109// ! config-params
@@ -205,6 +207,17 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) {
205207 "Total number of events applied due to cardinality limits" ,
206208 keyMetricLabels (p .keys )... ,
207209 )
210+
211+ if prefix == "" {
212+ metricName = "cardinality_unique_values_count"
213+ } else {
214+ metricName = fmt .Sprintf (`cardinality_unique_values_%s_count` , prefix )
215+ }
216+ p .cardinalityUniqueValuesGauge = ctl .RegisterGaugeVec (
217+ metricName ,
218+ "Count of unique values" ,
219+ keyMetricLabels (p .keys )... ,
220+ )
208221}
209222
210223func keyMetricLabels (fields []parsedField ) []string {
@@ -232,18 +245,20 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
232245 cacheKey .Set (key .name , value )
233246 }
234247
248+ labelsValues := getLabelsValues (len (p .keys ))
249+ defer labelsValuesPool .Put (labelsValues )
250+ for _ , key := range p .keys {
251+ if val , exists := cacheKey .Get (key .name ); exists {
252+ labelsValues = append (labelsValues , val )
253+ }
254+ }
255+
235256 prefixKey := mapToKey (cacheKey )
236257 keysCount := p .cache .CountPrefix (prefixKey )
237258
238- if p .config .Limit >= 0 && keysCount >= p .config .Limit {
239- labelsValues := getLabelsValues (len (p .keys ))
240- defer labelsValuesPool .Put (labelsValues )
259+ shouldUpdateCache := false
241260
242- for _ , key := range p .keys {
243- if val , exists := cacheKey .Get (key .name ); exists {
244- labelsValues = append (labelsValues , val )
245- }
246- }
261+ if p .config .Limit >= 0 && keysCount >= p .config .Limit {
247262 p .cardinalityApplyCounter .WithLabelValues (labelsValues ... ).Inc ()
248263 switch p .config .Action {
249264 case actionDiscard :
@@ -252,16 +267,30 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
252267 for _ , key := range p .fields {
253268 event .Root .Dig (key .value ... ).Suicide ()
254269 }
270+ case actionNothing :
271+ shouldUpdateCache = true
255272 }
256273 } else {
274+ shouldUpdateCache = true
275+ }
276+
277+ if shouldUpdateCache {
257278 cacheValue := orderedmap .NewOrderedMap [string , string ]()
258279 for _ , key := range p .fields {
259280 value := pipeline .CloneString (event .Root .Dig (key .value ... ).AsString ())
260281 cacheValue .Set (key .name , value )
261282 }
262283 value := mapToStringSorted (cacheKey , cacheValue )
263- p .cache .Set (value )
284+ isOldValue := p .cache .Set (value )
285+ if ! isOldValue {
286+ // is new value
287+ if p .config .Action != actionNothing {
288+ keysCount ++
289+ }
290+ }
291+ p .cardinalityUniqueValuesGauge .WithLabelValues (labelsValues ... ).Set (float64 (keysCount ))
264292 }
293+
265294 return pipeline .ActionPass
266295}
267296
0 commit comments