Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions acceptance/bundle/es-1617244/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
bundle:
name: test-bundle

variables:
server_events_workload_name:
default: "server-events-pipeline"

bundle_target:
default: "dev"

server_events_dev_icm_severity:
default: "3"

server_events_prod_icm_severity:
default: "2"

shared_slack_notification_destination:
default: "slack-webhook-id-12345"

jar_exportjobssafety:
default: "safety-export-jobs-1.0.0.jar"

server_events_jar_base_path:
default: "dbfs:/mnt/artifacts/server-events/jars"

jar_dataquality:
default: "data-quality-framework-1.2.0.jar"

data_quality_jar_base_path:
default: "dbfs:/mnt/artifacts/data-quality/jars"

spark_version:
default: "13.3.x-scala2.12"

spark_pool_d8ds_v5_id:
default: "pool-1234567890abcdef"

databricks_uami_service_principal:
default: "[email protected]"

# Safety Pipeline Jobs with Asynchronous Workflow (ADB Native)
# =============================================================
#
# WORKFLOW DESIGN:
# 1. safety_augment_workflow: Runs hourly at :00 (15-20 mins)
# - Augmentation + quality check tasks
# - Runs in parallel with messages_augmented_workflow
# 2. safety_scrub: Independent scrubbing job (triggered by scrub_orchestrator)
# - Runs in queue if previous scrub still active
# - Joins safety_data with fact_messages_augmented_scrubbed
resources:
jobs:
# Main Workflow: Hourly Augment + Quality + Async Scrub Trigger
safety_augment_workflow:
name: "${var.server_events_workload_name}-safety-augment-${var.bundle_target}"
description: "Hourly augment+QC workflow (runs in parallel with messages)"
schedule:
quartz_cron_expression: "0 0 * * * ?" # Top of every hour
timezone_id: "UTC"
pause_status: UNPAUSED
parameters: # Workflow-level parameter to share startDateTime across all tasks
- name: start_datetime
default: "{{job.trigger.time.iso_datetime}}"
- name: severity
default: "${var.server_events_${var.bundle_target}_icm_severity}"
- name: title
default: "Safety Augment Workflow Failure"
- name: summary
default: "Safety augment workflow failed - requires investigation"
- name: description
default: "The safety augment workflow encountered a failure during processing. This may affect hourly safety data augmentation and downstream processes."
- name: correlationId
default: "icmConnect://SAFETY-AUGMENT-{{job.start_time.iso_datetime}}"
webhook_notifications:
on_failure:
- id: "${var.shared_slack_notification_destination}"
max_concurrent_runs: 3
tasks:
# Task 1: Safety Data Augmentation
- task_key: safety_augmented
spark_jar_task:
jar_uri: ""
main_class_name: com.microsoft.dfw.safety.SafetyDataAugmented
parameters:
- --env
- "${var.bundle_target}"
- --startDateTime
- "{{job.parameters.start_datetime}}" # Use workflow parameter
- --window
- HOUR
run_as_repl: true
job_cluster_key: safety_augment_cluster
libraries:
- jar: "${var.server_events_jar_base_path}/${var.jar_exportjobssafety}"
# Task 2: Data Quality Check (runs after augmentation)
- task_key: safety_data_quality
depends_on:
- task_key: safety_augmented
spark_jar_task:
jar_uri: ""
main_class_name: com.microsoft.dfw.dataqc.dataquality.Main
parameters:
- --env
- "${var.bundle_target}"
- --job
- SafetyDataQuality
- --start
- "{{job.parameters.start_datetime}}"
- --config-file
- data_quality_adb.conf
- --window
- HOUR
run_as_repl: true
job_cluster_key: safety_augment_cluster
libraries:
- jar: "${var.data_quality_jar_base_path}/${var.jar_dataquality}"
# Task 3: Create ICM on any task failure
- task_key: create_icm_on_failure
depends_on:
- task_key: safety_augmented
- task_key: safety_data_quality
run_if: AT_LEAST_ONE_FAILED
job_cluster_key: icm_minimal_cluster
spark_jar_task:
main_class_name: com.microsoft.dfw.common.icm.ICMApi
parameters:
- --title
- "{{job.parameters.title}}"
- --severity
- "{{job.parameters.severity}}"
- --desc
- "{{job.parameters.description}}"
- --summary
- "{{job.parameters.summary}}"
- --correlationId
- "{{job.parameters.correlationId}}"
- --env
- "${var.bundle_target}"
libraries:
- jar: "${var.data_quality_jar_base_path}/${var.jar_dataquality}"
job_clusters:
- job_cluster_key: safety_augment_cluster
new_cluster:
cluster_name: ""
spark_version: "${var.spark_version}"
instance_pool_id: "${var.spark_pool_d8ds_v5_id}"
driver_instance_pool_id: "${var.spark_pool_d8ds_v5_id}"
autoscale:
min_workers: 2 # Smaller for JAR tasks (15-20 min runtime)
max_workers: 12 # Moderate scale for quick completion
custom_tags:
COMPUTE_PLATFORM: Databricks
workload: "safety-augment-workflow"
team: "data-flywheel"
environment: "${var.bundle_target}"
spark_env_vars:
COMPUTE_PLATFORM: Databricks
spark_conf:
"spark.sql.session.timeZone": "UTC"
"spark.sql.adaptive.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer.max": "1024m"
"spark.sql.shuffle.partitions": "3000"
"spark.sql.files.maxPartitionBytes": "128m"
"spark.databricks.unityCatalog.enabled": "true"
"spark.rdd.compress": "true"
"spark.speculation": "false"
# Optimized for 15-20 min JAR task completion
"spark.executor.cores": "8"
"spark.executor.memory": "16G"
"spark.executor.memoryOverhead": "4G"
enable_elastic_disk: true
data_security_mode: SINGLE_USER
runtime_engine: PHOTON
kind: CLASSIC_PREVIEW
is_single_node: false
- job_cluster_key: icm_minimal_cluster
new_cluster:
spark_version: "${var.spark_version}"
node_type_id: "Standard_D4ds_v5"
num_workers: 0 # single node
data_security_mode: "SINGLE_USER"
runtime_engine: "STANDARD"
enable_elastic_disk: false
custom_tags:
COMPUTE_PLATFORM: "Databricks"
workload: "create-icm-on-failure"
team: "data-flywheel"
environment: "${var.bundle_target}"
queue:
enabled: true
run_as:
service_principal_name: "${var.databricks_uami_service_principal}"
# Independent Scrub Job: Joins safety_data with fact_messages_augmented_scrubbed
safety_scrub:
name: "${var.server_events_workload_name}-safety-augmented-scrub-${var.bundle_target}"
description: "Safety data scrubbing job using fact_messages_augmented_scrubbed (Scala/Spark)"
# No schedule - triggered by workflow above
parameters: # Declare job-level parameters so run_job_task overrides can bind
- name: start_datetime # Provided by upstream workflow via run_job_task
default: "{{job.trigger.time.iso_datetime}}" # Fallback to scrub job's trigger time
webhook_notifications:
on_failure:
- id: "${var.shared_slack_notification_destination}"
max_concurrent_runs: 2 # QUEUE: Only two scrubs at a time, others wait
tasks:
- task_key: safety_data_scrubbed
spark_jar_task:
jar_uri: ""
main_class_name: com.microsoft.dfw.safety.SafetyDataScrubbed
parameters:
- --env
- "${var.bundle_target}"
- --startDateTime
- "{{job.parameters.start_datetime}}" # From augment workflow trigger
- --window
- HOUR
run_as_repl: true
job_cluster_key: safety_scrubber_cluster
libraries:
- jar: "${var.server_events_jar_base_path}/${var.jar_exportjobssafety}"
job_clusters:
- job_cluster_key: safety_scrubber_cluster
new_cluster:
cluster_name: ""
spark_version: "${var.spark_version}"
instance_pool_id: "${var.spark_pool_d8ds_v5_id}"
driver_instance_pool_id: "${var.spark_pool_d8ds_v5_id}"
autoscale:
min_workers: 2 # Reduced for JAR task joining tables
max_workers: 12 # Moderate scale for join operations
custom_tags:
COMPUTE_PLATFORM: Databricks
workload: "safety-scrub"
team: "data-flywheel"
environment: "${var.bundle_target}"
spark_env_vars:
COMPUTE_PLATFORM: Databricks
spark_conf:
"spark.sql.session.timeZone": "UTC"
"spark.sql.adaptive.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer.max": "1024m"
"spark.sql.shuffle.partitions": "3000"
"spark.sql.files.maxPartitionBytes": "128m"
"spark.databricks.unityCatalog.enabled": "true"
"spark.rdd.compress": "true"
"spark.speculation": "false"
# Optimized for JAR task with joins
"spark.executor.cores": "8"
"spark.executor.memory": "16G"
"spark.executor.memoryOverhead": "4G"
enable_elastic_disk: true
data_security_mode: SINGLE_USER
runtime_engine: PHOTON
kind: CLASSIC_PREVIEW
is_single_node: false
queue:
enabled: true # IMPORTANT: Enables queueing when max_concurrent_runs=2
run_as:
service_principal_name: "${var.databricks_uami_service_principal}"
22 changes: 22 additions & 0 deletions acceptance/bundle/es-1617244/out.deploy1.direct-exp.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

>>> errcode [CLI] bundle deploy
Warning: Single node cluster is not correctly configured
at resources.jobs.safety_augment_workflow.job_clusters[1].new_cluster
in databricks.yml:180:13

num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:

spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]

custom_tags:
ResourceClass: SingleNode


Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!
40 changes: 40 additions & 0 deletions acceptance/bundle/es-1617244/out.deploy1.terraform.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

>>> errcode [CLI] bundle deploy
Warning: Single node cluster is not correctly configured
at resources.jobs.safety_augment_workflow.job_clusters[1].new_cluster
in databricks.yml:180:13

num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:

spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]

custom_tags:
ResourceClass: SingleNode


Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Deploying resources...
Error: terraform apply: exit status 1

Error: cannot read job: panic: runtime error: invalid memory address or nil pointer dereference

with databricks_job.safety_augment_workflow,
on bundle.tf.json line 220, in resource.databricks_job.safety_augment_workflow:
220: },


Error: cannot read job: panic: runtime error: invalid memory address or nil pointer dereference

with databricks_job.safety_scrub,
on bundle.tf.json line 318, in resource.databricks_job.safety_scrub:
318: }



Updating deployment state...

Exit code: 1
37 changes: 37 additions & 0 deletions acceptance/bundle/es-1617244/out.deploy2.direct-exp.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

>>> errcode [CLI] bundle deploy
Warning: Single node cluster is not correctly configured
at resources.jobs.safety_augment_workflow.job_clusters[1].new_cluster
in databricks.yml:180:13

num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:

spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]

custom_tags:
ResourceClass: SingleNode


Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Error: cannot plan resources.jobs.safety_scrub: failed to read id="[NUMID]": Internal Server Error (500)

Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID]
HTTP Status: 500 Internal Server Error
API error_code:
API message: Internal Server Error

Error: cannot plan resources.jobs.safety_augment_workflow: failed to read id="[NUMID]": Internal Server Error (500)

Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID]
HTTP Status: 500 Internal Server Error
API error_code:
API message: Internal Server Error

Error: planning failed


Exit code: 1
Loading
Loading