diff --git a/datadog_sync/utils/resource_utils.py b/datadog_sync/utils/resource_utils.py index ccbbfa1f..5f8d61de 100644 --- a/datadog_sync/utils/resource_utils.py +++ b/datadog_sync/utils/resource_utils.py @@ -7,7 +7,7 @@ import re import logging from copy import deepcopy -from graphlib import TopologicalSorter +from graphlib import TopologicalSorter, CycleError from dateutil.parser import parse from deepdiff import DeepDiff @@ -256,3 +256,56 @@ def init_topological_sorter( sorter = TopologicalSorter(graph) sorter.prepare() return sorter + + +def detect_circular_dependencies(graph: Dict[Tuple[str, str], Set[Tuple[str, str]]]) -> Optional[List[Tuple[str, str]]]: + """Detect circular dependencies in a dependency graph. + + Args: + graph: Dependency graph to check + + Returns: + None if no cycles, or a list showing one cycle path if found + + Example: + >>> graph = {("monitors", "A"): {("monitors", "B")}, ("monitors", "B"): {("monitors", "A")}} + >>> detect_circular_dependencies(graph) + [("monitors", "A"), ("monitors", "B"), ("monitors", "A")] + """ + try: + sorter = TopologicalSorter(graph) + sorter.prepare() + # If prepare() succeeds, no cycles + return None + except CycleError: + # Cycle detected - try to find it using DFS + visited = set() + path = [] + + def find_cycle(node): + if node in path: + # Found cycle! + cycle_start = path.index(node) + return path[cycle_start:] + [node] + + if node in visited: + return None + + visited.add(node) + path.append(node) + + for neighbor in graph.get(node, []): + cycle = find_cycle(neighbor) + if cycle: + return cycle + + path.pop() + return None + + for node in graph: + if node not in visited: + cycle = find_cycle(node) + if cycle: + return cycle + + return None diff --git a/datadog_sync/utils/resources_handler.py b/datadog_sync/utils/resources_handler.py index d4d10ae5..b4a56010 100644 --- a/datadog_sync/utils/resources_handler.py +++ b/datadog_sync/utils/resources_handler.py @@ -36,6 +36,7 @@ class ResourcesHandler: def __init__(self, config: Configuration) -> None: self.config = config self.sorter: Optional[TopologicalSorter] = None + self.cleanup_sorter: Optional[TopologicalSorter] = None self.worker: Optional[Workers] = None self._dependency_graph = Optional[Dict[Tuple[str, str], List[Tuple[str, str]]]] @@ -90,12 +91,63 @@ async def apply_resources(self) -> Tuple[int, int]: if cleanup_resources: cleanup = _cleanup_prompt(self.config, cleanup_resources) if cleanup: - self.config.logger.info("cleaning up resources...") - await self.worker.init_workers(self._cleanup_worker, None, None) - for i in cleanup_resources: - self.worker.work_queue.put_nowait(i) - await self.worker.schedule_workers() - self.config.logger.info("finished cleaning up resources") + self.config.logger.info("cleaning up resources with dependency ordering...") + + # Build reverse dependency graph for ordered cleanup + try: + cleanup_graph = self.get_cleanup_dependency_graph(cleanup_resources) + self.config.logger.info(f"Built cleanup dependency graph with {len(cleanup_graph)} resources") + + # Detect circular dependencies + from datadog_sync.utils.resource_utils import detect_circular_dependencies + + cycle = detect_circular_dependencies(cleanup_graph) + if cycle: + # Circular dependency detected! + cycle_str = " -> ".join([f"{rt}:{rid}" for rt, rid in cycle]) + self.config.logger.error(f"Circular dependency detected in cleanup graph: {cycle_str}") + self.config.logger.error( + "Cannot safely delete resources. Please manually break circular references first." + ) + raise ValueError( + f"Circular dependency in cleanup graph: {cycle_str}. " + "Manual intervention required to break the cycle." + ) + + # Initialize cleanup sorter + self.cleanup_sorter = init_topological_sorter(cleanup_graph) + + # Initialize workers with cleanup sorter stop condition + await self.worker.init_workers( + self._cleanup_worker, lambda: not self.cleanup_sorter.is_active(), None + ) + + # Run cleanup with ordering + if self.config.show_progress_bar: + await self.worker.schedule_workers_with_pbar( + total=len(cleanup_graph), additional_coros=[self.run_cleanup_sorter()] + ) + else: + await self.worker.schedule_workers(additional_coros=[self.run_cleanup_sorter()]) + + self.config.logger.info("finished cleaning up resources") + + except ValueError: + # Circular dependency - already logged, re-raise + raise + except Exception as e: + # Unexpected error building or running cleanup graph + self.config.logger.error(f"Error during ordered cleanup: {str(e)}") + self.config.logger.warning( + "Falling back to unordered cleanup (may fail due to dependency issues)" + ) + + # Fallback to old unordered behavior + await self.worker.init_workers(self._cleanup_worker, None, None) + for i in cleanup_resources: + self.worker.work_queue.put_nowait(i) + await self.worker.schedule_workers() + self.config.logger.info("finished cleaning up resources (unordered fallback)") # Run pre-apply hooks resource_types = set(i[0] for i in self._dependency_graph) @@ -349,6 +401,10 @@ async def _cleanup_worker(self, q_item: List) -> None: await r_class._send_action_metrics("delete", _id, Status.FAILURE.value) self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}") finally: + # Mark as done in cleanup sorter if it exists + if hasattr(self, "cleanup_sorter") and self.cleanup_sorter: + self.cleanup_sorter.done(q_item) + if not r_class.resource_config.concurrent: r_class.resource_config.async_lock.release() @@ -370,6 +426,30 @@ async def run_sorter(self): await self.worker.work_queue.put(node) await asyncio.sleep(0) + async def run_cleanup_sorter(self): + """Mirror of run_sorter() but for cleanup operations. + + Continuously feeds deletion-ready resources to workers in proper order. + Resources are only deleted after all their dependents are deleted. + """ + loop = asyncio.get_event_loop() + + while await loop.run_in_executor(None, self.cleanup_sorter.is_active): + for node in self.cleanup_sorter.get_ready(): + resource_type, _id = node + + # Verify resource still exists in destination + if _id not in self.config.state.destination[resource_type]: + # Already deleted or doesn't exist + self.config.logger.debug(f"Resource {resource_type}:{_id} already deleted, marking as done") + self.cleanup_sorter.done(node) + continue + + # Add to work queue for deletion + await self.worker.work_queue.put(node) + + await asyncio.sleep(0) + def get_dependency_graph(self) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]: """Build the dependency graph for all resources. @@ -428,6 +508,160 @@ def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple return failed_connections, missing_resources + def get_cleanup_dependency_graph( + self, cleanup_resources: Dict[Tuple[str, str], str | None] + ) -> Dict[Tuple[str, str], Set[Tuple[str, str]]]: + """Build REVERSE dependency graph for cleanup. + + For deletion, we need to delete dependents BEFORE dependencies. + This inverts the normal creation graph. + + Args: + cleanup_resources: Resources to be deleted from destination + + Returns: + Dict mapping (resource_type, _id) to set of resources that depend on it + + Example: + If Dashboard depends on Monitor: + Creation graph: {("dashboards", "dash-1"): [("monitors", "mon-1")]} + Cleanup graph: {("monitors", "mon-1"): {("dashboards", "dash-1")}} + + This means: Monitor "mon-1" can only be deleted AFTER Dashboard "dash-1" + """ + reverse_graph = defaultdict(set) + + # Initialize all cleanup resources with empty dependencies + for resource_key in cleanup_resources.keys(): + reverse_graph[resource_key] = set() + + # Build reverse dependencies by scanning destination state + for resource_type, _id in cleanup_resources.keys(): + if resource_type not in self.config.resources: + continue + + r_config = self.config.resources[resource_type].resource_config + if not r_config.resource_connections: + continue + + # Get the actual resource from destination state (already in memory) + if _id not in self.config.state.destination[resource_type]: + self.config.logger.debug( + f"Resource {resource_type}:{_id} not in destination state, skipping dependency analysis" + ) + continue + + resource = self.config.state.destination[resource_type][_id] + + # For each dependency this resource has + for dep_type, attr_paths in r_config.resource_connections.items(): + for attr_path in attr_paths: + # Find dependency IDs in the resource + dep_ids = self._extract_dependency_ids(resource, attr_path, dep_type) + + # For each dependency, add THIS resource as a dependent + for dep_id in dep_ids: + dep_key = (dep_type, dep_id) + # Only include if the dependency is also being cleaned up + if dep_key in cleanup_resources: + # This says: "dep_key must be deleted AFTER (resource_type, _id)" + reverse_graph[dep_key].add((resource_type, _id)) + self.config.logger.debug(f"Dependency edge: {dep_type}:{dep_id} <- {resource_type}:{_id}") + + return dict(reverse_graph) + + def _extract_dependency_ids(self, resource: Dict, attr_path: str, dep_type: str) -> Set[str]: + """Extract dependency IDs from a resource at the given attribute path. + + Args: + resource: The resource dict + attr_path: Dot-separated path like "widgets.definition.alert_id" + dep_type: Type of dependency (for context) + + Returns: + Set of dependency IDs found + """ + ids = set() + + def extract_from_obj(obj, path_parts): + if not obj or not path_parts: + return + + current_key = path_parts[0] + remaining = path_parts[1:] + + if isinstance(obj, list): + for item in obj: + extract_from_obj(item, path_parts) + elif isinstance(obj, dict): + if current_key in obj: + if not remaining: + # We're at the target attribute + value = obj[current_key] + if value: + # Handle both lists and single values + values = [value] if not isinstance(value, list) else value + for v in values: + if v: + # Parse prefixed IDs (e.g., "dashboard:abc-123" → "abc-123") + parsed_id = self._parse_prefixed_id(str(v), dep_type) + ids.add(parsed_id) + else: + # Keep traversing + extract_from_obj(obj[current_key], remaining) + + path_parts = attr_path.split(".") + extract_from_obj(resource, path_parts) + return ids + + def _parse_prefixed_id(self, value: str, expected_resource_type: str) -> str: + """Parse IDs that may have resource type prefixes. + + Restriction policies and some other resources use prefixed IDs like + "dashboard:abc-123" or "slo:xyz-789". This method strips the prefix + to get the actual resource ID that matches the state keys. + + Examples: + "dashboard:abc-123" with expected_resource_type="dashboards" → "abc-123" + "slo:xyz-789" with expected_resource_type="service_level_objectives" → "xyz-789" + "abc-123" with any type → "abc-123" (passthrough) + + Args: + value: The ID value, possibly prefixed + expected_resource_type: The resource type we expect (e.g., "dashboards") + + Returns: + The parsed ID without prefix + """ + # Check if value contains a colon (indicates prefix) + if ":" not in value: + return value + + # Split on first colon only + prefix, actual_id = value.split(":", 1) + + # Map prefix to resource type + prefix_to_resource_type = { + "dashboard": "dashboards", + "slo": "service_level_objectives", + "notebook": "notebooks", + "monitor": "monitors", + "user": "users", + "role": "roles", + "team": "teams", + "security-rule": "security_monitoring_rules", + } + + # If prefix matches expected resource type, return just the ID + if prefix_to_resource_type.get(prefix) == expected_resource_type: + return actual_id + + # If prefix doesn't match, return original value (might be a false positive) + self.config.logger.debug( + f"ID '{value}' has prefix '{prefix}' but expected resource type '{expected_resource_type}'" + ) + return value + def _cleanup_prompt( config: Configuration, resources_to_cleanup: Dict[Tuple[str, str], str | None], prompt: bool = True