Skip to content

Commit a31d56f

Browse files
Allow using beats receivers for self-monitoring (#8031)
* Handle nil case in monitoring config parsing * Allow using otel runtime for self-monitoring # Conflicts: # internal/pkg/agent/application/monitoring/v1_monitor.go # Conflicts: # internal/pkg/otel/configtranslate/otelconfig.go * Modify e2e test * Make the monitoring e2e test more restrictive * Revert "Handle nil case in monitoring config parsing" This reverts commit bb11a0f. * Check receiver statuses in e2e test * Send data from beats processes and receivers to different namespaces * Check all component statuses in E2E test * Fix typo Co-authored-by: Khushi Jain <[email protected]> --------- Co-authored-by: Khushi Jain <[email protected]>
1 parent b5823bd commit a31d56f

File tree

4 files changed

+246
-276
lines changed

4 files changed

+246
-276
lines changed

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -414,19 +414,19 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
414414
componentInfo{
415415
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
416416
BinaryName: metricBeatName,
417-
RuntimeManager: component.DefaultRuntimeManager,
417+
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
418418
},
419419
componentInfo{
420420
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
421421
BinaryName: metricBeatName,
422-
RuntimeManager: component.DefaultRuntimeManager,
422+
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
423423
})
424424
}
425425
if b.config.C.MonitorLogs {
426426
componentInfos = append(componentInfos, componentInfo{
427427
ID: monitoringFilesUnitsID,
428428
BinaryName: fileBeatName,
429-
RuntimeManager: component.DefaultRuntimeManager,
429+
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
430430
})
431431
}
432432
// sort the components to ensure a consistent order of inputs in the configuration
@@ -444,15 +444,19 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo
444444

445445
streams = append(streams, b.getServiceComponentFilestreamStreams(componentInfos)...)
446446

447-
inputs := []interface{}{
448-
map[string]interface{}{
449-
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
450-
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
451-
"type": "filestream",
452-
useOutputKey: monitoringOutput,
453-
"streams": streams,
454-
},
447+
input := map[string]interface{}{
448+
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
449+
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
450+
"type": "filestream",
451+
useOutputKey: monitoringOutput,
452+
"streams": streams,
453+
}
454+
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
455+
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
456+
input["_runtime_experimental"] = b.config.C.RuntimeManager
455457
}
458+
459+
inputs := []any{input}
456460
inputsNode, found := cfg[inputsKey]
457461
if !found {
458462
return fmt.Errorf("no inputs in config")
@@ -518,6 +522,14 @@ func (b *BeatsMonitor) injectMetricsInput(
518522
},
519523
}
520524

525+
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
526+
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
527+
for _, input := range inputs {
528+
inputMap := input.(map[string]interface{})
529+
inputMap["_runtime_experimental"] = b.config.C.RuntimeManager
530+
}
531+
}
532+
521533
// add system/process metrics for services that can't be monitored via json/beats metrics
522534
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
523535
componentInfos, metricsCollectionIntervalString)...)

internal/pkg/agent/application/monitoring/v1_monitor_test.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func TestMonitoringFull(t *testing.T) {
3939
HTTP: &monitoringcfg.MonitoringHTTPConfig{
4040
Enabled: true,
4141
},
42+
RuntimeManager: monitoringcfg.DefaultRuntimeManager,
4243
},
4344
},
4445
agentInfo: agentInfo,
@@ -863,6 +864,7 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
863864
HTTP: &monitoringcfg.MonitoringHTTPConfig{
864865
Enabled: false,
865866
},
867+
RuntimeManager: monitoringcfg.DefaultRuntimeManager,
866868
},
867869
}
868870

@@ -914,7 +916,9 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
914916
// Verify that if we're using filebeat receiver, there's no filebeat input
915917
var monitoringCfg struct {
916918
Inputs []struct {
917-
Streams []struct {
919+
ID string
920+
RuntimeManager string `mapstructure:"_runtime_experimental"`
921+
Streams []struct {
918922
Path string `mapstructure:"path"`
919923
}
920924
}
@@ -931,7 +935,72 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
931935
}
932936
}
933937
}
934-
assert.Len(t, streamsForInputMetrics, 2) // we have two filebeats running: filestream-process and monitoring
938+
assert.Len(t, streamsForInputMetrics, 2)
939+
}
940+
941+
func TestMonitoringWithOtelRuntime(t *testing.T) {
942+
agentInfo, err := info.NewAgentInfo(context.Background(), false)
943+
require.NoError(t, err, "Error creating agent info")
944+
945+
cfg := &monitoringConfig{
946+
C: &monitoringcfg.MonitoringConfig{
947+
Enabled: true,
948+
MonitorLogs: true,
949+
MonitorMetrics: true,
950+
Namespace: "test",
951+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
952+
Enabled: false,
953+
},
954+
RuntimeManager: monitoringcfg.OtelRuntimeManager,
955+
},
956+
}
957+
958+
policy := map[string]any{
959+
"agent": map[string]any{
960+
"monitoring": map[string]any{
961+
"metrics": true,
962+
"logs": false,
963+
},
964+
},
965+
"outputs": map[string]any{
966+
"default": map[string]any{},
967+
},
968+
}
969+
970+
b := &BeatsMonitor{
971+
enabled: true,
972+
config: cfg,
973+
agentInfo: agentInfo,
974+
}
975+
976+
components := []component.Component{
977+
{
978+
ID: "filestream-receiver",
979+
InputSpec: &component.InputRuntimeSpec{
980+
Spec: component.InputSpec{
981+
Command: &component.CommandSpec{
982+
Name: "filebeat",
983+
},
984+
},
985+
},
986+
RuntimeManager: component.OtelRuntimeManager,
987+
},
988+
}
989+
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
990+
require.NoError(t, err)
991+
992+
// Verify that if we're using filebeat receiver, there's no filebeat input
993+
var monitoringCfg struct {
994+
Inputs []struct {
995+
ID string
996+
RuntimeManager string `mapstructure:"_runtime_experimental"`
997+
}
998+
}
999+
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
1000+
require.NoError(t, err)
1001+
for _, input := range monitoringCfg.Inputs {
1002+
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
1003+
}
9351004
}
9361005

9371006
func TestEnrichArgs(t *testing.T) {

internal/pkg/core/monitoring/config/config.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ const (
1616
defaultNamespace = "default"
1717

1818
// DefaultHost is used when host is not defined or empty
19-
DefaultHost = "localhost"
19+
DefaultHost = "localhost"
20+
ProcessRuntimeManager = "process"
21+
OtelRuntimeManager = "otel"
22+
DefaultRuntimeManager = ProcessRuntimeManager
2023
)
2124

2225
// MonitoringConfig describes a configuration of a monitoring
@@ -33,6 +36,7 @@ type MonitoringConfig struct {
3336
MonitorTraces bool `yaml:"traces" config:"traces"`
3437
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
3538
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
39+
RuntimeManager string `yaml:"_runtime_experimental,omitempty" config:"_runtime_experimental,omitempty"`
3640
}
3741

3842
// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
@@ -118,9 +122,10 @@ func DefaultConfig() *MonitoringConfig {
118122
Host: DefaultHost,
119123
Port: defaultPort,
120124
},
121-
Namespace: defaultNamespace,
122-
APM: defaultAPMConfig(),
123-
Diagnostics: defaultDiagnostics(),
125+
Namespace: defaultNamespace,
126+
APM: defaultAPMConfig(),
127+
Diagnostics: defaultDiagnostics(),
128+
RuntimeManager: DefaultRuntimeManager,
124129
}
125130
}
126131

0 commit comments

Comments
 (0)