diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index 2e8b8e137d6..099d3f2c780 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -5,39 +5,19 @@ package manager import ( - "archive/tar" - "bytes" - "compress/gzip" "context" - "encoding/json" "errors" "fmt" - "io" - "io/fs" - "os" - "path/filepath" - "regexp" "strings" "syscall" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" - componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" "github.com/elastic/elastic-agent/internal/pkg/otel" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/elastic/elastic-agent-client/v7/pkg/proto" - - "github.com/elastic/elastic-agent/internal/pkg/otel/translate" - "github.com/elastic/elastic-agent/pkg/core/logger" - "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" ) -var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles() - // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning // is logged. @@ -138,285 +118,17 @@ func (m *OTelManager) PerformComponentDiagnostics( } for idx, diag := range diagnostics { + found := false for _, extDiag := range extDiagnostics.ComponentDiagnostics { if strings.Contains(extDiag.Name, diag.Component.ID) { - diagnostics[idx].Results = append(diag.Results, extDiag) + found = true + diagnostics[idx].Results = append(diagnostics[idx].Results, extDiag) } } - } - - for idx, diag := range diagnostics { - var results []*proto.ActionDiagnosticUnitResult - var errs []error - jsonMetricDiagnostic, err := GetBeatJsonMetricsDiagnostics(ctx, diag.Component.ID) - errs = append(errs, err) - if jsonMetricDiagnostic != nil { - results = append(results, jsonMetricDiagnostic) - } - - inputMetricsDiagnostic, err := GetBeatInputMetricsDiagnostics(ctx, diag.Component.ID) - errs = append(errs, err) - if inputMetricsDiagnostic != nil { - results = append(results, inputMetricsDiagnostic) - } - - if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" { - // include filebeat registry, reimplementation of a filebeat diagnostic hook - registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID) - if err != nil { - errs = append(errs, fmt.Errorf("failed to get filebeat registry archive: %w", err)) - } - if registryTarGzBytes != nil { - m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes)) - results = append(results, &proto.ActionDiagnosticUnitResult{ - Name: "registry", - Description: "Filebeat's registry", - Filename: "registry.tar.gz", - ContentType: "application/octet-stream", - Content: registryTarGzBytes, - Generated: timestamppb.Now(), - }) - } - + if !found { + diagnostics[idx].Err = fmt.Errorf("failed to get diagnostics for %s", diag.Component.ID) } - - diagnostics[idx].Results = append(diagnostics[idx].Results, results...) - diagnostics[idx].Err = errors.Join(errs...) } return diagnostics, nil } - -func GetBeatJsonMetricsDiagnostics(ctx context.Context, componentID string) (*proto.ActionDiagnosticUnitResult, error) { - beatMetrics, err := GetBeatMetricsPayload(ctx, componentID, "/stats") - if err != nil { - return nil, fmt.Errorf("failed to get stats beat metrics: %w", err) - } - - beatMetrics, err = formatJSON(beatMetrics) - if err != nil { - return nil, fmt.Errorf("failed to format stats beat metrics: %w", err) - } - - result := &proto.ActionDiagnosticUnitResult{ - Name: "beat_metrics", - Description: "Metrics from the default monitoring namespace and expvar.", - Filename: "beat_metrics.json", - ContentType: "application/json", - Content: beatMetrics, - Generated: timestamppb.Now(), - } - return result, nil -} - -func GetBeatInputMetricsDiagnostics(ctx context.Context, componentID string) (*proto.ActionDiagnosticUnitResult, error) { - inputMetrics, err := GetBeatMetricsPayload(ctx, componentID, "/inputs/") - if err != nil { - return nil, fmt.Errorf("failed to get input beat metrics: %w", err) - } - - inputMetrics, err = formatJSON(inputMetrics) - if err != nil { - return nil, fmt.Errorf("failed to format input beat metrics: %w", err) - } - - result := &proto.ActionDiagnosticUnitResult{ - Name: "input_metrics", - Description: "Metrics from active inputs.", - Filename: "input_metrics.json", - ContentType: "application/json", - Content: inputMetrics, - Generated: timestamppb.Now(), - } - return result, nil -} - -func GetBeatMetricsPayload(ctx context.Context, componentID string, path string) ([]byte, error) { - endpoint := componentmonitoring.PrefixedEndpoint(componentmonitoring.BeatsMonitoringEndpoint(componentID)) - metricBytes, statusCode, err := monitoring.GetProcessMetrics(ctx, endpoint, path) - if err != nil { - return nil, err - } - if statusCode >= 300 { - return nil, fmt.Errorf("unexpected status code %d", statusCode) - } - return metricBytes, nil -} - -func formatJSON(jsonBytes []byte) ([]byte, error) { - // remarshal the metrics to produce nicely formatted json - var data any - if err := json.Unmarshal(jsonBytes, &data); err != nil { - return nil, err - } - - formattedData, err := json.MarshalIndent(data, "", " ") - if err != nil { - return nil, err - } - return formattedData, nil -} - -func FileBeatRegistryPath(componentID string) string { - dataPath := translate.BeatDataPath(componentID) - return filepath.Join(dataPath, "registry") -} - -// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes. -func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) { - registryPath := FileBeatRegistryPath(componentID) - - tempFile, err := os.CreateTemp("", "temp-registry.tar.gz") - if err != nil { - return nil, err - } - - defer func() { - if closeErr := tempFile.Close(); closeErr != nil { - logger.Warn("error closing temporary registry archive", "error", closeErr) - } - if removeErr := os.Remove(tempFile.Name()); removeErr != nil { - logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr) - } - }() - - gzWriter := gzip.NewWriter(tempFile) - defer func() { - if closeErr := gzWriter.Close(); closeErr != nil { - logger.Warnf("error closing gzip writer: %v", closeErr) - } - }() - - err = tarFolder(logger, gzWriter, registryPath) - if err != nil { - return nil, err - } - if closeErr := gzWriter.Close(); closeErr != nil { - return nil, closeErr - } - - stat, err := tempFile.Stat() - if err != nil { - return nil, err - } - - if stat.Size() > 20_000_000 { - return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000) - } - - var output bytes.Buffer - _, err = tempFile.Seek(0, 0) - if err != nil { - return nil, err - } - _, err = io.Copy(&output, tempFile) - if err != nil { - return nil, err - } - - return output.Bytes(), nil -} - -// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files. -func getRegexpsForRegistryFiles() []*regexp.Regexp { - var registryFileRegExps []*regexp.Regexp - preFilesList := [][]string{ - {"^registry$"}, - {"^registry", "filebeat$"}, - {"^registry", "filebeat", "meta\\.json$"}, - {"^registry", "filebeat", "log\\.json$"}, - {"^registry", "filebeat", "active\\.dat$"}, - {"^registry", "filebeat", "[[:digit:]]*\\.json$"}, - } - - for _, lst := range preFilesList { - // On windows, we need to ensure we escape the path separator, because backslash has a special meaning - separator := regexp.QuoteMeta(string(filepath.Separator)) - pathRe := strings.Join(lst, separator) - re := regexp.MustCompile(pathRe) - registryFileRegExps = append(registryFileRegExps, re) - } - - return registryFileRegExps -} - -// tarFolder creates a tar archive from the folder src and stores it at dst. -// -// dst must be the full path with extension, e.g: /tmp/foo.tar -// If src is not a folder an error is returned -func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error { - fullPath, err := filepath.Abs(srcPath) - if err != nil { - return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err) - } - - tarWriter := tar.NewWriter(dst) - defer func() { - if err := tarWriter.Close(); err != nil { - logger.Warnf("cannot close tar writer: '%s'", err) - } - }() - - info, err := os.Stat(fullPath) - if err != nil { - return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) - } - - if !info.IsDir() { - return fmt.Errorf("'%s' is not a directory", fullPath) - } - baseDir := filepath.Base(srcPath) - - logger.Debugf("starting to walk '%s'", fullPath) - - return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { - // Stop if there is any errors - if prevErr != nil { - return prevErr - } - - pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath)) - if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) { - return nil - } - header, err := tar.FileInfoHeader(info, info.Name()) - if err != nil { - return fmt.Errorf("cannot create tar info header: '%w'", err) - } - header.Name = pathInTar - - if err := tarWriter.WriteHeader(header); err != nil { - return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) - } - - if info.IsDir() { - return nil - } - - file, err := os.Open(path) - if err != nil { - return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) - } - defer func() { - if closeErr := file.Close(); closeErr != nil { - logger.Warnf("cannot close file '%s': '%s'", path, closeErr) - } - }() - - logger.Debugf("adding '%s' to the tar archive", file.Name()) - if _, err := io.Copy(tarWriter, file); err != nil { - return fmt.Errorf("cannot read '%s': '%w'", path, err) - } - - return nil - }) -} - -func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool { - for _, regExp := range registryFileRegExps { - if regExp.MatchString(path) { - return true - } - } - return false -} diff --git a/internal/pkg/otel/manager/diagnostics_test.go b/internal/pkg/otel/manager/diagnostics_test.go index 27e55a2365b..8ce94720965 100644 --- a/internal/pkg/otel/manager/diagnostics_test.go +++ b/internal/pkg/otel/manager/diagnostics_test.go @@ -5,28 +5,16 @@ package manager import ( - "archive/tar" - "bytes" - "compress/gzip" - "crypto/rand" "encoding/json" "fmt" - "io" - "net" - "net/http" - "os" - "path/filepath" - "regexp" "runtime" - "strings" "testing" - "time" - componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" - "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/otel/extension/elasticdiagnostics" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/elastic/elastic-agent/pkg/utils" componentruntime "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -65,13 +53,9 @@ func TestPerformComponentDiagnostics(t *testing.T) { require.NoError(t, err) for i, d := range diags { assert.Equal(t, expectedDiags[i].Component.ID, d.Component.ID) - // we should have errors set about not being able to connect to monitoring endpoints + // we should have errors set about not being able to connect to diagnostics extension require.NotNil(t, d.Err) - assert.ErrorContains(t, d.Err, "failed to get stats beat metrics") - assert.ErrorContains(t, d.Err, "failed to get input beat metrics") - if translate.GetBeatNameForComponent(&d.Component) == "filebeat" { - assert.ErrorContains(t, d.Err, "failed to get filebeat registry archive") - } + assert.ErrorContains(t, d.Err, fmt.Sprintf("failed to get diagnostics for %s", d.Component.ID)) } } @@ -150,22 +134,27 @@ func TestBeatMetrics(t *testing.T) { expectedMetricData, err := json.MarshalIndent(map[string]any{"test": "test"}, "", " ") require.NoError(t, err) - fileName := strings.TrimPrefix(componentmonitoring.BeatsMonitoringEndpoint(compID), fmt.Sprintf("%s://", utils.SocketScheme)) - err = os.MkdirAll(filepath.Dir(fileName), 0o755) - require.NoError(t, err) + expectedResponse := elasticdiagnostics.Response{ + ComponentDiagnostics: []*proto.ActionDiagnosticUnitResult{ + { + Name: compID, + Filename: "beat_metrics.json", + ContentType: "application/json", + Description: "Metrics from the default monitoring namespace and expvar.", + Content: expectedMetricData, + }, + { + Name: compID, + Filename: "input_metrics.json", + ContentType: "application/json", + Description: "Metrics from active inputs.", + Content: expectedMetricData, + }, + }, + } - listener, err := net.Listen("unix", fileName) - require.NoError(t, err) - server := http.Server{ - ReadHeaderTimeout: time.Second, // needed to silence gosec - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, wErr := w.Write(expectedMetricData) - require.NoError(t, wErr) - })} - go func() { - sErr := server.Serve(listener) - assert.ErrorIs(t, sErr, http.ErrServerClosed) - }() + called := false + server := handlers.NewMockServer(t, paths.DiagnosticsExtensionSocket(), &called, &expectedResponse) t.Cleanup(func() { cErr := server.Close() assert.NoError(t, cErr) @@ -175,6 +164,7 @@ func TestBeatMetrics(t *testing.T) { require.NoError(t, err) assert.Len(t, obs.All(), 1) require.Len(t, diags, 1) + require.True(t, called) diag := diags[0] assert.Equal(t, filebeatComp, diag.Component) @@ -183,7 +173,7 @@ func TestBeatMetrics(t *testing.T) { t.Run("stats beat metrics", func(t *testing.T) { beatMetrics := diag.Results[0] - assert.Equal(t, "beat_metrics", beatMetrics.Name) + assert.Equal(t, compID, beatMetrics.Name) assert.Equal(t, "Metrics from the default monitoring namespace and expvar.", beatMetrics.Description) assert.Equal(t, "beat_metrics.json", beatMetrics.Filename) assert.Equal(t, "application/json", beatMetrics.ContentType) @@ -192,7 +182,7 @@ func TestBeatMetrics(t *testing.T) { t.Run("input beat metrics", func(t *testing.T) { inputMetrics := diag.Results[1] - assert.Equal(t, "input_metrics", inputMetrics.Name) + assert.Equal(t, compID, inputMetrics.Name) assert.Equal(t, "Metrics from active inputs.", inputMetrics.Description) assert.Equal(t, "input_metrics.json", inputMetrics.Filename) assert.Equal(t, "application/json", inputMetrics.ContentType) @@ -200,128 +190,6 @@ func TestBeatMetrics(t *testing.T) { }) } -func TestMatchRegistryFiles(t *testing.T) { - regexps := getRegexpsForRegistryFiles() - testCases := []struct { - path string - expected bool - }{ - {"registry", true}, - {filepath.Join("registry", "filebeat"), true}, - {filepath.Join("registry", "filebeat", "meta.json"), true}, - {filepath.Join("registry", "filebeat", "log.json"), true}, - {filepath.Join("registry", "filebeat", "active.dat"), true}, - {filepath.Join("registry", "filebeat", "12345.json"), true}, - {filepath.Join("registry", "filebeat", "other.txt"), false}, - {"not_registry", false}, - } - - for _, tc := range testCases { - t.Run(tc.path, func(t *testing.T) { - assert.Equal(t, tc.expected, matchRegistryFiles(regexps, tc.path)) - }) - } -} - -func TestTarFolder(t *testing.T) { - logger, _ := loggertest.New("test") - - // Create a temporary source directory - srcDir, err := os.MkdirTemp("", "src") - require.NoError(t, err) - defer os.RemoveAll(srcDir) - - // Create registry structure - registryDir := filepath.Join(srcDir, "registry") - filebeatDir := filepath.Join(registryDir, "filebeat") - require.NoError(t, os.MkdirAll(filebeatDir, 0755)) - - // Create files - filesToCreate := []string{ - filepath.Join(filebeatDir, "meta.json"), - filepath.Join(filebeatDir, "log.json"), - filepath.Join(filebeatDir, "123.json"), - filepath.Join(filebeatDir, "should_be_ignored.txt"), - } - for _, f := range filesToCreate { - require.NoError(t, os.WriteFile(f, []byte("test data"), 0644)) - } - - // Tar the folder - var buf bytes.Buffer - err = tarFolder(logger, &buf, registryDir) - require.NoError(t, err) - - // Verify the tar contents - tarReader := tar.NewReader(&buf) - foundFiles := make(map[string]bool) - for { - hdr, err := tarReader.Next() - if err == io.EOF { - break - } - require.NoError(t, err) - foundFiles[hdr.Name] = true - } - - assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "meta.json")]) - assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "log.json")]) - assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "123.json")]) - assert.False(t, foundFiles[filepath.Join("registry", "filebeat", "should_be_ignored.txt")]) -} - -func TestFileBeatRegistryPath(t *testing.T) { - compID := "test-component" - expectedPath := filepath.Join(paths.Run(), compID, "registry") - assert.Equal(t, expectedPath, FileBeatRegistryPath(compID)) -} - -func TestFileBeatRegistryTarGz(t *testing.T) { - logger, _ := loggertest.New("test") - compID := "filebeat-comp-1" - - setTemporaryAgentPath(t) - registryPath := FileBeatRegistryPath(compID) - require.NoError(t, os.MkdirAll(filepath.Join(registryPath, "filebeat"), 0755)) - require.NoError(t, os.WriteFile(filepath.Join(registryPath, "filebeat", "meta.json"), []byte("test"), 0644)) - - t.Run("creates a valid tar.gz", func(t *testing.T) { - data, err := FileBeatRegistryTarGz(logger, compID) - require.NoError(t, err) - - gzReader, err := gzip.NewReader(bytes.NewReader(data)) - require.NoError(t, err) - tarReader := tar.NewReader(gzReader) - hdr, err := tarReader.Next() - require.NoError(t, err) - assert.Equal(t, "registry", hdr.Name) - hdr, err = tarReader.Next() - require.NoError(t, err) - assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name) - hdr, err = tarReader.Next() - require.NoError(t, err) - assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name) - }) - - t.Run("returns error if registry is too large", func(t *testing.T) { - // Temporarily change the regex to include a large file - originalRegexps := fileBeatRegistryPathRegExps - fileBeatRegistryPathRegExps = []*regexp.Regexp{regexp.MustCompile(".*")} - defer func() { fileBeatRegistryPathRegExps = originalRegexps }() - - largeFilePath := filepath.Join(registryPath, "largefile.log") - largeData := make([]byte, 21*1024*1024) // 21MB - _, err := rand.Read(largeData) - require.NoError(t, err) - require.NoError(t, os.WriteFile(largeFilePath, largeData, 0644)) - defer os.Remove(largeFilePath) - - _, err = FileBeatRegistryTarGz(logger, compID) - require.Error(t, err) - assert.Contains(t, err.Error(), "registry is too large for diagnostics") - }) -} - func setTemporaryAgentPath(t *testing.T) { topPath := paths.Top() tempTopPath := t.TempDir() diff --git a/testing/integration/ess/diagnostics_test.go b/testing/integration/ess/diagnostics_test.go index 7a2c828a555..ee50b25dda5 100644 --- a/testing/integration/ess/diagnostics_test.go +++ b/testing/integration/ess/diagnostics_test.go @@ -7,8 +7,10 @@ package ess import ( + "archive/tar" "archive/zip" "bytes" + "compress/gzip" "context" "fmt" "io" @@ -547,6 +549,7 @@ agent.monitoring.enabled: false require.NoErrorf(t, err, "stat file %q failed", path) require.Greaterf(t, stat.Size(), int64(0), "file %s has incorrect size", path) } + verifyFilebeatRegistry(t, filepath.Join(extractionDir, "components/filestream-default/registry.tar.gz")) } func testDiagnosticsFactory(t *testing.T, compSetup map[string]integrationtest.ComponentState, diagFiles []string, diagCompFiles []string, fix *integrationtest.Fixture, cmd []string) func(ctx context.Context) error { @@ -731,3 +734,23 @@ type filePattern struct { pattern string optional bool } + +func verifyFilebeatRegistry(t *testing.T, path string) { + data, err := os.ReadFile(path) + require.NoError(t, err) + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + tarReader := tar.NewReader(gzReader) + hdr, err := tarReader.Next() + require.NoError(t, err) + assert.Equal(t, "registry", hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat", "log.json"), hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name) +}