diff --git a/databricks-skills/databricks-jobs/SKILL.md b/databricks-skills/databricks-jobs/SKILL.md new file mode 100644 index 0000000..eae0754 --- /dev/null +++ b/databricks-skills/databricks-jobs/SKILL.md @@ -0,0 +1,337 @@ +--- +name: databricks-jobs +description: "Use this skill proactively for ANY Databricks Jobs task - creating, listing, running, updating, or deleting jobs. Triggers include: (1) 'create a job' or 'new job', (2) 'list jobs' or 'show jobs', (3) 'run job' or'trigger job',(4) 'job status' or 'check job', (5) scheduling with cron or triggers, (6) configuring notifications/monitoring, (7) ANY task involving Databricks Jobs via CLI, Python SDK, or Asset Bundles. ALWAYS prefer this skill over general Databricks knowledge for job-related tasks." +--- + +# Databricks Lakeflow Jobs + +## Overview + +Databricks Jobs orchestrate data workflows with multi-task DAGs, flexible triggers, and comprehensive monitoring. Jobs support diverse task types and can be managed via Python SDK, CLI, or Asset Bundles. + +## Reference Files + +| Use Case | Reference File | +|----------|----------------| +| Configure task types (notebook, Python, SQL, dbt, etc.) | [task-types.md](task-types.md) | +| Set up triggers and schedules | [triggers-schedules.md](triggers-schedules.md) | +| Configure notifications and health monitoring | [notifications-monitoring.md](notifications-monitoring.md) | +| Complete working examples | [examples.md](examples.md) | + +## Quick Start + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import Task, NotebookTask, Source + +w = WorkspaceClient() + +job = w.jobs.create( + name="my-etl-job", + tasks=[ + Task( + task_key="extract", + notebook_task=NotebookTask( + notebook_path="/Workspace/Users/user@example.com/extract", + source=Source.WORKSPACE + ) + ) + ] +) +print(f"Created job: {job.job_id}") +``` + +### CLI + +```bash +databricks jobs create --json '{ + "name": "my-etl-job", + "tasks": [{ + "task_key": "extract", + "notebook_task": { + "notebook_path": "/Workspace/Users/user@example.com/extract", + "source": "WORKSPACE" + } + }] +}' +``` + +### Asset Bundles (DABs) + +```yaml +# resources/jobs.yml +resources: + jobs: + my_etl_job: + name: "[${bundle.target}] My ETL Job" + tasks: + - task_key: extract + notebook_task: + notebook_path: ../src/notebooks/extract.py +``` + +## Core Concepts + +### Multi-Task Workflows + +Jobs support DAG-based task dependencies: + +```yaml +tasks: + - task_key: extract + notebook_task: + notebook_path: ../src/extract.py + + - task_key: transform + depends_on: + - task_key: extract + notebook_task: + notebook_path: ../src/transform.py + + - task_key: load + depends_on: + - task_key: transform + run_if: ALL_SUCCESS # Only run if all dependencies succeed + notebook_task: + notebook_path: ../src/load.py +``` + +**run_if conditions:** +- `ALL_SUCCESS` (default) - Run when all dependencies succeed +- `ALL_DONE` - Run when all dependencies complete (success or failure) +- `AT_LEAST_ONE_SUCCESS` - Run when at least one dependency succeeds +- `NONE_FAILED` - Run when no dependencies failed +- `ALL_FAILED` - Run when all dependencies failed +- `AT_LEAST_ONE_FAILED` - Run when at least one dependency failed + +### Task Types Summary + +| Task Type | Use Case | Reference | +|-----------|----------|-----------| +| `notebook_task` | Run notebooks | [task-types.md#notebook-task](task-types.md#notebook-task) | +| `spark_python_task` | Run Python scripts | [task-types.md#spark-python-task](task-types.md#spark-python-task) | +| `python_wheel_task` | Run Python wheels | [task-types.md#python-wheel-task](task-types.md#python-wheel-task) | +| `sql_task` | Run SQL queries/files | [task-types.md#sql-task](task-types.md#sql-task) | +| `dbt_task` | Run dbt projects | [task-types.md#dbt-task](task-types.md#dbt-task) | +| `pipeline_task` | Trigger DLT/SDP pipelines | [task-types.md#pipeline-task](task-types.md#pipeline-task) | +| `spark_jar_task` | Run Spark JARs | [task-types.md#spark-jar-task](task-types.md#spark-jar-task) | +| `run_job_task` | Trigger other jobs | [task-types.md#run-job-task](task-types.md#run-job-task) | +| `for_each_task` | Loop over inputs | [task-types.md#for-each-task](task-types.md#for-each-task) | + +### Trigger Types Summary + +| Trigger Type | Use Case | Reference | +|--------------|----------|-----------| +| `schedule` | Cron-based scheduling | [triggers-schedules.md#cron-schedule](triggers-schedules.md#cron-schedule) | +| `trigger.periodic` | Interval-based | [triggers-schedules.md#periodic-trigger](triggers-schedules.md#periodic-trigger) | +| `trigger.file_arrival` | File arrival events | [triggers-schedules.md#file-arrival-trigger](triggers-schedules.md#file-arrival-trigger) | +| `trigger.table_update` | Table change events | [triggers-schedules.md#table-update-trigger](triggers-schedules.md#table-update-trigger) | +| `continuous` | Always-running jobs | [triggers-schedules.md#continuous-jobs](triggers-schedules.md#continuous-jobs) | + +## Compute Configuration + +### Job Clusters (Recommended) + +Define reusable cluster configurations: + +```yaml +job_clusters: + - job_cluster_key: shared_cluster + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + num_workers: 2 + spark_conf: + spark.speculation: "true" + +tasks: + - task_key: my_task + job_cluster_key: shared_cluster + notebook_task: + notebook_path: ../src/notebook.py +``` + +### Autoscaling Clusters + +```yaml +new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + autoscale: + min_workers: 2 + max_workers: 8 +``` + +### Existing Cluster + +```yaml +tasks: + - task_key: my_task + existing_cluster_id: "0123-456789-abcdef12" + notebook_task: + notebook_path: ../src/notebook.py +``` + +### Serverless Compute + +For notebook and Python tasks, omit cluster configuration to use serverless: + +```yaml +tasks: + - task_key: serverless_task + notebook_task: + notebook_path: ../src/notebook.py + # No cluster config = serverless +``` + +## Job Parameters + +### Define Parameters + +```yaml +parameters: + - name: env + default: "dev" + - name: date + default: "{{start_date}}" # Dynamic value reference +``` + +### Access in Notebook + +```python +# In notebook +dbutils.widgets.get("env") +dbutils.widgets.get("date") +``` + +### Pass to Tasks + +```yaml +tasks: + - task_key: my_task + notebook_task: + notebook_path: ../src/notebook.py + base_parameters: + env: "{{job.parameters.env}}" + custom_param: "value" +``` + +## Common Operations + +### Python SDK Operations + +```python +from databricks.sdk import WorkspaceClient + +w = WorkspaceClient() + +# List jobs +jobs = w.jobs.list() + +# Get job details +job = w.jobs.get(job_id=12345) + +# Run job now +run = w.jobs.run_now(job_id=12345) + +# Run with parameters +run = w.jobs.run_now( + job_id=12345, + job_parameters={"env": "prod", "date": "2024-01-15"} +) + +# Cancel run +w.jobs.cancel_run(run_id=run.run_id) + +# Delete job +w.jobs.delete(job_id=12345) +``` + +### CLI Operations + +```bash +# List jobs +databricks jobs list + +# Get job details +databricks jobs get 12345 + +# Run job +databricks jobs run-now 12345 + +# Run with parameters +databricks jobs run-now 12345 --job-params '{"env": "prod"}' + +# Cancel run +databricks jobs cancel-run 67890 + +# Delete job +databricks jobs delete 12345 +``` + +### Asset Bundle Operations + +```bash +# Validate configuration +databricks bundle validate + +# Deploy job +databricks bundle deploy + +# Run job +databricks bundle run my_job_resource_key + +# Deploy to specific target +databricks bundle deploy -t prod + +# Destroy resources +databricks bundle destroy +``` + +## Permissions (DABs) + +```yaml +resources: + jobs: + my_job: + name: "My Job" + permissions: + - level: CAN_VIEW + group_name: "data-analysts" + - level: CAN_MANAGE_RUN + group_name: "data-engineers" + - level: CAN_MANAGE + user_name: "admin@example.com" +``` + +**Permission levels:** +- `CAN_VIEW` - View job and run history +- `CAN_MANAGE_RUN` - View, trigger, and cancel runs +- `CAN_MANAGE` - Full control including edit and delete + +## Common Issues + +| Issue | Solution | +|-------|----------| +| Job cluster startup slow | Use job clusters with `job_cluster_key` for reuse across tasks | +| Task dependencies not working | Verify `task_key` references match exactly in `depends_on` | +| Schedule not triggering | Check `pause_status: UNPAUSED` and valid timezone | +| File arrival not detecting | Ensure path has proper permissions and uses cloud storage URL | +| Table update trigger missing events | Verify Unity Catalog table and proper grants | +| Parameter not accessible | Use `dbutils.widgets.get()` in notebooks | +| "admins" group error | Cannot modify admins permissions on jobs | +| Serverless task fails | Ensure task type supports serverless (notebook, Python) | + +## Related Skills + +- **[asset-bundles](../asset-bundles/SKILL.md)** - Deploy jobs via Databricks Asset Bundles +- **[spark-declarative-pipelines](../spark-declarative-pipelines/SKILL.md)** - Configure pipelines triggered by jobs + +## Resources + +- [Jobs API Reference](https://docs.databricks.com/api/workspace/jobs) +- [Jobs Documentation](https://docs.databricks.com/en/jobs/index.html) +- [DABs Job Task Types](https://docs.databricks.com/en/dev-tools/bundles/job-task-types.html) +- [Bundle Examples Repository](https://github.com/databricks/bundle-examples) diff --git a/databricks-skills/databricks-jobs/examples.md b/databricks-skills/databricks-jobs/examples.md new file mode 100644 index 0000000..6ae53fa --- /dev/null +++ b/databricks-skills/databricks-jobs/examples.md @@ -0,0 +1,721 @@ +# Complete Examples + +## Contents +- [ETL Pipeline with Multiple Tasks](#etl-pipeline-with-multiple-tasks) +- [Scheduled Data Warehouse Refresh](#scheduled-data-warehouse-refresh) +- [Event-Driven Pipeline](#event-driven-pipeline) +- [ML Training Pipeline](#ml-training-pipeline) +- [Multi-Environment Deployment](#multi-environment-deployment) +- [Streaming Job](#streaming-job) +- [Cross-Job Orchestration](#cross-job-orchestration) + +--- + +## ETL Pipeline with Multiple Tasks + +A classic extract-transform-load pipeline with task dependencies. + +### DABs YAML + +```yaml +# resources/etl_job.yml +resources: + jobs: + daily_etl: + name: "[${bundle.target}] Daily ETL Pipeline" + + # Schedule: Daily at 6 AM UTC + schedule: + quartz_cron_expression: "0 0 6 * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + + # Job parameters + parameters: + - name: load_date + default: "{{start_date}}" + - name: env + default: "${bundle.target}" + + # Shared cluster for all tasks + job_clusters: + - job_cluster_key: etl_cluster + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + num_workers: 4 + spark_conf: + spark.sql.shuffle.partitions: "200" + + # Email notifications + email_notifications: + on_failure: + - "data-team@example.com" + on_success: + - "data-team@example.com" + + tasks: + # Extract from source systems + - task_key: extract_orders + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/extract_orders.py + base_parameters: + load_date: "{{job.parameters.load_date}}" + + - task_key: extract_customers + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/extract_customers.py + base_parameters: + load_date: "{{job.parameters.load_date}}" + + - task_key: extract_products + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/extract_products.py + + # Transform: wait for all extracts + - task_key: transform_facts + depends_on: + - task_key: extract_orders + - task_key: extract_customers + - task_key: extract_products + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/transform_facts.py + base_parameters: + load_date: "{{job.parameters.load_date}}" + + # Load: run after transform + - task_key: load_warehouse + depends_on: + - task_key: transform_facts + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/load_warehouse.py + + # Data quality check + - task_key: validate_data + depends_on: + - task_key: load_warehouse + run_if: ALL_SUCCESS + job_cluster_key: etl_cluster + notebook_task: + notebook_path: ../src/notebooks/validate_data.py + + permissions: + - level: CAN_VIEW + group_name: "data-analysts" + - level: CAN_MANAGE_RUN + group_name: "data-engineers" +``` + +### Python SDK Equivalent + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import ( + Task, NotebookTask, Source, + JobCluster, ClusterSpec, + CronSchedule, PauseStatus, + JobEmailNotifications, + JobParameterDefinition +) + +w = WorkspaceClient() + +job = w.jobs.create( + name="Daily ETL Pipeline", + schedule=CronSchedule( + quartz_cron_expression="0 0 6 * * ?", + timezone_id="UTC", + pause_status=PauseStatus.UNPAUSED + ), + parameters=[ + JobParameterDefinition(name="load_date", default="{{start_date}}"), + JobParameterDefinition(name="env", default="prod") + ], + job_clusters=[ + JobCluster( + job_cluster_key="etl_cluster", + new_cluster=ClusterSpec( + spark_version="15.4.x-scala2.12", + node_type_id="i3.xlarge", + num_workers=4 + ) + ) + ], + email_notifications=JobEmailNotifications( + on_failure=["data-team@example.com"], + on_success=["data-team@example.com"] + ), + tasks=[ + Task( + task_key="extract_orders", + job_cluster_key="etl_cluster", + notebook_task=NotebookTask( + notebook_path="/Workspace/etl/extract_orders", + source=Source.WORKSPACE, + base_parameters={"load_date": "{{job.parameters.load_date}}"} + ) + ), + Task( + task_key="extract_customers", + job_cluster_key="etl_cluster", + notebook_task=NotebookTask( + notebook_path="/Workspace/etl/extract_customers", + source=Source.WORKSPACE + ) + ), + Task( + task_key="transform_facts", + depends_on=[ + {"task_key": "extract_orders"}, + {"task_key": "extract_customers"} + ], + job_cluster_key="etl_cluster", + notebook_task=NotebookTask( + notebook_path="/Workspace/etl/transform_facts", + source=Source.WORKSPACE + ) + ), + Task( + task_key="load_warehouse", + depends_on=[{"task_key": "transform_facts"}], + job_cluster_key="etl_cluster", + notebook_task=NotebookTask( + notebook_path="/Workspace/etl/load_warehouse", + source=Source.WORKSPACE + ) + ) + ] +) + +print(f"Created job: {job.job_id}") +``` + +--- + +## Scheduled Data Warehouse Refresh + +SQL-based warehouse refresh with multiple queries. + +### DABs YAML + +```yaml +resources: + jobs: + warehouse_refresh: + name: "[${bundle.target}] Warehouse Refresh" + + schedule: + quartz_cron_expression: "0 0 4 * * ?" # 4 AM daily + timezone_id: "America/New_York" + pause_status: UNPAUSED + + tasks: + # Refresh dimension tables + - task_key: refresh_dim_customers + sql_task: + file: + path: ../src/sql/refresh_dim_customers.sql + source: WORKSPACE + warehouse_id: ${var.warehouse_id} + + - task_key: refresh_dim_products + sql_task: + file: + path: ../src/sql/refresh_dim_products.sql + source: WORKSPACE + warehouse_id: ${var.warehouse_id} + + # Refresh fact tables (depends on dimensions) + - task_key: refresh_fact_sales + depends_on: + - task_key: refresh_dim_customers + - task_key: refresh_dim_products + sql_task: + file: + path: ../src/sql/refresh_fact_sales.sql + source: WORKSPACE + warehouse_id: ${var.warehouse_id} + + # Update aggregations + - task_key: update_aggregations + depends_on: + - task_key: refresh_fact_sales + sql_task: + file: + path: ../src/sql/update_aggregations.sql + source: WORKSPACE + warehouse_id: ${var.warehouse_id} + + # Refresh dashboard + - task_key: refresh_dashboard + depends_on: + - task_key: update_aggregations + sql_task: + dashboard: + dashboard_id: "dashboard-uuid-here" + warehouse_id: ${var.warehouse_id} +``` + +--- + +## Event-Driven Pipeline + +Pipeline triggered by file arrival and table updates. + +### DABs YAML + +```yaml +resources: + jobs: + event_driven_pipeline: + name: "[${bundle.target}] Event-Driven Pipeline" + + # Trigger on file arrival + trigger: + pause_status: UNPAUSED + file_arrival: + url: "s3://data-lake/incoming/orders/" + min_time_between_triggers_seconds: 300 # 5 min cooldown + wait_after_last_change_seconds: 60 # Wait for batch completion + + # Health monitoring + health: + rules: + - metric: RUN_DURATION_SECONDS + op: GREATER_THAN + value: 1800 # Alert if > 30 min + + email_notifications: + on_failure: + - "data-alerts@example.com" + on_duration_warning_threshold_exceeded: + - "data-alerts@example.com" + + tasks: + - task_key: process_incoming + notebook_task: + notebook_path: ../src/notebooks/process_incoming_files.py + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + autoscale: + min_workers: 2 + max_workers: 10 +``` + +### Table Update Trigger Example + +```yaml +resources: + jobs: + table_triggered_job: + name: "[${bundle.target}] Table Update Handler" + + trigger: + pause_status: UNPAUSED + table_update: + table_names: + - "main.bronze.raw_orders" + - "main.bronze.raw_inventory" + condition: ANY_UPDATED + min_time_between_triggers_seconds: 600 + wait_after_last_change_seconds: 120 + + tasks: + - task_key: process_updates + notebook_task: + notebook_path: ../src/notebooks/process_table_updates.py +``` + +--- + +## ML Training Pipeline + +Machine learning workflow with training, evaluation, and deployment. + +### DABs YAML + +```yaml +resources: + jobs: + ml_training: + name: "[${bundle.target}] ML Training Pipeline" + + # Weekly retraining + schedule: + quartz_cron_expression: "0 0 2 ? * SUN" # Sunday 2 AM + timezone_id: "UTC" + pause_status: UNPAUSED + + parameters: + - name: model_name + default: "sales_forecaster" + - name: experiment_name + default: "/Shared/experiments/sales_forecast" + + # GPU cluster for training + job_clusters: + - job_cluster_key: gpu_cluster + new_cluster: + spark_version: "15.4.x-gpu-ml-scala2.12" + node_type_id: "g5.xlarge" + num_workers: 2 + aws_attributes: + first_on_demand: 1 + + - job_cluster_key: cpu_cluster + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + num_workers: 4 + + # ML environment + environments: + - environment_key: ml_env + spec: + dependencies: + - mlflow>=2.10.0 + - scikit-learn>=1.4.0 + - pandas>=2.0.0 + - xgboost>=2.0.0 + + tasks: + # Data preparation + - task_key: prepare_training_data + job_cluster_key: cpu_cluster + environment_key: ml_env + notebook_task: + notebook_path: ../src/ml/prepare_training_data.py + base_parameters: + output_table: "main.ml.training_data" + + # Feature engineering + - task_key: engineer_features + depends_on: + - task_key: prepare_training_data + job_cluster_key: cpu_cluster + environment_key: ml_env + notebook_task: + notebook_path: ../src/ml/engineer_features.py + + # Model training + - task_key: train_model + depends_on: + - task_key: engineer_features + job_cluster_key: gpu_cluster + environment_key: ml_env + notebook_task: + notebook_path: ../src/ml/train_model.py + base_parameters: + model_name: "{{job.parameters.model_name}}" + experiment_name: "{{job.parameters.experiment_name}}" + + # Model evaluation + - task_key: evaluate_model + depends_on: + - task_key: train_model + job_cluster_key: cpu_cluster + environment_key: ml_env + notebook_task: + notebook_path: ../src/ml/evaluate_model.py + + # Conditional deployment (only on success) + - task_key: deploy_model + depends_on: + - task_key: evaluate_model + run_if: ALL_SUCCESS + job_cluster_key: cpu_cluster + environment_key: ml_env + notebook_task: + notebook_path: ../src/ml/deploy_model.py + base_parameters: + model_name: "{{job.parameters.model_name}}" +``` + +--- + +## Multi-Environment Deployment + +Job configuration with environment-specific settings. + +### databricks.yml + +```yaml +bundle: + name: data-pipeline + +include: + - resources/*.yml + +variables: + warehouse_id: + lookup: + warehouse: "Shared SQL Warehouse" + notification_email: + default: "data-team@example.com" + +targets: + dev: + default: true + mode: development + workspace: + profile: dev-profile + variables: + notification_email: "dev-team@example.com" + + staging: + mode: development + workspace: + profile: staging-profile + + prod: + mode: production + workspace: + profile: prod-profile + run_as: + service_principal_name: "production-sp" +``` + +### resources/jobs.yml + +```yaml +resources: + jobs: + data_pipeline: + name: "[${bundle.target}] Data Pipeline" + + # Only schedule in prod + schedule: + quartz_cron_expression: "0 0 6 * * ?" + timezone_id: "UTC" + pause_status: ${if(bundle.target == "prod", "UNPAUSED", "PAUSED")} + + # Environment-specific cluster sizing + job_clusters: + - job_cluster_key: main_cluster + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: ${if(bundle.target == "prod", "i3.2xlarge", "i3.xlarge")} + num_workers: ${if(bundle.target == "prod", 8, 2)} + + email_notifications: + on_failure: + - ${var.notification_email} + + tasks: + - task_key: process_data + job_cluster_key: main_cluster + notebook_task: + notebook_path: ../src/notebooks/process_data.py + base_parameters: + env: "${bundle.target}" + catalog: "${bundle.target}_catalog" + + permissions: + - level: CAN_VIEW + group_name: "data-analysts" + - level: CAN_MANAGE_RUN + group_name: "data-engineers" + - level: CAN_MANAGE + service_principal_name: "deployment-sp" +``` + +--- + +## Streaming Job + +Continuous streaming job with monitoring. + +### DABs YAML + +```yaml +resources: + jobs: + streaming_processor: + name: "[${bundle.target}] Streaming Processor" + + # Continuous execution + continuous: + pause_status: UNPAUSED + + # Health monitoring for streaming + health: + rules: + - metric: STREAMING_BACKLOG_SECONDS + op: GREATER_THAN + value: 300 # Alert if > 5 min behind + - metric: STREAMING_BACKLOG_RECORDS + op: GREATER_THAN + value: 1000000 # Alert if > 1M records behind + + email_notifications: + on_failure: + - "streaming-alerts@example.com" + on_streaming_backlog_exceeded: + - "streaming-alerts@example.com" + + webhook_notifications: + on_failure: + - id: "pagerduty-streaming-alerts" + on_streaming_backlog_exceeded: + - id: "slack-streaming-channel" + + tasks: + - task_key: stream_processor + notebook_task: + notebook_path: ../src/notebooks/stream_processor.py + new_cluster: + spark_version: "15.4.x-scala2.12" + node_type_id: "i3.xlarge" + autoscale: + min_workers: 2 + max_workers: 16 + spark_conf: + spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled: "true" + spark.sql.streaming.stateStore.providerClass: "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" +``` + +--- + +## Cross-Job Orchestration + +Multiple jobs with dependencies using run_job_task. + +### DABs YAML + +```yaml +resources: + jobs: + # Data ingestion job + ingestion_job: + name: "[${bundle.target}] Data Ingestion" + tasks: + - task_key: ingest + notebook_task: + notebook_path: ../src/notebooks/ingest.py + + # Data transformation job + transformation_job: + name: "[${bundle.target}] Data Transformation" + tasks: + - task_key: transform + notebook_task: + notebook_path: ../src/notebooks/transform.py + + # Master orchestration job + orchestrator: + name: "[${bundle.target}] Master Orchestrator" + + schedule: + quartz_cron_expression: "0 0 1 * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + + tasks: + # Run ingestion first + - task_key: run_ingestion + run_job_task: + job_id: ${resources.jobs.ingestion_job.id} + + # Run transformation after ingestion + - task_key: run_transformation + depends_on: + - task_key: run_ingestion + run_job_task: + job_id: ${resources.jobs.transformation_job.id} + + # Final validation + - task_key: validate_all + depends_on: + - task_key: run_transformation + notebook_task: + notebook_path: ../src/notebooks/validate_all.py +``` + +--- + +## For Each Task - Parallel Processing + +Process multiple items in parallel using for_each_task. + +### DABs YAML + +```yaml +resources: + jobs: + parallel_processor: + name: "[${bundle.target}] Parallel Region Processor" + + schedule: + quartz_cron_expression: "0 0 8 * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + + tasks: + # Generate list of items to process + - task_key: get_regions + notebook_task: + notebook_path: ../src/notebooks/get_active_regions.py + + # Process each region in parallel + - task_key: process_regions + depends_on: + - task_key: get_regions + for_each_task: + inputs: "{{tasks.get_regions.values.regions}}" + concurrency: 10 # Max 10 parallel + task: + task_key: process_region + notebook_task: + notebook_path: ../src/notebooks/process_region.py + base_parameters: + region: "{{input}}" + + # Aggregate results after all regions processed + - task_key: aggregate_results + depends_on: + - task_key: process_regions + run_if: ALL_DONE # Run even if some regions failed + notebook_task: + notebook_path: ../src/notebooks/aggregate_results.py +``` + +### Notebook: get_active_regions.py + +```python +# Get list of active regions to process +regions = spark.sql(""" + SELECT DISTINCT region_code + FROM main.config.active_regions + WHERE is_active = true +""").collect() + +region_list = [row.region_code for row in regions] + +# Pass to downstream for_each_task +dbutils.jobs.taskValues.set(key="regions", value=region_list) +``` + +### Notebook: process_region.py + +```python +# Get region from parameter +region = dbutils.widgets.get("region") + +# Process data for this region +df = spark.sql(f""" + SELECT * FROM main.bronze.orders + WHERE region = '{region}' +""") + +# Transform and write +df_transformed = transform_orders(df) +df_transformed.write.mode("append").saveAsTable(f"main.silver.orders_{region}") + +print(f"Processed region: {region}") +``` diff --git a/databricks-skills/databricks-jobs/notifications-monitoring.md b/databricks-skills/databricks-jobs/notifications-monitoring.md new file mode 100644 index 0000000..9933cc7 --- /dev/null +++ b/databricks-skills/databricks-jobs/notifications-monitoring.md @@ -0,0 +1,548 @@ +# Notifications and Monitoring Reference + +## Contents +- [Email Notifications](#email-notifications) +- [Webhook Notifications](#webhook-notifications) +- [Health Rules](#health-rules) +- [Timeout Configuration](#timeout-configuration) +- [Retry Configuration](#retry-configuration) +- [Run Queue Settings](#run-queue-settings) + +--- + +## Email Notifications + +Send email alerts for job lifecycle events. + +### DABs YAML + +```yaml +resources: + jobs: + monitored_job: + name: "Monitored Job" + email_notifications: + on_start: + - "team@example.com" + on_success: + - "team@example.com" + on_failure: + - "oncall@example.com" + - "team@example.com" + on_duration_warning_threshold_exceeded: + - "oncall@example.com" + no_alert_for_skipped_runs: true + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import JobEmailNotifications + +w = WorkspaceClient() + +job = w.jobs.create( + name="Monitored Job", + email_notifications=JobEmailNotifications( + on_start=["team@example.com"], + on_success=["team@example.com"], + on_failure=["oncall@example.com", "team@example.com"], + on_duration_warning_threshold_exceeded=["oncall@example.com"], + no_alert_for_skipped_runs=True + ), + tasks=[...] +) +``` + +### Email Notification Events + +| Event | Description | +|-------|-------------| +| `on_start` | When job run starts | +| `on_success` | When job run completes successfully | +| `on_failure` | When job run fails | +| `on_duration_warning_threshold_exceeded` | When run exceeds duration warning threshold | +| `on_streaming_backlog_exceeded` | When streaming backlog exceeds threshold | + +### Task-Level Email Notifications + +```yaml +tasks: + - task_key: critical_task + email_notifications: + on_start: + - "task-owner@example.com" + on_success: + - "task-owner@example.com" + on_failure: + - "oncall@example.com" + notebook_task: + notebook_path: ../src/critical.py +``` + +--- + +## Webhook Notifications + +Send HTTP webhooks for job events (Slack, PagerDuty, custom endpoints). + +### Create Notification Destination First + +Before using webhooks, create a notification destination in the workspace: + +**Python SDK:** +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.settings import ( + CreateNotificationDestinationRequest, + DestinationType, + SlackConfig +) + +w = WorkspaceClient() + +# Create Slack destination +destination = w.notification_destinations.create( + display_name="Slack Alerts", + config=SlackConfig( + url="https://hooks.slack.com/services/XXX/YYY/ZZZ" + ) +) + +print(f"Destination ID: {destination.id}") +``` + +### DABs YAML + +```yaml +resources: + jobs: + webhook_job: + name: "Job with Webhooks" + webhook_notifications: + on_start: + - id: "notification-destination-uuid" + on_success: + - id: "notification-destination-uuid" + on_failure: + - id: "pagerduty-destination-uuid" + on_duration_warning_threshold_exceeded: + - id: "slack-destination-uuid" + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import WebhookNotifications, Webhook + +w = WorkspaceClient() + +job = w.jobs.create( + name="Job with Webhooks", + webhook_notifications=WebhookNotifications( + on_start=[Webhook(id="notification-destination-uuid")], + on_success=[Webhook(id="notification-destination-uuid")], + on_failure=[Webhook(id="pagerduty-destination-uuid")], + on_duration_warning_threshold_exceeded=[Webhook(id="slack-destination-uuid")] + ), + tasks=[...] +) +``` + +### Supported Destinations + +| Type | Configuration | +|------|---------------| +| Slack | Slack webhook URL | +| Microsoft Teams | Teams webhook URL | +| PagerDuty | PagerDuty integration key | +| Generic Webhook | Custom HTTP endpoint | +| Email | Email addresses | + +### Task-Level Webhooks + +```yaml +tasks: + - task_key: critical_task + webhook_notifications: + on_failure: + - id: "pagerduty-destination-uuid" + notebook_task: + notebook_path: ../src/critical.py +``` + +--- + +## Health Rules + +Monitor job health metrics and trigger alerts. + +### DABs YAML + +```yaml +resources: + jobs: + health_monitored: + name: "Health Monitored Job" + health: + rules: + - metric: RUN_DURATION_SECONDS + op: GREATER_THAN + value: 3600 # Alert if run > 1 hour + - metric: STREAMING_BACKLOG_BYTES + op: GREATER_THAN + value: 1073741824 # Alert if backlog > 1GB + - metric: STREAMING_BACKLOG_SECONDS + op: GREATER_THAN + value: 300 # Alert if backlog > 5 minutes + - metric: STREAMING_BACKLOG_FILES + op: GREATER_THAN + value: 1000 # Alert if backlog > 1000 files + - metric: STREAMING_BACKLOG_RECORDS + op: GREATER_THAN + value: 100000 # Alert if backlog > 100k records + email_notifications: + on_duration_warning_threshold_exceeded: + - "oncall@example.com" + on_streaming_backlog_exceeded: + - "oncall@example.com" + tasks: + - task_key: streaming + notebook_task: + notebook_path: ../src/streaming.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import JobsHealthRules, JobsHealthRule, JobsHealthMetric, JobsHealthOperator + +w = WorkspaceClient() + +job = w.jobs.create( + name="Health Monitored Job", + health=JobsHealthRules( + rules=[ + JobsHealthRule( + metric=JobsHealthMetric.RUN_DURATION_SECONDS, + op=JobsHealthOperator.GREATER_THAN, + value=3600 + ), + JobsHealthRule( + metric=JobsHealthMetric.STREAMING_BACKLOG_BYTES, + op=JobsHealthOperator.GREATER_THAN, + value=1073741824 + ) + ] + ), + tasks=[...] +) +``` + +### Health Metrics + +| Metric | Description | Use Case | +|--------|-------------|----------| +| `RUN_DURATION_SECONDS` | Total run time | Detect stuck/slow jobs | +| `STREAMING_BACKLOG_BYTES` | Unprocessed data size | Streaming lag | +| `STREAMING_BACKLOG_SECONDS` | Processing delay time | Streaming lag | +| `STREAMING_BACKLOG_FILES` | Unprocessed file count | File processing lag | +| `STREAMING_BACKLOG_RECORDS` | Unprocessed record count | Record processing lag | + +### Operators + +| Operator | Description | +|----------|-------------| +| `GREATER_THAN` | Value exceeds threshold | + +--- + +## Timeout Configuration + +### Job-Level Timeout + +```yaml +resources: + jobs: + timeout_job: + name: "Job with Timeout" + timeout_seconds: 7200 # 2 hours max run time + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Task-Level Timeout + +```yaml +tasks: + - task_key: long_running + timeout_seconds: 3600 # 1 hour max for this task + notebook_task: + notebook_path: ../src/long_running.py +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, NotebookTask + +Task( + task_key="long_running", + timeout_seconds=3600, + notebook_task=NotebookTask( + notebook_path="/Workspace/long_running" + ) +) +``` + +### Timeout Behavior + +- Value `0` = no timeout (default) +- When timeout exceeds, task/job is cancelled +- Partial results may be lost +- Triggers `on_failure` notifications + +--- + +## Retry Configuration + +### Task Retry Settings + +```yaml +tasks: + - task_key: flaky_task + max_retries: 3 + min_retry_interval_millis: 30000 # 30 seconds between retries + retry_on_timeout: true + notebook_task: + notebook_path: ../src/flaky_task.py +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, NotebookTask + +Task( + task_key="flaky_task", + max_retries=3, + min_retry_interval_millis=30000, + retry_on_timeout=True, + notebook_task=NotebookTask( + notebook_path="/Workspace/flaky_task" + ) +) +``` + +### Retry Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `max_retries` | 0 | Number of retry attempts | +| `min_retry_interval_millis` | 0 | Minimum wait between retries | +| `retry_on_timeout` | false | Retry when task times out | + +### Retry Behavior + +- Retries only apply to task failures +- Each retry is a new task attempt +- Retry count resets for each job run +- Dependencies wait for retries to complete + +--- + +## Run Queue Settings + +Control concurrent run behavior. + +### Maximum Concurrent Runs + +```yaml +resources: + jobs: + concurrent_job: + name: "Concurrent Job" + max_concurrent_runs: 5 # Allow up to 5 simultaneous runs + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Queue Settings + +```yaml +resources: + jobs: + queued_job: + name: "Queued Job" + max_concurrent_runs: 1 + queue: + enabled: true # Queue additional runs instead of skipping + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import QueueSettings + +w = WorkspaceClient() + +job = w.jobs.create( + name="Queued Job", + max_concurrent_runs=1, + queue=QueueSettings(enabled=True), + tasks=[...] +) +``` + +### Behavior Options + +| Setting | Behavior | +|---------|----------| +| `max_concurrent_runs=1`, `queue.enabled=false` | Skip if already running | +| `max_concurrent_runs=1`, `queue.enabled=true` | Queue runs, execute sequentially | +| `max_concurrent_runs=N` | Allow N simultaneous runs | + +--- + +## Notification Settings + +Fine-tune notification behavior. + +### Job-Level Settings + +```yaml +resources: + jobs: + notification_settings_job: + name: "Job with Notification Settings" + notification_settings: + no_alert_for_skipped_runs: true + no_alert_for_canceled_runs: true + email_notifications: + on_failure: + - "team@example.com" + tasks: + - task_key: main + notebook_task: + notebook_path: ../src/main.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import JobNotificationSettings + +w = WorkspaceClient() + +job = w.jobs.create( + name="Job with Notification Settings", + notification_settings=JobNotificationSettings( + no_alert_for_skipped_runs=True, + no_alert_for_canceled_runs=True + ), + tasks=[...] +) +``` + +### Settings + +| Setting | Default | Description | +|---------|---------|-------------| +| `no_alert_for_skipped_runs` | false | Suppress alerts when runs are skipped | +| `no_alert_for_canceled_runs` | false | Suppress alerts when runs are canceled | + +--- + +## Complete Monitoring Example + +```yaml +resources: + jobs: + fully_monitored: + name: "[${bundle.target}] Fully Monitored ETL" + + # Timeout and retries + timeout_seconds: 14400 # 4 hours max + max_concurrent_runs: 1 + queue: + enabled: true + + # Health monitoring + health: + rules: + - metric: RUN_DURATION_SECONDS + op: GREATER_THAN + value: 7200 # Warn if > 2 hours + + # Email notifications + email_notifications: + on_start: + - "team@example.com" + on_success: + - "team@example.com" + on_failure: + - "oncall@example.com" + - "team@example.com" + on_duration_warning_threshold_exceeded: + - "oncall@example.com" + no_alert_for_skipped_runs: true + + # Webhook notifications + webhook_notifications: + on_failure: + - id: "pagerduty-destination-uuid" + on_duration_warning_threshold_exceeded: + - id: "slack-alerts-uuid" + + # Notification settings + notification_settings: + no_alert_for_canceled_runs: true + + tasks: + - task_key: extract + max_retries: 2 + min_retry_interval_millis: 60000 + timeout_seconds: 3600 + notebook_task: + notebook_path: ../src/extract.py + + - task_key: transform + depends_on: + - task_key: extract + max_retries: 1 + timeout_seconds: 3600 + notebook_task: + notebook_path: ../src/transform.py + + - task_key: load + depends_on: + - task_key: transform + timeout_seconds: 1800 + # Critical task - specific notifications + email_notifications: + on_failure: + - "data-team-lead@example.com" + notebook_task: + notebook_path: ../src/load.py +``` diff --git a/databricks-skills/databricks-jobs/task-types.md b/databricks-skills/databricks-jobs/task-types.md new file mode 100644 index 0000000..7d973c3 --- /dev/null +++ b/databricks-skills/databricks-jobs/task-types.md @@ -0,0 +1,651 @@ +# Task Types Reference + +## Contents +- [Notebook Task](#notebook-task) +- [Spark Python Task](#spark-python-task) +- [Python Wheel Task](#python-wheel-task) +- [SQL Task](#sql-task) +- [dbt Task](#dbt-task) +- [Pipeline Task](#pipeline-task) +- [Spark JAR Task](#spark-jar-task) +- [Run Job Task](#run-job-task) +- [For Each Task](#for-each-task) + +--- + +## Notebook Task + +Run Databricks notebooks. Most common task type. + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, NotebookTask, Source + +Task( + task_key="run_notebook", + notebook_task=NotebookTask( + notebook_path="/Workspace/Users/user@example.com/etl_notebook", + source=Source.WORKSPACE, + base_parameters={ + "env": "prod", + "date": "2024-01-15" + } + ) +) +``` + +### DABs YAML + +```yaml +tasks: + - task_key: run_notebook + notebook_task: + notebook_path: ../src/notebooks/etl_notebook.py + source: WORKSPACE + base_parameters: + env: "{{job.parameters.env}}" + date: "{{job.parameters.date}}" +``` + +### CLI JSON + +```json +{ + "task_key": "run_notebook", + "notebook_task": { + "notebook_path": "/Workspace/Users/user@example.com/etl_notebook", + "source": "WORKSPACE", + "base_parameters": { + "env": "prod", + "date": "2024-01-15" + } + } +} +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `notebook_path` | Yes | Absolute path to notebook | +| `source` | No | `WORKSPACE` (default) or `GIT` | +| `base_parameters` | No | Key-value parameters passed to notebook | +| `warehouse_id` | No | SQL warehouse for SQL cells (optional) | + +### Access Parameters in Notebook + +```python +# Get parameter with default +env = dbutils.widgets.get("env") + +# Or define widget first +dbutils.widgets.text("env", "dev") +env = dbutils.widgets.get("env") +``` + +--- + +## Spark Python Task + +Run Python files directly on Spark cluster. + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, SparkPythonTask + +Task( + task_key="run_python", + spark_python_task=SparkPythonTask( + python_file="/Workspace/Users/user@example.com/scripts/process.py", + parameters=["--env", "prod", "--date", "2024-01-15"] + ) +) +``` + +### DABs YAML + +```yaml +tasks: + - task_key: run_python + spark_python_task: + python_file: ../src/scripts/process.py + parameters: + - "--env" + - "prod" + - "--date" + - "2024-01-15" +``` + +### CLI JSON + +```json +{ + "task_key": "run_python", + "spark_python_task": { + "python_file": "/Workspace/Users/user@example.com/scripts/process.py", + "parameters": ["--env", "prod", "--date", "2024-01-15"] + } +} +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `python_file` | Yes | Path to Python file (workspace, DBFS, or Unity Catalog volume) | +| `parameters` | No | Command-line arguments passed to script | +| `source` | No | `WORKSPACE` (default) or `GIT` | + +--- + +## Python Wheel Task + +Run Python packages distributed as wheels. + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, PythonWheelTask + +Task( + task_key="run_wheel", + python_wheel_task=PythonWheelTask( + package_name="my_package", + entry_point="main", + parameters=["--env", "prod"] + ), + libraries=[ + {"whl": "/Volumes/catalog/schema/libs/my_package-1.0.0-py3-none-any.whl"} + ] +) +``` + +### DABs YAML + +```yaml +tasks: + - task_key: run_wheel + python_wheel_task: + package_name: my_package + entry_point: main + parameters: + - "--env" + - "prod" + libraries: + - whl: /Volumes/catalog/schema/libs/my_package-1.0.0-py3-none-any.whl +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `package_name` | Yes | Name of the Python package | +| `entry_point` | Yes | Entry point function or module | +| `parameters` | No | Command-line arguments | +| `named_parameters` | No | Named parameters as key-value pairs | + +### Entry Point Configuration + +In your package's `setup.py` or `pyproject.toml`: + +```python +# setup.py +entry_points={ + 'console_scripts': [ + 'main=my_package.main:run', + ], +} +``` + +--- + +## SQL Task + +Run SQL queries, files, or refresh dashboards/alerts. + +### Run SQL Query + +```yaml +tasks: + - task_key: run_query + sql_task: + query: + query_id: "abc123-def456" # Existing query ID + warehouse_id: "1234567890abcdef" +``` + +### Run SQL File + +```yaml +tasks: + - task_key: run_sql_file + sql_task: + file: + path: ../src/sql/transform.sql + source: WORKSPACE + warehouse_id: "1234567890abcdef" +``` + +### Refresh Dashboard + +```yaml +tasks: + - task_key: refresh_dashboard + sql_task: + dashboard: + dashboard_id: "dashboard-uuid" + warehouse_id: "1234567890abcdef" +``` + +### Refresh Alert + +```yaml +tasks: + - task_key: refresh_alert + sql_task: + alert: + alert_id: "alert-uuid" + warehouse_id: "1234567890abcdef" +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, SqlTask, SqlTaskFile + +Task( + task_key="run_sql", + sql_task=SqlTask( + warehouse_id="1234567890abcdef", + file=SqlTaskFile( + path="/Workspace/Users/user@example.com/queries/transform.sql", + source=Source.WORKSPACE + ) + ) +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `warehouse_id` | Yes | SQL warehouse ID | +| `query` | One of | Run existing query by ID | +| `file` | One of | Run SQL file | +| `dashboard` | One of | Refresh dashboard | +| `alert` | One of | Refresh alert | +| `parameters` | No | Query parameters | + +--- + +## dbt Task + +Run dbt projects with Databricks. + +### DABs YAML + +```yaml +tasks: + - task_key: run_dbt + dbt_task: + project_directory: ../src/dbt_project + commands: + - "dbt deps" + - "dbt seed" + - "dbt run --select tag:daily" + - "dbt test" + warehouse_id: "1234567890abcdef" + catalog: "main" + schema: "analytics" +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, DbtTask + +Task( + task_key="run_dbt", + dbt_task=DbtTask( + project_directory="/Workspace/Users/user@example.com/dbt_project", + commands=["dbt deps", "dbt run", "dbt test"], + warehouse_id="1234567890abcdef", + catalog="main", + schema="analytics" + ) +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `project_directory` | Yes | Path to dbt project | +| `commands` | Yes | List of dbt commands to run | +| `warehouse_id` | No | SQL warehouse (required if not using cluster) | +| `catalog` | No | Unity Catalog catalog | +| `schema` | No | Target schema | +| `profiles_directory` | No | Path to profiles.yml directory | +| `source` | No | `WORKSPACE` (default) or `GIT` | + +--- + +## Pipeline Task + +Trigger DLT or Spark Declarative Pipelines. + +### DABs YAML + +```yaml +tasks: + - task_key: run_pipeline + pipeline_task: + pipeline_id: "pipeline-uuid-here" + full_refresh: false +``` + +### With Pipeline Resource Reference (DABs) + +```yaml +resources: + pipelines: + my_pipeline: + name: "My Data Pipeline" + # ... pipeline config + + jobs: + my_job: + name: "Orchestrate Pipeline" + tasks: + - task_key: run_pipeline + pipeline_task: + pipeline_id: ${resources.pipelines.my_pipeline.id} +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, PipelineTask + +Task( + task_key="run_pipeline", + pipeline_task=PipelineTask( + pipeline_id="pipeline-uuid-here", + full_refresh=False + ) +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `pipeline_id` | Yes | ID of the pipeline to trigger | +| `full_refresh` | No | Force full refresh (default: false) | + +--- + +## Spark JAR Task + +Run Scala/Java JAR files on Spark. + +### DABs YAML + +```yaml +tasks: + - task_key: run_jar + spark_jar_task: + main_class_name: "com.example.Main" + parameters: + - "--env" + - "prod" + libraries: + - jar: /Volumes/catalog/schema/libs/my-app.jar +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, SparkJarTask + +Task( + task_key="run_jar", + spark_jar_task=SparkJarTask( + main_class_name="com.example.Main", + parameters=["--env", "prod"] + ), + libraries=[ + {"jar": "/Volumes/catalog/schema/libs/my-app.jar"} + ] +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `main_class_name` | Yes | Main class to execute | +| `parameters` | No | Command-line arguments | + +--- + +## Run Job Task + +Trigger another job as a task (job chaining). + +### DABs YAML + +```yaml +tasks: + - task_key: trigger_downstream + run_job_task: + job_id: 12345 + job_parameters: + source_table: "catalog.schema.table" +``` + +### With Job Resource Reference (DABs) + +```yaml +resources: + jobs: + upstream_job: + name: "Upstream Job" + tasks: + - task_key: process + notebook_task: + notebook_path: ../src/process.py + + downstream_job: + name: "Downstream Job" + tasks: + - task_key: trigger_upstream + run_job_task: + job_id: ${resources.jobs.upstream_job.id} +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, RunJobTask + +Task( + task_key="trigger_downstream", + run_job_task=RunJobTask( + job_id=12345, + job_parameters={"source_table": "catalog.schema.table"} + ) +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `job_id` | Yes | ID of job to trigger | +| `job_parameters` | No | Parameters to pass to triggered job | + +--- + +## For Each Task + +Loop over a collection and run a nested task for each item. + +### DABs YAML - Static Inputs + +```yaml +tasks: + - task_key: process_regions + for_each_task: + inputs: '["us-east", "us-west", "eu-west"]' + task: + task_key: process_region + notebook_task: + notebook_path: ../src/process_region.py + base_parameters: + region: "{{input}}" +``` + +### DABs YAML - Dynamic Inputs from Previous Task + +```yaml +tasks: + - task_key: generate_list + notebook_task: + notebook_path: ../src/generate_countries.py + + - task_key: process_countries + depends_on: + - task_key: generate_list + for_each_task: + inputs: "{{tasks.generate_list.values.countries}}" + task: + task_key: process_country + notebook_task: + notebook_path: ../src/process_country.py + base_parameters: + country: "{{input}}" +``` + +### Generate Dynamic Inputs + +In the generating notebook, return values using task values: + +```python +# generate_countries.py notebook +countries = ["USA", "UK", "Germany", "France"] + +# Set task value for downstream for_each_task +dbutils.jobs.taskValues.set(key="countries", value=countries) +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import Task, ForEachTask, NotebookTask + +Task( + task_key="process_regions", + for_each_task=ForEachTask( + inputs='["us-east", "us-west", "eu-west"]', + task=Task( + task_key="process_region", + notebook_task=NotebookTask( + notebook_path="/Workspace/process_region", + base_parameters={"region": "{{input}}"} + ) + ) + ) +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `inputs` | Yes | JSON array string or task value reference | +| `task` | Yes | Nested task to run for each input | +| `concurrency` | No | Max parallel iterations (default: 20) | + +### Access Current Item + +Inside the nested task, access the current item: +- In parameters: `{{input}}` +- In notebook: Use the parameter passed via `base_parameters` + +--- + +## Task Libraries + +Add libraries to tasks for dependencies. + +### DABs YAML + +```yaml +tasks: + - task_key: with_libraries + notebook_task: + notebook_path: ../src/notebook.py + libraries: + - pypi: + package: pandas==2.0.0 + - pypi: + package: scikit-learn + - whl: /Volumes/catalog/schema/libs/custom-1.0.0-py3-none-any.whl + - jar: /Volumes/catalog/schema/libs/custom.jar + - maven: + coordinates: "org.apache.spark:spark-avro_2.12:3.5.0" +``` + +### Library Types + +| Type | Format | Example | +|------|--------|---------| +| PyPI | `pypi.package` | `pandas==2.0.0` | +| Wheel | `whl` | Path to .whl file | +| JAR | `jar` | Path to .jar file | +| Maven | `maven.coordinates` | `group:artifact:version` | +| Egg | `egg` | Path to .egg file | + +--- + +## Environments + +Define reusable Python environments for tasks. + +### DABs YAML + +```yaml +environments: + - environment_key: ml_env + spec: + client: "1" + dependencies: + - pandas==2.0.0 + - scikit-learn==1.3.0 + - mlflow + +tasks: + - task_key: ml_task + environment_key: ml_env + notebook_task: + notebook_path: ../src/train_model.py +``` + +### Python SDK + +```python +from databricks.sdk.service.jobs import JobEnvironment, EnvironmentSpec + +environments = [ + JobEnvironment( + environment_key="ml_env", + spec=EnvironmentSpec( + client="1", + dependencies=["pandas==2.0.0", "scikit-learn==1.3.0"] + ) + ) +] +``` diff --git a/databricks-skills/databricks-jobs/triggers-schedules.md b/databricks-skills/databricks-jobs/triggers-schedules.md new file mode 100644 index 0000000..9022c71 --- /dev/null +++ b/databricks-skills/databricks-jobs/triggers-schedules.md @@ -0,0 +1,520 @@ +# Triggers and Schedules Reference + +## Contents +- [Cron Schedule](#cron-schedule) +- [Periodic Trigger](#periodic-trigger) +- [File Arrival Trigger](#file-arrival-trigger) +- [Table Update Trigger](#table-update-trigger) +- [Continuous Jobs](#continuous-jobs) +- [Manual Runs](#manual-runs) + +--- + +## Cron Schedule + +Run jobs on a cron-based schedule. + +### DABs YAML + +```yaml +resources: + jobs: + daily_etl: + name: "Daily ETL" + schedule: + quartz_cron_expression: "0 0 8 * * ?" # Daily at 8 AM + timezone_id: "America/New_York" + pause_status: UNPAUSED + tasks: + - task_key: etl + notebook_task: + notebook_path: ../src/etl.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import CronSchedule, PauseStatus + +w = WorkspaceClient() + +job = w.jobs.create( + name="Daily ETL", + schedule=CronSchedule( + quartz_cron_expression="0 0 8 * * ?", + timezone_id="America/New_York", + pause_status=PauseStatus.UNPAUSED + ), + tasks=[...] +) +``` + +### CLI JSON + +```json +{ + "name": "Daily ETL", + "schedule": { + "quartz_cron_expression": "0 0 8 * * ?", + "timezone_id": "America/New_York", + "pause_status": "UNPAUSED" + }, + "tasks": [...] +} +``` + +### Cron Expression Reference + +Format: `seconds minutes hours day-of-month month day-of-week` + +| Expression | Description | +|------------|-------------| +| `0 0 8 * * ?` | Daily at 8:00 AM | +| `0 0 8 * * MON-FRI` | Weekdays at 8:00 AM | +| `0 0 */2 * * ?` | Every 2 hours | +| `0 30 9 * * ?` | Daily at 9:30 AM | +| `0 0 0 1 * ?` | First day of month at midnight | +| `0 0 6 ? * MON` | Every Monday at 6:00 AM | +| `0 0 8 15 * ?` | 15th of each month at 8:00 AM | +| `0 0 8 L * ?` | Last day of month at 8:00 AM | + +### Common Timezones + +| Timezone ID | Description | +|-------------|-------------| +| `UTC` | Coordinated Universal Time | +| `America/New_York` | Eastern Time (US) | +| `America/Chicago` | Central Time (US) | +| `America/Denver` | Mountain Time (US) | +| `America/Los_Angeles` | Pacific Time (US) | +| `Europe/London` | British Time | +| `Europe/Paris` | Central European Time | +| `Asia/Tokyo` | Japan Standard Time | + +--- + +## Periodic Trigger + +Run jobs at fixed intervals (simpler than cron). + +### DABs YAML + +```yaml +resources: + jobs: + hourly_sync: + name: "Hourly Sync" + trigger: + pause_status: UNPAUSED + periodic: + interval: 1 + unit: HOURS + tasks: + - task_key: sync + notebook_task: + notebook_path: ../src/sync.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import TriggerSettings, Periodic, PeriodicTriggerConfigurationTimeUnit, PauseStatus + +w = WorkspaceClient() + +job = w.jobs.create( + name="Hourly Sync", + trigger=TriggerSettings( + pause_status=PauseStatus.UNPAUSED, + periodic=Periodic( + interval=1, + unit=PeriodicTriggerConfigurationTimeUnit.HOURS + ) + ), + tasks=[...] +) +``` + +### Interval Units + +| Unit | Description | +|------|-------------| +| `HOURS` | Run every N hours | +| `DAYS` | Run every N days | +| `WEEKS` | Run every N weeks | + +### Examples + +```yaml +# Every 30 minutes (not supported - use cron) +# Minimum periodic interval is 1 hour + +# Every 4 hours +trigger: + periodic: + interval: 4 + unit: HOURS + +# Every 2 days +trigger: + periodic: + interval: 2 + unit: DAYS + +# Weekly +trigger: + periodic: + interval: 1 + unit: WEEKS +``` + +--- + +## File Arrival Trigger + +Run jobs when new files arrive in cloud storage. + +### DABs YAML + +```yaml +resources: + jobs: + process_uploads: + name: "Process Uploads" + trigger: + pause_status: UNPAUSED + file_arrival: + url: "s3://my-bucket/uploads/" + min_time_between_triggers_seconds: 60 + wait_after_last_change_seconds: 30 + tasks: + - task_key: process + notebook_task: + notebook_path: ../src/process_files.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import TriggerSettings, FileArrivalTriggerConfiguration, PauseStatus + +w = WorkspaceClient() + +job = w.jobs.create( + name="Process Uploads", + trigger=TriggerSettings( + pause_status=PauseStatus.UNPAUSED, + file_arrival=FileArrivalTriggerConfiguration( + url="s3://my-bucket/uploads/", + min_time_between_triggers_seconds=60, + wait_after_last_change_seconds=30 + ) + ), + tasks=[...] +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `url` | Yes | Cloud storage URL to monitor | +| `min_time_between_triggers_seconds` | No | Minimum wait between triggers (default: 0) | +| `wait_after_last_change_seconds` | No | Wait time after last file change (default: 0) | + +### Supported URL Formats + +| Cloud | Format | Example | +|-------|--------|---------| +| AWS S3 | `s3://bucket/path/` | `s3://my-bucket/data/uploads/` | +| Azure ADLS | `abfss://container@account.dfs.core.windows.net/path/` | `abfss://data@myaccount.dfs.core.windows.net/uploads/` | +| GCS | `gs://bucket/path/` | `gs://my-bucket/uploads/` | +| Unity Catalog Volume | `/Volumes/catalog/schema/volume/path/` | `/Volumes/main/data/uploads/` | + +### Access File Information in Notebook + +```python +# The trigger provides file information via task context +import json + +# Get trigger info from job context +trigger_info = dbutils.jobs.taskValues.get( + taskKey="__trigger_info__", + key="file_arrival", + debugValue={} +) + +# Contains: url, files (list of new files) +print(f"New files: {trigger_info.get('files', [])}") +``` + +--- + +## Table Update Trigger + +Run jobs when Unity Catalog tables are updated. + +### DABs YAML + +```yaml +resources: + jobs: + process_updates: + name: "Process Table Updates" + trigger: + pause_status: UNPAUSED + table_update: + table_names: + - "catalog.schema.source_table" + - "catalog.schema.other_table" + condition: ANY_UPDATED + min_time_between_triggers_seconds: 300 + wait_after_last_change_seconds: 60 + tasks: + - task_key: process + notebook_task: + notebook_path: ../src/process_changes.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import ( + TriggerSettings, + TableUpdateTriggerConfiguration, + Condition, + PauseStatus +) + +w = WorkspaceClient() + +job = w.jobs.create( + name="Process Table Updates", + trigger=TriggerSettings( + pause_status=PauseStatus.UNPAUSED, + table_update=TableUpdateTriggerConfiguration( + table_names=["catalog.schema.source_table"], + condition=Condition.ANY_UPDATED, + min_time_between_triggers_seconds=300, + wait_after_last_change_seconds=60 + ) + ), + tasks=[...] +) +``` + +### Parameters + +| Parameter | Required | Description | +|-----------|----------|-------------| +| `table_names` | Yes | List of Unity Catalog tables to monitor | +| `condition` | No | `ANY_UPDATED` (default) - trigger when any table updates | +| `min_time_between_triggers_seconds` | No | Minimum wait between triggers | +| `wait_after_last_change_seconds` | No | Wait time after last change | + +### Requirements + +- Tables must be in Unity Catalog +- Job identity needs `SELECT` permission on monitored tables +- Works with Delta tables (managed and external) + +--- + +## Continuous Jobs + +Always-running jobs that automatically restart. + +### DABs YAML + +```yaml +resources: + jobs: + streaming_job: + name: "Streaming Processor" + continuous: + pause_status: UNPAUSED + tasks: + - task_key: stream + notebook_task: + notebook_path: ../src/streaming_processor.py +``` + +### Python SDK + +```python +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import Continuous, PauseStatus + +w = WorkspaceClient() + +job = w.jobs.create( + name="Streaming Processor", + continuous=Continuous( + pause_status=PauseStatus.UNPAUSED + ), + tasks=[...] +) +``` + +### Continuous Job Behavior + +- Job runs immediately when created/unpaused +- Automatically restarts after completion or failure +- Maintains one active run at a time +- Use `pause_status: PAUSED` to stop + +### Control Continuous Jobs + +```python +# Pause continuous job +w.jobs.update( + job_id=12345, + new_settings=JobSettings( + continuous=Continuous(pause_status=PauseStatus.PAUSED) + ) +) + +# Resume continuous job +w.jobs.update( + job_id=12345, + new_settings=JobSettings( + continuous=Continuous(pause_status=PauseStatus.UNPAUSED) + ) +) +``` + +--- + +## Manual Runs + +Run jobs on-demand without automatic triggers. + +### No Trigger Configuration + +Simply omit `schedule`, `trigger`, and `continuous`: + +```yaml +resources: + jobs: + manual_job: + name: "Manual Job" + # No schedule/trigger = manual only + tasks: + - task_key: run + notebook_task: + notebook_path: ../src/manual_task.py +``` + +### Trigger Manual Run + +**Python SDK:** +```python +# Run with default parameters +run = w.jobs.run_now(job_id=12345) + +# Run with custom parameters +run = w.jobs.run_now( + job_id=12345, + job_parameters={"env": "prod", "date": "2024-01-15"} +) + +# Wait for completion +run_result = w.jobs.run_now_and_wait(job_id=12345) +``` + +**CLI:** +```bash +# Run job +databricks jobs run-now 12345 + +# Run with parameters +databricks jobs run-now 12345 --job-params '{"env": "prod"}' +``` + +**DABs:** +```bash +databricks bundle run my_job_resource_key +``` + +--- + +## Combining Triggers + +A job can have multiple trigger types (evaluated independently): + +```yaml +resources: + jobs: + multi_trigger: + name: "Multi-Trigger Job" + # Cron schedule + schedule: + quartz_cron_expression: "0 0 6 * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + # Also trigger on file arrival + trigger: + pause_status: UNPAUSED + file_arrival: + url: "s3://my-bucket/urgent/" + tasks: + - task_key: process + notebook_task: + notebook_path: ../src/process.py +``` + +### Trigger Priority + +When multiple triggers fire simultaneously: +- Job queues runs if `max_concurrent_runs > 1` +- Otherwise, subsequent triggers are skipped while a run is active + +```yaml +max_concurrent_runs: 1 # Only one run at a time (default) +``` + +--- + +## Pause and Resume + +### Pause Scheduled Job + +```yaml +schedule: + quartz_cron_expression: "0 0 8 * * ?" + timezone_id: "UTC" + pause_status: PAUSED # Job won't run on schedule +``` + +### Pause via SDK + +```python +from databricks.sdk.service.jobs import JobSettings, CronSchedule, PauseStatus + +w.jobs.update( + job_id=12345, + new_settings=JobSettings( + schedule=CronSchedule( + quartz_cron_expression="0 0 8 * * ?", + timezone_id="UTC", + pause_status=PauseStatus.PAUSED + ) + ) +) +``` + +### Pause via CLI + +```bash +databricks jobs update 12345 --json '{ + "new_settings": { + "schedule": { + "pause_status": "PAUSED" + } + } +}' +``` diff --git a/databricks-skills/install_skills.sh b/databricks-skills/install_skills.sh index b731a9c..b1ec8e2 100755 --- a/databricks-skills/install_skills.sh +++ b/databricks-skills/install_skills.sh @@ -38,7 +38,7 @@ INSTALL_FROM_LOCAL=false SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # All available skills -ALL_SKILLS="asset-bundles databricks-app-apx databricks-app-python databricks-python-sdk mlflow-evaluation spark-declarative-pipelines synthetic-data-generation" +ALL_SKILLS="asset-bundles databricks-app-apx databricks-app-python databricks-jobs databricks-python-sdk mlflow-evaluation spark-declarative-pipelines synthetic-data-generation" # Get skill description get_skill_description() { @@ -46,6 +46,7 @@ get_skill_description() { "asset-bundles") echo "Databricks Asset Bundles - deployment and configuration" ;; "databricks-app-apx") echo "Databricks Apps with React/Next.js (APX framework)" ;; "databricks-app-python") echo "Databricks Apps with Python (Dash, Streamlit)" ;; + "databricks-jobs") echo "Databricks Lakeflow Jobs - workflow orchestration" ;; "databricks-python-sdk") echo "Databricks Python SDK, Connect, and REST API" ;; "mlflow-evaluation") echo "MLflow evaluation, scoring, and trace analysis" ;; "spark-declarative-pipelines") echo "Spark Declarative Pipelines (SDP/LDP/DLT)" ;; @@ -59,6 +60,7 @@ get_skill_extra_files() { case "$1" in "databricks-app-apx") echo "backend-patterns.md best-practices.md frontend-patterns.md" ;; "databricks-app-python") echo "dash.md streamlit.md README.md" ;; + "databricks-jobs") echo "task-types.md triggers-schedules.md notifications-monitoring.md examples.md" ;; "mlflow-evaluation") echo "references/CRITICAL-interfaces.md references/GOTCHAS.md references/patterns-context-optimization.md references/patterns-datasets.md references/patterns-evaluation.md references/patterns-scorers.md references/patterns-trace-analysis.md references/user-journeys.md" ;; "spark-declarative-pipelines") echo "1-ingestion-patterns.md 2-streaming-patterns.md 3-scd-patterns.md 4-performance-tuning.md 5-python-api.md 6-dlt-migration.md 7-advanced-configuration.md" ;; *) echo "" ;;