Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 285 additions & 5 deletions testing/integration/ess/beat_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"

"github.com/gofrs/uuid/v5"
"gopkg.in/yaml.v2"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
"github.com/elastic/elastic-agent/testing/integration"

Expand Down Expand Up @@ -251,7 +253,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) {
status, statusErr := classicFixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 3)
return
}, 1*time.Minute, 1*time.Second)

// 2. Assert monitoring logs and metrics are available on ES
Expand Down Expand Up @@ -322,7 +323,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) {
status, statusErr := beatReceiverFixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 4)
return
}, 1*time.Minute, 1*time.Second)

// 5. Assert monitoring logs and metrics are available on ES (for otel mode)
Expand Down Expand Up @@ -781,7 +781,6 @@ agent.monitoring.enabled: false
status, statusErr := fixture.ExecStatus(ctx)
require.NoError(collect, statusErr)
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
return
}, 2*time.Minute, 5*time.Second)

// change configuration and wait until the beats receiver is healthy
Expand All @@ -793,7 +792,6 @@ agent.monitoring.enabled: false
status, statusErr := fixture.ExecStatus(ctx)
require.NoError(collect, statusErr)
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
return
}, 2*time.Minute, 5*time.Second)

logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
Expand Down Expand Up @@ -870,7 +868,6 @@ agent.monitoring.enabled: false
assert.NoError(collect, statusErr)
// we should be running beats processes even though the otel runtime was requested
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
return
}, 1*time.Minute, 1*time.Second)
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
require.NoError(t, err)
Expand Down Expand Up @@ -1254,3 +1251,286 @@ func setStrictMapping(client *elasticsearch.Client, index string) error {
}
return nil
}

// TestMonitoringNoDuplicates checks to see if switching to otel
// runtime re-ingests logs. Also checks to make sure restarting
// elastic-agent when using otel runtime for monitoring doesn't
// re-ingest logs.
//
// Flow
// 1. Create policy in Kibana with just monitoring and "process" runtime
// 2. Install and Enroll
// 3. Switch to monitoring "otel" runtime
// 4. restart agent 3 times, making sure healthy between restarts
// 5. switch back to "process" runtime
// 6. query ES for monitoring logs with aggregation on fingerprint and line number,
// ideally 0 duplicates but possible to have a small number
// 7. uninstall
func TestMonitoringNoDuplicates(t *testing.T) {
info := define.Require(t, define.Requirements{
Group: integration.Default,
Local: true,
OS: []define.OS{
{Type: define.Linux},
{Type: define.Darwin},
{Type: define.Windows},
},
Stack: &define.Stack{},
Sudo: true,
})

ctx, cancel := testcontext.WithDeadline(t,
context.Background(),
time.Now().Add(5*time.Minute))
t.Cleanup(cancel)

policyName := fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String())
createPolicyReq := kibana.AgentPolicy{
Name: policyName,
Namespace: info.Namespace,
Description: fmt.Sprintf("%s policy", t.Name()),
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
Overrides: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"_runtime_experimental": "process",
},
},
},
}
policyResponse, err := info.KibanaClient.CreatePolicy(ctx, createPolicyReq)
require.NoError(t, err, "error creating policy")

enrollmentToken, err := info.KibanaClient.CreateEnrollmentAPIKey(ctx,
kibana.CreateEnrollmentAPIKeyRequest{
PolicyID: policyResponse.ID,
})

fut, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = fut.Prepare(ctx)
require.NoError(t, err)

fleetServerURL, err := fleettools.DefaultURL(ctx, info.KibanaClient)
require.NoError(t, err, "failed getting Fleet Server URL")

installOpts := atesting.InstallOpts{
NonInteractive: true,
Privileged: true,
Force: true,
EnrollOpts: atesting.EnrollOpts{
URL: fleetServerURL,
EnrollmentToken: enrollmentToken.APIKey,
},
}
combinedOutput, err := fut.Install(ctx, &installOpts)
require.NoErrorf(t, err, "error install with enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput))

// store timestamp to filter duplicate docs with timestamp greater than this value
installTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

healthCheck := func(ctx context.Context, message string, runtime component.RuntimeManager, componentCount int, timestamp string) {
require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fut.ExecStatus(ctx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, runtime, componentCount)
}, 1*time.Minute, 1*time.Second)
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second)
defer findCancel()
mustClauses := []map[string]any{
{"match_phrase": map[string]any{"message": message}},
{"match": map[string]any{"data_stream.type": "logs"}},
{"match": map[string]any{"data_stream.dataset": "elastic_agent"}},
{"match": map[string]any{"data_stream.namespace": info.Namespace}},
}
rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": mustClauses,
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}
docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-*", info.ESClient)
require.NoError(t, err)
return docs.Hits.Total.Value > 0
},
4*time.Minute, 5*time.Second,
"health check failed: timestamp: %s", timestamp)
}

// make sure running and logs are making it to ES
healthCheck(ctx,
"control checkin v2 protocol has chunking enabled",
component.ProcessRuntimeManager,
3,
installTimestamp)

// Switch to otel monitoring
otelMonUpdateReq := kibana.AgentPolicyUpdateRequest{
Name: policyName,
Namespace: info.Namespace,
Overrides: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"_runtime_experimental": "otel",
},
},
},
}

otelMonResp, err := info.KibanaClient.UpdatePolicy(ctx,
policyResponse.ID, otelMonUpdateReq)
require.NoError(t, err)

otelTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

// wait until policy is applied
policyCheck := func(expectedRevision int) {
require.Eventually(t, func() bool {
inspectOutput, err := fut.ExecInspect(ctx)
require.NoError(t, err)
return expectedRevision == inspectOutput.Revision
}, 3*time.Minute, 1*time.Second)
}
policyCheck(otelMonResp.Revision)

// make sure running and logs are making it to ES
healthCheck(ctx,
"Everything is ready. Begin running and processing data.",
component.OtelRuntimeManager,
4,
otelTimestamp)

// restart 3 times, checks path definition is stable
for range 3 {
restartTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
restartBytes, err := fut.Exec(ctx, []string{"restart"})
require.NoErrorf(t,
err,
"Restart error: %s, output was: %s",
err,
string(restartBytes))
healthCheck(ctx,
"Everything is ready. Begin running and processing data.",
component.OtelRuntimeManager,
4,
restartTimestamp)
}

// Switch back to process monitoring
processMonUpdateReq := kibana.AgentPolicyUpdateRequest{
Name: policyName,
Namespace: info.Namespace,
Overrides: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"_runtime_experimental": "process",
},
},
},
}

processMonResp, err := info.KibanaClient.UpdatePolicy(ctx,
policyResponse.ID, processMonUpdateReq)
require.NoError(t, err)

processTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

// wait until policy is applied
policyCheck(processMonResp.Revision)

// make sure running and logs are making it to ES
healthCheck(ctx,
"control checkin v2 protocol has chunking enabled",
component.ProcessRuntimeManager,
3,
processTimestamp)

// duplicate check
rawQuery := map[string]any{
"runtime_mappings": map[string]any{
"log.offset": map[string]any{
"type": "keyword",
},
"log.file.fingerprint": map[string]any{
"type": "keyword",
},
},
"query": map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{"match": map[string]any{"data_stream.type": "logs"}},
{"match": map[string]any{"data_stream.dataset": "elastic_agent"}},
{"match": map[string]any{"data_stream.namespace": info.Namespace}},
},
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": installTimestamp}}},
},
},
"aggs": map[string]any{
"duplicates": map[string]any{
"multi_terms": map[string]any{
"size": 500,
"min_doc_count": 2,
"terms": []map[string]any{
{"field": "log.file.fingerprint"},
{"field": "log.offset"},
},
},
},
},
}
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(rawQuery)
require.NoError(t, err)

es := esapi.New(info.ESClient)
res, err := es.Search(
es.Search.WithIndex("logs-*"),
es.Search.WithSize(0),
es.Search.WithBody(&buf),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
)
require.NoError(t, err)
require.Falsef(t, (res.StatusCode >= http.StatusMultipleChoices || res.StatusCode < http.StatusOK), "status should be 2xx was: %d", res.StatusCode)
resultBuf, err := io.ReadAll(res.Body)
require.NoError(t, err)

aggResults := map[string]any{}
err = json.Unmarshal(resultBuf, &aggResults)
aggs, ok := aggResults["aggregations"].(map[string]any)
require.Truef(t, ok, "'aggregations' wasn't a map[string]any, result was %s", string(resultBuf))
dups, ok := aggs["duplicates"].(map[string]any)
require.Truef(t, ok, "'duplicates' wasn't a map[string]any, result was %s", string(resultBuf))
buckets, ok := dups["buckets"].([]any)
require.Truef(t, ok, "'buckets' wasn't a []any, result was %s", string(resultBuf))

// It is possible to have a batch in flight not get acked
// before elastic-agent shutsdown. That can lead to
Comment on lines +1519 to +1520
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to wait (for a reasonable amount of time, maybe a few seconds?) on that last batch to be indexed before restarting Agent in the test? That would make the test more deterministic IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is checking to see if the "last" log message from startup has been indexed. That is part of what what the healthcheck function is doing. This does make the test much more consistent. This is also why the test disables monitoring and then restarts before enabling it with otel mode. But I haven't found a way to make this totally deterministic.

There is always a window where elastic-agent can produce a log, elastic-agent or agentbeat "sees" the log and tries to send it, then we kill elastic-agent. In that scenario the registry on disk will always be behind what elasticsearch has indexed and we will produce a duplicate on the next restart. But this should be a small number of duplicates and should not be the "entire file" which is what we are trying to make sure isn't happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't kill elastic-agent though, we restart it in an orderly way. In principle, there shouldn't be any duplicates in this case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, there shouldn't be any duplicates in this case, right?

I started out with this assumption too. But, we don't have a "only once" delivery guarantee, we have a "at least once" delivery guarantee. Because of this having occasional duplicates is "normal behavior", but re-ingesting the whole file would not be normal behavior. Which is why I switched to accepting a "small percentage" of duplicates. Even if we shut down "in an orderly way", there are still timeouts that could cause us to exit without receiving a reply from elasticsearch even though the elasticsearch bulk request eventually succeeds. And these timeouts are necessary because we don't want to prevent shutting down due to network issues etc.

// duplicates. So to account for that we have to tolerate a
// "few" duplicates. 10% of total docs seems reasonable.
hits, ok := aggResults["hits"].(map[string]any)
require.Truef(t, ok, "'hits' wasn't a map[string]any, result was %s", string(resultBuf))
total, ok := hits["total"].(map[string]any)
require.Truef(t, ok, "'total' wasn't a map[string]any, result was %s", string(resultBuf))
value, ok := total["value"].(float64)
require.Truef(t, ok, "'total' wasn't an int, result was %s", string(resultBuf))

require.Equalf(t, 0, len(buckets), "len(buckets): %d, hits.total.value: %d, result was %s", len(buckets), value, string(resultBuf))
require.True(t, float64(len(buckets)) < (value/10), "len(buckets): %d, hits.total.value: %d, result was %s", len(buckets), value, string(resultBuf))

// Uninstall
combinedOutput, err = fut.Uninstall(ctx, &atesting.UninstallOpts{Force: true})
require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput))
}