Skip to content

Commit 1a99e94

Browse files
[ci] capture logs of pods in the test namespace and kube-system (#7341) (#7508)
* fix: path for pod logs dump and uploading them as artifacts * fix: derive k8s namespace from random uuid * fix: dump the logs of all pods and the containers statuses into an archive for the test namespace and kube-system * fix: properly set the namespace for clusterole and clusterrole bindings in kustomize-based k8s integration tests * fix: extend k8sCheckAgentStatus to catch container restarts * test: make k8s integration tests be flaky * fix: time box all exec in pods * Revert "test: make k8s integration tests be flaky" This reverts commit e776c64. (cherry picked from commit 79bcfb7) Co-authored-by: Panos Koutsovasilis <[email protected]>
1 parent 7aaf5bc commit 1a99e94

File tree

3 files changed

+172
-44
lines changed

3 files changed

+172
-44
lines changed

.buildkite/bk.integration.pipeline.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ steps:
265265
artifact_paths:
266266
- build/**
267267
- build/diagnostics/**
268+
- build/*.pod_logs_dump/*
268269
retry:
269270
automatic:
270271
limit: 1

.buildkite/scripts/buildkite-k8s-integration-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ EOF
9494
fully_qualified_group_name="${CLUSTER_NAME}_${TARGET_ARCH}_${variant}"
9595
outputXML="build/${fully_qualified_group_name}.integration.xml"
9696
outputJSON="build/${fully_qualified_group_name}.integration.out.json"
97-
pod_logs_base="build/${fully_qualified_group_name}.integration"
97+
pod_logs_base="${PWD}/build/${fully_qualified_group_name}.pod_logs_dump"
9898

9999
set +e
100100
K8S_TESTS_POD_LOGS_BASE="${pod_logs_base}" AGENT_IMAGE="${image}" DOCKER_VARIANT="${variant}" gotestsum --hide-summary=skipped --format testname --no-color -f standard-quiet --junitfile "${outputXML}" --jsonfile "${outputJSON}" -- -tags kubernetes,integration -test.shuffle on -test.timeout 2h0m0s github.com/elastic/elastic-agent/testing/integration -v -args -integration.groups="${group_name}" -integration.sudo="false"

testing/integration/kubernetes_agent_standalone_test.go

Lines changed: 170 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package integration
88

99
import (
10+
"archive/tar"
1011
"bufio"
1112
"bytes"
1213
"context"
@@ -24,6 +25,7 @@ import (
2425
"testing"
2526
"time"
2627

28+
"github.com/gofrs/uuid/v5"
2729
"github.com/stretchr/testify/require"
2830

2931
"github.com/elastic/elastic-agent-libs/kibana"
@@ -794,8 +796,28 @@ func k8sCheckAgentStatus(ctx context.Context, client klient.Client, stdout *byte
794796
namespace string, agentPodName string, containerName string, componentPresence map[string]bool,
795797
) error {
796798
command := []string{"elastic-agent", "status", "--output=json"}
799+
stopCheck := errors.New("stop check")
800+
801+
// we will wait maximum 120 seconds for the agent to report healthy
802+
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
803+
defer cancel()
797804

798805
checkStatus := func() error {
806+
pod := corev1.Pod{}
807+
if err := client.Resources(namespace).Get(ctx, agentPodName, namespace, &pod); err != nil {
808+
return err
809+
}
810+
811+
for _, container := range pod.Status.ContainerStatuses {
812+
if container.Name != containerName {
813+
continue
814+
}
815+
816+
if restarts := container.RestartCount; restarts != 0 {
817+
return fmt.Errorf("container %q of pod %q has restarted %d times: %w", containerName, agentPodName, restarts, stopCheck)
818+
}
819+
}
820+
799821
status := atesting.AgentStatusOutput{} // clear status output
800822
stdout.Reset()
801823
stderr.Reset()
@@ -826,16 +848,14 @@ func k8sCheckAgentStatus(ctx context.Context, client klient.Client, stdout *byte
826848
}
827849
return err
828850
}
829-
830-
// we will wait maximum 120 seconds for the agent to report healthy
831-
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 120*time.Second)
832-
defer timeoutCancel()
833851
for {
834852
err := checkStatus()
835853
if err == nil {
836854
return nil
855+
} else if errors.Is(err, stopCheck) {
856+
return err
837857
}
838-
if timeoutCtx.Err() != nil {
858+
if ctx.Err() != nil {
839859
// timeout waiting for agent to become healthy
840860
return errors.Join(err, errors.New("timeout waiting for agent to become healthy"))
841861
}
@@ -851,7 +871,10 @@ func k8sGetAgentID(ctx context.Context, client klient.Client, stdout *bytes.Buff
851871
status := atesting.AgentStatusOutput{} // clear status output
852872
stdout.Reset()
853873
stderr.Reset()
854-
if err := client.Resources().ExecInPod(ctx, namespace, agentPodName, containerName, command, stdout, stderr); err != nil {
874+
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
875+
err := client.Resources().ExecInPod(ctx, namespace, agentPodName, containerName, command, stdout, stderr)
876+
cancel()
877+
if err != nil {
855878
return "", err
856879
}
857880

@@ -872,58 +895,157 @@ func getAgentComponentState(status atesting.AgentStatusOutput, componentName str
872895
return -1, false
873896
}
874897

875-
// k8sDumpAllPodLogs dumps the logs of all pods in the given namespace to the given target directory
876-
func k8sDumpAllPodLogs(ctx context.Context, client klient.Client, testName string, namespace string, targetDir string) error {
877-
podList := &corev1.PodList{}
898+
// k8sDumpPods creates an archive that contains logs of all pods in the given namespace and kube-system to the given target directory
899+
func k8sDumpPods(t *testing.T, ctx context.Context, client klient.Client, testName string, namespace string, targetDir string, testStartTime time.Time) {
900+
// Create the tar file
901+
archivePath := filepath.Join(targetDir, fmt.Sprintf("%s.tar.gz", namespace))
902+
tarFile, err := os.Create(archivePath)
903+
if err != nil {
904+
t.Logf("failed to create archive at path %q", archivePath)
905+
return
906+
}
907+
defer tarFile.Close()
908+
909+
t.Logf("archive %q contains the dump info for %q test", archivePath, testName)
910+
911+
// Create a new tar writer
912+
tarWriter := tar.NewWriter(tarFile)
913+
defer tarWriter.Close()
878914

879915
clientSet, err := kubernetes.NewForConfig(client.RESTConfig())
880916
if err != nil {
881-
return fmt.Errorf("error creating clientset: %w", err)
917+
t.Logf("error creating clientset: %s", err)
918+
return
882919
}
883920

884-
err = client.Resources(namespace).List(ctx, podList)
921+
podList := &corev1.PodList{}
922+
err = client.Resources("").List(ctx, podList)
885923
if err != nil {
886-
return fmt.Errorf("error listing pods: %w", err)
924+
t.Logf("error listing pods: %s", err)
925+
return
887926
}
888927

889-
var errs error
928+
type containerPodState struct {
929+
Namespace string `json:"namespace"`
930+
PodName string `json:"pod_name"`
931+
ContainerName string `json:"container_name"`
932+
RestartCount int32 `json:"restart_count"`
933+
LastTerminationReason string `json:"last_termination_reason"`
934+
LastTerminationMessage string `json:"last_termination_message"`
935+
}
936+
937+
var statesDump []containerPodState
938+
890939
for _, pod := range podList.Items {
891-
previous := false
892-
for _, containerStatus := range pod.Status.ContainerStatuses {
893-
if containerStatus.RestartCount > 0 {
894-
previous = true
895-
break
896-
}
940+
podNamespace := pod.GetNamespace()
941+
if podNamespace != namespace && podNamespace != "kube-system" {
942+
continue
897943
}
898944

899945
for _, container := range pod.Spec.Containers {
900-
logFilePath := filepath.Join(targetDir, fmt.Sprintf("%s-%s-%s.log", testName, pod.Name, container.Name))
901-
logFile, err := os.Create(logFilePath)
902-
if err != nil {
903-
errs = errors.Join(fmt.Errorf("error creating log file: %w", err), errs)
904-
continue
946+
state := containerPodState{
947+
Namespace: podNamespace,
948+
PodName: pod.GetName(),
949+
ContainerName: container.Name,
905950
}
951+
previous := false
906952

907-
req := clientSet.CoreV1().Pods(namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
953+
for _, containerStatus := range pod.Status.ContainerStatuses {
954+
if container.Name != containerStatus.Name {
955+
continue
956+
}
957+
958+
state.RestartCount = containerStatus.RestartCount
959+
if containerStatus.RestartCount == 0 {
960+
break
961+
}
962+
// since we dump logs from pods that are expected to constantly run,
963+
// namely kube-apiserver in kube-system namespace, we need to identify
964+
// if a restart of such pod happened during the test to correctly if we
965+
// want previous log
966+
containerTerminated := containerStatus.LastTerminationState.Terminated
967+
if containerTerminated != nil && containerTerminated.FinishedAt.After(testStartTime) {
968+
previous = true
969+
state.LastTerminationReason = containerTerminated.Reason
970+
state.LastTerminationMessage = containerTerminated.Message
971+
}
972+
break
973+
}
974+
statesDump = append(statesDump, state)
975+
976+
var logFileName string
977+
if previous {
978+
logFileName = fmt.Sprintf("%s-%s-%s-previous.log", podNamespace, pod.Name, container.Name)
979+
} else {
980+
logFileName = fmt.Sprintf("%s-%s-%s.log", podNamespace, pod.Name, container.Name)
981+
}
982+
983+
req := clientSet.CoreV1().Pods(podNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{
908984
Container: container.Name,
909985
Previous: previous,
986+
SinceTime: &metav1.Time{Time: testStartTime},
910987
})
911-
podLogsStream, err := req.Stream(context.TODO())
988+
989+
streamCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
990+
podLogsStream, err := req.Stream(streamCtx)
912991
if err != nil {
913-
errs = errors.Join(fmt.Errorf("error getting container %s of pod %s logs: %w", container.Name, pod.Name, err), errs)
992+
cancel()
993+
t.Logf("error getting container %q of pod %q logs: %s", container.Name, pod.Name, err)
914994
continue
915995
}
916996

917-
_, err = io.Copy(logFile, podLogsStream)
997+
b, err := io.ReadAll(podLogsStream)
998+
_ = podLogsStream.Close()
999+
cancel()
9181000
if err != nil {
919-
errs = errors.Join(fmt.Errorf("error writing container %s of pod %s logs: %w", container.Name, pod.Name, err), errs)
1001+
t.Logf("error reading container %q logs of pod %q: %s", container.Name, pod.Name, err)
1002+
continue
9201003
}
9211004

922-
_ = podLogsStream.Close()
1005+
header := &tar.Header{
1006+
Name: logFileName,
1007+
Size: int64(len(b)),
1008+
Mode: 0600,
1009+
ModTime: time.Now(),
1010+
AccessTime: time.Now(),
1011+
ChangeTime: time.Now(),
1012+
}
1013+
1014+
if err := tarWriter.WriteHeader(header); err != nil {
1015+
t.Logf("error writing header of file %q in archive: %s", logFileName, err)
1016+
continue
1017+
}
1018+
1019+
if _, err := tarWriter.Write(b); err != nil {
1020+
t.Logf("error writing data of file %q in archive: %s", logFileName, err)
1021+
}
9231022
}
9241023
}
9251024

926-
return errs
1025+
b, err := json.Marshal(statesDump)
1026+
if err != nil {
1027+
t.Logf("error marshalling pod states: %s", err)
1028+
return
1029+
}
1030+
1031+
statesDumpFile := "containerPodsStates.json"
1032+
header := &tar.Header{
1033+
Name: statesDumpFile,
1034+
Size: int64(len(b)),
1035+
Mode: 0600,
1036+
ModTime: time.Now(),
1037+
AccessTime: time.Now(),
1038+
ChangeTime: time.Now(),
1039+
}
1040+
1041+
if err := tarWriter.WriteHeader(header); err != nil {
1042+
t.Logf("error writing header of file %q in archive: %s", statesDumpFile, err)
1043+
return
1044+
}
1045+
1046+
if _, err := tarWriter.Write(b); err != nil {
1047+
t.Logf("error writing data of file %q in archive: %s", statesDumpFile, err)
1048+
}
9271049
}
9281050

9291051
// k8sKustomizeAdjustObjects adjusts the namespace of given k8s objects and calls the given callbacks for the containers and the pod
@@ -1145,12 +1267,10 @@ func k8sCreateObjects(ctx context.Context, client klient.Client, opts k8sCreateO
11451267
for idx := range objWithType.Subjects {
11461268
objWithType.Subjects[idx].Namespace = opts.namespace
11471269
}
1148-
continue
11491270
case *rbacv1.RoleBinding:
11501271
for idx := range objWithType.Subjects {
11511272
objWithType.Subjects[idx].Namespace = opts.namespace
11521273
}
1153-
continue
11541274
}
11551275
}
11561276
if err := client.Resources().Create(ctx, obj); err != nil {
@@ -1263,12 +1383,18 @@ type k8sContext struct {
12631383
esAPIKey string
12641384
// enrollParams contains the information needed to enroll an agent with Fleet in the test
12651385
enrollParams *fleettools.EnrollParams
1386+
// createdAt is the time when the k8sContext was created
1387+
createdAt time.Time
12661388
}
12671389

12681390
// getNamespace returns a unique namespace for the current test
1269-
func (k8sContext) getNamespace(t *testing.T) string {
1391+
func (k k8sContext) getNamespace(t *testing.T) string {
1392+
nsUUID, err := uuid.NewV4()
1393+
if err != nil {
1394+
t.Fatalf("error generating namespace UUID: %v", err)
1395+
}
12701396
hasher := sha256.New()
1271-
hasher.Write([]byte(t.Name()))
1397+
hasher.Write([]byte(nsUUID.String()))
12721398
testNamespace := strings.ToLower(base64.URLEncoding.EncodeToString(hasher.Sum(nil)))
12731399
return noSpecialCharsRegexp.ReplaceAllString(testNamespace, "")
12741400
}
@@ -1326,7 +1452,7 @@ func k8sGetContext(t *testing.T, info *define.Info) k8sContext {
13261452
testLogsBasePath := os.Getenv("K8S_TESTS_POD_LOGS_BASE")
13271453
require.NotEmpty(t, testLogsBasePath, "K8S_TESTS_POD_LOGS_BASE must be set")
13281454

1329-
err = os.MkdirAll(filepath.Join(testLogsBasePath, t.Name()), 0o755)
1455+
err = os.MkdirAll(testLogsBasePath, 0o755)
13301456
require.NoError(t, err, "failed to create test logs directory")
13311457

13321458
esHost := os.Getenv("ELASTICSEARCH_HOST")
@@ -1349,6 +1475,7 @@ func k8sGetContext(t *testing.T, info *define.Info) k8sContext {
13491475
esHost: esHost,
13501476
esAPIKey: esAPIKey,
13511477
enrollParams: enrollParams,
1478+
createdAt: time.Now(),
13521479
}
13531480
}
13541481

@@ -1473,9 +1600,7 @@ func k8sStepDeployKustomize(kustomizePath string, containerName string, override
14731600

14741601
t.Cleanup(func() {
14751602
if t.Failed() {
1476-
if err := k8sDumpAllPodLogs(ctx, kCtx.client, namespace, namespace, kCtx.logsBasePath); err != nil {
1477-
t.Logf("failed to dump logs: %v", err)
1478-
}
1603+
k8sDumpPods(t, ctx, kCtx.client, t.Name(), namespace, kCtx.logsBasePath, kCtx.createdAt)
14791604
}
14801605

14811606
err := k8sDeleteObjects(ctx, kCtx.client, k8sDeleteOpts{wait: true}, objects...)
@@ -1484,7 +1609,7 @@ func k8sStepDeployKustomize(kustomizePath string, containerName string, override
14841609
}
14851610
})
14861611

1487-
err = k8sCreateObjects(ctx, kCtx.client, k8sCreateOpts{wait: true}, objects...)
1612+
err = k8sCreateObjects(ctx, kCtx.client, k8sCreateOpts{wait: true, namespace: namespace}, objects...)
14881613
require.NoError(t, err, "failed to create objects")
14891614
}
14901615
}
@@ -1547,8 +1672,10 @@ func k8sStepRunInnerTests(agentPodLabelSelector string, expectedPodNumber int, c
15471672

15481673
for _, pod := range perNodePodList.Items {
15491674
var stdout, stderr bytes.Buffer
1675+
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
15501676
err = kCtx.client.Resources().ExecInPod(ctx, namespace, pod.Name, containerName,
15511677
[]string{"/usr/share/elastic-agent/k8s-inner-tests", "-test.v"}, &stdout, &stderr)
1678+
cancel()
15521679
t.Logf("%s k8s-inner-tests output:", pod.Name)
15531680
t.Log(stdout.String())
15541681
if err != nil {
@@ -1593,9 +1720,7 @@ func k8sStepHelmDeploy(chartPath string, releaseName string, values map[string]a
15931720

15941721
t.Cleanup(func() {
15951722
if t.Failed() {
1596-
if err := k8sDumpAllPodLogs(ctx, kCtx.client, namespace, namespace, kCtx.logsBasePath); err != nil {
1597-
t.Logf("failed to dump logs: %v", err)
1598-
}
1723+
k8sDumpPods(t, ctx, kCtx.client, t.Name(), namespace, kCtx.logsBasePath, kCtx.createdAt)
15991724
}
16001725

16011726
uninstallAction := action.NewUninstall(actionConfig)
@@ -1712,7 +1837,9 @@ func k8sStepCheckRestrictUpgrade(agentPodLabelSelector string, expectedPodNumber
17121837
var stdout, stderr bytes.Buffer
17131838

17141839
command := []string{"elastic-agent", "upgrade", "1.0.0"}
1840+
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
17151841
err := kCtx.client.Resources().ExecInPod(ctx, namespace, pod.Name, containerName, command, &stdout, &stderr)
1842+
cancel()
17161843
require.Error(t, err)
17171844
require.Contains(t, stderr.String(), coordinator.ErrNotUpgradable.Error())
17181845
}

0 commit comments

Comments
 (0)