diff --git a/README.md b/README.md index fe1b272..3fa6728 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ For example, consider the following use-cases: | [Job Cluster Update](./notebooks/update_job_cluster/README.md) | Use the Databricks API to mass-update Job and Task configs | | [Workflow Config Exporter](./notebooks/workflow_config_exporter/README.md) | Export existing workflow configuration and save it for future consumption | | [Workflow Schedule Semaphore](./notebooks/databricks_workflow_semaphore/README.md) | Pause/Unpause all workflows' schedules with a certain keyword | +| [Delta Table Staleness Monitor](./notebooks/delta_table_staleness_monitor/README.md) | Automatically identify, notify on, and clean up stale Delta Lake tables according to configurable business rules. | ## Discussions diff --git a/notebooks/delta_table_staleness_monitor/README.md b/notebooks/delta_table_staleness_monitor/README.md new file mode 100644 index 0000000..f0f5fb7 --- /dev/null +++ b/notebooks/delta_table_staleness_monitor/README.md @@ -0,0 +1,27 @@ +

Automated Staleness Monitoring for Delta Lake Tables 🕵️

+

+    Databricks +    Delta +    Slack +

+ +## Introduction + +This notebook provides an automated solution for monitoring and managing stale Delta Lake tables within a Databricks Unity Catalog environment. It systematically analyzes table history to identify assets that haven't had recent data-changing operations, helping to maintain a clean and cost-effective data lake. + +> This notebook is designed as a configurable utility. Before execution, users must define their own rules for staleness thresholds, table exclusion policies, and notification settings to tailor the tool to their specific environment. + +## Use Cases + +The Delta Lake Staleness Monitor is a helpful tool with the below use cases: + +1. **Cost Optimization**: Reduce cloud storage costs by automatically identifying and cleaning up unused or abandoned Delta tables. + +2. **Improved Data Governance**: Maintain a high-quality data environment by flagging stale assets that may contain outdated or irrelevant information. + +3. **Automated Housekeeping**: Save significant manual effort by creating a scheduled workflow that automatically detects and manages stale tables across your workspace. + +4. **Proactive Team Alerts**: Keep data teams informed by sending automatic Slack notifications about tables that require review, enabling them to take action before data becomes obsolete. + +--- +See more details in the notebook (ipynb) \ No newline at end of file diff --git a/notebooks/delta_table_staleness_monitor/delta_table_staleness_monitor.ipynb b/notebooks/delta_table_staleness_monitor/delta_table_staleness_monitor.ipynb new file mode 100644 index 0000000..914e7b0 --- /dev/null +++ b/notebooks/delta_table_staleness_monitor/delta_table_staleness_monitor.ipynb @@ -0,0 +1,919 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e6e53af0-18e0-46fc-89db-409c47bc55e5", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Delta Table Staleness Monitor\n", + "\n", + "## Overview\n", + "This notebook monitors Delta tables across your Databricks Unity Catalog environment to identify stale tables that may need attention. It uses `DESCRIBE HISTORY` to evaluate the freshness of tables and classifies them for action (e.g., warn or auto-delete) based on configurable thresholds and catalog-specific rules.\n", + "\n", + "## ⚙️ Configuration Guide\n", + "This notebook is designed to be highly configurable. Before running, you will need to customize several sections to match your environment's specific needs. This guide points you to the key areas that require your attention.\n", + "\n", + "**1. Skipping Specific Assets:**\n", + "You must define which catalogs, schemas, or tables should be ignored by the monitor. This is critical for excluding system tables, temporary schemas, or irrelevant data.\n", + "\n", + "**2. Defining Staleness Thresholds:**\n", + "The core logic of this tool depends on rules that define when a table is considered \"stale.\" You can set different time thresholds (in days) for different catalogs and schemas.\n", + "\n", + "**3. Customizing Actions (Warn vs. Auto-Delete):**\n", + "Based on the staleness rules, the script takes actions like sending a warning or automatically deleting a table. You must configure which action applies to which tables.\n", + "\n", + "**4. Setting Up Notifications:**\n", + "The notebook is pre-configured to send alerts to a Slack channel. You will need to add your `SLACK_WEBHOOK_URL` to databricks secrets.\n", + " \n", + " \n", + " \n", + "## Core Functionality\n", + "- Iterates through all eligible tables using the Databricks SDK\n", + "- Extracts metadata from `DESCRIBE HISTORY`, such as creation date, last data update, and last modified by\n", + "- Applies staleness thresholds based on catalog and schema pre-defined rules\n", + "- Flags tables with no recent data changes\n", + "- Outputs Slack alerts grouped by catalog, summarizing stale tables\n", + "- Autodeletes stale tables, if configured to do so\n", + "\n", + "## Data Sources and Output\n", + "| Name | Type | Source/Destination | Description |\n", + "|------|------|--------------------|-------------|\n", + "| `table_metadata` | Input | Databricks SDK | List of all catalog.schema.table entries |\n", + "| `history_df` | Input | `DESCRIBE HISTORY` | History logs per Delta table |\n", + "| `stale_df` | Output | In-memory DataFrame | Table metadata with staleness status and actions |\n", + "| `Slack Alerts` | Output | Slack channel | Grouped alerts per catalog with stale table summary |\n", + "\n", + "## Recommended Actions\n", + "| Action | Description |\n", + "|--------|-------------|\n", + "| AUTO DELETE | Table is in the `developer` catalog and has not had recent data changes; eligible for automatic deletion |\n", + "| WARN | Table has exceeded the staleness threshold and should be reviewed for potential cleanup |\n", + "| NONE | Table is either fresh or does not meet the criteria for a warning or deletion |\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1e4c963c-314b-4c57-83da-08bd0dac72cf", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "!pip install loguru -q" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d34d1c0f-608c-4ae7-b64d-e57243eaeaf2", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Imports" + } + }, + "outputs": [], + "source": [ + "from loguru import logger\n", + "import os\n", + "import pandas as pd\n", + "import re\n", + "from databricks.sdk import WorkspaceClient\n", + "from dateutil.relativedelta import relativedelta\n", + "import requests" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f0971d04-87e0-458f-afca-53b5f6afc06c", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Constant" + } + }, + "outputs": [], + "source": [ + "NOW = pd.Timestamp.now()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "490a1ede-593f-45a9-b7f7-aac596de7f32", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### 1. Constants for Skipping\n", + "This section defines which assets (catalogs, schemas, table types, and tables matching a specific name pattern) should be **excluded** from the staleness check.\n", + "\n", + "**ACTION REQUIRED:** Modify the Python sets below to fit your Databricks environment. For example, you may want to skip catalogs used for sandboxing, schemas containing only views, or tables created by specific automated processes.\n", + "\n", + "- `SKIP_CATALOGS`: Add any catalogs you wish to ignore entirely. The `system` catalog is included by default.\n", + "- `SKIP_SCHEMAS`: Add any schemas you wish to ignore. `information_schema` is a common one to exclude.\n", + "- `SKIP_TABLE_TYPES`: By default, `VIEW` and `MATERIALIZED_VIEW` are skipped as they don't contain their own data. You can add other table types if needed.\n", + "- `SKIP_TABLE_PATTERN`: Use this for more complex rules. It allows you to skip tables in a specific catalog and schema that match a regular expression.\n", + "- `SKIP_SECURABLE_KINDS`: Skips catalogs based on their metastore type, such as a PostgreSQL-linked catalog." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "708383d9-9c88-436b-ad96-6c9906168022", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Constants for Skipping" + } + }, + "outputs": [], + "source": [ + "SKIP_CATALOGS = {\"system\"}\n", + "SKIP_SCHEMAS = {\"information_schema\"}\n", + "SKIP_TABLE_TYPES = {\"VIEW\", \"MATERIALIZED_VIEW\"}\n", + "SKIP_TABLE_PATTERN = {\n", + " (\"forge\", \"restaurants_universe\"): r\".*_[a-z]{2}$\"\n", + "}\n", + "SKIP_SECURABLE_KINDS = {\"POSTGRES\"}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "89e93b7a-ec42-49b0-8730-bc4986f1ec72", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### 2. Stale Threshold Rules\n", + "This function defines the business logic for what makes a table \"stale.\" The rules are based on the number of days since a table's data was last changed.\n", + "\n", + "**ACTION REQUIRED:** Modify the `thresholds` dictionary inside the `get_stale_threshold` function to define your organization's staleness policies.\n", + "\n", + "The rules are applied with the following priority:\n", + "1. A specific `(catalog, schema)` pair.\n", + "2. A general `(catalog, None)` rule that applies to all schemas within that catalog.\n", + "3. A default value if no other rule matches.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0cf5e546-b34f-4dee-afad-1e1151689663", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Stale Threshold Rules" + } + }, + "outputs": [], + "source": [ + "def get_stale_threshold(catalog, schema):\n", + " \"\"\"\n", + " Determines the number of days after which a table is considered stale, based on catalog and schema.\n", + "\n", + " Rules:\n", + " - developer.* → 120 days (auto-delete)\n", + " - frontiers.* → 90 days (Slack warning)\n", + " - web.* → 45 days (Slack warning)\n", + " - cargo.samples → 90 days (Slack warning)\n", + " - All other tables → 45 days (Slack warning)\n", + "\n", + " Parameters:\n", + " catalog_name (str): The catalog name (e.g., 'web', 'developer')\n", + " schema_name (str): The schema name within the catalog (e.g., 'samples')\n", + "\n", + " Returns:\n", + " int: Number of days to consider a table stale\n", + " \"\"\"\n", + " default_threshold = 45\n", + " thresholds = {\n", + " (\"developer\", None): 120,\n", + " (\"frontiers\", None): 90,\n", + " (\"web\", None): 45,\n", + " (\"cargo\", \"samples\"): 90\n", + " }\n", + " return thresholds.get((catalog, schema), thresholds.get((catalog, None), default_threshold))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9fe35aaa-ea9e-424f-9a0e-ae49b6056c73", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### 3. Action Logic\n", + "This section controls which action is taken for a stale table. The configuration is based on simple sets, making it easy to define which catalogs are targeted for which action.\n", + "\n", + "**ACTION REQUIRED:**\n", + "* **`AUTO_DELETE_CATALOGS`**: Add the names of any catalogs where stale tables should be automatically dropped. **Use extreme caution with this list.**\n", + "\n", + "The logic in `get_action_for_table` uses this set to assign the final action. If stale, not skipped and not in `AUTO_DELETE_CATALOGS`, it will default to **WARN**." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "68ddf843-008f-452e-a7b4-f96c22c0d661", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "AUTO_DELETE_CATALOGS = {\"developer\"}\n", + "\n", + "def get_action_for_table(is_stale, table_info):\n", + " \"\"\"\n", + " Determines the action to take based on table properties and staleness.\n", + "\n", + " Args:\n", + " is_stale (bool): Whether the table has been flagged as stale.\n", + " table_info (dict): A dictionary containing table metadata \n", + " (e.g., {\"catalog\": \"dev\", \"schema\": \"temp\", ...}).\n", + "\n", + " Returns:\n", + " str or None: The action to take (\"AUTO DELETE\", \"WARN\") or None.\n", + " \"\"\"\n", + " if not is_stale:\n", + " return None\n", + " \n", + " catalog = table_info.get(\"catalog\")\n", + " if catalog in AUTO_DELETE_CATALOGS:\n", + " return \"AUTO DELETE\"\n", + " \n", + " # Default action for any other stale table\n", + " return \"WARN\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d1befac0-486f-4f45-9dfc-667bdbb8707c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### 4. Slack Warning Config\n", + "This section sets the function that sends warnings for stale tables to a designated Slack channel.\n", + "\n", + "**ACTION REQUIRED:**\n", + "1. **Get a Slack Webhook URL**\n", + "2. **Set Enviroment Variables:** Update the `SLACK_WEBHOOK_URL` secret variable for the compute." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1ada3532-7dd0-45f7-91f5-6a3dd6039a0d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "SLACK_WEBHOOK_URL = os.getenv(\"SLACK_WEBHOOK_URL\")\n", + "SLACK_CHANNEL = \"slack-api-test\"\n", + "\n", + "def post_to_slack(payload: dict) -> bool:\n", + " \"\"\"Posts a payload to a Slack incoming webhook.\n", + "\n", + " This function sends a JSON payload to the `SLACK_WEBHOOK_URL`. It handles\n", + " the request and logs errors if the message fails to send.\n", + "\n", + " Args:\n", + " payload (dict): The message payload, typically containing a \"blocks\"\n", + " or \"text\" key, formatted for the Slack API.\n", + "\n", + " Returns:\n", + " bool: True if the message was posted successfully (HTTP 200), False otherwise.\n", + " \"\"\"\n", + " if not SLACK_WEBHOOK_URL:\n", + " logger.error(\"SLACK_WEBHOOK_URL environment variable is not set. Cannot post message.\")\n", + " return False\n", + "\n", + " try:\n", + " # Send the POST request to the Slack webhook URL\n", + " response = requests.post(SLACK_WEBHOOK_URL, json=payload)\n", + "\n", + " # Raise an exception if the request returned an unsuccessful status code (4xx or 5xx)\n", + " response.raise_for_status()\n", + "\n", + " return True\n", + " except requests.exceptions.RequestException as e:\n", + " # Log any network-related errors or unsuccessful status codes\n", + " logger.error(f\"Error posting message to Slack: {e}\")\n", + " return False\n", + " except Exception as e:\n", + " # Catch any other unexpected errors\n", + " logger.error(f\"An unexpected error occurred: {e}\")\n", + " return False" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "27280d7f-4d5b-4b4a-8446-8c9c51d8c52f", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### Table Staleness Detection Logic" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d67681f4-fbda-484a-b11f-85b841ee3732", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Operation Categories" + } + }, + "outputs": [], + "source": [ + "DATA_CHANGING_OPS = {\n", + " \"WRITE\", \"UPDATE\", \"DELETE\", \"INSERT\", \"MERGE\", \"COPY INTO\",\n", + " \"RESTORE\", \"CREATE TABLE AS SELECT\", \"CREATE OR REPLACE TABLE AS SELECT\",\n", + " \"REPLACE TABLE AS SELECT\", \"STREAMING UPDATE\", \"ROW TRACKING BACKFILL\"\n", + "}\n", + "\n", + "NON_DATA_CHANGING_OPS = {\n", + " \"CREATE\", \"ALTER\", \"DROP\", \"OPTIMIZE\", \"VACUUM START\",\n", + " \"VACUUM END\", \"SET TBLPROPERTIES\", \"UPGRADE PROTOCOL\"\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bddac2c1-70da-4bec-827c-e53499b66bc2", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Should the Table be Skipped Over Based on Constants" + } + }, + "outputs": [], + "source": [ + "def should_skip_table(catalog, schema, table):\n", + " \"\"\"\n", + " Determines if a table should be skipped based on catalog, schema, type, or name pattern.\n", + "\n", + " Args:\n", + " catalog: Catalog Object\n", + " schema: Schema Object\n", + " table: Table Object\n", + "\n", + " Returns:\n", + " bool: True if the table should be excluded from processing\n", + " \"\"\"\n", + " catalog_name = catalog.name\n", + " schema_name = schema.name\n", + " table_name = table.name\n", + " table_type = table.table_type.value.upper() if table.table_type else None\n", + " catalog_securable_kind = catalog.securable_kind.value.upper() if catalog.securable_kind else None\n", + "\n", + " if any(skip_kind in (catalog_securable_kind or '') for skip_kind in SKIP_SECURABLE_KINDS):\n", + " logger.info(f\"Skipping table: `{catalog_name}.{schema_name}.{table.name}` because the catalog has the securable kind:{catalog_securable_kind}.\")\n", + " return True\n", + " if catalog_name in SKIP_CATALOGS:\n", + " logger.info(f\"Skipping table: `{catalog_name}.{schema_name}.{table.name}` because of the catalog.\")\n", + " return True\n", + " if schema_name in SKIP_SCHEMAS:\n", + " logger.info(f\"Skipping table: `{catalog_name}.{schema_name}.{table.name}` because of the schema.\")\n", + " return True\n", + " if table_type and table_type in SKIP_TABLE_TYPES:\n", + " logger.info(f\"Skipping table: `{catalog_name}.{schema_name}.{table.name}` because of the table type ({table_type}).\")\n", + " return True\n", + " if (catalog_name, schema_name) in SKIP_TABLE_PATTERN:\n", + " logger.info(f\"Skipping table: `{catalog_name}.{schema_name}.{table.name}` because of the regex pattern in `{catalog_name}.{schema_name}` table.\")\n", + " pattern = SKIP_TABLE_PATTERN[(catalog_name, schema_name)]\n", + " return re.match(pattern, table_name) is not None\n", + " \n", + " return False" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8c6c6eb3-9e3b-4dc6-bbb3-7e88cb517327", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Format Staleness Time as Descriptive String" + } + }, + "outputs": [], + "source": [ + "def format_age(from_time, to_time=NOW):\n", + " \"\"\"\n", + " Converts a timestamp difference into a human-readable string \n", + " like '2 months, 5 days' or '0 days' if recent.\n", + "\n", + " Parameters:\n", + " from_time (pd.Timestamp): The earlier timestamp.\n", + " to_time (pd.Timestamp): The later timestamp (default: current time).\n", + "\n", + " Returns:\n", + " str: Human-readable duration between timestamps.\n", + " \"\"\"\n", + " delta = relativedelta(to_time, from_time)\n", + " parts = []\n", + " if delta.years:\n", + " parts.append(f\"{delta.years} year{'s' if delta.years > 1 else ''}\")\n", + " if delta.months:\n", + " parts.append(f\"{delta.months} month{'s' if delta.months > 1 else ''}\")\n", + " if delta.days:\n", + " parts.append(f\"{delta.days} day{'s' if delta.days > 1 else ''}\")\n", + " return \", \".join(parts) if parts else \"0 days\"" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0776e196-0caf-4883-897a-e944f2a77b2b", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Categorize Staleness" + } + }, + "outputs": [], + "source": [ + "def categorize_staleness(from_time, to_time=NOW):\n", + " \"\"\"\n", + " Categorizes how old a timestamp is relative to a reference point, \n", + " returning a label like 'Years', 'Months', 'Days', or 'Recent'.\n", + "\n", + " Parameters:\n", + " from_time (pd.Timestamp): The earlier timestamp.\n", + " to_time (pd.Timestamp): The later timestamp (default: current time).\n", + "\n", + " Returns:\n", + " str: A staleness category label.\n", + " - 'Years' for 365+ days old\n", + " - 'Months' for 30–364 days old\n", + " - 'Days' for 1–29 days old\n", + " - 'Recent' for 0 days\n", + " \"\"\"\n", + " days_old = (to_time - from_time).days\n", + " thresholds = {\n", + " \"Years\": 365,\n", + " \"Months\": 30,\n", + " \"Days\": 1,\n", + " \"Recent\": 0\n", + " }\n", + " \n", + " # Evaluate thresholds in descending order of staleness\n", + " for label, threshold in thresholds.items():\n", + " if days_old >= threshold:\n", + " return label\n", + " return \"Unknown\"" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f3039875-3e9e-49fb-b75c-bc08a8956d2a", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Main metadeta collection with SDK" + } + }, + "outputs": [], + "source": [ + "# Initialize Databricks SDK workspace client\n", + "w = WorkspaceClient()\n", + "\n", + "# This list will collect metadata about all eligible tables\n", + "table_metadata = []\n", + "\n", + "# Loop through all catalogs returned by the SDK\n", + "for catalog in w.catalogs.list():\n", + " catalog_name = catalog.name\n", + " for schema in w.schemas.list(catalog_name):\n", + " schema_name = schema.name\n", + " for table in w.tables.list(catalog_name, schema_name):\n", + " # Safely get the table type (e.g., MANAGED, VIEW)\n", + " table_type = table.table_type.value if table.table_type else None\n", + "\n", + " # Apply custom logic to skip specific tables\n", + " if should_skip_table(catalog, schema, table):\n", + " continue\n", + "\n", + " # Store the metadata of tables that passed all filters\n", + " table_metadata.append({\n", + " \"catalog\": catalog_name,\n", + " \"schema\": schema_name,\n", + " \"table\": table.name,\n", + " \"full_path\": f\"{catalog_name}.{schema_name}.{table.name}\", # Used for DESCRIBE HISTORY\n", + " \"table_type\": table_type,\n", + " \"comment\": table.comment\n", + " })" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "73b1a811-b715-44ed-884c-dbc4d939c8a1", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Form final df" + } + }, + "outputs": [], + "source": [ + "def parse_history_df(history_df, table_info):\n", + " \"\"\"\n", + " Parses the output of DESCRIBE HISTORY for a Delta table to extract key metadata.\n", + "\n", + " This function analyzes the Delta table's operation history to:\n", + " - Identify when the table was created and by whom.\n", + " - Identify the most recent operation (e.g., INSERT, UPDATE, OPTIMIZE).\n", + " - Identify the most recent *data-changing* operation (e.g., WRITE, DELETE, MERGE).\n", + " - Calculate how long it has been since the last data change.\n", + " - Determine if the table is considered stale based on catalog/schema-specific thresholds.\n", + " - Return a dictionary of relevant fields for tracking table freshness and usage.\n", + "\n", + " Returns `None` if there are no data-changing operations, since such tables are not relevant for staleness tracking.\n", + " \n", + " Parameters:\n", + " - history_df (pd.DataFrame): Output of `DESCRIBE HISTORY ` converted to a pandas DataFrame.\n", + "\n", + " Returns:\n", + " - dict or None: Parsed metadata about the table, or None if no data-changing operations are found.\n", + " \"\"\" \n", + " # The last row in DESCRIBE HISTORY is the table's creation event\n", + " created_row = history_df.iloc[-1]\n", + " # The first row is the most recent operation (update, insert, etc.)\n", + " last_updated_row = history_df.iloc[0]\n", + "\n", + " # Filter history to only include data-changing operations\n", + " write_ops = history_df[history_df[\"operation\"].isin(DATA_CHANGING_OPS)]\n", + " # If present, pick the most recent data-changing operation\n", + " last_data_row = write_ops.iloc[0] if not write_ops.empty else None\n", + "\n", + " # Extract creation timestamp and user\n", + " created_at = created_row[\"timestamp\"]\n", + " created_by = created_row[\"userName\"]\n", + "\n", + " # Extract last operation timestamp and user (regardless of whether it changed data)\n", + " last_updated_at = last_updated_row[\"timestamp\"]\n", + " last_updated_by = last_updated_row[\"userName\"]\n", + "\n", + " # Extract most recent data-changing operation info (if any)\n", + " last_data_change = last_data_row[\"timestamp\"] if last_data_row is not None else None\n", + " last_data_changed_by = last_data_row[\"userName\"] if last_data_row is not None else None\n", + "\n", + " # If no data-changing op exists, skip this table (maybe it's only been ALTER-ed or OPTIMIZE-ed)\n", + " if last_data_change is None:\n", + " logger.warning(f\"No data-changing ops found in: {table_path}\")\n", + " return None\n", + "\n", + " # Calculate how many days it's been since the last data change\n", + " days_outdated = (NOW - last_data_change).days\n", + "\n", + " # Determine if the table is considered stale based on catalog+schema rules\n", + " is_stale = days_outdated > get_stale_threshold(table_info['catalog'], table_info['schema'])\n", + "\n", + " return {\n", + " \"table_path\": table_path,\n", + " \"created_at\": created_at.date(),\n", + " \"created_by\": created_by,\n", + " \"last_updated_at\": last_updated_at.date(),\n", + " \"last_updated_by\": last_updated_by,\n", + " \"last_data_change\": last_data_change.date(),\n", + " \"last_data_changed_by\": last_data_changed_by,\n", + " \"days_outdated\": days_outdated,\n", + " \"is_stale\": is_stale,\n", + " \"outdated_for\": format_age(last_data_change),\n", + " \"staleness_category\": categorize_staleness(last_data_change),\n", + " \"action\": get_action_for_table(is_stale, table_info),\n", + " \"comment\": table_info[\"comment\"]\n", + " }\n", + "\n", + "# Iterate over each valid table we've collected from the SDK\n", + "records = []\n", + "for table_info in table_metadata[:5]:\n", + " table_path = table_info[\"full_path\"] # e.g., catalog.schema.table\n", + " logger.info(f\"Analyzing table: {table_path}\")\n", + "\n", + " history_df = spark.sql(f\"DESCRIBE HISTORY {table_path}\").toPandas()\n", + " \n", + " result = parse_history_df(history_df, table_info)\n", + " if not result:\n", + " continue\n", + "\n", + " # Append all relevant information into the records list\n", + " records.append(result)\n", + "\n", + "stale_df = pd.DataFrame(records)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9be0838d-1717-40b3-82fa-9411e4756838", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Delta Table Staleness Report" + } + }, + "outputs": [], + "source": [ + "display(stale_df.sort_values(by=\"days_outdated\", ascending=False))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "66c7ac16-5234-4747-8196-ee17ec898ba8", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "### Take Actions Based on Staleness\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1a8e2e20-38e0-43ef-8cbd-5772a29693d0", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Auto-Delete" + } + }, + "outputs": [], + "source": [ + "# Filter relevant rows\n", + "delete_records = stale_df[\n", + " (stale_df[\"is_stale\"]) & (stale_df[\"action\"] == \"AUTO DELETE\")\n", + "]\n", + "\n", + "for table_path in delete_records[\"table_path\"].values:\n", + " spark.sql(f\"DROP TABLE {table_path}\")\n", + " logger.info(f\"Dropped table: `{table_path}`\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bbc682bc-9165-4094-ab7b-7941d9c2ff43", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Send Warning in Slack" + } + }, + "outputs": [], + "source": [ + "# Extract catalog, schema, and table from table_path\n", + "stale_df[[\"catalog\", \"schema\", \"table\"]] = stale_df[\"table_path\"].str.split(\".\", expand=True)\n", + "\n", + "# Filter relevant rows\n", + "warn_records = stale_df[\n", + " (stale_df[\"is_stale\"]) & (stale_df[\"action\"] == \"WARN\")\n", + "]\n", + "\n", + "# Group by catalog\n", + "grouped = warn_records.groupby(\"catalog\")\n", + "\n", + "for catalog, group in grouped:\n", + " # Header message\n", + " header = f\"*Catalog: `{catalog}`* — These tables appear stale :warning:\\n\"\n", + "\n", + " # Build markdown table inside a code block\n", + " table_md = \"```\" + group[[\"schema\", \"table\", \"last_data_change\"]].to_markdown(index=False) + \"```\"\n", + " message_text = header + table_md\n", + "\n", + " # Slack block\n", + " message_payload = {\n", + " \"blocks\": [\n", + " {\n", + " \"type\": \"section\",\n", + " \"text\": {\n", + " \"type\": \"mrkdwn\",\n", + " \"text\": message_text\n", + " }\n", + " }\n", + " ]\n", + " }\n", + "\n", + " # Send to Slack\n", + " logger.info(f\"Attempting to post message for catalog: {catalog}\")\n", + " if post_to_slack(message_payload):\n", + " logger.info(f\"Successfully posted message for catalog: {catalog}\")\n", + " else:\n", + " logger.error(f\"Failed to post message for catalog: {catalog}\")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "2" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 7081826483846517, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 4 + }, + "notebookName": "delta_table_staleness_monitor (1)", + "widgets": {} + }, + "kernelspec": { + "display_name": "dotlas", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}