Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e134bd4
Initial commit
gueniai May 31, 2024
da14f88
adding fix for schemaScrubber and StructToMap (#1232)
aman-db Jun 4, 2024
a5c8b54
Comvert all the struct field inside 'spec' column for cluster_snapsho…
souravbaner-da Jun 11, 2024
aeef7ff
Dropped Spec column from snapshot
souravbaner-da Jun 11, 2024
4f64d83
Squash merge of show_dataframe_in_logs
neilbest-db Jun 13, 2024
0fdac8e
Squash merge of named_transformation
neilbest-db Jun 13, 2024
1175940
Refactor lookups in Silver Job Runs
neilbest-db Jun 13, 2024
98cfed6
use `NamedTransformation`s in Silver Job Runs
neilbest-db Jun 12, 2024
d751d5f
adjust Silver Job Runs module configuration and caching
neilbest-db Jun 21, 2024
0c8c9c2
add more Spark UI labels to Silver Job Runs transformations
neilbest-db Jun 25, 2024
2ead752
flip transformation names to beginning of label
neilbest-db Jun 25, 2024
97236ae
Initial commit
gueniai May 8, 2024
9fe9f8c
revert Spark UI Job Group labels
neilbest-db Jun 29, 2024
995e0da
Merge branch '0820_release' into 1228-silver-job-runs-spark312-r0812
neilbest-db Jun 29, 2024
25671b7
recon enhancement done to deal with different columns in source and t…
sriram251-code Jul 8, 2024
f7460bd
adjust Silver Job Runs module configuration (#1256)
neilbest-db Jul 30, 2024
caa3282
append null columns from cluster snapshot for cluster_spec_silver (#1…
sriram251-code Aug 7, 2024
f3ffd7c
1201 collect all event logs on first run (#1255)
souravbaner-da Aug 7, 2024
3c16b5f
Redefine views so that they are created from tables not locations (#1…
souravbaner-da Aug 7, 2024
d6fa441
1030 pipeline validation framework (#1071)
souravbaner-da Aug 7, 2024
59daae5
adding fix for duplicate accountId in module 2010 and 3019 (#1270)
aman-db Aug 8, 2024
3055a22
1218 warehouse state details (#1254)
aman-db Aug 12, 2024
bbdb61f
Add descriptive `NamedTransformation`s to Spark UI (#1223)
neilbest-db Aug 20, 2024
12fd6ac
adding code for warehouseStateFact gold (#1265)
aman-db Aug 23, 2024
1eb6a00
Show `DataFrame` records in logs (#1224)
neilbest-db Aug 23, 2024
75c8760
Merge branch '0820_release' into 1228-silver-job-runs-spark312-r0812
neilbest-db Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.1.2"
version := "0.8.2.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand All @@ -18,6 +18,7 @@ libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provide
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"

//libraryDependencies += "org.apache.hive" % "hive-metastore" % "2.3.9"

Expand Down Expand Up @@ -51,4 +52,4 @@ assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@ class ETLDefinition(

val transformedDF = transforms.foldLeft(verifiedSourceDF) {
case (df, transform) =>
df.transform(transform)

/*
* reverting Spark UI Job Group labels for now
*
* TODO: enumerate the regressions this would introduce
* when the labels set by then platform are replaced
* this way.
* df.sparkSession.sparkContext.setJobGroup(
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
* transform.toString)
*/

df.transform( transform)
}
write(transformedDF, module)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,11 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
)
}

lazy private[overwatch] val jobRunsModule = Module(2011, "Silver_JobsRuns", this, Array(1004, 2010, 2014), shuffleFactor = 12.0)
lazy private[overwatch] val jobRunsModule =
Module( 2011, "Silver_JobsRuns", this, Array(1004, 2010, 2014), shuffleFactor = 12.0)
.withSparkOverrides( Map(
"spark.databricks.adaptive.autoOptimizeShuffle.enabled" -> "true"))

lazy private val appendJobRunsProcess: () => ETLDefinition = {
() =>
ETLDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}


trait SilverTransforms extends SparkSessionWrapper {
trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax {

import TransformationDescriber._
import spark.implicits._

private val logger: Logger = Logger.getLogger(this.getClass)
Expand Down Expand Up @@ -1305,71 +1306,178 @@ trait SilverTransforms extends SparkSessionWrapper {
"runSucceeded", "runFailed", "runTriggered", "runNow", "runStart", "submitRun", "cancel", "repairRun"
)
val optimalCacheParts = Math.min(daysToProcess * getTotalCores * 2, 1000)
val jobRunsLag30D = getJobsBase(auditLogLag30D)
.filter('actionName.isin(jobRunActions: _*))
.repartition(optimalCacheParts)
.cache() // cached df removed at end of module run

// eagerly force this highly reused DF into cache()
jobRunsLag30D.count()

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSpecNameLookup = clusterSpec.asDF
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSnapNameLookup = clusterSnapshot.asDF
.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)

// Lookup to populate the existing_cluster_id where missing from jobs -- it can be derived from name
lazy val jobStatusMetaLookup = jobsStatus.asDF
.verifyMinimumSchema(Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json('tasks).alias("tasks"),
to_json('job_clusters).alias("job_clusters"),
to_json($"task_detail_legacy.notebook_task").alias("notebook_task"),
to_json($"task_detail_legacy.spark_python_task").alias("spark_python_task"),
to_json($"task_detail_legacy.python_wheel_task").alias("python_wheel_task"),
to_json($"task_detail_legacy.spark_jar_task").alias("spark_jar_task"),
to_json($"task_detail_legacy.spark_submit_task").alias("spark_submit_task"),
to_json($"task_detail_legacy.shell_command_task").alias("shell_command_task"),
to_json($"task_detail_legacy.pipeline_task").alias("pipeline_task")
)
// val optimalCacheParts = Math.min( daysToProcess * 2, getTotalCores * 2)

lazy val jobSnapNameLookup = jobsSnapshot.asDF
.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'job_id.alias("jobId"), $"settings.name".alias("jobName"))
logger.log( Level.INFO, s"optimalCacheParts: ${optimalCacheParts}")

val jobRunsLookups = jobRunsInitializeLookups(
(clusterSpec, clusterSpecNameLookup),
(clusterSnapshot, clusterSnapNameLookup),
(jobsStatus, jobStatusMetaLookup),
(jobsSnapshot, jobSnapNameLookup)
)
val filterAuditLog = NamedTransformation {
(df: DataFrame) => {
val cachedActions = getJobsBase( df)
.filter('actionName.isin( jobRunActions: _*))
.repartition( optimalCacheParts)
.cache() // cached df removed at end of module run

// eagerly force this highly reused DF into cache()
val n = cachedActions.count()

logger.log( Level.INFO, s"jobRunsLag30D count: ${n}")
logger.log( Level.INFO, s"jobRunsLag30D.rdd.partitions.size: ${cachedActions.rdd.partitions.size}")

cachedActions
}
}

val jobRunsLag30D = auditLogLag30D.transformWithDescription( filterAuditLog)

/*
* keep original usage example of `.showLines()` for reference
*
* logger.log( Level.INFO, "Showing first 5 rows of `jobRunsLag30D`:")
* jobRunsLag30D
* .showLines(5, 20, true)
* .foreach( logger.log( Level.INFO, _))
*
*/


/**
* Look up the cluster_name based on id first from
* `job_status_silver`. If not present there fallback to latest
* snapshot prior to the run
*/

val jobRunsAppendClusterName = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "clusterId")

lazy val clusterSpecNameLookup = clusterSpec.asDF
.select(
'organization_id,
'timestamp,
'cluster_name,
'cluster_id.alias("clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

lazy val clusterSnapNameLookup = clusterSnapshot.asDF
.select(
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
'organization_id,
( unix_timestamp('Pipeline_SnapTS) * lit(1000)).alias( "timestamp"),
'cluster_name,
'cluster_id.alias( "clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( clusterSpecNameLookup)
.lookupWhen( clusterSnapNameLookup)
.df
}
}

/**
* looks up the job name based on id first from job_status_silver
* and if not present there fallback to latest snapshot prior to
* the run
*/

val jobRunsAppendJobMeta = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "jobId")

lazy val jobStatusMetaLookup = jobsStatus.asDF
.verifyMinimumSchema(
Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json( 'tasks).alias("tasks"),
to_json( 'job_clusters).alias("job_clusters"),
to_json( $"task_detail_legacy.notebook_task").alias( "notebook_task"),
to_json( $"task_detail_legacy.spark_python_task").alias( "spark_python_task"),
to_json( $"task_detail_legacy.python_wheel_task").alias( "python_wheel_task"),
to_json( $"task_detail_legacy.spark_jar_task").alias( "spark_jar_task"),
to_json( $"task_detail_legacy.spark_submit_task").alias( "spark_submit_task"),
to_json( $"task_detail_legacy.shell_command_task").alias( "shell_command_task"),
to_json( $"task_detail_legacy.pipeline_task").alias( "pipeline_task"))
.toTSDF( "timestamp", key:_*)

lazy val jobSnapshotNameLookup = jobsSnapshot.asDF
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select(
'organization_id,
( unix_timestamp( 'Pipeline_SnapTS) * lit( 1000)).alias( "timestamp"),
'job_id.alias("jobId"),
$"settings.name".alias("jobName"))
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( jobStatusMetaLookup)
.lookupWhen( jobSnapshotNameLookup)
.df
.withColumn( "jobName",
coalesce('jobName, 'run_name))
.withColumn( "tasks",
coalesce('tasks, 'submitRun_tasks))
.withColumn( "job_clusters",
coalesce('job_clusters, 'submitRun_job_clusters))

// the following is only valid in Spark > 3.1.2, apparently
// (it compiles with 3.3.0)

// .withColumns( Map(
// "jobName"
// -> coalesce('jobName, 'run_name),
// "tasks"
// -> coalesce('tasks, 'submitRun_tasks),
// "job_clusters"
// -> coalesce('job_clusters, 'submitRun_job_clusters)))
}
}



// val jobRunsLookups = jobRunsInitializeLookups(
// (clusterSpec, clusterSpecNameLookup),
// (clusterSnapshot, clusterSnapNameLookup),
// (jobsStatus, jobStatusMetaLookup),
// (jobsSnapshot, jobSnapNameLookup)
// )

// caching before structifying
jobRunsDeriveRunsBase(jobRunsLag30D, etlUntilTime)
.transform(jobRunsAppendClusterName(jobRunsLookups))
.transform(jobRunsAppendJobMeta(jobRunsLookups))
.transform(jobRunsStructifyLookupMeta(optimalCacheParts))
.transform(jobRunsAppendTaskAndClusterDetails)
.transform(jobRunsCleanseCreatedNestedStructures(targetKeys))
// .transform(jobRunsRollupWorkflowsAndChildren)
.drop("timestamp") // could be duplicated to enable asOf Lookups, dropping to clean up
val cachedDF = jobRunsLag30D
.transformWithDescription( jobRunsDeriveRunsBase( etlUntilTime))
.transformWithDescription( jobRunsAppendClusterName)
.transformWithDescription( jobRunsAppendJobMeta)
.repartition( optimalCacheParts)
.cache() // to speed up schema inference while "structifying" next

logger.log( Level.INFO, s"cachedDF count before structifying: ${cachedDF.count()}")
logger.log( Level.INFO, s"cachedDF.rdd.partitions.size: ${cachedDF.rdd.partitions.size}")

cachedDF
.transformWithDescription( jobRunsStructifyLookupMeta) // ( optimalCacheParts))
.transformWithDescription( jobRunsAppendTaskAndClusterDetails)
.transformWithDescription( jobRunsCleanseCreatedNestedStructures( targetKeys))
.drop("timestamp")
// `timestamp` could be duplicated to enable `asOf` lookups;
// dropping to clean up
}

protected def notebookSummary()(df: DataFrame): DataFrame = {
Expand Down
Loading