-
Notifications
You must be signed in to change notification settings - Fork 197
Integration test to check data re-ingest when switching runtimes #10544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
d06bcc5
39cd769
177ca5b
6f17b11
3ad4cb9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ import ( | |
"github.com/elastic/elastic-agent/pkg/testing/define" | ||
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" | ||
"github.com/elastic/elastic-agent/testing/integration" | ||
"github.com/elastic/go-elasticsearch/v8/esapi" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
@@ -251,7 +252,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { | |
status, statusErr := classicFixture.ExecStatus(ctx) | ||
assert.NoError(collect, statusErr) | ||
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 3) | ||
return | ||
}, 1*time.Minute, 1*time.Second) | ||
|
||
// 2. Assert monitoring logs and metrics are available on ES | ||
|
@@ -322,7 +322,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { | |
status, statusErr := beatReceiverFixture.ExecStatus(ctx) | ||
assert.NoError(collect, statusErr) | ||
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 4) | ||
return | ||
}, 1*time.Minute, 1*time.Second) | ||
|
||
// 5. Assert monitoring logs and metrics are available on ES (for otel mode) | ||
|
@@ -781,7 +780,6 @@ agent.monitoring.enabled: false | |
status, statusErr := fixture.ExecStatus(ctx) | ||
require.NoError(collect, statusErr) | ||
assertBeatsReady(collect, &status, component.ProcessRuntimeManager) | ||
return | ||
}, 2*time.Minute, 5*time.Second) | ||
|
||
// change configuration and wait until the beats receiver is healthy | ||
|
@@ -793,7 +791,6 @@ agent.monitoring.enabled: false | |
status, statusErr := fixture.ExecStatus(ctx) | ||
require.NoError(collect, statusErr) | ||
assertBeatsReady(collect, &status, component.OtelRuntimeManager) | ||
return | ||
}, 2*time.Minute, 5*time.Second) | ||
|
||
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) | ||
|
@@ -870,7 +867,6 @@ agent.monitoring.enabled: false | |
assert.NoError(collect, statusErr) | ||
// we should be running beats processes even though the otel runtime was requested | ||
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1) | ||
return | ||
}, 1*time.Minute, 1*time.Second) | ||
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) | ||
require.NoError(t, err) | ||
|
@@ -1254,3 +1250,280 @@ func setStrictMapping(client *elasticsearch.Client, index string) error { | |
} | ||
return nil | ||
} | ||
|
||
// TestMonitoringNoDuplicates checks to see if switching to otel | ||
// runtime re-ingests logs. Also checks to make sure restarting | ||
// elastic-agent when using otel runtime for monitoring doesn't | ||
// re-ingest logs. | ||
// | ||
// Flow | ||
// 1) Create policy in Kibana with just monitoring and download it | ||
// 2) Install with monitoring with "process" runtime | ||
// 3) restart agent, to roll log files | ||
// 4) Switch to monitoring "otel" runtime | ||
// 5) restart agent 3 times, making sure healthy between restarts | ||
// 6) query ES for monitoring logs with aggregation on fingerprint and line number, ideally 0 duplicates but possible to have a small number | ||
// 7) uninstall | ||
func TestMonitoringNoDuplicates(t *testing.T) { | ||
info := define.Require(t, define.Requirements{ | ||
Group: integration.Default, | ||
Local: true, | ||
OS: []define.OS{ | ||
{Type: define.Linux}, | ||
{Type: define.Darwin}, | ||
{Type: define.Windows}, | ||
}, | ||
Stack: &define.Stack{}, | ||
Sudo: true, | ||
}) | ||
|
||
installOpts := atesting.InstallOpts{ | ||
NonInteractive: true, | ||
Privileged: true, | ||
Force: true, | ||
Develop: true, | ||
} | ||
|
||
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) | ||
t.Cleanup(cancel) | ||
|
||
policyCtx, policyCancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) | ||
t.Cleanup(policyCancel) | ||
|
||
createPolicyReq := kibana.AgentPolicy{ | ||
Name: fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()), | ||
Namespace: info.Namespace, | ||
Description: fmt.Sprintf("%s policy", t.Name()), | ||
MonitoringEnabled: []kibana.MonitoringEnabledOption{ | ||
kibana.MonitoringEnabledLogs, | ||
kibana.MonitoringEnabledMetrics, | ||
}, | ||
} | ||
policyResponse, err := info.KibanaClient.CreatePolicy(policyCtx, createPolicyReq) | ||
require.NoError(t, err, "error creating policy") | ||
|
||
downloadURL := fmt.Sprintf("/api/fleet/agent_policies/%s/download", policyResponse.ID) | ||
resp, err := info.KibanaClient.Connection.SendWithContext(policyCtx, http.MethodGet, downloadURL, nil, nil, nil) | ||
require.NoError(t, err, "error downloading policy") | ||
policyBytes, err := io.ReadAll(resp.Body) | ||
require.NoError(t, err, "error reading policy response") | ||
defer resp.Body.Close() | ||
|
||
apiKeyResponse, err := createESApiKey(info.ESClient) | ||
require.NoError(t, err, "failed to get api key") | ||
require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse) | ||
apiKey, err := getDecodedApiKey(apiKeyResponse) | ||
require.NoError(t, err, "error decoding api key") | ||
|
||
type PolicyOutputs struct { | ||
Type string `yaml:"type"` | ||
Hosts []string `yaml:"hosts"` | ||
Preset string `yaml:"preset"` | ||
ApiKey string `yaml:"api_key"` | ||
} | ||
type PolicyStruct struct { | ||
ID string `yaml:"id"` | ||
Revision int `yaml:"revision"` | ||
Outputs map[string]PolicyOutputs `yaml:"outputs"` | ||
Fleet map[string]any `yaml:"fleet"` | ||
OutputPermissions map[string]any `yaml:"output_permissions"` | ||
Agent struct { | ||
Monitoring map[string]any `yaml:"monitoring"` | ||
Rest map[string]any `yaml:",inline"` | ||
} `yaml:"agent"` | ||
Inputs []map[string]any `yaml:"inputs"` | ||
Signed map[string]any `yaml:"signed"` | ||
SecretReferences []map[string]any `yaml:"secret_references"` | ||
Namespaces []string `yaml:"namespaces"` | ||
} | ||
|
||
policy := PolicyStruct{} | ||
err = yaml.Unmarshal(policyBytes, &policy) | ||
require.NoError(t, err, "error unmarshalling policy: %s", string(policyBytes)) | ||
d, prs := policy.Outputs["default"] | ||
require.True(t, prs, "default must be in outputs") | ||
d.ApiKey = string(apiKey) | ||
policy.Outputs["default"] = d | ||
policy.Agent.Monitoring["_runtime_experimental"] = "process" | ||
|
||
updatedPolicyBytes, err := yaml.Marshal(policy) | ||
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) | ||
t.Cleanup(func() { | ||
if t.Failed() { | ||
t.Logf("policy was %s", string(updatedPolicyBytes)) | ||
} | ||
}) | ||
|
||
fut, err := define.NewFixtureFromLocalBuild(t, define.Version()) | ||
require.NoError(t, err) | ||
err = fut.Prepare(ctx) | ||
require.NoError(t, err) | ||
err = fut.Configure(ctx, updatedPolicyBytes) | ||
require.NoError(t, err) | ||
combinedOutput, err := fut.InstallWithoutEnroll(ctx, &installOpts) | ||
require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput)) | ||
|
||
// store timestamp to filter otel docs with timestamp greater than this value | ||
installTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
|
||
healthcheck := func(ctx context.Context, message string, runtime component.RuntimeManager, componentCount int, timestamp string) { | ||
require.EventuallyWithT(t, func(collect *assert.CollectT) { | ||
var statusErr error | ||
status, statusErr := fut.ExecStatus(ctx) | ||
assert.NoError(collect, statusErr) | ||
assertBeatsHealthy(collect, &status, runtime, componentCount) | ||
}, 1*time.Minute, 1*time.Second) | ||
require.Eventuallyf(t, | ||
func() bool { | ||
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) | ||
defer findCancel() | ||
mustClauses := []map[string]any{ | ||
{"match_phrase": map[string]any{"message": message}}, | ||
{"match": map[string]any{"data_stream.type": "logs"}}, | ||
{"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, | ||
{"match": map[string]any{"data_stream.namespace": info.Namespace}}, | ||
} | ||
rawQuery := map[string]any{ | ||
"query": map[string]any{ | ||
"bool": map[string]any{ | ||
"must": mustClauses, | ||
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, | ||
}, | ||
}, | ||
"sort": []map[string]any{ | ||
{"@timestamp": map[string]any{"order": "asc"}}, | ||
}, | ||
} | ||
docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-*", info.ESClient) | ||
require.NoError(t, err) | ||
return docs.Hits.Total.Value > 0 | ||
}, | ||
4*time.Minute, 5*time.Second, | ||
"health check failed: timestamp: %s", timestamp) | ||
} | ||
|
||
// make sure running and logs are making it to ES | ||
healthcheck(ctx, "control checkin v2 protocol has chunking enabled", component.ProcessRuntimeManager, 3, installTimestamp) | ||
|
||
// restart process monitoring, gives us multiple files to track in registry | ||
restartTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
restartBytes, err := fut.Exec(ctx, []string{"restart"}) | ||
require.NoErrorf(t, err, "Restart error: %s, output was: %s", err, string(restartBytes)) | ||
healthcheck(ctx, "control checkin v2 protocol has chunking enabled", component.ProcessRuntimeManager, 3, restartTimestamp) | ||
|
||
// turn off monitoring | ||
policy.Agent.Monitoring["enabled"] = false | ||
updatedPolicyBytes, err = yaml.Marshal(policy) | ||
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) | ||
err = fut.Configure(ctx, updatedPolicyBytes) | ||
require.NoError(t, err) | ||
|
||
// restart to make sure beat processes have exited | ||
restartBytes, err = fut.Exec(ctx, []string{"restart"}) | ||
require.NoErrorf(t, err, "Restart error: %s, output was: %s", err, string(restartBytes)) | ||
|
||
// make sure agent is running. with no processes. | ||
require.EventuallyWithT(t, func(collect *assert.CollectT) { | ||
var statusErr error | ||
status, statusErr := fut.ExecStatus(ctx) | ||
assert.NoError(collect, statusErr) | ||
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 0) | ||
}, 1*time.Minute, 1*time.Second) | ||
|
||
// Switch to otel monitoring | ||
policy.Agent.Monitoring["enabled"] = true | ||
policy.Agent.Monitoring["_runtime_experimental"] = "otel" | ||
updatedPolicyBytes, err = yaml.Marshal(policy) | ||
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) | ||
err = fut.Configure(ctx, updatedPolicyBytes) | ||
require.NoError(t, err) | ||
|
||
// make sure running and logs are making it to ES | ||
otelTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
healthcheck(ctx, "Everything is ready. Begin running and processing data.", component.OtelRuntimeManager, 4, otelTimestamp) | ||
|
||
// restart 3 times, checks path definition is stable | ||
for range 3 { | ||
restartTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
restartBytes, err := fut.Exec(ctx, []string{"restart"}) | ||
require.NoErrorf(t, err, "Restart error: %s, output was: %s", err, string(restartBytes)) | ||
healthcheck(ctx, "Everything is ready. Begin running and processing data.", component.OtelRuntimeManager, 4, restartTimestamp) | ||
} | ||
|
||
// duplicate check | ||
dupCtx, dupCancel := context.WithTimeout(ctx, 10*time.Second) | ||
defer dupCancel() | ||
rawQuery := map[string]any{ | ||
"runtime_mappings": map[string]any{ | ||
"log.offset": map[string]any{ | ||
"type": "keyword", | ||
}, | ||
"log.file.fingerprint": map[string]any{ | ||
"type": "keyword", | ||
}, | ||
}, | ||
"query": map[string]any{ | ||
"bool": map[string]any{ | ||
"must": []map[string]any{ | ||
{"match": map[string]any{"data_stream.type": "logs"}}, | ||
{"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, | ||
{"match": map[string]any{"data_stream.namespace": info.Namespace}}, | ||
}, | ||
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": installTimestamp}}}, | ||
}, | ||
}, | ||
"aggs": map[string]any{ | ||
"duplicates": map[string]any{ | ||
"multi_terms": map[string]any{ | ||
"size": 500, | ||
"min_doc_count": 2, | ||
"terms": []map[string]any{ | ||
{"field": "log.file.fingerprint"}, | ||
{"field": "log.offset"}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
var buf bytes.Buffer | ||
err = json.NewEncoder(&buf).Encode(rawQuery) | ||
require.NoError(t, err) | ||
|
||
es := esapi.New(info.ESClient) | ||
res, err := es.Search( | ||
es.Search.WithIndex("logs-*"), | ||
es.Search.WithSize(0), | ||
es.Search.WithBody(&buf), | ||
es.Search.WithPretty(), | ||
es.Search.WithContext(dupCtx), | ||
) | ||
require.NoError(t, err) | ||
require.Falsef(t, (res.StatusCode >= http.StatusMultipleChoices || res.StatusCode < http.StatusOK), "status should be 2xx was: %d", res.StatusCode) | ||
resultBuf, err := io.ReadAll(res.Body) | ||
require.NoError(t, err) | ||
aggResults := map[string]any{} | ||
err = json.Unmarshal(resultBuf, &aggResults) | ||
aggs, ok := aggResults["aggregations"].(map[string]any) | ||
require.Truef(t, ok, "'aggregations' wasn't a map[string]any, result was %s", string(resultBuf)) | ||
dups, ok := aggs["duplicates"].(map[string]any) | ||
require.Truef(t, ok, "'duplicates' wasn't a map[string]any, result was %s", string(resultBuf)) | ||
buckets, ok := dups["buckets"].([]any) | ||
require.Truef(t, ok, "'buckets' wasn't a []any, result was %s", string(resultBuf)) | ||
|
||
// It is possible to have a batch in flight not get acked | ||
// before elastic-agent shutsdown. That can lead to | ||
Comment on lines
+1519
to
+1520
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there no way to wait (for a reasonable amount of time, maybe a few seconds?) on that last batch to be indexed before restarting Agent in the test? That would make the test more deterministic IMO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test is checking to see if the "last" log message from startup has been indexed. That is part of what what the There is always a window where elastic-agent can produce a log, elastic-agent or agentbeat "sees" the log and tries to send it, then we kill elastic-agent. In that scenario the registry on disk will always be behind what elasticsearch has indexed and we will produce a duplicate on the next restart. But this should be a small number of duplicates and should not be the "entire file" which is what we are trying to make sure isn't happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't kill elastic-agent though, we restart it in an orderly way. In principle, there shouldn't be any duplicates in this case, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I started out with this assumption too. But, we don't have a "only once" delivery guarantee, we have a "at least once" delivery guarantee. Because of this having occasional duplicates is "normal behavior", but re-ingesting the whole file would not be normal behavior. Which is why I switched to accepting a "small percentage" of duplicates. Even if we shut down "in an orderly way", there are still timeouts that could cause us to exit without receiving a reply from elasticsearch even though the elasticsearch bulk request eventually succeeds. And these timeouts are necessary because we don't want to prevent shutting down due to network issues etc. |
||
// duplicates. So to account for that we have to tolerate a | ||
// "few" duplicates. 10% of total docs seems reasonable. | ||
hits, ok := aggResults["hits"].(map[string]any) | ||
require.Truef(t, ok, "'hits' wasn't a map[string]any, result was %s", string(resultBuf)) | ||
total, ok := hits["total"].(map[string]any) | ||
require.Truef(t, ok, "'total' wasn't a map[string]any, result was %s", string(resultBuf)) | ||
value, ok := total["value"].(float64) | ||
require.Truef(t, ok, "'total' wasn't an int, result was %s", string(resultBuf)) | ||
|
||
require.True(t, float64(len(buckets)) < (value/10), "buckets contained duplicates, result was %s", string(resultBuf)) | ||
|
||
// Uninstall | ||
combinedOutput, err = fut.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) | ||
require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably fine, but the more idiomatic way would be to update the policy in Kibana and wait until agent reports it as applied. Any specific reason to reload the configuration locally instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Do we have an example of using the API to change the runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swiatekm updated to using API to make changes, plus enrolled in Fleet.