diff --git a/build.sbt b/build.sbt index f2c2e63cb..900f8ad3a 100644 --- a/build.sbt +++ b/build.sbt @@ -10,8 +10,8 @@ scalacOptions ++= Seq("-Xmax-classfile-name", "78") Test / fork := true Test / envVars := Map("OVERWATCH_ENV" -> " ","OVERWATCH_TOKEN" -> " ","OVERWATCH" -> " ") -logBuffered in Test := false -// parallelExecution in Test := false +Test / logBuffered := false +// Test / parallelExecution := false val sparkVersion = "3.1.2" libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided diff --git a/src/main/scala/com/databricks/labs/overwatch/env/Database.scala b/src/main/scala/com/databricks/labs/overwatch/env/Database.scala index f5ce0d68b..b66b5b96f 100644 --- a/src/main/scala/com/databricks/labs/overwatch/env/Database.scala +++ b/src/main/scala/com/databricks/labs/overwatch/env/Database.scala @@ -272,7 +272,7 @@ class Database(config: Config) extends SparkSessionWrapper { val explicitDatePartitionCondition = if (datePartitionFields.nonEmpty & maxMergeScanDates.nonEmpty) { s" AND target.${datePartitionFields.get} in (${maxMergeScanDates.mkString("'", "', '", "'")})" } else "" - val mergeCondition: String = immutableColumns.map(k => s"updates.$k = target.$k").mkString(" AND ") + " " + + val mergeCondition: String = immutableColumns.map(k => s"updates.$k <=> target.$k").mkString(" AND ") + " " + s"AND target.organization_id = '${config.organizationId}'" + // force partition filter for concurrent merge explicitDatePartitionCondition // force right side scan to only scan relevant dates diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/ETLDefinition.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/ETLDefinition.scala index 02b0705dd..abe7b9c8a 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/ETLDefinition.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/ETLDefinition.scala @@ -27,6 +27,7 @@ class ETLDefinition( val transformedDF = transforms.foldLeft(verifiedSourceDF) { case (df, transform) => + /* * reverting Spark UI Job Group labels for now * @@ -37,8 +38,9 @@ class ETLDefinition( * s"${module.pipeline.config.workspaceName}:${module.moduleName}", * transform.toString) */ - - df.transform( transform) + + df.transform( transform) + } write(transformedDF, module) } diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala index 9605e2706..f2e7582ee 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala @@ -78,6 +78,7 @@ trait GoldTransforms extends SparkSessionWrapper { 'git_source, 'timeout_seconds, 'max_concurrent_runs, + 'queue, 'max_retries, 'retry_on_timeout, 'min_retry_interval_millis, @@ -141,8 +142,6 @@ trait GoldTransforms extends SparkSessionWrapper { 'min_retry_interval_millis, 'max_concurrent_runs, 'run_as_user_name, -// 'children, -// 'workflow_children, 'workflow_context, 'task_detail_legacy, 'submitRun_details, @@ -150,8 +149,9 @@ trait GoldTransforms extends SparkSessionWrapper { 'last_edited_by, 'requestDetails.alias("request_detail"), 'timeDetails.alias("time_detail"), - 'startEpochMS - ) + 'startEpochMS, + 'startTaskEpochMS) + jobRunsLag30D .select(jobRunCols: _*) } @@ -1053,7 +1053,7 @@ trait GoldTransforms extends SparkSessionWrapper { protected val jobViewColumnMapping: String = """ |organization_id, workspace_name, job_id, action, date, timestamp, job_name, tags, tasks, job_clusters, - |libraries, timeout_seconds, max_concurrent_runs, max_retries, retry_on_timeout, min_retry_interval_millis, + |libraries, timeout_seconds, max_concurrent_runs, queue, max_retries, retry_on_timeout, min_retry_interval_millis, |schedule, existing_cluster_id, new_cluster, git_source, task_detail_legacy, is_from_dlt, aclPermissionSet, |targetUserId, session_id, request_id, user_agent, response, source_ip_address, created_by, created_ts, |deleted_by, deleted_ts, last_edited_by, last_edited_ts @@ -1065,7 +1065,7 @@ trait GoldTransforms extends SparkSessionWrapper { |task_run_id, repair_id, task_key, cluster_type, cluster_id, cluster_name, job_cluster_key, job_cluster, |new_cluster, tags, task_detail, task_dependencies, task_runtime, task_execution_runtime, task_type, |terminal_state, job_trigger_type, schedule, libraries, manual_override_params, repair_details, timeout_seconds, - |retry_on_timeout, max_retries, min_retry_interval_millis, max_concurrent_runs, run_as_user_name, parent_run_id, + |retry_on_timeout, max_retries, min_retry_interval_millis, max_concurrent_runs, queue, run_as_user_name, parent_run_id, |workflow_context, task_detail_legacy, submitRun_details, created_by, last_edited_by, request_detail, time_detail |""".stripMargin diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala index 39f3adb16..5cc531a88 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala @@ -321,10 +321,15 @@ abstract class PipelineTargets(config: Config) { lazy private[overwatch] val dbJobRunsTarget: PipelineTable = PipelineTable( name = "jobrun_silver", - _keys = Array("runId", "startEpochMS"), + _keys = Array( + "runId", + "startEpochMS", // was incorrectly equal to `$"timeDetails.startTime"` through 0.8.1.2 + // via incorrect expression for `'TaskRunTime`; now `$"timeDetails.submissionTime"` + "startTaskEpochMS" // added to make key for task runs complete; can be null + ), config, _mode = WriteMode.merge, - incrementalColumns = Array("startEpochMS"), // don't load into gold until run is terminated + incrementalColumns = Array("startEpochMS"), zOrderBy = Array("runId", "jobId"), partitionBy = Seq("organization_id", "__overwatch_ctrl_noise"), persistBeforeWrite = true, diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Schema.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Schema.scala index fa7577f06..bffee9414 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/Schema.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Schema.scala @@ -136,6 +136,8 @@ object Schema extends SparkSessionWrapper { StructField("name", StringType, nullable = true), StructField("timeout_seconds", LongType, nullable = true), StructField("max_concurrent_runs", LongType, nullable = true), + StructField("queue", StringType, nullable = true), + StructField("all_queued_runs", BooleanType, nullable = true), StructField("max_retries", LongType, nullable = true), StructField("retry_on_timeout", BooleanType, nullable = true), StructField("min_retry_interval_millis", LongType, nullable = true), @@ -464,6 +466,9 @@ object Schema extends SparkSessionWrapper { StructField("timezone_id", StringType, true) )) + val minimumQueueSchema: StructType = StructType(Seq( + StructField("enabled", BooleanType, nullable = true))) + val minimumEmailNotificationsSchema: StructType = StructType(Seq( StructField("no_alert_for_skipped_runs", BooleanType, nullable = true), StructField("on_failure", ArrayType(StringType, containsNull = true), nullable = true) @@ -502,20 +507,21 @@ object Schema extends SparkSessionWrapper { ))) // minimum new jobs settings struct - val minimumNewSettingsSchema: StructType = StructType(Seq( - StructField("existing_cluster_id", StringType, nullable = true), - StructField("max_concurrent_runs", LongType, nullable = true), - StructField("name", StringType, nullable = true), - StructField("new_cluster", minimumNewClusterSchema, nullable = true), - StructField("timeout_seconds", LongType, nullable = true), - StructField("notebook_task", minimumNotebookTaskSchema, nullable = true), - StructField("spark_python_task", minimumSparkPythonTaskSchema, nullable = true), - StructField("python_wheel_task", minimumPythonWheelTaskSchema, nullable = true), - StructField("spark_jar_task", minimumSparkJarTaskSchema, nullable = true), - StructField("spark_submit_task", minimumSparkSubmitTaskSchema, nullable = true), - StructField("shell_command_task", minimumShellCommandTaskSchema, nullable = true), - StructField("pipeline_task", minimumPipelineTaskSchema, nullable = true), - )) + // (not used; no references) + // val minimumNewSettingsSchema: StructType = StructType(Seq( + // StructField("existing_cluster_id", StringType, nullable = true), + // StructField("max_concurrent_runs", LongType, nullable = true), + // StructField("name", StringType, nullable = true), + // StructField("new_cluster", minimumNewClusterSchema, nullable = true), + // StructField("timeout_seconds", LongType, nullable = true), + // StructField("notebook_task", minimumNotebookTaskSchema, nullable = true), + // StructField("spark_python_task", minimumSparkPythonTaskSchema, nullable = true), + // StructField("python_wheel_task", minimumPythonWheelTaskSchema, nullable = true), + // StructField("spark_jar_task", minimumSparkJarTaskSchema, nullable = true), + // StructField("spark_submit_task", minimumSparkSubmitTaskSchema, nullable = true), + // StructField("shell_command_task", minimumShellCommandTaskSchema, nullable = true), + // StructField("pipeline_task", minimumPipelineTaskSchema, nullable = true), + // )) val minimumJobStatusSilverMetaLookupSchema: StructType = StructType(Seq( StructField("organization_id", StringType, nullable = false), @@ -527,6 +533,7 @@ object Schema extends SparkSessionWrapper { StructField("tags", MapType(StringType, StringType, valueContainsNull = true), nullable = true), StructField("schedule", minimumScheduleSchema, nullable = true), StructField("max_concurrent_runs", LongType, nullable = true), + StructField("queue", minimumQueueSchema, nullable = true), StructField("run_as_user_name", StringType, nullable = true), StructField("timeout_seconds", LongType, nullable = true), StructField("created_by", StringType, nullable = true), @@ -535,23 +542,24 @@ object Schema extends SparkSessionWrapper { )) // simplified new settings struct - private[overwatch] val simplifiedNewSettingsSchema = StructType(Seq( - StructField("email_notifications", minimumEmailNotificationsSchema, nullable = true), - StructField("existing_cluster_id", StringType, nullable = true), - StructField("max_concurrent_runs", LongType, nullable = true), - StructField("name", StringType, nullable = true), - StructField("new_cluster", minimumNewClusterSchema, nullable = true), - StructField("notebook_task", minimumNotebookTaskSchema, nullable = true), - StructField("schedule", minimumScheduleSchema, nullable = true), - StructField("notebook_task",minimumNotebookTaskSchema, nullable = true), - StructField("spark_python_task",minimumSparkPythonTaskSchema, nullable = true), - StructField("python_wheel_task", minimumPythonWheelTaskSchema, nullable = true), - StructField("spark_jar_task",minimumSparkJarTaskSchema, nullable = true), - StructField("spark_submit_task", minimumSparkSubmitTaskSchema, nullable = true), - StructField("shell_command_task",minimumShellCommandTaskSchema, nullable = true), - StructField("pipeline_task", minimumPipelineTaskSchema, nullable = true), - StructField("timeout_seconds", LongType, nullable = true) - )) + // (not used; no references) + // private[overwatch] val simplifiedNewSettingsSchema = StructType(Seq( + // StructField("email_notifications", minimumEmailNotificationsSchema, nullable = true), + // StructField("existing_cluster_id", StringType, nullable = true), + // StructField("max_concurrent_runs", LongType, nullable = true), + // StructField("name", StringType, nullable = true), + // StructField("new_cluster", minimumNewClusterSchema, nullable = true), + // StructField("notebook_task", minimumNotebookTaskSchema, nullable = true), + // StructField("schedule", minimumScheduleSchema, nullable = true), + // StructField("notebook_task",minimumNotebookTaskSchema, nullable = true), + // StructField("spark_python_task",minimumSparkPythonTaskSchema, nullable = true), + // StructField("python_wheel_task", minimumPythonWheelTaskSchema, nullable = true), + // StructField("spark_jar_task",minimumSparkJarTaskSchema, nullable = true), + // StructField("spark_submit_task", minimumSparkSubmitTaskSchema, nullable = true), + // StructField("shell_command_task",minimumShellCommandTaskSchema, nullable = true), + // StructField("pipeline_task", minimumPipelineTaskSchema, nullable = true), + // StructField("timeout_seconds", LongType, nullable = true) + // )) val streamingGoldMinimumSchema: StructType = StructType(Seq( StructField("cluster_id", StringType, nullable = false), @@ -717,6 +725,7 @@ object Schema extends SparkSessionWrapper { StructField("libraries", minimumLibrariesSchema, nullable = true), StructField("git_source", minimumGitSourceSchema, nullable = true), StructField("max_concurrent_runs", LongType, nullable = true), + StructField("queue", minimumQueueSchema, nullable = true), StructField("max_retries", LongType, nullable = true), StructField("timeout_seconds", LongType, nullable = true), StructField("retry_on_timeout", BooleanType, nullable = true), @@ -846,6 +855,7 @@ object Schema extends SparkSessionWrapper { StructField("git_source", minimumGitSourceSchema, nullable = true), StructField("timeout_seconds", LongType, nullable = true), StructField("max_concurrent_runs", LongType, nullable = true), + StructField("queue", minimumQueueSchema, nullable = true), StructField("max_retries", LongType, nullable = true), StructField("retry_on_timeout", BooleanType, nullable = true), StructField("min_retry_interval_millis", LongType, nullable = true), @@ -877,6 +887,7 @@ object Schema extends SparkSessionWrapper { StructField("jobId", LongType, nullable = true), StructField("runId", LongType, nullable = false), StructField("startEpochMS", LongType, nullable = false), + StructField("startTaskEpochMS", LongType, nullable = false), StructField("jobName", StringType, nullable = true), StructField("tags", MapType(StringType, StringType), nullable = true), StructField("jobRunId", LongType, nullable = true), @@ -908,6 +919,7 @@ object Schema extends SparkSessionWrapper { StructField("max_retries", LongType, nullable = true), StructField("min_retry_interval_millis", LongType, nullable = true), StructField("max_concurrent_runs", LongType, nullable = true), + StructField("queue", minimumQueueSchema, nullable = true), StructField("run_as_user_name", StringType, nullable = true), StructField("workflow_context", StringType, nullable = true), StructField("task_detail_legacy", minimumTaskDetailSchema, nullable = true), diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala index aaac41224..d88c35369 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame} + trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax[ SparkSessionWrapper] { import TransformationDescriber._ @@ -1323,6 +1324,7 @@ trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax[ SparkSe NamedColumn("tasks", lit(null).cast(Schema.minimumTasksSchema)), NamedColumn("job_clusters", lit(null).cast(Schema.minimumJobClustersSchema)), NamedColumn("libraries", lit(null).cast(Schema.minimumLibrariesSchema)), + NamedColumn("queue", lit(null).cast(Schema.minimumQueueSchema)), NamedColumn("access_control_list", lit(null).cast(Schema.minimumAccessControlListSchema)), NamedColumn("grants", lit(null).cast(Schema.minimumGrantsSchema)), ) @@ -1398,81 +1400,173 @@ trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax[ SparkSe "runSucceeded", "runFailed", "runTriggered", "runNow", "runStart", "submitRun", "cancel", "repairRun" ) val optimalCacheParts = Math.min(daysToProcess * getTotalCores * 2, 1000) - val jobRunsLag30D = getJobsBase(auditLogLag30D) - .filter('actionName.isin(jobRunActions: _*)) - .repartition(optimalCacheParts) - .cache() // cached df removed at end of module run - // eagerly force this highly reused DF into cache() - jobRunsLag30D.count() + logger.log( Level.INFO, s"optimalCacheParts: ${optimalCacheParts}") - - // TODO: remove or comment out or change log level or . . . + val filterAuditLog = NamedTransformation { + (df: DataFrame) => { + val cachedActions = getJobsBase( df) + .filter('actionName.isin( jobRunActions: _*)) + .repartition( optimalCacheParts) + .cache() // cached df removed at end of module run - logger.log( Level.INFO, "Showing first 5 rows of `jobRunsLag30D`:") + // eagerly force this highly reused DF into cache() + val n = cachedActions.count() - jobRunsLag30D - .showLines(5, 20, true) - .foreach( logger.log( Level.INFO, _)) + logger.log( Level.INFO, s"jobRunsLag30D count: ${n}") + logger.log( Level.INFO, s"jobRunsLag30D.rdd.partitions.size: ${cachedActions.rdd.partitions.size}") - // Lookup to populate the clusterID/clusterName where missing from jobs - lazy val clusterSpecNameLookup = clusterSpec.asDF - .select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId")) - .filter('clusterId.isNotNull && 'cluster_name.isNotNull) + cachedActions + } + } - // Lookup to populate the clusterID/clusterName where missing from jobs - lazy val clusterSnapNameLookup = clusterSnapshot.asDF - .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) - .select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId")) - .filter('clusterId.isNotNull && 'cluster_name.isNotNull) + val jobRunsLag30D = auditLogLag30D.transformWithDescription( filterAuditLog) - // Lookup to populate the existing_cluster_id where missing from jobs -- it can be derived from name - lazy val jobStatusMetaLookup = jobsStatus.asDF - .verifyMinimumSchema(Schema.minimumJobStatusSilverMetaLookupSchema) - .select( - 'organization_id, - 'timestamp, - 'jobId, - 'jobName, - to_json('tags).alias("tags"), - 'schedule, - 'max_concurrent_runs, - 'run_as_user_name, - 'timeout_seconds, - 'created_by, - 'last_edited_by, - to_json('tasks).alias("tasks"), - to_json('job_clusters).alias("job_clusters"), - to_json($"task_detail_legacy.notebook_task").alias("notebook_task"), - to_json($"task_detail_legacy.spark_python_task").alias("spark_python_task"), - to_json($"task_detail_legacy.python_wheel_task").alias("python_wheel_task"), - to_json($"task_detail_legacy.spark_jar_task").alias("spark_jar_task"), - to_json($"task_detail_legacy.spark_submit_task").alias("spark_submit_task"), - to_json($"task_detail_legacy.shell_command_task").alias("shell_command_task"), - to_json($"task_detail_legacy.pipeline_task").alias("pipeline_task") - ) + /* + * keep original usage example of `.showLines()` for reference + * + * logger.log( Level.INFO, "Showing first 5 rows of `jobRunsLag30D`:") + * jobRunsLag30D + * .showLines(5, 20, true) + * .foreach( logger.log( Level.INFO, _)) + * + */ - lazy val jobSnapNameLookup = jobsSnapshot.asDF - .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) - .select('organization_id, 'timestamp, 'job_id.alias("jobId"), $"settings.name".alias("jobName")) - val jobRunsLookups = jobRunsInitializeLookups( - (clusterSpec, clusterSpecNameLookup), - (clusterSnapshot, clusterSnapNameLookup), - (jobsStatus, jobStatusMetaLookup), - (jobsSnapshot, jobSnapNameLookup) - ) + /** + * Look up the cluster_name based on id first from + * `job_status_silver`. If not present there fallback to latest + * snapshot prior to the run + */ + + val jobRunsAppendClusterName = NamedTransformation { + (df: DataFrame) => { + + val key = Seq( "organization_id", "clusterId") + + lazy val clusterSpecNameLookup = clusterSpec.asDF + .select( + 'organization_id, + 'timestamp, + 'cluster_name, + 'cluster_id.alias("clusterId")) + .filter( + 'clusterId.isNotNull + && 'cluster_name.isNotNull) + .toTSDF( "timestamp", key:_*) + + lazy val clusterSnapNameLookup = clusterSnapshot.asDF + .select( + // .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) + 'organization_id, + ( unix_timestamp('Pipeline_SnapTS) * lit(1000)).alias( "timestamp"), + 'cluster_name, + 'cluster_id.alias( "clusterId")) + .filter( + 'clusterId.isNotNull + && 'cluster_name.isNotNull) + .toTSDF( "timestamp", key:_*) + + df.toTSDF( "timestamp", key:_*) + .lookupWhen( clusterSpecNameLookup) + .lookupWhen( clusterSnapNameLookup) + .df + } + } + + /** + * looks up the job name based on id first from job_status_silver + * and if not present there fallback to latest snapshot prior to + * the run + */ + + val jobRunsAppendJobMeta = NamedTransformation { + (df: DataFrame) => { + + val key = Seq( "organization_id", "jobId") + + lazy val jobStatusMetaLookup = jobsStatus.asDF + .verifyMinimumSchema( + Schema.minimumJobStatusSilverMetaLookupSchema) + .select( + 'organization_id, + 'timestamp, + 'jobId, + 'jobName, + to_json('tags).alias("tags"), + 'schedule, + 'max_concurrent_runs, + 'queue, + 'run_as_user_name, + 'timeout_seconds, + 'created_by, + 'last_edited_by, + to_json( 'tasks).alias("tasks"), + to_json( 'job_clusters).alias("job_clusters"), + to_json( $"task_detail_legacy.notebook_task").alias( "notebook_task"), + to_json( $"task_detail_legacy.spark_python_task").alias( "spark_python_task"), + to_json( $"task_detail_legacy.python_wheel_task").alias( "python_wheel_task"), + to_json( $"task_detail_legacy.spark_jar_task").alias( "spark_jar_task"), + to_json( $"task_detail_legacy.spark_submit_task").alias( "spark_submit_task"), + to_json( $"task_detail_legacy.shell_command_task").alias( "shell_command_task"), + to_json( $"task_detail_legacy.pipeline_task").alias( "pipeline_task")) + .toTSDF( "timestamp", key:_*) + + lazy val jobSnapshotNameLookup = jobsSnapshot.asDF + // .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) + .select( + 'organization_id, + ( unix_timestamp( 'Pipeline_SnapTS) * lit( 1000)).alias( "timestamp"), + 'job_id.alias("jobId"), + $"settings.name".alias("jobName")) + .toTSDF( "timestamp", key:_*) + + df.toTSDF( "timestamp", key:_*) + .lookupWhen( jobStatusMetaLookup) + .lookupWhen( jobSnapshotNameLookup) + .df + .withColumn( "jobName", + coalesce('jobName, 'run_name)) + .withColumn( "tasks", + coalesce('tasks, 'submitRun_tasks)) + .withColumn( "job_clusters", + coalesce('job_clusters, 'submitRun_job_clusters)) + + // the following is only valid in Spark > 3.1.2, apparently + // (it compiles with 3.3.0) + + // .withColumns( Map( + // "jobName" + // -> coalesce('jobName, 'run_name), + // "tasks" + // -> coalesce('tasks, 'submitRun_tasks), + // "job_clusters" + // -> coalesce('job_clusters, 'submitRun_job_clusters))) + } + } + + val cancelAllQueuedRunsIntervals = + jobRunsLag30D.transform( jobRunsDeriveCancelAllQueuedRunsIntervals) // caching before structifying - jobRunsDeriveRunsBase(jobRunsLag30D, etlUntilTime) - .transformWithDescription( - jobRunsAppendClusterName( jobRunsLookups)) - .transform(jobRunsAppendJobMeta(jobRunsLookups)) - .transform(jobRunsStructifyLookupMeta(optimalCacheParts)) - .transform(jobRunsAppendTaskAndClusterDetails) - .transform(jobRunsCleanseCreatedNestedStructures(targetKeys)) - // .transform(jobRunsRollupWorkflowsAndChildren) - .drop("timestamp") // could be duplicated to enable asOf Lookups, dropping to clean up + val cachedDF = jobRunsLag30D + .transformWithDescription( jobRunsDeriveRunsBase( etlUntilTime)) + .transformWithDescription( jobRunsAppendClusterName) + .transformWithDescription( jobRunsAppendJobMeta) + .transform( jobRunsCancelAllQueuedRuns( cancelAllQueuedRunsIntervals)) + .repartition( optimalCacheParts) + .cache() // to speed up schema inference while "structifying" next + + logger.log( Level.INFO, s"cachedDF count before structifying: ${cachedDF.count()}") + logger.log( Level.INFO, s"cachedDF.rdd.partitions.size: ${cachedDF.rdd.partitions.size}") + + cachedDF + .transformWithDescription( jobRunsStructifyLookupMeta) + .transformWithDescription( jobRunsAppendTaskAndClusterDetails) + .transformWithDescription( jobRunsCleanseCreatedNestedStructures( targetKeys)) + .drop("timestamp") + // `timestamp` could be duplicated to enable `asOf` lookups; + // dropping to clean up } protected def notebookSummary()(df: DataFrame): DataFrame = { diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala index b44df97c7..eec5ea6d4 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala @@ -314,6 +314,7 @@ object WorkflowsTransforms extends SparkSessionWrapper { jobStatusBuildLookupLogic("git_source", lastJobStatus, rawDFSchema, Some("lookup_settings")), fillForward("is_from_dlt", lastJobStatus), jobStatusBuildLookupLogic("max_concurrent_runs", lastJobStatus, rawDFSchema, Some("lookup_settings")).cast("long"), + jobStatusBuildLookupLogic("queue", lastJobStatus, rawDFSchema, Some("lookup_settings")), jobStatusBuildLookupLogic("max_retries", lastJobStatus, rawDFSchema, Some("lookup_settings")).cast("long"), jobStatusBuildLookupLogic("timeout_seconds", lastJobStatus, rawDFSchema, Some("lookup_settings")).cast("long"), jobStatusBuildLookupLogic("retry_on_timeout", lastJobStatus, rawDFSchema, Some("lookup_settings")).cast("boolean"), @@ -399,6 +400,7 @@ object WorkflowsTransforms extends SparkSessionWrapper { $"settings.libraries".alias("libraries"), $"settings.git_source".alias("git_source"), $"settings.max_concurrent_runs".alias("max_concurrent_runs"), + $"settings.queue".alias("queue"), $"settings.max_retries".alias("max_retries"), $"settings.retry_on_timeout".alias("retry_on_timeout"), $"settings.min_retry_interval_millis".alias("min_retry_interval_millis"), @@ -460,12 +462,14 @@ object WorkflowsTransforms extends SparkSessionWrapper { dfc.count() val dfCols = dfc.columns val colsToRebuild = Array( + "queue", "email_notifications", "tags", "schedule", "libraries", "job_clusters", "tasks", "new_cluster", "git_source", "access_control_list", "grants", "notebook_task", "spark_python_task", "spark_jar_task", "python_wheel_task", "spark_submit_task", "pipeline_task", "shell_command_task" ).map(_.toLowerCase) val baseSelects = dfCols.filterNot(cName => colsToRebuild.contains(cName.toLowerCase)) map col val structifiedCols: Array[Column] = Array( + structFromJson(spark, dfc, "queue", allNullMinimumSchema = Schema.minimumQueueSchema), structFromJson(spark, dfc, "job_clusters", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumJobClustersSchema), structFromJson(spark, dfc, "email_notifications", allNullMinimumSchema = Schema.minimumEmailNotificationsSchema), structFromJson(spark, dfc, "tags"), @@ -545,13 +549,13 @@ object WorkflowsTransforms extends SparkSessionWrapper { */ // val jobRunsLookups: Map[String, DataFrame] = - def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = { - lookups - .filter(_._1.exists) - .map(lookup => { - (lookup._1.name, lookup._2) - }).toMap - } + // def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = { + // lookups + // .filter(_._1.exists) + // .map(lookup => { + // (lookup._1.name, lookup._2) + // }).toMap + // } def jobRunsDeriveCompletedRuns(df: DataFrame, firstRunSemanticsW: WindowSpec): DataFrame = { df @@ -608,7 +612,7 @@ object WorkflowsTransforms extends SparkSessionWrapper { /** * Primarily necessary to get runId from response and capture the submission time */ - def jobRunsDeriveRunsLaunched(df: DataFrame, firstRunSemanticsW: WindowSpec, arrayStringSchema: ArrayType): DataFrame = { + def jobRunsDeriveRunsLaunched(df: DataFrame, ws: WindowSpec, arrayStringSchema: ArrayType): DataFrame = { df .filter('actionName.isin("runNow")) .select( @@ -637,8 +641,8 @@ object WorkflowsTransforms extends SparkSessionWrapper { 'userIdentity.alias("submittedBy") ) .filter('jobRunId.isNotNull) - .withColumn("rnk", rank().over(firstRunSemanticsW)) - .withColumn("rn", row_number().over(firstRunSemanticsW)) + .withColumn("rnk", rank().over( ws)) + .withColumn("rn", row_number().over( ws)) .filter('rnk === 1 && 'rn === 1) .drop("rnk", "rn", "timestamp") } @@ -654,6 +658,7 @@ object WorkflowsTransforms extends SparkSessionWrapper { 'timestamp, 'jobId.cast("long").alias("submissionJobId"), 'runId.alias("jobRunId"), + 'queue, 'timestamp.alias("submissionTime"), 'jobTriggerType.alias("jobTriggerType_Triggered"), 'requestId.alias("submitRequestID"), @@ -675,6 +680,7 @@ object WorkflowsTransforms extends SparkSessionWrapper { 'timestamp, get_json_object($"response.result", "$.run_id").cast("long").alias("jobRunId"), 'run_name, + 'queue, 'timestamp.alias("submissionTime"), 'job_clusters, // json array struct string 'new_cluster, // json struct string @@ -769,216 +775,427 @@ object WorkflowsTransforms extends SparkSessionWrapper { .agg(collect_list('repair_details).alias("repair_details")) } - def jobRunsDeriveRunsBase(df: DataFrame, etlUntilTime: TimeTypes): DataFrame = { + def jobRunsDeriveCancelAllQueuedRunsIntervals( df: DataFrame): DataFrame = { - val arrayStringSchema = ArrayType(StringType, containsNull = true) - val firstTaskRunSemanticsW = Window.partitionBy('organization_id, 'jobRunId, 'taskRunId).orderBy('timestamp) - val firstJobRunSemanticsW = Window.partitionBy('organization_id, 'jobRunId).orderBy('timestamp) - val firstRunSemanticsW = Window.partitionBy('organization_id, 'runId).orderBy('timestamp) + val maxQueueDurationMS = lit( 48 * 60 * 60 * 1000) - // Completes must be >= etlStartTime as it is the driver endpoint - // All joiners to Completes may be from the past up to N days as defined in the incremental df - // Identify all completed jobs in scope for this overwatch run - val allCompletes = jobRunsDeriveCompletedRuns(df, firstTaskRunSemanticsW) + val requestWindow = Window + .partitionBy( 'organization_id, 'requestId) + .orderBy( 'timestamp) - // CancelRequests are still lookups from the driver "complete" as a cancel request is a request and still - // results in a runFailed after the cancellation - // Identify all cancelled jobs in scope for this overwatch run - val allCancellations = jobRunsDeriveCancelledRuns(df, firstRunSemanticsW) + val intervalWindow = Window + .partitionBy( 'organization_id, 'jobId) + .orderBy( 'timestamp) + + df.filter( + 'actionName isin "cancelAllRuns" + and 'all_queued_runs + and $"response.statusCode" === 200) + .withColumn( "is_distinct_request", + row_number.over( requestWindow) === lit( 1)) + .filter( 'is_distinct_request) + .select( + 'organization_id, + 'job_id cast "long" + alias "jobId", + greatest( + lag( $"timestamp", offset= 1, defaultValue= 0) + over intervalWindow, + 'timestamp - maxQueueDurationMS) + alias "fromMS", + 'timestamp + alias "untilMS", + lit( "Cancelled") + alias "cancelled", + 'requestId + alias "cancellationRequestId", + 'response + alias "cancellationResponse", + 'sessionId + alias "cancellationSessionId", + 'sourceIPAddress + alias "cancellationSourceIP", + 'timestamp + alias "cancellationTime", + 'userAgent + alias "cancelledUserAgent", + 'userIdentity + alias "cancelledBy") + } - // DF for jobs launched with actionName == "runNow" - // Lookback 30 days for laggard starts prior to current run - // only field from runNow that we care about is the response.result.runId - val runNowStart = jobRunsDeriveRunsLaunched(df, firstJobRunSemanticsW, arrayStringSchema) - val runTriggered = jobRunsDeriveRunsTriggered(df, firstJobRunSemanticsW) + def jobRunsCancelAllQueuedRuns( intervals: DataFrame)( runsBaseWithMeta: DataFrame): DataFrame = { - /** - * These are runs submitted using the "submitRun" API endpoint. These runs will have no corresponding job - * since the job was never scheduled. The entire definition of the job and the cluster must be sumitted - * in this API call. Does not reference an existing job_id present in the jobsStatus Target - */ - val runSubmitStart = jobRunsDeriveSubmittedRuns(df, firstJobRunSemanticsW) + val rangeJoinBinSize = 60 * 60 * 1000 // 1 hour in milliseconds - // DF to pull unify differing schemas from runNow and submitRun and pull all job launches into one DF - val allSubmissions = runNowStart - .unionByName(runTriggered, allowMissingColumns = true) - .unionByName(runSubmitStart, allowMissingColumns = true) + def joinByJobId( intervals: DataFrame)( df: DataFrame): DataFrame = { + df.hint( "range_join", rangeJoinBinSize) + .join( + intervals alias "intervals_by_job", + $"runs.organization_id" === $"intervals_by_job.organization_id" + and $"runs.jobId" === $"intervals_by_job.jobId" + and $"runs.startEpochMS".between( + $"intervals_by_job.fromMS", + $"intervals_by_job.untilMS"), + "left") + } - // Find the corresponding runStart action for the completed jobs - // Lookback 30 days for laggard starts prior to current run - val runStarts = jobRunsDeriveRunStarts(df, firstTaskRunSemanticsW) + def joinNullJobId( intervals: DataFrame)( df: DataFrame): DataFrame = { + df.hint( "range_join", rangeJoinBinSize) + .join( + intervals alias "intervals_null_job", + $"runs.organization_id" === $"intervals_null_job.organization_id" + and $"runs.startEpochMS".between( + $"intervals_null_job.fromMS", + $"intervals_null_job.untilMS"), + "left") + } - val repairDetails = jobRunsDeriveRepairRunsDetail(df, firstRunSemanticsW) + def populateTerminalState( df: DataFrame): DataFrame = { + df withColumn( "terminalState", + coalesce( + $"runs.terminalState", + $"intervals_by_job.cancelled", + $"intervals_null_job.cancelled")) + } - val jobRunsMaster = allSubmissions - .join(runStarts, Seq("organization_id", "jobRunId"), "left") - .join(allCompletes, Seq("organization_id", "jobRunId", "taskRunId"), "left") - .withColumn("runId", coalesce('taskRunId, 'jobRunId).cast("long")) - .join(allCancellations, Seq("organization_id", "runId"), "left") - .withColumn("repairId", coalesce('repairId_runStart, 'repairId_Completed).cast("long")) - .join(repairDetails, Seq("organization_id", "runId", "repairId"), "left") - .cache() // caching to speed up schema inference + // when intervals intersect, take the field value from the earliest action + + val withFieldOfEarliestInterval = ( c: Column, field: String) => { + c withField( field, + when( + $"intervals_by_job.untilMS" <=> least( + $"intervals_by_job.untilMS", + $"intervals_null_job.untilMS"), + col( s"intervals_by_job.$field")) + otherwise + col( s"intervals_null_job.$field")) + } - jobRunsMaster.count + def populateTimeDetails( df: DataFrame): DataFrame = { + df withColumn( "timeDetails", + when( ! $"runs.queue_enabled", $"runs.timeDetails") + otherwise withFieldOfEarliestInterval( $"runs.timeDetails", "cancellationTime")) + } - jobRunsMaster + def populateRequestDetails( df: DataFrame): DataFrame = { + + val cancellatioRequestFieldNames = Seq( + "cancellationRequestId", + "cancellationResponse", + "cancellationSessionId", + "cancellationSourceIp", + "cancelledUserAgent", + "cancelledBy") + + val cr = col( "runs.requestDetails.cancellationRequest") + + df withColumn( "requestDetails", + when( ! $"runs.queue_enabled", + $"runs.requestDetails") + otherwise( $"runs.requestDetails" + withField( "cancellationRequest", + cancellatioRequestFieldNames + .foldLeft( cr)( withFieldOfEarliestInterval)))) + } + + + // avoid bad column references after modification + + // def matchModifiedColumn( c: String): Column = + + val modifiedColumns = Set( + "requestDetails", + "timeDetails", + "terminalState") + + val selectColumns = runsBaseWithMeta.columns map { + case c if modifiedColumns.contains(c) => col( c) + case c => col( s"runs.$c") + } + + + runsBaseWithMeta + .withColumn( "queue_enabled", + get_json_object('queue, "$.enabled") + cast BooleanType) + .alias( "runs") + .transform( joinByJobId( + intervals where $"jobId".isNotNull)) + .transform( joinNullJobId( + intervals where $"jobId".isNull)) + .transform( populateTimeDetails) + .transform( populateRequestDetails) + .transform( populateTerminalState) .select( - 'organization_id, - coalesce('runStartJobId, 'completedJobId, 'submissionJobId).cast("long").alias("jobId"), - 'jobRunId.cast("long"), - 'taskRunId.cast("long"), - coalesce('taskKey_runStart, 'taskKey_Completed).alias("taskKey"), - from_json(coalesce('taskDependencies_runStart, 'taskDependencies_Completed), arrayStringSchema).alias("taskDependencies"), - 'runId, - coalesce('multitaskParentRunId_Started, 'multitaskParentRunId_Completed).cast("long").alias("multitaskParentRunId"), - coalesce('parentRunId_Started, 'parentRunId_Completed).cast("long").alias("parentRunId"), - coalesce('taskRunId, 'idInJob).cast("long").alias("idInJob"), - TransformFunctions.subtractTime( - 'startTime, - array_max(array('completionTime, 'cancellationTime)) // endTS must remain null if still open - ).alias("TaskRunTime"), // run launch time until terminal event - TransformFunctions.subtractTime( - 'startTime, - array_max(array('completionTime, 'cancellationTime)) // endTS must remain null if still open - ).alias("TaskExecutionRunTime"), // from cluster up and run begin until terminal event - 'run_name, - when(coalesce('jobClusterType_Started, 'jobClusterType_Completed).isNull and ('job_clusters.isNotNull or 'new_cluster.isNotNull), "new") - .otherwise(coalesce('jobClusterType_Started, 'jobClusterType_Completed)).alias("clusterType"), - coalesce('jobTaskType_Started, 'jobTaskType_Completed).alias("taskType"), - coalesce('jobTriggerType_Triggered,'jobTriggerType_Started, 'jobTriggerType_Completed, 'jobTriggerType_runNow).alias("jobTriggerType"), - when('cancellationRequestId.isNotNull, "Cancelled") - .otherwise('jobTerminalState) - .alias("terminalState"), - 'clusterId, - 'existing_cluster_id, - 'new_cluster, - 'tasks.alias("submitRun_tasks"), - 'job_clusters.alias("submitRun_job_clusters"), - 'libraries, - 'access_control_list, - 'git_source, - 'manual_override_params, - coalesce('workflow_context_runNow, 'workflow_context_submitRun).alias("workflow_context"), - 'notebook_task, - 'spark_python_task, - 'python_wheel_task, - 'spark_jar_task, - 'spark_submit_task, - 'shell_command_task, - 'pipeline_task, - 'repairId, - 'repair_details, - struct( - 'startTime, - 'submissionTime, - 'cancellationTime, - 'completionTime, - 'timeout_seconds - ).alias("timeDetails"), - struct( - struct( - 'submitRequestId, - 'submitResponse, - 'submitSessionId, - 'submitSourceIP, - 'submitUserAgent, - 'submittedBy - ).alias("submissionRequest"), - struct( - 'cancellationRequestId, - 'cancellationResponse, - 'cancellationSessionId, - 'cancellationSourceIP, - 'cancelledUserAgent, - 'cancelledBy - ).alias("cancellationRequest"), + selectColumns:_*) + } + + + val jobRunsDeriveRunsBase = (etlUntilTime: TimeTypes) => NamedTransformation { + (df: DataFrame) => { + + val arrayStringSchema = ArrayType(StringType, containsNull = true) + val firstTaskRunSemanticsW = Window.partitionBy('organization_id, 'jobRunId, 'taskRunId).orderBy('timestamp) + val firstJobRunSemanticsW = Window.partitionBy('organization_id, 'jobRunId).orderBy('timestamp) + val firstRunSemanticsW = Window.partitionBy('organization_id, 'runId).orderBy('timestamp) + + // Completes must be >= etlStartTime as it is the driver endpoint + // All joiners to Completes may be from the past up to N days as defined in the incremental df + // Identify all completed jobs in scope for this overwatch run + val allCompletes = jobRunsDeriveCompletedRuns(df, firstTaskRunSemanticsW) + + // CancelRequests are still lookups from the driver "complete" as a cancel request is a request and still + // results in a runFailed after the cancellation + // Identify all cancelled jobs in scope for this overwatch run + val allCancellations = jobRunsDeriveCancelledRuns(df, firstRunSemanticsW) + + // DF for jobs launched with actionName == "runNow" + // Lookback 30 days for laggard starts prior to current run + // only field from runNow that we care about is the response.result.runId + val runNowStart = jobRunsDeriveRunsLaunched(df, firstJobRunSemanticsW, arrayStringSchema) + + val runTriggered = jobRunsDeriveRunsTriggered(df, firstJobRunSemanticsW) + + /** + * These are runs submitted using the "submitRun" API endpoint. These runs will have no corresponding job + * since the job was never scheduled. The entire definition of the job and the cluster must be sumitted + * in this API call. Does not reference an existing job_id present in the jobsStatus Target + */ + val runSubmitStart = jobRunsDeriveSubmittedRuns(df, firstJobRunSemanticsW) + + // DF to pull unify differing schemas from runNow and submitRun and pull all job launches into one DF + val allSubmissions = runNowStart + .unionByName(runTriggered, allowMissingColumns = true) + .unionByName(runSubmitStart, allowMissingColumns = true) + + // Find the corresponding runStart action for the completed jobs + // Lookback 30 days for laggard starts prior to current run + val runStarts = jobRunsDeriveRunStarts(df, firstTaskRunSemanticsW) + + val repairDetails = jobRunsDeriveRepairRunsDetail(df, firstRunSemanticsW) + + val jobRunsMaster = allSubmissions + .join(runStarts, Seq("organization_id", "jobRunId"), "left") + .join(allCompletes, Seq("organization_id", "jobRunId", "taskRunId"), "left") + .withColumn("runId", coalesce('taskRunId, 'jobRunId).cast("long")) + .join(allCancellations, Seq("organization_id", "runId"), "left") + .withColumn("repairId", coalesce('repairId_runStart, 'repairId_Completed).cast("long")) + .join(repairDetails, Seq("organization_id", "runId", "repairId"), "left") + .cache() // caching to speed up schema inference + + jobRunsMaster.count + + jobRunsMaster + .select( + 'organization_id, + coalesce('runStartJobId, 'completedJobId, 'submissionJobId).cast("long").alias("jobId"), + 'jobRunId.cast("long"), + 'taskRunId.cast("long"), + coalesce('taskKey_runStart, 'taskKey_Completed).alias("taskKey"), + from_json(coalesce('taskDependencies_runStart, 'taskDependencies_Completed), arrayStringSchema).alias("taskDependencies"), + 'runId, + coalesce('multitaskParentRunId_Started, 'multitaskParentRunId_Completed).cast("long").alias("multitaskParentRunId"), + coalesce('parentRunId_Started, 'parentRunId_Completed).cast("long").alias("parentRunId"), + coalesce('taskRunId, 'idInJob).cast("long").alias("idInJob"), + 'queue, + TransformFunctions.subtractTime( + 'submissionTime, // was `'startTime` through 0.8.1.2 + array_max(array('completionTime, 'cancellationTime)) // endTS must remain null if still open + ).alias("TaskRunTime"), // run launch time until terminal event + TransformFunctions.subtractTime( + 'startTime, + array_max(array('completionTime, 'cancellationTime)) // endTS must remain null if still open + ).alias("TaskExecutionRunTime"), // from cluster up and run begin until terminal event + 'run_name, + when(coalesce('jobClusterType_Started, 'jobClusterType_Completed).isNull and ('job_clusters.isNotNull or 'new_cluster.isNotNull), "new") + .otherwise(coalesce('jobClusterType_Started, 'jobClusterType_Completed)).alias("clusterType"), + coalesce('jobTaskType_Started, 'jobTaskType_Completed).alias("taskType"), + coalesce('jobTriggerType_Triggered,'jobTriggerType_Started, 'jobTriggerType_Completed, 'jobTriggerType_runNow).alias("jobTriggerType"), + when('cancellationRequestId.isNotNull, "Cancelled") + .otherwise('jobTerminalState) + .alias("terminalState"), + 'clusterId, + 'existing_cluster_id, + 'new_cluster, + 'tasks.alias("submitRun_tasks"), + 'job_clusters.alias("submitRun_job_clusters"), + 'libraries, + 'access_control_list, + 'git_source, + 'manual_override_params, + coalesce('workflow_context_runNow, 'workflow_context_submitRun).alias("workflow_context"), + 'notebook_task, + 'spark_python_task, + 'python_wheel_task, + 'spark_jar_task, + 'spark_submit_task, + 'shell_command_task, + 'pipeline_task, + 'repairId, + 'repair_details, struct( - 'completionRequestId, - 'completionResponse - ).alias("completionRequest"), + 'startTime, + 'submissionTime, + 'cancellationTime, + 'completionTime, + 'timeout_seconds + ).alias("timeDetails"), struct( - 'startRequestId - ).alias("startRequest") - ).alias("requestDetails") - ) - .withColumn("timestamp", $"TaskRunTime.startEpochMS") // TS lookup key added for next steps (launch time) - .withColumn("startEpochMS", $"TaskRunTime.startEpochMS") // set launch time as TS key -// .scrubSchema + struct( + 'submitRequestId, + 'submitResponse, + 'submitSessionId, + 'submitSourceIP, + 'submitUserAgent, + 'submittedBy + ).alias("submissionRequest"), + struct( + 'cancellationRequestId, + 'cancellationResponse, + 'cancellationSessionId, + 'cancellationSourceIP, + 'cancelledUserAgent, + 'cancelledBy + ).alias("cancellationRequest"), + struct( + 'completionRequestId, + 'completionResponse + ).alias("completionRequest"), + struct( + 'startRequestId + ).alias("startRequest") + ).alias("requestDetails") + ) + // TS lookup key added for next steps (launch time) + .withColumn( "timestamp", + $"TaskRunTime.startEpochMS") // was 'startTime; now 'submissionTime + .withColumn( "startEpochMS", + $"TaskRunTime.startEpochMS") // was 'startTime; now 'submissionTime + .withColumn( "startTaskEpochMS", + $"TaskExecutionRunTime.startEpochMS") // <=> timeDetails.startTime + // .scrubSchema + } } - def jobRunsStructifyLookupMeta(cacheParts: Int)(df: DataFrame): DataFrame = { - val dfc = df.repartition(cacheParts).cache() // caching to speed up schema inference - dfc.count() - val colsToOverride = Array("tasks", "job_clusters", "tags").toSet - val dfOrigCols = (dfc.columns.toSet -- colsToOverride).toArray map col - val colsToAppend: Array[Column] = Array( - structFromJson(spark, dfc, "tasks", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumTasksSchema).alias("tasks"), - structFromJson(spark, dfc, "job_clusters", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumJobClustersSchema).alias("job_clusters"), - struct( - structFromJson(spark, dfc, "new_cluster", allNullMinimumSchema = Schema.minimumNewClusterSchema), - structFromJson(spark, dfc, "submitRun_tasks", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumTasksSchema).alias("tasks"), - structFromJson(spark, dfc, "submitRun_job_clusters", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumJobClustersSchema).alias("job_clusters"), - structFromJson(spark, dfc, "libraries", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumLibrariesSchema), - structFromJson(spark, dfc, "access_control_list", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumAccessControlListSchema), - structFromJson(spark, dfc, "git_source", allNullMinimumSchema = Schema.minimumGitSourceSchema) - ).alias("submitRun_details"), - struct( - structFromJson(spark, dfc, "notebook_task", allNullMinimumSchema = Schema.minimumNotebookTaskSchema), - structFromJson(spark, dfc, "spark_python_task", allNullMinimumSchema = Schema.minimumSparkPythonTaskSchema), - structFromJson(spark, dfc, "python_wheel_task", allNullMinimumSchema = Schema.minimumPythonWheelTaskSchema), - structFromJson(spark, dfc, "spark_jar_task", allNullMinimumSchema = Schema.minimumSparkJarTaskSchema), - structFromJson(spark, dfc, "spark_submit_task", allNullMinimumSchema = Schema.minimumSparkSubmitTaskSchema), - structFromJson(spark, dfc, "shell_command_task", allNullMinimumSchema = Schema.minimumShellCommandTaskSchema), - structFromJson(spark, dfc, "pipeline_task", allNullMinimumSchema = Schema.minimumPipelineTaskSchema) - ).alias("task_detail_legacy"), - structFromJson(spark, dfc, "manual_override_params.notebook_params").alias("notebook_params_overwatch_ctrl"), - structFromJson(spark, dfc, "manual_override_params.python_named_params").alias("python_named_params_overwatch_ctrl"), - structFromJson(spark, dfc, "manual_override_params.sql_params").alias("sql_params_overwatch_ctrl"), - structFromJson(spark, dfc, "manual_override_params.pipeline_params").alias("pipeline_params_overwatch_ctrl"), - structFromJson(spark, dfc, "tags").alias("tags") - ) - val selectCols = dfOrigCols ++ colsToAppend - dfc.select(selectCols: _*) - .drop( - "notebook_task", "spark_python_task", "spark_jar_task", "python_wheel_task", - "spark_submit_task", "shell_command_task", "pipeline_task", "new_cluster", "libraries", "access_control_list", - "git_source", "submitRun_tasks", "submitRun_job_clusters" + val jobRunsStructifyLookupMeta = NamedTransformation { + (dfc: DataFrame) => { + val colsToOverride = Array( "queue", "tasks", "job_clusters", "tags").toSet + val dfOrigCols = (dfc.columns.toSet -- colsToOverride).toArray map col + val colsToAppend: Array[Column] = Array( + structFromJson(spark, dfc, "queue", allNullMinimumSchema = Schema.minimumQueueSchema).alias("queue"), + structFromJson(spark, dfc, "tasks", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumTasksSchema).alias("tasks"), + structFromJson(spark, dfc, "job_clusters", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumJobClustersSchema).alias("job_clusters"), + struct( + structFromJson(spark, dfc, "new_cluster", allNullMinimumSchema = Schema.minimumNewClusterSchema), + structFromJson(spark, dfc, "submitRun_tasks", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumTasksSchema).alias("tasks"), + structFromJson(spark, dfc, "submitRun_job_clusters", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumJobClustersSchema).alias("job_clusters"), + structFromJson(spark, dfc, "libraries", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumLibrariesSchema), + structFromJson(spark, dfc, "access_control_list", isArrayWrapped = true, allNullMinimumSchema = Schema.minimumAccessControlListSchema), + structFromJson(spark, dfc, "git_source", allNullMinimumSchema = Schema.minimumGitSourceSchema) + ).alias("submitRun_details"), + struct( + structFromJson(spark, dfc, "notebook_task", allNullMinimumSchema = Schema.minimumNotebookTaskSchema), + structFromJson(spark, dfc, "spark_python_task", allNullMinimumSchema = Schema.minimumSparkPythonTaskSchema), + structFromJson(spark, dfc, "python_wheel_task", allNullMinimumSchema = Schema.minimumPythonWheelTaskSchema), + structFromJson(spark, dfc, "spark_jar_task", allNullMinimumSchema = Schema.minimumSparkJarTaskSchema), + structFromJson(spark, dfc, "spark_submit_task", allNullMinimumSchema = Schema.minimumSparkSubmitTaskSchema), + structFromJson(spark, dfc, "shell_command_task", allNullMinimumSchema = Schema.minimumShellCommandTaskSchema), + structFromJson(spark, dfc, "pipeline_task", allNullMinimumSchema = Schema.minimumPipelineTaskSchema) + ).alias("task_detail_legacy"), + structFromJson(spark, dfc, "manual_override_params.notebook_params").alias("notebook_params_overwatch_ctrl"), + structFromJson(spark, dfc, "manual_override_params.python_named_params").alias("python_named_params_overwatch_ctrl"), + structFromJson(spark, dfc, "manual_override_params.sql_params").alias("sql_params_overwatch_ctrl"), + structFromJson(spark, dfc, "manual_override_params.pipeline_params").alias("pipeline_params_overwatch_ctrl"), + structFromJson(spark, dfc, "tags").alias("tags") ) - .scrubSchema + val selectCols = dfOrigCols ++ colsToAppend + dfc.select(selectCols: _*) + .drop( + "notebook_task", "spark_python_task", "spark_jar_task", "python_wheel_task", + "spark_submit_task", "shell_command_task", "pipeline_task", "new_cluster", "libraries", "access_control_list", + "git_source", "submitRun_tasks", "submitRun_job_clusters" + ) + .scrubSchema - } + }} - def jobRunsCleanseCreatedNestedStructures(keys: Array[String])(df: DataFrame): DataFrame = { - val emptyKeysDF = Seq.empty[(String, Long, Long)].toDF("organization_id", "runId", "startEpochMS") - - val cleansedTasksDF = workflowsCleanseTasks(df, keys, emptyKeysDF, "submitRun_details.tasks") - val cleansedJobClustersDF = workflowsCleanseJobClusters(df, keys, emptyKeysDF, "submitRun_details.job_clusters") - - val dfWCleansedJobsAndTasks = df - .join(cleansedTasksDF, keys.toSeq, "left") - .join(cleansedJobClustersDF, keys.toSeq, "left") - - val tasksAndJobClustersCleansingInventory = Map( - "tags" -> handleRootNull(dfWCleansedJobsAndTasks, "tags", SchemaTools.structToMap(dfWCleansedJobsAndTasks, "tags"), MapType(StringType, StringType)), - "submitRun_details.tasks" -> col("cleansedTasks"), - "submitRun_details.job_clusters" -> col("cleansedJobsClusters"), - "task_detail.notebook_task.base_parameters" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "task_detail.notebook_task.base_parameters"), - "task_detail.shell_command_task.env_vars" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "task_detail.shell_command_task.env_vars"), - "task_detail_legacy.notebook_task.base_parameters" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "task_detail_legacy.notebook_task.base_parameters"), - "task_detail_legacy.shell_command_task.env_vars" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "task_detail_legacy.shell_command_task.env_vars"), - "manual_override_params.notebook_params" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "notebook_params_overwatch_ctrl"), - "manual_override_params.python_named_params" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "python_named_params_overwatch_ctrl"), - "manual_override_params.sql_params" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "sql_params_overwatch_ctrl"), - "manual_override_params.pipeline_params" -> SchemaTools.structToMap(dfWCleansedJobsAndTasks, "pipeline_params_overwatch_ctrl") - ) ++ - PipelineFunctions.newClusterCleaner(dfWCleansedJobsAndTasks, "submitRun_details.new_cluster") ++ - PipelineFunctions.newClusterCleaner(dfWCleansedJobsAndTasks, "new_cluster") ++ - PipelineFunctions.newClusterCleaner(dfWCleansedJobsAndTasks, "job_cluster") + val jobRunsCleanseCreatedNestedStructures = (keys: Array[String]) => NamedTransformation { + (df: DataFrame) => { + + val emptyKeysDF = + Seq.empty[ ( String, Long, Long, Long)] + .toDF( "organization_id", "runId", "startEpochMS", "startTaskEpochMS") + + val cleansedTasksDF = workflowsCleanseTasks(df, keys, emptyKeysDF, "submitRun_details.tasks") + val cleansedJobClustersDF = workflowsCleanseJobClusters(df, keys, emptyKeysDF, "submitRun_details.job_clusters") + + val dfWCleansedJobsAndTasks = df + .join(cleansedTasksDF, keys.toSeq, "left") + .join(cleansedJobClustersDF, keys.toSeq, "left") + + val tasksAndJobClustersCleansingInventory = { + Map( + "tags" -> + handleRootNull( + dfWCleansedJobsAndTasks, + "tags", + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "tags"), + MapType(StringType, StringType)), + "submitRun_details.tasks" -> + col("cleansedTasks"), + "submitRun_details.job_clusters" -> + col("cleansedJobsClusters"), + "task_detail.notebook_task.base_parameters" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "task_detail.notebook_task.base_parameters"), + "task_detail.shell_command_task.env_vars" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "task_detail.shell_command_task.env_vars"), + "task_detail_legacy.notebook_task.base_parameters" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "task_detail_legacy.notebook_task.base_parameters"), + "task_detail_legacy.shell_command_task.env_vars" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "task_detail_legacy.shell_command_task.env_vars"), + "manual_override_params.notebook_params" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "notebook_params_overwatch_ctrl"), + "manual_override_params.python_named_params" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "python_named_params_overwatch_ctrl"), + "manual_override_params.sql_params" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "sql_params_overwatch_ctrl"), + "manual_override_params.pipeline_params" -> + SchemaTools.structToMap( + dfWCleansedJobsAndTasks, + "pipeline_params_overwatch_ctrl")) ++ + PipelineFunctions.newClusterCleaner( + dfWCleansedJobsAndTasks, + "submitRun_details.new_cluster") ++ + PipelineFunctions.newClusterCleaner( + dfWCleansedJobsAndTasks, + "new_cluster") ++ + PipelineFunctions.newClusterCleaner( + dfWCleansedJobsAndTasks, + "job_cluster") + } // val dfWStructedTasksAndCleansedJobs = dfWCleansedJobsAndTasks - .modifyStruct(tasksAndJobClustersCleansingInventory) // overwrite nested complex structures with cleansed structures + .modifyStruct( + // overwrite nested complex structures with cleansed structures + tasksAndJobClustersCleansingInventory) .drop( "cleansedTasks", "cleansedJobsClusters", @@ -989,73 +1206,48 @@ object WorkflowsTransforms extends SparkSessionWrapper { ) // cleanup temporary cleaner fields .scrubSchema - } - - /** - * Look up the cluster_name based on id first from - * `job_status_silver`. If not present there fallback to latest - * snapshot prior to the run - */ - - val jobRunsAppendClusterName = (lookups: Map[String,DataFrame]) => NamedTransformation { - - (df: DataFrame) => { - - val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) { - df.toTSDF("timestamp", "organization_id", "clusterId") - .lookupWhen( - lookups("cluster_spec_silver") - .toTSDF("timestamp", "organization_id", "clusterId") - ).df - } else df - - val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) { - runsWClusterNames1 - .toTSDF("timestamp", "organization_id", "clusterId") - .lookupWhen( - lookups("clusters_snapshot_bronze") - .toTSDF("timestamp", "organization_id", "clusterId") - ).df - } else runsWClusterNames1 - - runsWClusterNames2 - } - - } + }} + + // /** + // * Look up the cluster_name based on id first from + // * `job_status_silver`. If not present there fallback to latest + // * snapshot prior to the run + // */ + + // val jobRunsAppendClusterName = NamedTransformation { + // (df: DataFrame) => { + // val key = Seq( "timestamp", "organization_id", "clusterId") + // df.toTSDF( key:_*) + // .lookupWhen( clusterSpecNameLookup.toTSDF( key:_*)) + // .lookupWhen( clusterSnapNameLookup.toTSDF( key:_*)) + // .df + // } + // } + + + // /** + // * looks up the job name based on id first from job_status_silver and if not present there fallback to latest + // * snapshot prior to the run + // */ + // val jobRunsAppendJobMeta = NamedTransformation { + // (df: DataFrame) => { + // val key = Seq( "timestamp", "organization_id", "jobId") + // df.toTSDF( key:_*) + // .lookupWhen( jobStatusMetaLookup.toTSDF( key:_*)) + // .lookupWhen( jobSnapNameLookup.toTSDF( key:_*)) + // .df + // .withColumns( Map( + // "jobName" + // -> coalesce('jobName, 'run_name), + // "tasks" + // -> coalesce('tasks, 'submitRun_tasks), + // "job_clusters" + // -> coalesce('job_clusters, 'submitRun_job_clusters))) + // } + // } + + val jobRunsAppendTaskAndClusterDetails = NamedTransformation { (df: DataFrame) => { - - /** - * looks up the job name based on id first from job_status_silver and if not present there fallback to latest - * snapshot prior to the run - */ - def jobRunsAppendJobMeta(lookups: Map[String, DataFrame])(df: DataFrame): DataFrame = { - - val runsWithJobName1 = if (lookups.contains("job_status_silver")) { - df - .toTSDF("timestamp", "organization_id", "jobId") - .lookupWhen( - lookups("job_status_silver") - .toTSDF("timestamp", "organization_id", "jobId") - ).df - } else df - - val runsWithJobName2 = if (lookups.contains("jobs_snapshot_bronze")) { - runsWithJobName1 - .toTSDF("timestamp", "organization_id", "jobId") - .lookupWhen( - lookups("jobs_snapshot_bronze") - .toTSDF("timestamp", "organization_id", "jobId") - ).df - } else df - - runsWithJobName2 - .withColumn("jobName", coalesce('jobName, 'run_name)) - .withColumn("tasks", coalesce('tasks, 'submitRun_tasks)) - .withColumn("job_clusters", coalesce('job_clusters, 'submitRun_job_clusters)) - - } - - def jobRunsAppendTaskAndClusterDetails(df: DataFrame): DataFrame = { val computeIsSQLWarehouse = $"task_detail.sql_task.warehouse_id".isNotNull val dfHasTasks = SchemaTools.nestedColExists(df.schema, "tasks") @@ -1123,70 +1315,8 @@ object WorkflowsTransforms extends SparkSessionWrapper { .drop("tasks") } } else df // tasks not present in schema - } + }} - /** - * It's imperative that all nested runs be nested within the jobRun record to ensure cost accuracy downstream in - * jrcp -- without it jrcp will double count costs as both the parent and the child will have an associated cost - * - * A "workflow" in this context isa dbutils.notebook.run execution -- it does spin up a job run in the back end - * and will have a workflow_context (field) with root_run_id and parent_run_id. All of these are rolled to the root - * to avoid the need to multi-layer joining. It's up to the customer to complete this as needed as the depths can - * get very large. - * - * An multi-task job (mtj) is a job that has at least one task identified (all jobs runs after the change in 2022). - * MTJs can execute notebooks which can also run nested workflows using dbutils.notebook.run. - * - * Workflows can be launched interactively from a notebook or through an mtj; thus it's necessary to account for - * both scenarios hence the double join in the last DF. - * - * Nested runs DO NOT mean tasks inside a jobrun as these are still considered root level tasks. A nested run - * is only launched via dbutils.notebook.run either manually or through an MTJ. - * - * It may be possible to utilize a single field to report both of these as it appears there can never be a child - * without a workflow child but the reverse is not true. This can be reviewed with customer to determine if this - * is a valid assumption and these can be coalesced, but for now, for safety, they are being kept separate until - * all scenarios can be identified - */ - def jobRunsRollupWorkflowsAndChildren(df: DataFrame): DataFrame = { - - // identify root level task runs - val rootTaskRuns = df - .filter('parentRunId.isNull && get_json_object('workflow_context, "$.root_run_id").isNull) - - // pull only workflow children as defined by having a workflow_context.root_run_id - val workflowChildren = df - .filter(get_json_object('workflow_context, "$.root_run_id").isNotNull) - - // prepare the nesting by getting keys and the entire record as a nested record - val workflowChildrenForNesting = workflowChildren - .withColumn("parentRunId", get_json_object('workflow_context, "$.root_run_id").cast("long")) - .withColumn("workflowChild", struct(workflowChildren.schema.fieldNames map col: _*)) - .groupBy('organization_id, 'parentRunId) - .agg(collect_list('workflowChild).alias("workflow_children")) - - // get all the children identified as having a parentRunId as they need to be rolled up - val children = df - .filter('parentRunId.isNotNull) - .join(workflowChildrenForNesting, Seq("organization_id", "parentRunId"), "left") - - // prepare the nesting by getting keys and the entire record as a nested record - val childrenForNesting = children - .withColumn("child", struct(children.schema.fieldNames map col: _*)) - .groupBy('organization_id, 'parentRunId) - .agg(collect_list('child).alias("children")) - .withColumnRenamed("parentRunId", "taskRunId") // for simple joining - - // deliver root task runs with workflows and children nested within the root - rootTaskRuns - .join(childrenForNesting, Seq("organization_id", "taskRunId"), "left") // workflows in mtjs - .join( - workflowChildrenForNesting.withColumnRenamed("parentRunId", "taskRunId"), // for simple joining - Seq("organization_id", "taskRunId"), - "left" - ) - - } /** * BEGIN JRCP Transforms diff --git a/src/main/scala/com/databricks/labs/overwatch/utils/DataFrameSyntax.scala b/src/main/scala/com/databricks/labs/overwatch/utils/DataFrameSyntax.scala index 1ae815252..e713b06a2 100644 --- a/src/main/scala/com/databricks/labs/overwatch/utils/DataFrameSyntax.scala +++ b/src/main/scala/com/databricks/labs/overwatch/utils/DataFrameSyntax.scala @@ -1,6 +1,5 @@ package com.databricks.labs.overwatch.utils - import org.apache.log4j.{Level,Logger} import org.apache.spark.sql.Dataset import java.io.ByteArrayOutputStream @@ -100,8 +99,6 @@ trait DataFrameSyntax[ SPARK <: SparkSessionWrapper] { private val logger: Logger = Logger.getLogger(this.getClass) - val dataFrameLoggerSparkConfKey = "overwatch.dataframelogger.level" - implicit class DataFrameShower[T]( val df: Dataset[T]) { def showLines(): Iterator[String] = @@ -133,6 +130,8 @@ trait DataFrameSyntax[ SPARK <: SparkSessionWrapper] { } } + + val dataFrameLoggerSparkConfKey = "overwatch.dataframelogger.level" def getDataFrameLoggerSparkConfValue(): Option[ String] = spark.conf.getOption( dataFrameLoggerSparkConfKey)