Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/datadoghq/v1alpha1/datadogmonitor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ const (
DatadogMonitorConditionTypeUpdated DatadogMonitorConditionType = "Updated"
// DatadogMonitorConditionTypeError means the DatadogMonitor has an error
DatadogMonitorConditionTypeError DatadogMonitorConditionType = "Error"
// DatadogMonitorConditionTypeDriftDetected means drift was detected between the resource and Datadog
DatadogMonitorConditionTypeDriftDetected DatadogMonitorConditionType = "DriftDetected"
// DatadogMonitorConditionTypeRecreated means the DatadogMonitor was recreated due to drift
DatadogMonitorConditionTypeRecreated DatadogMonitorConditionType = "Recreated"
)

// DatadogMonitorState represents the overall DatadogMonitor state
Expand Down
253 changes: 224 additions & 29 deletions internal/controller/datadogmonitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,34 +165,49 @@ func (r *Reconciler) internalReconcile(ctx context.Context, instance *datadoghqv
if instance.Status.ID == 0 {
shouldCreate = true
} else {
var m datadogV1.Monitor
if instanceSpecHash != statusSpecHash {
// Custom resource manifest has changed, need to update the API
logger.V(1).Info("DatadogMonitor manifest has changed")
shouldUpdate = true
} else if instance.Status.MonitorLastForceSyncTime == nil || (forceSyncPeriod-now.Sub(instance.Status.MonitorLastForceSyncTime.Time)) <= 0 {
// Periodically force a sync with the API monitor to ensure parity
// Get monitor to make sure it exists before trying any updates. If it doesn't, set shouldCreate
_, err = r.get(instance, newStatus)
// Perform drift detection to check if monitor exists in Datadog
driftDetected, err := r.detectDrift(ctx, logger, instance, newStatus)
if err != nil {
logger.Error(err, "error during drift detection", "Monitor ID", instance.Status.ID)
// Continue with reconciliation even if drift detection fails
}

if driftDetected {
logger.Info("Drift detected: monitor not found in Datadog, will recreate", "Monitor ID", instance.Status.ID)
err = r.handleMonitorRecreation(ctx, logger, instance, newStatus, now, instanceSpecHash)
if err != nil {
logger.Error(err, "error getting monitor", "Monitor ID", instance.Status.ID)
if strings.Contains(err.Error(), ctrutils.NotFoundString) {
shouldCreate = true
}
} else {
shouldUpdate = true
logger.Error(err, "error recreating monitor", "Monitor ID", instance.Status.ID)
}
} else if instance.Status.MonitorStateLastUpdateTime == nil || (defaultRequeuePeriod-now.Sub(instance.Status.MonitorStateLastUpdateTime.Time)) <= 0 {
// If other conditions aren't met, and we have passed the defaultRequeuePeriod, then update monitor state
// Get monitor to make sure it exists before trying any updates. If it doesn't, set shouldCreate
m, err = r.get(instance, newStatus)
if err != nil {
logger.Error(err, "error getting monitor", "Monitor ID", instance.Status.ID)
if strings.Contains(err.Error(), ctrutils.NotFoundString) {
shouldCreate = true
} else {
var m datadogV1.Monitor
if instanceSpecHash != statusSpecHash {
// Custom resource manifest has changed, need to update the API
logger.V(1).Info("DatadogMonitor manifest has changed")
shouldUpdate = true
} else if instance.Status.MonitorLastForceSyncTime == nil || (forceSyncPeriod-now.Sub(instance.Status.MonitorLastForceSyncTime.Time)) <= 0 {
// Periodically force a sync with the API monitor to ensure parity
// Get monitor to make sure it exists before trying any updates. If it doesn't, set shouldCreate
_, err = r.get(instance, newStatus)
if err != nil {
logger.Error(err, "error getting monitor", "Monitor ID", instance.Status.ID)
if strings.Contains(err.Error(), ctrutils.NotFoundString) {
shouldCreate = true
}
} else {
shouldUpdate = true
}
} else if instance.Status.MonitorStateLastUpdateTime == nil || (defaultRequeuePeriod-now.Sub(instance.Status.MonitorStateLastUpdateTime.Time)) <= 0 {
// If other conditions aren't met, and we have passed the defaultRequeuePeriod, then update monitor state
// Get monitor to make sure it exists before trying any updates. If it doesn't, set shouldCreate
m, err = r.get(instance, newStatus)
if err != nil {
logger.Error(err, "error getting monitor", "Monitor ID", instance.Status.ID)
if strings.Contains(err.Error(), ctrutils.NotFoundString) {
shouldCreate = true
}
}
updateMonitorState(m, now, newStatus)
}
updateMonitorState(m, now, newStatus)
}
}

Expand Down Expand Up @@ -236,6 +251,10 @@ func (r *Reconciler) internalReconcile(ctx context.Context, instance *datadoghqv
}

func (r *Reconciler) create(logger logr.Logger, datadogMonitor *datadoghqv1alpha1.DatadogMonitor, status *datadoghqv1alpha1.DatadogMonitorStatus, now metav1.Time, instanceSpecHash string) error {
return r.createInternal(logger, datadogMonitor, status, now, instanceSpecHash, false)
}

func (r *Reconciler) createInternal(logger logr.Logger, datadogMonitor *datadoghqv1alpha1.DatadogMonitor, status *datadoghqv1alpha1.DatadogMonitorStatus, now metav1.Time, instanceSpecHash string, isRecreation bool) error {
// Validate monitor in Datadog
if err := validateMonitor(r.datadogAuth, logger, r.datadogClient, datadogMonitor); err != nil {
return err
Expand All @@ -246,10 +265,22 @@ func (r *Reconciler) create(logger logr.Logger, datadogMonitor *datadoghqv1alpha
if err != nil {
return err
}
event := buildEventInfo(datadogMonitor.Name, datadogMonitor.Namespace, pkgutils.CreationEvent)

// Determine event type based on whether this is recreation or initial creation
var eventType pkgutils.EventType
var logMessage string
if isRecreation {
eventType = pkgutils.RecreationEvent
logMessage = "Recreated DatadogMonitor"
} else {
eventType = pkgutils.CreationEvent
logMessage = "Created a new DatadogMonitor"
}

event := buildEventInfo(datadogMonitor.Name, datadogMonitor.Namespace, eventType)
r.recordEvent(datadogMonitor, event)

// As this is a new monitor, add static information to status
// Update status with new monitor information
status.ID = int(m.GetId())
creator := m.GetCreator()
status.Creator = creator.GetEmail()
Expand All @@ -259,13 +290,84 @@ func (r *Reconciler) create(logger logr.Logger, datadogMonitor *datadoghqv1alpha
status.MonitorStateSyncStatus = ""
status.CurrentHash = instanceSpecHash

// Set Created Condition
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeCreated, corev1.ConditionTrue, "DatadogMonitor Created")
logger.Info("Created a new DatadogMonitor", "Monitor Namespace", datadogMonitor.Namespace, "Monitor Name", datadogMonitor.Name, "Monitor ID", m.GetId())
// Set appropriate condition based on operation type
if isRecreation {
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeRecreated, corev1.ConditionTrue, "DatadogMonitor Recreated")
} else {
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeCreated, corev1.ConditionTrue, "DatadogMonitor Created")
}

logger.Info(logMessage, "Monitor Namespace", datadogMonitor.Namespace, "Monitor Name", datadogMonitor.Name, "Monitor ID", m.GetId())

return nil
}

// detectDrift checks if the monitor referenced by the DatadogMonitor exists in Datadog
func (r *Reconciler) detectDrift(ctx context.Context, logger logr.Logger, instance *datadoghqv1alpha1.DatadogMonitor, status *datadoghqv1alpha1.DatadogMonitorStatus) (bool, error) {
// If no monitor ID is set, no drift can be detected
if instance.Status.ID == 0 {
return false, nil
}

// Attempt to get the monitor from Datadog
_, err := getMonitor(r.datadogAuth, r.datadogClient, instance.Status.ID)
if err != nil {
// Check if the error indicates the monitor was not found
if strings.Contains(err.Error(), ctrutils.NotFoundString) {
logger.Info("Drift detected: monitor not found in Datadog", "Monitor ID", instance.Status.ID)
// Update status to indicate drift was detected
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
// Set drift detected condition with detailed message
now := metav1.Now()
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeDriftDetected, corev1.ConditionTrue, fmt.Sprintf("Monitor ID %d not found in Datadog API", instance.Status.ID))
return true, nil
}

// Handle different types of API errors gracefully with detailed error reporting
errorMessage := err.Error()
now := metav1.Now()

if strings.Contains(errorMessage, "rate limit") || strings.Contains(errorMessage, "429") {
logger.V(1).Info("Rate limit encountered during drift detection, will retry later", "Monitor ID", instance.Status.ID)
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Rate limit during drift detection for monitor ID %d: %s", instance.Status.ID, errorMessage))
return false, fmt.Errorf("rate limit during drift detection, will retry: %w", err)
}

if strings.Contains(errorMessage, "unauthorized") || strings.Contains(errorMessage, "401") {
logger.Error(err, "Authentication error during drift detection", "Monitor ID", instance.Status.ID)
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Authentication error during drift detection for monitor ID %d: credentials may be invalid", instance.Status.ID))
return false, err
}

if strings.Contains(errorMessage, "forbidden") || strings.Contains(errorMessage, "403") {
logger.Error(err, "Authorization error during drift detection", "Monitor ID", instance.Status.ID)
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Authorization error during drift detection for monitor ID %d: insufficient permissions", instance.Status.ID))
return false, err
}

if strings.Contains(errorMessage, "timeout") || strings.Contains(errorMessage, "context deadline exceeded") {
logger.V(1).Info("Timeout during drift detection, will retry", "Monitor ID", instance.Status.ID)
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Timeout during drift detection for monitor ID %d: API request timed out", instance.Status.ID))
return false, fmt.Errorf("timeout during drift detection, will retry: %w", err)
}

// For other errors (API unavailable, service errors, etc.), handle gracefully
logger.V(1).Info("Error during drift detection, will retry", "Monitor ID", instance.Status.ID, "error", errorMessage)
status.MonitorStateSyncStatus = datadoghqv1alpha1.MonitorStateSyncStatusGetError
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("API error during drift detection for monitor ID %d: %s", instance.Status.ID, errorMessage))
return false, fmt.Errorf("error during drift detection, will retry: %w", err)
}

// Monitor exists, no drift detected - clear any previous error conditions
now := metav1.Now()
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionFalse, "")
return false, nil
}

func (r *Reconciler) update(logger logr.Logger, datadogMonitor *datadoghqv1alpha1.DatadogMonitor, status *datadoghqv1alpha1.DatadogMonitorStatus, now metav1.Time, instanceSpecHash string) error {
// Validate monitor in Datadog
if err := validateMonitor(r.datadogAuth, logger, r.datadogClient, datadogMonitor); err != nil {
Expand Down Expand Up @@ -414,3 +516,96 @@ func isSupportedMonitorType(monitorType datadoghqv1alpha1.DatadogMonitorType) bo
func isTriggered(groupStatus string) bool {
return groupStatus == string(datadoghqv1alpha1.DatadogMonitorStateAlert) || groupStatus == string(datadoghqv1alpha1.DatadogMonitorStateWarn) || groupStatus == string(datadoghqv1alpha1.DatadogMonitorStateNoData)
}

// handleMonitorRecreation manages the recreation of a deleted monitor
func (r *Reconciler) handleMonitorRecreation(ctx context.Context, logger logr.Logger, instance *datadoghqv1alpha1.DatadogMonitor, status *datadoghqv1alpha1.DatadogMonitorStatus, now metav1.Time, instanceSpecHash string) error {
logger.Info("Starting monitor recreation", "Monitor ID", instance.Status.ID, "Monitor Name", instance.Spec.Name)

// Store the old monitor ID for logging and error recovery
oldMonitorID := instance.Status.ID

// Check if the resource was deleted during processing
if ctx.Err() != nil {
logger.V(1).Info("Context cancelled during recreation, aborting", "Monitor ID", oldMonitorID)
return ctx.Err()
}

// Validate the monitor spec before attempting recreation
if err := datadoghqv1alpha1.IsValidDatadogMonitor(&instance.Spec); err != nil {
logger.Error(err, "Invalid monitor spec, cannot recreate", "Monitor ID", oldMonitorID)
// Don't attempt recreation for validation errors
return fmt.Errorf("validation error prevents recreation: %w", err)
}

// Implement optimistic locking by checking resource version hasn't changed
// This helps prevent conflicts during concurrent operations
originalResourceVersion := instance.ResourceVersion

// Reset the monitor ID to trigger creation logic
status.ID = 0

// Use the internal create method with recreation flag
err := r.createInternal(logger, instance, status, now, instanceSpecHash, true)
if err != nil {
// Restore original ID on error to maintain state consistency
status.ID = oldMonitorID

// Check if this is a conflict error (resource was modified concurrently)
if strings.Contains(err.Error(), "conflict") || strings.Contains(err.Error(), "resource version") {
logger.V(1).Info("Concurrent modification detected during recreation, will retry", "Monitor ID", oldMonitorID, "ResourceVersion", originalResourceVersion)
now := metav1.Now()
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Concurrent modification detected during recreation of monitor ID %d: resource version conflict", oldMonitorID))
return fmt.Errorf("concurrent modification during recreation, will retry: %w", err)
}

// Categorize and handle different types of creation errors with detailed status reporting
errorMessage := err.Error()
now := metav1.Now()

if strings.Contains(errorMessage, "rate limit") || strings.Contains(errorMessage, "429") {
logger.V(1).Info("Rate limit during recreation, will retry", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Rate limit during recreation of monitor ID %d: API rate limit exceeded, will retry", oldMonitorID))
return fmt.Errorf("rate limit during recreation, will retry: %w", err)
}

if strings.Contains(errorMessage, "unauthorized") || strings.Contains(errorMessage, "401") {
logger.Error(err, "Authentication error during recreation", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Authentication error during recreation of monitor ID %d: credentials are invalid or expired", oldMonitorID))
return fmt.Errorf("authentication error during recreation: %w", err)
}

if strings.Contains(errorMessage, "forbidden") || strings.Contains(errorMessage, "403") {
logger.Error(err, "Authorization error during recreation", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Authorization error during recreation of monitor ID %d: insufficient permissions to create monitors", oldMonitorID))
return fmt.Errorf("authorization error during recreation: %w", err)
}

if strings.Contains(errorMessage, "validation") || strings.Contains(errorMessage, "400") {
logger.Error(err, "Validation error during recreation", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Validation error during recreation of monitor ID %d: monitor configuration is invalid", oldMonitorID))
return fmt.Errorf("validation error during recreation: %w", err)
}

if strings.Contains(errorMessage, "timeout") || strings.Contains(errorMessage, "context deadline exceeded") {
logger.V(1).Info("Timeout during recreation, will retry", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Timeout during recreation of monitor ID %d: API request timed out, will retry", oldMonitorID))
return fmt.Errorf("timeout during recreation, will retry: %w", err)
}

// Generic error handling for other API errors
logger.Error(err, "Failed to recreate monitor", "Old Monitor ID", oldMonitorID)
condition.UpdateDatadogMonitorConditions(status, now, datadoghqv1alpha1.DatadogMonitorConditionTypeError, corev1.ConditionTrue, fmt.Sprintf("Failed to recreate monitor ID %d: %s", oldMonitorID, errorMessage))
return fmt.Errorf("failed to recreate monitor: %w", err)
}

// Check for context cancellation after recreation but before finalizing status
if ctx.Err() != nil {
// Restore original ID since the operation was cancelled
status.ID = oldMonitorID
logger.V(1).Info("Context cancelled after recreation, operation may be incomplete", "Old Monitor ID", oldMonitorID)
return ctx.Err()
}

logger.Info("Successfully recreated monitor", "Old Monitor ID", oldMonitorID, "New Monitor ID", status.ID)
return nil
}
Loading
Loading