Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,11 @@ func checkRequiredCredentials(opts *options, credErr error) error {

func setupAndStartOperatorMetadataForwarder(logger logr.Logger, client client.Reader, kubernetesVersion string, options *options, credsManager *config.CredentialManager) {
omf := metadata.NewOperatorMetadataForwarder(logger, client, kubernetesVersion, version.GetVersion(), credsManager)
if omf == nil {
logger.Error(fmt.Errorf("operator metadata forwarder initialization failed"), "Continuing without metadata forwarding")
return
}
omf.OperatorMetadata = metadata.OperatorMetadata{
OperatorVersion: version.GetVersion(),
KubernetesVersion: kubernetesVersion,
InstallMethodTool: "datadog-operator",
InstallMethodToolVersion: version.GetVersion(),
IsLeader: true,
Expand Down Expand Up @@ -593,10 +595,18 @@ func setupAndStartCRDMetadataForwarder(logger logr.Logger, client client.Reader,
DatadogAgentProfileEnabled: options.datadogAgentProfileEnabled,
},
)
if cmf == nil {
logger.Error(fmt.Errorf("CRD metadata forwarder initialization failed"), "Continuing without metadata forwarding")
return
}
cmf.Start()
}

func setupAndStartHelmMetadataForwarder(logger logr.Logger, client client.Reader, kubernetesVersion string, credsManager *config.CredentialManager) {
hmf := metadata.NewHelmMetadataForwarder(logger, client, kubernetesVersion, version.GetVersion(), credsManager)
if hmf == nil {
logger.Error(fmt.Errorf("helm metadata forwarder initialization failed"), "Continuing without metadata forwarding")
return
}
hmf.Start()
}
36 changes: 19 additions & 17 deletions pkg/controller/utils/metadata/crd_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
)

type CRDMetadataForwarder struct {
*BaseForwarder
*SharedMetadata

enabledCRDs EnabledCRDKindsConfig
Expand All @@ -45,10 +46,7 @@ type CRDMetadataPayload struct {
}

type CRDMetadata struct {
// Shared
OperatorVersion string `json:"operator_version"`
KubernetesVersion string `json:"kubernetes_version"`
ClusterID string `json:"cluster_id"`
SharedMetadata

CRDKind string `json:"crd_kind"`
CRDName string `json:"crd_name"`
Expand Down Expand Up @@ -79,10 +77,21 @@ type EnabledCRDKindsConfig struct {
}

// NewCRDMetadataForwarder creates a new instance of the CRD metadata forwarder
// Returns nil if shared metadata cannot be initialized
func NewCRDMetadataForwarder(logger logr.Logger, k8sClient client.Reader, kubernetesVersion string, operatorVersion string, credsManager *config.CredentialManager, config EnabledCRDKindsConfig) *CRDMetadataForwarder {
forwarderLogger := logger.WithName("crd")

sharedMetadata, err := NewSharedMetadata(operatorVersion, kubernetesVersion, k8sClient)
if err != nil {
forwarderLogger.Info("Failed to initialize shared metadata", "error", err)
return nil
}

baseForwarder := NewBaseForwarder(forwarderLogger, k8sClient, credsManager)

return &CRDMetadataForwarder{
SharedMetadata: NewSharedMetadata(forwarderLogger, k8sClient, kubernetesVersion, operatorVersion, credsManager),
BaseForwarder: baseForwarder,
SharedMetadata: sharedMetadata,
enabledCRDs: config,
crdCache: make(map[string]string),
}
Expand Down Expand Up @@ -126,12 +135,7 @@ func (cmf *CRDMetadataForwarder) sendMetadata() error {
}

func (cmf *CRDMetadataForwarder) sendCRDMetadata(crdInstance CRDInstance) error {
clusterUID, err := cmf.GetOrCreateClusterUID()
if err != nil {
return fmt.Errorf("error getting cluster UID: %w", err)
}

payload := cmf.buildPayload(clusterUID, crdInstance)
payload := cmf.buildPayload(crdInstance)

cmf.logger.V(1).Info("Sending metadata HTTP request",
"kind", crdInstance.Kind,
Expand Down Expand Up @@ -179,17 +183,15 @@ func (cmf *CRDMetadataForwarder) marshalToJSON(data interface{}, fieldName strin
return jsonBytes
}

func (cmf *CRDMetadataForwarder) buildPayload(clusterUID string, crdInstance CRDInstance) []byte {
func (cmf *CRDMetadataForwarder) buildPayload(crdInstance CRDInstance) []byte {
now := time.Now().Unix()

specJSON := cmf.marshalToJSON(crdInstance.Spec, "spec", crdInstance)
labelsJSON := cmf.marshalToJSON(crdInstance.Labels, "labels", crdInstance)
annotationsJSON := cmf.marshalToJSON(crdInstance.Annotations, "annotations", crdInstance)

crdMetadata := CRDMetadata{
OperatorVersion: cmf.operatorVersion,
KubernetesVersion: cmf.kubernetesVersion,
ClusterID: clusterUID,
SharedMetadata: *cmf.SharedMetadata,
CRDKind: crdInstance.Kind,
CRDName: crdInstance.Name,
CRDNamespace: crdInstance.Namespace,
Expand All @@ -201,9 +203,9 @@ func (cmf *CRDMetadataForwarder) buildPayload(clusterUID string, crdInstance CRD
}

payload := CRDMetadataPayload{
UUID: clusterUID,
UUID: cmf.SharedMetadata.ClusterID,
Timestamp: now,
ClusterID: clusterUID,
ClusterID: cmf.SharedMetadata.ClusterID,
Metadata: crdMetadata,
}

Expand Down
27 changes: 17 additions & 10 deletions pkg/controller/utils/metadata/crd_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"

"github.com/DataDog/datadog-operator/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

Expand All @@ -25,12 +24,14 @@ func Test_CRDBuildPayload(t *testing.T) {
expectedCRDAPIVersion := "datadoghq.com/v2alpha1"
expectedCRDUID := "crd-uid-67890"

client := newFakeClientWithKubeSystem("test-cluster-uid-12345")

cmf := NewCRDMetadataForwarder(
zap.New(zap.UseDevMode(true)),
nil,
client,
expectedKubernetesVersion,
expectedOperatorVersion,
config.NewCredentialManager(fake.NewFakeClient()),
config.NewCredentialManager(client),
EnabledCRDKindsConfig{
DatadogAgentEnabled: true,
DatadogAgentInternalEnabled: true,
Expand Down Expand Up @@ -66,7 +67,7 @@ func Test_CRDBuildPayload(t *testing.T) {
Annotations: testAnnotations,
}

payload := cmf.buildPayload(expectedClusterUID, crdInstance)
payload := cmf.buildPayload(crdInstance)

// Verify payload is valid JSON
if len(payload) == 0 {
Expand Down Expand Up @@ -170,12 +171,14 @@ func Test_CRDBuildPayload(t *testing.T) {

// Test that hash-based change detection works correctly
func Test_CRDCacheDetection(t *testing.T) {
client := newFakeClientWithKubeSystem("test-cluster-uid-12345")

cmf := NewCRDMetadataForwarder(
zap.New(zap.UseDevMode(true)),
nil,
client,
"v1.28.0",
"v1.19.0",
config.NewCredentialManager(fake.NewFakeClient()),
config.NewCredentialManager(client),
EnabledCRDKindsConfig{
DatadogAgentEnabled: true,
DatadogAgentInternalEnabled: true,
Expand Down Expand Up @@ -246,12 +249,14 @@ func Test_CRDCacheDetection(t *testing.T) {

// Test that cache cleanup works correctly
func Test_CRDCacheCleanup(t *testing.T) {
client := newFakeClientWithKubeSystem("test-cluster-uid-12345")

cmf := NewCRDMetadataForwarder(
zap.New(zap.UseDevMode(true)),
nil,
client,
"v1.28.0",
"v1.19.0",
config.NewCredentialManager(fake.NewFakeClient()),
config.NewCredentialManager(client),
EnabledCRDKindsConfig{DatadogAgentEnabled: true},
)

Expand Down Expand Up @@ -294,12 +299,14 @@ func Test_CRDCacheCleanup(t *testing.T) {

// Test that per-kind error handling preserves cache correctly
func Test_CRDPerKindErrorHandling(t *testing.T) {
client := newFakeClientWithKubeSystem("test-cluster-uid-12345")

cmf := NewCRDMetadataForwarder(
zap.New(zap.UseDevMode(true)),
nil,
client,
"v1.28.0",
"v1.19.0",
config.NewCredentialManager(fake.NewFakeClient()),
config.NewCredentialManager(client),
EnabledCRDKindsConfig{
DatadogAgentEnabled: true,
DatadogAgentInternalEnabled: true,
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/utils/metadata/credential_setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,14 @@ func TestSetupRequestPrerequisites(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&v2alpha1.DatadogAgent{}).WithObjects(clientObjects...).Build()

credsManager := config.NewCredentialManagerWithDecryptor(client, &mockDecryptor{})
sharedMetadata, _ := NewSharedMetadata("v1.0.0", "v1.28.0", client)
omf := &OperatorMetadataForwarder{
SharedMetadata: NewSharedMetadata(
BaseForwarder: NewBaseForwarder(
zap.New(zap.UseDevMode(true)),
client,
"v1.28.0",
"v1.0.0",
credsManager,
),
SharedMetadata: sharedMetadata,
OperatorMetadata: OperatorMetadata{},
}

Expand Down Expand Up @@ -334,9 +334,7 @@ func TestSetupRequestPrerequisites(t *testing.T) {
assert.Equal(t, "application/json", req.Header.Get("Accept"), "Accept header should be set")

// Verify cluster UID is set
clusterUID, err := omf.GetOrCreateClusterUID()
assert.NoError(t, err)
assert.NotEmpty(t, clusterUID, "Cluster UID should be set")
assert.NotEmpty(t, omf.SharedMetadata.ClusterID, "Cluster UID should be set")
})
}
}
35 changes: 19 additions & 16 deletions pkg/controller/utils/metadata/helm_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
)

type HelmMetadataForwarder struct {
*BaseForwarder
*SharedMetadata

allHelmReleasesCache allHelmReleasesCache
Expand All @@ -57,10 +58,7 @@ type HelmMetadataPayload struct {
}

type HelmMetadata struct {
// Shared
OperatorVersion string `json:"operator_version"`
KubernetesVersion string `json:"kubernetes_version"`
ClusterID string `json:"cluster_id"`
SharedMetadata

ChartName string `json:"chart_name"`
ChartReleaseName string `json:"chart_release_name"`
Expand Down Expand Up @@ -113,10 +111,21 @@ type HelmReleaseMinimal struct {
}

// NewHelmMetadataForwarder creates a new instance of the helm metadata forwarder
// Returns nil if shared metadata cannot be initialized
func NewHelmMetadataForwarder(logger logr.Logger, k8sClient client.Reader, kubernetesVersion string, operatorVersion string, credsManager *config.CredentialManager) *HelmMetadataForwarder {
forwarderLogger := logger.WithName("helm")

sharedMetadata, err := NewSharedMetadata(operatorVersion, kubernetesVersion, k8sClient)
if err != nil {
forwarderLogger.Info("Failed to initialize shared metadata", "error", err)
return nil
}

baseForwarder := NewBaseForwarder(forwarderLogger, k8sClient, credsManager)

return &HelmMetadataForwarder{
SharedMetadata: NewSharedMetadata(forwarderLogger, k8sClient, kubernetesVersion, operatorVersion, credsManager),
BaseForwarder: baseForwarder,
SharedMetadata: sharedMetadata,
}
}

Expand Down Expand Up @@ -188,11 +197,7 @@ func (hmf *HelmMetadataForwarder) sendMetadata() error {
}

func (hmf *HelmMetadataForwarder) sendSingleReleasePayload(release HelmReleaseData) error {
clusterUID, err := hmf.GetOrCreateClusterUID()
if err != nil {
return fmt.Errorf("error getting cluster UID: %w", err)
}
payload := hmf.buildPayload(release, clusterUID)
payload := hmf.buildPayload(release)

hmf.logger.V(1).Info("Built metadata payload",
"release", release.ReleaseName,
Expand Down Expand Up @@ -231,13 +236,11 @@ func (hmf *HelmMetadataForwarder) sendSingleReleasePayload(release HelmReleaseDa
return nil
}

func (hmf *HelmMetadataForwarder) buildPayload(release HelmReleaseData, clusterUID string) []byte {
func (hmf *HelmMetadataForwarder) buildPayload(release HelmReleaseData) []byte {
now := time.Now().Unix()

helmMetadata := HelmMetadata{
OperatorVersion: hmf.operatorVersion,
KubernetesVersion: hmf.kubernetesVersion,
ClusterID: clusterUID,
SharedMetadata: *hmf.SharedMetadata,
ChartName: release.ChartName,
ChartReleaseName: release.ReleaseName,
ChartAppVersion: release.AppVersion,
Expand All @@ -249,9 +252,9 @@ func (hmf *HelmMetadataForwarder) buildPayload(release HelmReleaseData, clusterU
}

payload := HelmMetadataPayload{
UUID: clusterUID,
UUID: hmf.SharedMetadata.ClusterID,
Timestamp: now,
ClusterID: clusterUID,
ClusterID: hmf.SharedMetadata.ClusterID,
Metadata: helmMetadata,
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/controller/utils/metadata/helm_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/DataDog/datadog-operator/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

Expand All @@ -27,7 +26,9 @@ func Test_HelmMetadataForwarder_getPayload(t *testing.T) {
expectedChartVersion := "3.10.0"
expectedAppVersion := "7.50.0"

hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), nil, expectedKubernetesVersion, expectedOperatorVersion, config.NewCredentialManager(fake.NewFakeClient()))
client := newFakeClientWithKubeSystem("test-cluster-uid-123")

hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), client, expectedKubernetesVersion, expectedOperatorVersion, config.NewCredentialManager(client))

release := HelmReleaseData{
ReleaseName: expectedReleaseName,
Expand All @@ -42,7 +43,7 @@ func Test_HelmMetadataForwarder_getPayload(t *testing.T) {
Status: "deployed",
}

payload := hmf.buildPayload(release, expectedClusterUID)
payload := hmf.buildPayload(release)

// Verify payload is valid JSON
if len(payload) == 0 {
Expand Down Expand Up @@ -118,7 +119,9 @@ func Test_HelmMetadataForwarder_getPayload(t *testing.T) {
}

func Test_parseHelmResource(t *testing.T) {
hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), nil, "v1.28.0", "v1.19.0", config.NewCredentialManager(fake.NewFakeClient()))
client := newFakeClientWithKubeSystem("test-cluster-uid-123")

hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), client, "v1.28.0", "v1.19.0", config.NewCredentialManager(client))

// Create a minimal valid Helm release JSON
releaseData := HelmReleaseMinimal{
Expand Down Expand Up @@ -264,7 +267,9 @@ func Test_allHelmReleasesCache(t *testing.T) {
}

func Test_mergeValues(t *testing.T) {
hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), nil, "v1.28.0", "v1.19.0", config.NewCredentialManager(fake.NewFakeClient()))
client := newFakeClientWithKubeSystem("test-cluster-uid-123")

hmf := NewHelmMetadataForwarder(zap.New(zap.UseDevMode(true)), client, "v1.28.0", "v1.19.0", config.NewCredentialManager(client))

tests := []struct {
name string
Expand Down
Loading
Loading