Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *collector) pullFromKubelet(ctx context.Context) error {
return nil
}

events = append(events, util.ParseKubeletPods(podList.Items, c.collectEphemeralContainers)...)
events = append(events, util.ParseKubeletPods(podList.Items, c.collectEphemeralContainers, c.store)...)

// Mark return pods and containers as seen now
now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock"
workloadmetamock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/mock"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/pointer"
Expand Down Expand Up @@ -162,7 +167,12 @@ func TestPodParser(t *testing.T) {
},
}

events := util.ParseKubeletPods(referencePod, true)
mockStore := fxutil.Test[workloadmetamock.Mock](t, fx.Options(
core.MockBundle(),
workloadmetafxmock.MockModule(workloadmeta.NewParams()),
))

events := util.ParseKubeletPods(referencePod, true, mockStore)
parsedEntities := make([]workloadmeta.Entity, 0, len(events))
for _, event := range events {
parsedEntities = append(parsedEntities, event.Entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"time"

"go.uber.org/fx"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
apiv1 "github.com/DataDog/datadog-agent/pkg/clusteragent/api/v1"
"github.com/DataDog/datadog-agent/pkg/config/env"
Expand All @@ -32,15 +34,17 @@ import (
)

const (
collectorID = "kube_metadata"
componentName = "workloadmeta-kube_metadata"
collectorID = "kube_metadata"
componentName = "workloadmeta-kube_metadata"
namespaceMetadataTTL = 25 * time.Hour
)

type collector struct {
id string
store workloadmeta.Component
catalog workloadmeta.AgentType
seen map[workloadmeta.EntityID]struct{}
namespaceLastSeen map[string]time.Time
kubeUtil kubelet.KubeUtilInterface
apiClient *apiserver.APIClient
dcaClient clusteragent.DCAClientInterface
Expand All @@ -55,9 +59,10 @@ type collector struct {
func NewCollector() (workloadmeta.CollectorProvider, error) {
return workloadmeta.CollectorProvider{
Collector: &collector{
id: collectorID,
seen: make(map[workloadmeta.EntityID]struct{}),
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
id: collectorID,
seen: make(map[workloadmeta.EntityID]struct{}),
namespaceLastSeen: make(map[string]time.Time),
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
},
}, nil
}
Expand Down Expand Up @@ -152,18 +157,18 @@ func (c *collector) Pull(ctx context.Context) error {
return err
}

// Create unset events for entities that are no longer seen
for seenID := range c.seen {
if _, ok := seen[seenID]; ok {
continue
}

events = append(events, workloadmeta.CollectorEvent{
Type: workloadmeta.EventTypeUnset,
Source: workloadmeta.SourceClusterOrchestrator,
Entity: &workloadmeta.KubernetesPod{
EntityID: seenID,
},
})
if c.shouldKeepNamespaceAlive(seenID) {
continue
}

// Unset entities that are no longer seen
events = append(events, c.createUnsetEvent(seenID))
}

c.seen = seen
Expand All @@ -183,6 +188,49 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
return c.catalog
}

// createUnsetEvent creates an unset event for the appropriate entity type.
func (c *collector) createUnsetEvent(seenID workloadmeta.EntityID) workloadmeta.CollectorEvent {
var entity workloadmeta.Entity
switch seenID.Kind {
case workloadmeta.KindKubernetesMetadata:
entity = &workloadmeta.KubernetesMetadata{EntityID: seenID}
default:
entity = &workloadmeta.KubernetesPod{EntityID: seenID}
}

return workloadmeta.CollectorEvent{
Type: workloadmeta.EventTypeUnset,
Source: workloadmeta.SourceClusterOrchestrator,
Entity: entity,
}
}

// shouldKeepNamespaceAlive checks if a namespace metadata entity is within its TTL.
// Returns false for non-namespace entities or expired namespaces (also cleans up tracking).
func (c *collector) shouldKeepNamespaceAlive(seenID workloadmeta.EntityID) bool {
if seenID.Kind != workloadmeta.KindKubernetesMetadata {
return false
}

group, resource, _, name, err := util.ParseKubeMetadataEntityID(workloadmeta.KubeMetadataEntityID(seenID.ID))
if err != nil || group != "" || resource != "namespaces" {
return false
}

lastSeen, ok := c.namespaceLastSeen[name]
if !ok {
return false
}

if time.Since(lastSeen) < namespaceMetadataTTL {
return true
}

// Expired, remove from tracking
delete(c.namespaceLastSeen, name)
return false
}

// parsePods returns collection events based on a given podlist.
func (c *collector) parsePods(
ctx context.Context,
Expand Down Expand Up @@ -213,7 +261,6 @@ func (c *collector) parsePods(

// To get metadata/labels once per namespace.
metadataByNS := make(map[string]*clusteragent.Metadata)
labelsByNS := make(map[string]map[string]string)

for _, pod := range pods {
if pod.Metadata.UID == "" {
Expand Down Expand Up @@ -272,17 +319,23 @@ func (c *collector) parsePods(
}
} else {
// Cluster agent with version older than 7.55
var ok bool
nsLabels, ok = labelsByNS[pod.Metadata.Namespace]
nsMetadata, ok := metadataByNS[pod.Metadata.Namespace]
if !ok {
nsLabels, err = c.getNamespaceLabels(pod.Metadata.Namespace)
if err == nil {
labelsByNS[pod.Metadata.Namespace] = nsLabels
nsMetadata = &clusteragent.Metadata{
Labels: nsLabels,
}
metadataByNS[pod.Metadata.Namespace] = nsMetadata
} else {
log.Errorf("Could not fetch namespace labels for pod %s/%s: %v", pod.Metadata.Namespace, pod.Metadata.Name, err)
}
}

if nsMetadata != nil {
nsLabels = nsMetadata.Labels
}

if c.collectNamespaceAnnotations {
log.Errorf("Could not fetch namespace annotations for pod %s/%s: kubernetes_namespace_annotations_as_tags requires version 7.55 or later of the cluster agent", pod.Metadata.Namespace, pod.Metadata.Name)
}
Expand Down Expand Up @@ -315,6 +368,21 @@ func (c *collector) parsePods(
})
}

// Save kubernetes namespace metadata entities for caching
for ns, nsMetadata := range metadataByNS {
nsEntity := createNamespaceEntity(ns, nsMetadata)
nsEntityID := nsEntity.GetID()

events = append(events, workloadmeta.CollectorEvent{
Source: workloadmeta.SourceClusterOrchestrator,
Type: workloadmeta.EventTypeSet,
Entity: nsEntity,
})

seen[nsEntityID] = struct{}{}
c.namespaceLastSeen[ns] = time.Now()
}

return events, nil
}

Expand Down Expand Up @@ -377,6 +445,38 @@ func (c *collector) isDCAEnabled() bool {
return false
}

// createNamespaceEntity creates a KubernetesMetadata entity for a namespace
func createNamespaceEntity(namespaceName string, metadata *clusteragent.Metadata) *workloadmeta.KubernetesMetadata {
labels := make(map[string]string)
annotations := make(map[string]string)

if metadata != nil {
if metadata.Labels != nil {
labels = metadata.Labels
}
if metadata.Annotations != nil {
annotations = metadata.Annotations
}
}

return &workloadmeta.KubernetesMetadata{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesMetadata,
ID: string(util.GenerateKubeMetadataEntityID("", "namespaces", "", namespaceName)),
},
EntityMeta: workloadmeta.EntityMeta{
Name: namespaceName,
Labels: labels,
Annotations: annotations,
},
GVR: &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "namespaces",
},
}
}

// addToCacheMetadataMapping is acting like the DCA at the node level.
func (c *collector) addToCacheMetadataMapping(kubeletPodList []*kubelet.Pod) error {
if len(kubeletPodList) == 0 {
Expand Down
Loading
Loading