Skip to content

Commit aaa7855

Browse files
committed
feat:扩展 metricsfilter processor 的 drop action 过滤功能,支持额外条件过滤
1 parent f126c9a commit aaa7855

File tree

4 files changed

+130
-20
lines changed

4 files changed

+130
-20
lines changed

pkg/collector/processor/metricsfilter/config.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,29 @@ func (c *Config) Validate() error {
5959
}
6060

6161
type DropAction struct {
62-
Metrics []string `config:"metrics" mapstructure:"metrics"`
62+
Metrics []string `config:"metrics" mapstructure:"metrics"`
63+
Op Op `config:"op" mapstructure:"op"`
64+
Rules []*DropRule `config:"extra_rules" mapstructure:"extra_rules"`
65+
}
66+
67+
func (d *DropAction) MetricMatch(name string) bool {
68+
switch d.Op {
69+
case OpNotIn:
70+
return !slices.Contains(d.Metrics, name)
71+
72+
default:
73+
return slices.Contains(d.Metrics, name)
74+
}
75+
}
76+
77+
type DropRule struct {
78+
PredicateKey string `config:"predicate_key" mapstructure:"predicate_key"`
79+
MatchConfig MatchConfig `config:"match" mapstructure:"match"`
80+
}
81+
82+
type MatchConfig struct {
83+
Op string `config:"op" mapstructure:"op"`
84+
Value string `config:"value" mapstructure:"value"`
6385
}
6486

6587
type ReplaceAction struct {

pkg/collector/processor/metricsfilter/factory.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717

1818
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/confengine"
1919
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/define"
20+
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/internal/fields"
2021
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/internal/foreach"
2122
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/internal/mapstructure"
23+
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/internal/opmatch"
2224
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/internal/promlabels"
2325
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/collector/processor"
2426
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
@@ -140,18 +142,34 @@ func (p *metricsFilter) Process(record *define.Record) (*define.Record, error) {
140142
func (p *metricsFilter) dropAction(record *define.Record, config Config) {
141143
switch record.RecordType {
142144
case define.RecordMetrics:
143-
for _, name := range config.Drop.Metrics {
144-
pdMetrics := record.Data.(pmetric.Metrics)
145-
pdMetrics.ResourceMetrics().RemoveIf(func(resourceMetrics pmetric.ResourceMetrics) bool {
146-
resourceMetrics.ScopeMetrics().RemoveIf(func(scopeMetrics pmetric.ScopeMetrics) bool {
147-
scopeMetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool {
148-
return metric.Name() == name
149-
})
150-
return scopeMetrics.Metrics().Len() == 0
145+
action := config.Drop
146+
pdMetrics := record.Data.(pmetric.Metrics)
147+
pdMetrics.ResourceMetrics().RemoveIf(func(resourceMetrics pmetric.ResourceMetrics) bool {
148+
rs := resourceMetrics.Resource().Attributes()
149+
resourceMetrics.ScopeMetrics().RemoveIf(func(scopeMetrics pmetric.ScopeMetrics) bool {
150+
scopeMetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool {
151+
ruleMatch := true
152+
if len(action.Rules) > 0 {
153+
for _, rule := range action.Rules {
154+
ff, pk := fields.DecodeFieldFrom(rule.PredicateKey)
155+
switch ff {
156+
// TODO(aivan): 目前 predicateKey 暂时只支持 resource 后续可能会扩展
157+
case fields.FieldFromResource:
158+
rv, ok := rs.Get(pk)
159+
if !ok || !opmatch.Match(rv.AsString(), rule.MatchConfig.Value, rule.MatchConfig.Op) {
160+
ruleMatch = false
161+
}
162+
default:
163+
logger.Errorf("unsupported field %s", rule.PredicateKey)
164+
}
165+
}
166+
}
167+
return action.MetricMatch(metric.Name()) && ruleMatch
151168
})
152-
return resourceMetrics.ScopeMetrics().Len() == 0
169+
return scopeMetrics.Metrics().Len() == 0
153170
})
154-
}
171+
return resourceMetrics.ScopeMetrics().Len() == 0
172+
})
155173
}
156174
}
157175

pkg/collector/processor/metricsfilter/factory_test.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,24 +138,78 @@ func TestMetricsNoAction(t *testing.T) {
138138
}
139139

140140
func TestMetricsDropAction(t *testing.T) {
141-
content := `
141+
t.Run("simple_in", func(t *testing.T) {
142+
content := `
142143
processor:
143144
- name: "metrics_filter/drop"
144145
config:
145146
drop:
146147
metrics:
147148
- "my_metrics"
148149
`
149-
factory := processor.MustCreateFactory(content, NewFactory)
150+
factory := processor.MustCreateFactory(content, NewFactory)
150151

151-
record := define.Record{
152-
RecordType: define.RecordMetrics,
153-
Data: makeMetricsRecord(1),
154-
}
152+
record := define.Record{
153+
RecordType: define.RecordMetrics,
154+
Data: makeMetricsRecord(1),
155+
}
155156

156-
testkits.MustProcess(t, factory, record)
157-
metrics := record.Data.(pmetric.Metrics).ResourceMetrics()
158-
assert.Equal(t, 0, metrics.Len())
157+
testkits.MustProcess(t, factory, record)
158+
metrics := record.Data.(pmetric.Metrics).ResourceMetrics()
159+
assert.Equal(t, 0, metrics.Len())
160+
})
161+
162+
t.Run("simple_notin", func(t *testing.T) {
163+
content := `
164+
processor:
165+
- name: "metrics_filter/drop"
166+
config:
167+
drop:
168+
metrics:
169+
- "my_metrics1"
170+
op: "notin"
171+
`
172+
factory := processor.MustCreateFactory(content, NewFactory)
173+
174+
record := define.Record{
175+
RecordType: define.RecordMetrics,
176+
Data: makeMetricsRecord(1),
177+
}
178+
179+
testkits.MustProcess(t, factory, record)
180+
metrics := record.Data.(pmetric.Metrics).ResourceMetrics()
181+
assert.Equal(t, 0, metrics.Len())
182+
})
183+
184+
t.Run("complex_notin", func(t *testing.T) {
185+
content := `
186+
processor:
187+
- name: "metrics_filter/drop"
188+
config:
189+
drop:
190+
metrics:
191+
- "my_metrics111"
192+
op: "notin"
193+
extra_rules:
194+
- predicate_key: "resource.telemetry.distro.name"
195+
match:
196+
op: "eq"
197+
value: "opentelemetry-java-instrumentation"
198+
`
199+
factory := processor.MustCreateFactory(content, NewFactory)
200+
201+
record := define.Record{
202+
RecordType: define.RecordMetrics,
203+
Data: makeMetricsRecordWith(
204+
"my_metrics", 1, map[string]string{
205+
"telemetry.distro.name": "opentelemetry-java-instrumentation",
206+
}, map[string]string{}),
207+
}
208+
209+
testkits.MustProcess(t, factory, record)
210+
metrics := record.Data.(pmetric.Metrics).ResourceMetrics()
211+
assert.Equal(t, 0, metrics.Len())
212+
})
159213
}
160214

161215
func TestMetricsReplaceAction(t *testing.T) {

pkg/collector/support-files/templates/linux/x86_64/etc/bk-collector-application.conf.tpl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ default:
8686
{% if metrics_filter_config is defined %}
8787
- name: "{{ metrics_filter_config.name }}"
8888
config:
89+
{%- if metrics_filter_config.code_relabel is defined %}
8990
code_relabel:
9091
{%- for item in metrics_filter_config.code_relabel %}
9192
- metrics: {{ item.metrics | tojson }}
@@ -103,6 +104,21 @@ default:
103104
{%- endfor %}
104105
{%- endfor %}
105106
{%- endfor %}
107+
{%- endif %}
108+
{%- if metrics_filter_config.drop is defined %}
109+
drop:
110+
metrics: {{ metrics_filter_config.drop.metrics | tojson }}
111+
op: {{metrics_filter_config.drop.get("op", "in")}}
112+
{%- if metrics_filter_config.drop.extra_rules is defined %}
113+
extra_rules:
114+
{%- for item in metrics_filter_config.drop.get("extra_rules", []) %}
115+
- predicate_key: '{{ item.predicate_key }}'
116+
match:
117+
op: '{{ item.match.op }}'
118+
value: '{{ item.match.value }}'
119+
{%- endfor %}
120+
{%- endif %}
121+
{%- endif %}
106122
{%- endif %}
107123

108124
{% if db_slow_command_config is defined %}

0 commit comments

Comments
 (0)