Skip to content

Commit 86f5d7a

Browse files
Refactor the monitoring injection interface (elastic#7812) (elastic#7851)
* Refactor the monitoring injection interface * Remove unnecessary comment (cherry picked from commit 0d0e70d) Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent 8a24295 commit 86f5d7a

File tree

6 files changed

+122
-93
lines changed

6 files changed

+122
-93
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ type MonitorManager interface {
9494
// args:
9595
// - the existing config policy
9696
// - a list of the expected running components
97-
// - a map of component IDs to binary names
9897
// - a map of component IDs to the PIDs of the running components.
99-
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error)
98+
MonitoringConfig(map[string]interface{}, []component.Component, map[string]uint64) (map[string]interface{}, error)
10099
}
101100

102101
// Runner provides interface to run a manager and receive running errors.

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,12 +1192,12 @@ type testMonitoringManager struct{}
11921192

11931193
func newTestMonitoringMgr() *testMonitoringManager { return &testMonitoringManager{} }
11941194

1195-
func (*testMonitoringManager) EnrichArgs(_ string, _ string, args []string) []string { return args }
1196-
func (*testMonitoringManager) Prepare(_ string) error { return nil }
1197-
func (*testMonitoringManager) Cleanup(string) error { return nil }
1198-
func (*testMonitoringManager) Enabled() bool { return false }
1199-
func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil }
1200-
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) {
1195+
func (*testMonitoringManager) EnrichArgs(_, _ string, args []string) []string { return args }
1196+
func (*testMonitoringManager) Prepare(string) error { return nil }
1197+
func (*testMonitoringManager) Cleanup(string) error { return nil }
1198+
func (*testMonitoringManager) Enabled() bool { return false }
1199+
func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil }
1200+
func (*testMonitoringManager) MonitoringConfig(map[string]interface{}, []component.Component, map[string]uint64) (map[string]interface{}, error) {
12011201
return nil, nil
12021202
}
12031203

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 89 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package monitoring
77
import (
88
"crypto/sha256"
99
"fmt"
10-
"maps"
1110
"math"
1211
"net"
1312
"net/url"
@@ -56,6 +55,8 @@ const (
5655
monitoringOutput = "monitoring"
5756
defaultMonitoringNamespace = "default"
5857
agentName = "elastic-agent"
58+
metricBeatName = "metricbeat"
59+
fileBeatName = "filebeat"
5960

6061
monitoringMetricsUnitID = "metrics-monitoring"
6162
monitoringFilesUnitsID = "filestream-monitoring"
@@ -86,6 +87,16 @@ type BeatsMonitor struct {
8687
agentInfo info.Agent
8788
}
8889

90+
// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
91+
// the Component struct here because we also want to generate configurations for the monitoring components themselves,
92+
// but without generating the full Component for them.
93+
type componentInfo struct {
94+
ID string
95+
BinaryName string
96+
InputSpec *component.InputRuntimeSpec
97+
Pid uint64
98+
}
99+
89100
type monitoringConfig struct {
90101
C *monitoringCfg.MonitoringConfig `config:"agent.monitoring"`
91102
}
@@ -129,7 +140,6 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error {
129140
func (b *BeatsMonitor) MonitoringConfig(
130141
policy map[string]interface{},
131142
components []component.Component,
132-
componentIDToBinary map[string]string,
133143
componentIDPidMap map[string]uint64,
134144
) (map[string]interface{}, error) {
135145
if !b.Enabled() {
@@ -198,6 +208,8 @@ func (b *BeatsMonitor) MonitoringConfig(
198208
}
199209
}
200210

211+
componentInfos := b.getComponentInfos(components, componentIDPidMap)
212+
201213
if err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName); err != nil && !errors.Is(err, errNoOuputPresent) {
202214
return nil, errors.New(err, "failed to inject monitoring output")
203215
} else if errors.Is(err, errNoOuputPresent) {
@@ -209,13 +221,13 @@ func (b *BeatsMonitor) MonitoringConfig(
209221
b.initInputs(cfg)
210222

211223
if b.config.C.MonitorLogs {
212-
if err := b.injectLogsInput(cfg, components, monitoringOutput); err != nil {
224+
if err := b.injectLogsInput(cfg, componentInfos, monitoringOutput); err != nil {
213225
return nil, errors.New(err, "failed to inject monitoring output")
214226
}
215227
}
216228

217229
if b.config.C.MonitorMetrics {
218-
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil {
230+
if err := b.injectMetricsInput(cfg, componentInfos, metricsCollectionIntervalString, failureThreshold); err != nil {
219231
return nil, errors.New(err, "failed to inject monitoring output")
220232
}
221233
}
@@ -355,8 +367,48 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{
355367
return nil
356368
}
357369

370+
// getComponentInfos returns a slice of componentInfo structs based on the provided components. This slice contains
371+
// all the information needed to generate the monitoring configuration for these components, as well as configuration
372+
// for new components which are going to be doing the monitoring.
373+
func (b *BeatsMonitor) getComponentInfos(components []component.Component, componentIDPidMap map[string]uint64) []componentInfo {
374+
componentInfos := make([]componentInfo, 0, len(components))
375+
for _, comp := range components {
376+
compInfo := componentInfo{
377+
ID: comp.ID,
378+
BinaryName: comp.BinaryName(),
379+
InputSpec: comp.InputSpec,
380+
}
381+
if pid, ok := componentIDPidMap[comp.ID]; ok {
382+
compInfo.Pid = pid
383+
}
384+
componentInfos = append(componentInfos, compInfo)
385+
}
386+
if b.config.C.MonitorMetrics {
387+
componentInfos = append(componentInfos,
388+
componentInfo{
389+
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
390+
BinaryName: metricBeatName,
391+
},
392+
componentInfo{
393+
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
394+
BinaryName: metricBeatName,
395+
})
396+
}
397+
if b.config.C.MonitorLogs {
398+
componentInfos = append(componentInfos, componentInfo{
399+
ID: monitoringFilesUnitsID,
400+
BinaryName: fileBeatName,
401+
})
402+
}
403+
// sort the components to ensure a consistent order of inputs in the configuration
404+
slices.SortFunc(componentInfos, func(a, b componentInfo) int {
405+
return strings.Compare(a.ID, b.ID)
406+
})
407+
return componentInfos
408+
}
409+
358410
// injectLogsInput adds logging configs for component monitoring to the `cfg` map
359-
func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []component.Component, monitoringOutput string) error {
411+
func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfos []componentInfo, monitoringOutput string) error {
360412
monitoringNamespace := b.monitoringNamespace()
361413
logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem))
362414

@@ -508,18 +560,18 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
508560
}
509561

510562
// service components that define a log path are monitored using its own stream in the monitor
511-
for _, comp := range components {
512-
if comp.InputSpec == nil || comp.InputSpec.Spec.Service == nil || comp.InputSpec.Spec.Service.Log == nil || comp.InputSpec.Spec.Service.Log.Path == "" {
563+
for _, compInfo := range componentInfos {
564+
if compInfo.InputSpec == nil || compInfo.InputSpec.Spec.Service == nil || compInfo.InputSpec.Spec.Service.Log == nil || compInfo.InputSpec.Spec.Service.Log.Path == "" {
513565
// only monitor service inputs that define a log path
514566
continue
515567
}
516-
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.BinaryName(), "-", "_"), "/", "_") // conform with index naming policy
568+
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(compInfo.BinaryName, "-", "_"), "/", "_") // conform with index naming policy
517569
dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName)
518570
streams = append(streams, map[string]interface{}{
519-
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, comp.ID),
571+
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, compInfo.ID),
520572
"type": "filestream",
521573
"paths": []interface{}{
522-
comp.InputSpec.Spec.Service.Log.Path,
574+
compInfo.InputSpec.Spec.Service.Log.Path,
523575
},
524576
"data_stream": map[string]interface{}{
525577
"type": "logs",
@@ -547,9 +599,9 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
547599
"add_fields": map[string]interface{}{
548600
"target": "component",
549601
"fields": map[string]interface{}{
550-
"id": comp.ID,
551-
"type": comp.InputSpec.InputType,
552-
"binary": comp.BinaryName(),
602+
"id": compInfo.ID,
603+
"type": compInfo.InputSpec.InputType,
604+
"binary": compInfo.BinaryName,
553605
"dataset": dataset,
554606
},
555607
},
@@ -559,7 +611,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
559611
"add_fields": map[string]interface{}{
560612
"target": "log",
561613
"fields": map[string]interface{}{
562-
"source": comp.ID,
614+
"source": compInfo.ID,
563615
},
564616
},
565617
},
@@ -601,9 +653,7 @@ func (b *BeatsMonitor) monitoringNamespace() string {
601653
// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.
602654
func (b *BeatsMonitor) injectMetricsInput(
603655
cfg map[string]interface{},
604-
componentIDToBinary map[string]string,
605-
componentList []component.Component,
606-
existingStateServicePids map[string]uint64,
656+
componentInfos []componentInfo,
607657
metricsCollectionIntervalString string,
608658
failureThreshold *uint,
609659
) error {
@@ -617,18 +667,8 @@ func (b *BeatsMonitor) injectMetricsInput(
617667
}
618668
monitoringNamespace := b.monitoringNamespace()
619669

620-
//create a new map with the monitoring beats included
621-
componentListWithMonitoring := map[string]string{
622-
fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat",
623-
fmt.Sprintf("http/%s", monitoringMetricsUnitID): "metricbeat",
624-
monitoringFilesUnitsID: "filebeat",
625-
}
626-
for k, v := range componentIDToBinary {
627-
componentListWithMonitoring[k] = v
628-
}
629-
630-
beatsStreams := b.getBeatsStreams(componentListWithMonitoring, failureThreshold, metricsCollectionIntervalString)
631-
httpStreams := b.getHttpStreams(componentListWithMonitoring, failureThreshold, metricsCollectionIntervalString)
670+
beatsStreams := b.getBeatsStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
671+
httpStreams := b.getHttpStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
632672

633673
inputs := []interface{}{
634674
map[string]interface{}{
@@ -655,7 +695,7 @@ func (b *BeatsMonitor) injectMetricsInput(
655695

656696
// add system/process metrics for services that can't be monitored via json/beats metrics
657697
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
658-
componentList, existingStateServicePids, metricsCollectionIntervalString)...)
698+
componentInfos, metricsCollectionIntervalString)...)
659699

660700
inputsNode, found := cfg[inputsKey]
661701
if !found {
@@ -675,15 +715,15 @@ func (b *BeatsMonitor) injectMetricsInput(
675715
// getHttpStreams returns stream definitions for http/metrics inputs.
676716
// Note: The return type must be []any due to protobuf serialization quirks.
677717
func (b *BeatsMonitor) getHttpStreams(
678-
componentIDToBinary map[string]string,
718+
componentInfos []componentInfo,
679719
failureThreshold *uint,
680720
metricsCollectionIntervalString string,
681721
) []any {
682722
monitoringNamespace := b.monitoringNamespace()
683723
sanitizedAgentName := sanitizeName(agentName)
684724
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", sanitizedAgentName, monitoringNamespace)
685725
dataset := fmt.Sprintf("elastic_agent.%s", sanitizedAgentName)
686-
httpStreams := make([]any, 0, len(componentIDToBinary))
726+
httpStreams := make([]any, 0, len(componentInfos))
687727

688728
agentStream := map[string]any{
689729
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
@@ -705,15 +745,13 @@ func (b *BeatsMonitor) getHttpStreams(
705745
}
706746
httpStreams = append(httpStreams, agentStream)
707747

708-
// ensure consistent ordering
709-
unitIDs := slices.Sorted(maps.Keys(componentIDToBinary))
710-
for _, unit := range unitIDs {
711-
binaryName := componentIDToBinary[unit]
748+
for _, compInfo := range componentInfos {
749+
binaryName := compInfo.BinaryName
712750
if !isSupportedMetricsBinary(binaryName) {
713751
continue
714752
}
715753

716-
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(unit, paths.TempDir()))}
754+
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(compInfo.ID, paths.TempDir()))}
717755
name := sanitizeName(binaryName)
718756

719757
httpStream := map[string]interface{}{
@@ -729,7 +767,7 @@ func (b *BeatsMonitor) getHttpStreams(
729767
"namespace": "agent",
730768
"period": metricsCollectionIntervalString,
731769
"index": indexName,
732-
"processors": processorsForHttpStream(binaryName, unit, dataset, b.agentInfo),
770+
"processors": processorsForHttpStream(binaryName, compInfo.ID, dataset, b.agentInfo),
733771
}
734772
if failureThreshold != nil {
735773
httpStream[failureThresholdKey] = *failureThreshold
@@ -755,7 +793,7 @@ func (b *BeatsMonitor) getHttpStreams(
755793
"json.is_array": true,
756794
"period": metricsCollectionIntervalString,
757795
"index": fbIndexName,
758-
"processors": processorsForHttpStream(binaryName, unit, fbDataset, b.agentInfo),
796+
"processors": processorsForHttpStream(binaryName, compInfo.ID, fbDataset, b.agentInfo),
759797
}
760798
if failureThreshold != nil {
761799
fbStream[failureThresholdKey] = *failureThreshold
@@ -770,22 +808,21 @@ func (b *BeatsMonitor) getHttpStreams(
770808
// getBeatsStreams returns stream definitions for beats inputs.
771809
// Note: The return type must be []any due to protobuf serialization quirks.
772810
func (b *BeatsMonitor) getBeatsStreams(
773-
componentIDToBinary map[string]string,
811+
componentInfos []componentInfo,
774812
failureThreshold *uint,
775813
metricsCollectionIntervalString string,
776814
) []any {
777815
monitoringNamespace := b.monitoringNamespace()
778-
beatsStreams := make([]any, 0, len(componentIDToBinary))
816+
beatsStreams := make([]any, 0, len(componentInfos))
779817

780818
// ensure consistent ordering
781-
unitIDs := slices.Sorted(maps.Keys(componentIDToBinary))
782-
for _, unit := range unitIDs {
783-
binaryName := componentIDToBinary[unit]
819+
for _, compInfo := range componentInfos {
820+
binaryName := compInfo.BinaryName
784821
if !isSupportedBeatsBinary(binaryName) {
785822
continue
786823
}
787824

788-
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(unit, paths.TempDir()))}
825+
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(compInfo.ID, paths.TempDir()))}
789826
name := sanitizeName(binaryName)
790827
dataset := fmt.Sprintf("elastic_agent.%s", name)
791828
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace)
@@ -801,7 +838,7 @@ func (b *BeatsMonitor) getBeatsStreams(
801838
"hosts": endpoints,
802839
"period": metricsCollectionIntervalString,
803840
"index": indexName,
804-
"processors": processorsForBeatsStream(binaryName, unit, monitoringNamespace, dataset, b.agentInfo),
841+
"processors": processorsForBeatsStream(binaryName, compInfo.ID, monitoringNamespace, dataset, b.agentInfo),
805842
}
806843

807844
if failureThreshold != nil {
@@ -818,22 +855,17 @@ func (b *BeatsMonitor) getBeatsStreams(
818855
// running as services.
819856
// Note: The return type must be []any due to protobuf serialization quirks.
820857
func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
821-
components []component.Component,
822-
existingStateServicePids map[string]uint64,
858+
componentInfos []componentInfo,
823859
metricsCollectionIntervalString string,
824860
) []any {
825861
monitoringNamespace := b.monitoringNamespace()
826862
inputs := []any{}
827-
for _, comp := range components {
828-
if comp.InputSpec == nil || comp.InputSpec.Spec.Service == nil {
829-
continue
830-
}
831-
compPid, ok := existingStateServicePids[comp.ID]
832-
if !ok || compPid == 0 {
863+
for _, compInfo := range componentInfos {
864+
if compInfo.InputSpec == nil || compInfo.InputSpec.Spec.Service == nil || compInfo.Pid == 0 {
833865
continue
834866
}
835867
// If there's a checkin PID and the corresponding component has a service spec section, add a system/process config
836-
name := sanitizeName(comp.BinaryName())
868+
name := sanitizeName(compInfo.BinaryName)
837869
dataset := fmt.Sprintf("elastic_agent.%s", name)
838870
input := map[string]interface{}{
839871
idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name),
@@ -854,9 +886,9 @@ func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
854886
"metricsets": []interface{}{"process"},
855887
"period": metricsCollectionIntervalString,
856888
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace),
857-
"process.pid": compPid,
889+
"process.pid": compInfo.Pid,
858890
"process.cgroups.enabled": false,
859-
"processors": processorsForProcessMetrics(name, comp.ID, monitoringNamespace, dataset, b.agentInfo),
891+
"processors": processorsForProcessMetrics(name, compInfo.ID, monitoringNamespace, dataset, b.agentInfo),
860892
},
861893
},
862894
}

0 commit comments

Comments
 (0)