diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 7d3fabeb2fa..c05c0778494 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/gofrs/uuid/v5" "gopkg.in/yaml.v2" @@ -33,6 +34,7 @@ import ( "github.com/elastic/elastic-agent/pkg/control/v2/cproto" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" "github.com/elastic/elastic-agent/testing/integration" @@ -251,7 +253,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { status, statusErr := classicFixture.ExecStatus(ctx) assert.NoError(collect, statusErr) assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 3) - return }, 1*time.Minute, 1*time.Second) // 2. Assert monitoring logs and metrics are available on ES @@ -322,7 +323,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { status, statusErr := beatReceiverFixture.ExecStatus(ctx) assert.NoError(collect, statusErr) assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 4) - return }, 1*time.Minute, 1*time.Second) // 5. Assert monitoring logs and metrics are available on ES (for otel mode) @@ -781,7 +781,6 @@ agent.monitoring.enabled: false status, statusErr := fixture.ExecStatus(ctx) require.NoError(collect, statusErr) assertBeatsReady(collect, &status, component.ProcessRuntimeManager) - return }, 2*time.Minute, 5*time.Second) // change configuration and wait until the beats receiver is healthy @@ -793,7 +792,6 @@ agent.monitoring.enabled: false status, statusErr := fixture.ExecStatus(ctx) require.NoError(collect, statusErr) assertBeatsReady(collect, &status, component.OtelRuntimeManager) - return }, 2*time.Minute, 5*time.Second) logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) @@ -870,7 +868,6 @@ agent.monitoring.enabled: false assert.NoError(collect, statusErr) // we should be running beats processes even though the otel runtime was requested assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1) - return }, 1*time.Minute, 1*time.Second) logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) require.NoError(t, err) @@ -1254,3 +1251,286 @@ func setStrictMapping(client *elasticsearch.Client, index string) error { } return nil } + +// TestMonitoringNoDuplicates checks to see if switching to otel +// runtime re-ingests logs. Also checks to make sure restarting +// elastic-agent when using otel runtime for monitoring doesn't +// re-ingest logs. +// +// Flow +// 1. Create policy in Kibana with just monitoring and "process" runtime +// 2. Install and Enroll +// 3. Switch to monitoring "otel" runtime +// 4. restart agent 3 times, making sure healthy between restarts +// 5. switch back to "process" runtime +// 6. query ES for monitoring logs with aggregation on fingerprint and line number, +// ideally 0 duplicates but possible to have a small number +// 7. uninstall +func TestMonitoringNoDuplicates(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Linux}, + {Type: define.Darwin}, + {Type: define.Windows}, + }, + Stack: &define.Stack{}, + Sudo: true, + }) + + ctx, cancel := testcontext.WithDeadline(t, + context.Background(), + time.Now().Add(5*time.Minute)) + t.Cleanup(cancel) + + policyName := fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()) + createPolicyReq := kibana.AgentPolicy{ + Name: policyName, + Namespace: info.Namespace, + Description: fmt.Sprintf("%s policy", t.Name()), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "process", + }, + }, + }, + } + policyResponse, err := info.KibanaClient.CreatePolicy(ctx, createPolicyReq) + require.NoError(t, err, "error creating policy") + + enrollmentToken, err := info.KibanaClient.CreateEnrollmentAPIKey(ctx, + kibana.CreateEnrollmentAPIKeyRequest{ + PolicyID: policyResponse.ID, + }) + + fut, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = fut.Prepare(ctx) + require.NoError(t, err) + + fleetServerURL, err := fleettools.DefaultURL(ctx, info.KibanaClient) + require.NoError(t, err, "failed getting Fleet Server URL") + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Privileged: true, + Force: true, + EnrollOpts: atesting.EnrollOpts{ + URL: fleetServerURL, + EnrollmentToken: enrollmentToken.APIKey, + }, + } + combinedOutput, err := fut.Install(ctx, &installOpts) + require.NoErrorf(t, err, "error install with enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput)) + + // store timestamp to filter duplicate docs with timestamp greater than this value + installTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + healthCheck := func(ctx context.Context, message string, runtime component.RuntimeManager, componentCount int, timestamp string) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fut.ExecStatus(ctx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, runtime, componentCount) + }, 1*time.Minute, 1*time.Second) + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) + defer findCancel() + mustClauses := []map[string]any{ + {"match_phrase": map[string]any{"message": message}}, + {"match": map[string]any{"data_stream.type": "logs"}}, + {"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, + {"match": map[string]any{"data_stream.namespace": info.Namespace}}, + } + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": mustClauses, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-*", info.ESClient) + require.NoError(t, err) + return docs.Hits.Total.Value > 0 + }, + 4*time.Minute, 5*time.Second, + "health check failed: timestamp: %s", timestamp) + } + + // make sure running and logs are making it to ES + healthCheck(ctx, + "control checkin v2 protocol has chunking enabled", + component.ProcessRuntimeManager, + 3, + installTimestamp) + + // Switch to otel monitoring + otelMonUpdateReq := kibana.AgentPolicyUpdateRequest{ + Name: policyName, + Namespace: info.Namespace, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "otel", + }, + }, + }, + } + + otelMonResp, err := info.KibanaClient.UpdatePolicy(ctx, + policyResponse.ID, otelMonUpdateReq) + require.NoError(t, err) + + otelTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + // wait until policy is applied + policyCheck := func(expectedRevision int) { + require.Eventually(t, func() bool { + inspectOutput, err := fut.ExecInspect(ctx) + require.NoError(t, err) + return expectedRevision == inspectOutput.Revision + }, 3*time.Minute, 1*time.Second) + } + policyCheck(otelMonResp.Revision) + + // make sure running and logs are making it to ES + healthCheck(ctx, + "Everything is ready. Begin running and processing data.", + component.OtelRuntimeManager, + 4, + otelTimestamp) + + // restart 3 times, checks path definition is stable + for range 3 { + restartTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + restartBytes, err := fut.Exec(ctx, []string{"restart"}) + require.NoErrorf(t, + err, + "Restart error: %s, output was: %s", + err, + string(restartBytes)) + healthCheck(ctx, + "Everything is ready. Begin running and processing data.", + component.OtelRuntimeManager, + 4, + restartTimestamp) + } + + // Switch back to process monitoring + processMonUpdateReq := kibana.AgentPolicyUpdateRequest{ + Name: policyName, + Namespace: info.Namespace, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "process", + }, + }, + }, + } + + processMonResp, err := info.KibanaClient.UpdatePolicy(ctx, + policyResponse.ID, processMonUpdateReq) + require.NoError(t, err) + + processTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + // wait until policy is applied + policyCheck(processMonResp.Revision) + + // make sure running and logs are making it to ES + healthCheck(ctx, + "control checkin v2 protocol has chunking enabled", + component.ProcessRuntimeManager, + 3, + processTimestamp) + + // duplicate check + rawQuery := map[string]any{ + "runtime_mappings": map[string]any{ + "log.offset": map[string]any{ + "type": "keyword", + }, + "log.file.fingerprint": map[string]any{ + "type": "keyword", + }, + }, + "query": map[string]any{ + "bool": map[string]any{ + "must": []map[string]any{ + {"match": map[string]any{"data_stream.type": "logs"}}, + {"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, + {"match": map[string]any{"data_stream.namespace": info.Namespace}}, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": installTimestamp}}}, + }, + }, + "aggs": map[string]any{ + "duplicates": map[string]any{ + "multi_terms": map[string]any{ + "size": 500, + "min_doc_count": 2, + "terms": []map[string]any{ + {"field": "log.file.fingerprint"}, + {"field": "log.offset"}, + }, + }, + }, + }, + } + var buf bytes.Buffer + err = json.NewEncoder(&buf).Encode(rawQuery) + require.NoError(t, err) + + es := esapi.New(info.ESClient) + res, err := es.Search( + es.Search.WithIndex("logs-*"), + es.Search.WithSize(0), + es.Search.WithBody(&buf), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + require.NoError(t, err) + require.Falsef(t, (res.StatusCode >= http.StatusMultipleChoices || res.StatusCode < http.StatusOK), "status should be 2xx was: %d", res.StatusCode) + resultBuf, err := io.ReadAll(res.Body) + require.NoError(t, err) + + aggResults := map[string]any{} + err = json.Unmarshal(resultBuf, &aggResults) + aggs, ok := aggResults["aggregations"].(map[string]any) + require.Truef(t, ok, "'aggregations' wasn't a map[string]any, result was %s", string(resultBuf)) + dups, ok := aggs["duplicates"].(map[string]any) + require.Truef(t, ok, "'duplicates' wasn't a map[string]any, result was %s", string(resultBuf)) + buckets, ok := dups["buckets"].([]any) + require.Truef(t, ok, "'buckets' wasn't a []any, result was %s", string(resultBuf)) + + // It is possible to have a batch in flight not get acked + // before elastic-agent shutsdown. That can lead to + // duplicates. So to account for that we have to tolerate a + // "few" duplicates. 10% of total docs seems reasonable. + hits, ok := aggResults["hits"].(map[string]any) + require.Truef(t, ok, "'hits' wasn't a map[string]any, result was %s", string(resultBuf)) + total, ok := hits["total"].(map[string]any) + require.Truef(t, ok, "'total' wasn't a map[string]any, result was %s", string(resultBuf)) + value, ok := total["value"].(float64) + require.Truef(t, ok, "'total' wasn't an int, result was %s", string(resultBuf)) + + require.Equalf(t, 0, len(buckets), "len(buckets): %d, hits.total.value: %d, result was %s", len(buckets), value, string(resultBuf)) + require.True(t, float64(len(buckets)) < (value/10), "len(buckets): %d, hits.total.value: %d, result was %s", len(buckets), value, string(resultBuf)) + + // Uninstall + combinedOutput, err = fut.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) + require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) +}