Skip to content

Commit 5337cad

Browse files
authored
0713 Release Staging (#823)
* initial 0713 commit * fixed (#815) * fixed (#816) * fixed (#818) * jrcp worker potential fixed (#822) Co-authored-by: geeksheikh <[email protected]> * omitted sql warehouses from cluster spec silver (#831) * omitted sql warehouses from cluster spec silver * only publish successful change requsts * added constant for response status * backporting fix for 709 into 713 (#833) --------- Co-authored-by: geeksheikh <[email protected]>
1 parent cbf4cd2 commit 5337cad

File tree

4 files changed

+31
-16
lines changed

4 files changed

+31
-16
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name := "overwatch"
22

33
organization := "com.databricks.labs"
44

5-
version := "0.7.1.2"
5+
version := "0.7.1.3"
66

77
scalaVersion := "2.12.12"
88
scalacOptions ++= Seq("-Xmax-classfile-name", "78")

src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,22 @@ trait GoldTransforms extends SparkSessionWrapper {
333333
// using noise generators to fill with two passes to reduce skew
334334
.fillMeta(clusterPotMetaToFill, clusterPotKeys, clusterPotIncrementals, noiseBuckets = getTotalCores) // scan back then forward to fill all
335335

336-
337-
val workerPotentialCoreS = when('databricks_billable, $"workerSpecs.vCPUs" * 'current_num_workers * 'uptime_in_state_S).otherwise(lit(0))
336+
val isSingleNode = when(get_json_object('custom_tags, "$.ResourceClass") === "SingleNode", lit(true)).otherwise(lit(false))
337+
val workerPotentialCoreS = when('databricks_billable && isSingleNode, $"driverSpecs.vCPUs" * 'uptime_in_state_S)
338+
.when('databricks_billable, $"workerSpecs.vCPUs" * 'current_num_workers * 'uptime_in_state_S)
339+
.otherwise(lit(0))
338340
val isPhotonEnabled = upper('runtime_engine).equalTo("PHOTON")
339341
val isNotAnSQlWarehouse = !upper('sku).equalTo("SQLCOMPUTE")
340-
val photonDBUMultiplier = when(isPhotonEnabled && isNotAnSQlWarehouse, lit(2)).otherwise(lit(1))
342+
val photonDBUMultiplier = when(isPhotonEnabled && isNotAnSQlWarehouse && isAutomated('cluster_name), lit(2.9))
343+
.when(isPhotonEnabled && isNotAnSQlWarehouse && !isAutomated('cluster_name), lit(2.0))
344+
.otherwise(lit(1.0))
341345
val driverDBUs = when('databricks_billable, $"driverSpecs.Hourly_DBUs" * 'uptime_in_state_H * photonDBUMultiplier).otherwise(lit(0)).alias("driver_dbus")
342-
val workerDBUs = when('databricks_billable, $"workerSpecs.Hourly_DBUs" * 'current_num_workers * 'uptime_in_state_H * photonDBUMultiplier).otherwise(lit(0)).alias("worker_dbus")
346+
val workerDBUs = when('databricks_billable && !isSingleNode, $"workerSpecs.Hourly_DBUs" * 'current_num_workers * 'uptime_in_state_H * photonDBUMultiplier).otherwise(lit(0)).alias("worker_dbus")
347+
val driverCoreHours = round(TransformFunctions.getNodeInfo("driver", "vCPUs", true) / lit(3600), 2)
348+
val workerCoreHours = round(TransformFunctions.getNodeInfo("worker", "vCPUs", true) / lit(3600), 2)
349+
val coreHours = when('isRunning && isSingleNode, driverCoreHours)
350+
.when('isRunning && !isSingleNode, workerCoreHours)
351+
.otherwise(lit(0.0)).alias("core_hours")
343352
val driverComputeCost = Costs.compute('cloud_billable, $"driverSpecs.Compute_Contract_Price", lit(1), 'uptime_in_state_H).alias("driver_compute_cost")
344353
val workerComputeCost = Costs.compute('cloud_billable, $"workerSpecs.Compute_Contract_Price", 'target_num_workers, 'uptime_in_state_H).alias("worker_compute_cost")
345354
val driverDBUCost = Costs.dbu(driverDBUs, 'dbu_rate).alias("driver_dbu_cost")
@@ -375,10 +384,7 @@ trait GoldTransforms extends SparkSessionWrapper {
375384
driverDBUs,
376385
workerDBUs,
377386
(driverDBUs + workerDBUs).alias("total_dbus"),
378-
when('isRunning,
379-
round(TransformFunctions.getNodeInfo("driver", "vCPUs", true) / lit(3600), 2) +
380-
round(TransformFunctions.getNodeInfo("worker", "vCPUs", true) / lit(3600), 2)
381-
).otherwise(lit(0.0)).alias("core_hours"),
387+
coreHours,
382388
'driverSpecs,
383389
'workerSpecs,
384390
driverComputeCost,

src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ trait SilverTransforms extends SparkSessionWrapper {
1414
import spark.implicits._
1515

1616
private val logger: Logger = Logger.getLogger(this.getClass)
17+
private val responseSuccessFilter: Column = $"response.statusCode" === 200
1718

1819
private def appendPowerProperties: Column = {
1920
struct(
@@ -419,6 +420,7 @@ trait SilverTransforms extends SparkSessionWrapper {
419420
$"userIdentity.email".alias("userEmail"), 'requestId, 'response)
420421

421422
private def clusterBase(auditRawDF: DataFrame): DataFrame = {
423+
val isWarehouse = get_json_object('custom_tags, "$.SqlEndpointId").isNotNull
422424
val cluster_id_gen_w = Window.partitionBy('organization_id, 'cluster_name).orderBy('timestamp).rowsBetween(Window.currentRow, 1000)
423425
val cluster_name_gen_w = Window.partitionBy('organization_id, 'cluster_id).orderBy('timestamp).rowsBetween(Window.currentRow, 1000)
424426
val cluster_id_gen = first('cluster_id, true).over(cluster_id_gen_w)
@@ -465,7 +467,9 @@ trait SilverTransforms extends SparkSessionWrapper {
465467
val clusterRaw = auditRawDF
466468
.filter('serviceName === "clusters" && !'actionName.isin("changeClusterAcl"))
467469
.selectExpr("*", "requestParams.*").drop("requestParams", "Overwatch_RunID")
470+
.filter(responseSuccessFilter) // only publish successful edits into the spec table
468471
.select(clusterSummaryCols: _*)
472+
.filter(!isWarehouse)
469473
.withColumn("cluster_id", cluster_id_gen)
470474
.withColumn("cluster_name", cluster_name_gen)
471475

@@ -512,14 +516,14 @@ trait SilverTransforms extends SparkSessionWrapper {
512516
'poolSnapDetails
513517
)
514518

515-
val deleteCol = when('actionName === "delete" && $"response.statusCode" === 200,
519+
val deleteCol = when('actionName === "delete" && responseSuccessFilter,
516520
struct(
517521
$"userIdentity.email".alias("deleted_by"),
518522
'timestamp.alias("deleted_at_epochMillis"),
519523
from_unixtime('timestamp / 1000).cast("timestamp").alias("deleted_at")
520524
)).otherwise(lit(null).cast(Schema.poolsDeleteSchema)).alias("delete_details")
521525

522-
val createCol = when('actionName === "create" && $"response.statusCode" === 200,
526+
val createCol = when('actionName === "create" && responseSuccessFilter,
523527
struct(
524528
$"userIdentity.email".alias("created_by"),
525529
'timestamp.alias("created_at_epochMillis"),
@@ -873,7 +877,6 @@ trait SilverTransforms extends SparkSessionWrapper {
873877
)
874878

875879
val clustersRemoved = clusterBaseDF
876-
.filter($"response.statusCode" === 200) // only successful delete statements get applied
877880
.filter('actionName.isin("permanentDelete"))
878881
.select('organization_id, 'cluster_id, 'userEmail.alias("deleted_by"))
879882

src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ object WorkflowsTransforms extends SparkSessionWrapper {
653653
'jobId.cast("long").alias("submissionJobId"),
654654
'runId.alias("jobRunId"),
655655
'timestamp.alias("submissionTime"),
656-
'jobTriggerType,
656+
'jobTriggerType.alias("jobTriggerType_Triggered"),
657657
'requestId.alias("submitRequestID"),
658658
'response.alias("submitResponse"),
659659
'userIdentity.alias("submittedBy")
@@ -833,7 +833,7 @@ object WorkflowsTransforms extends SparkSessionWrapper {
833833
coalesce('parentRunId_Started, 'parentRunId_Completed).cast("long").alias("parentRunId"),
834834
coalesce('taskRunId, 'idInJob).cast("long").alias("idInJob"),
835835
TransformFunctions.subtractTime(
836-
'submissionTime,
836+
'startTime,
837837
array_max(array('completionTime, 'cancellationTime)) // endTS must remain null if still open
838838
).alias("TaskRunTime"), // run launch time until terminal event
839839
TransformFunctions.subtractTime(
@@ -843,7 +843,7 @@ object WorkflowsTransforms extends SparkSessionWrapper {
843843
'run_name,
844844
coalesce('jobClusterType_Started, 'jobClusterType_Completed).alias("clusterType"),
845845
coalesce('jobTaskType_Started, 'jobTaskType_Completed).alias("taskType"),
846-
coalesce('jobTriggerType_Started, 'jobTriggerType_Completed, 'jobTriggerType_runNow).alias("jobTriggerType"),
846+
coalesce('jobTriggerType_Triggered,'jobTriggerType_Started, 'jobTriggerType_Completed, 'jobTriggerType_runNow).alias("jobTriggerType"),
847847
when('cancellationRequestId.isNotNull, "Cancelled")
848848
.otherwise('jobTerminalState)
849849
.alias("terminalState"),
@@ -1221,6 +1221,8 @@ object WorkflowsTransforms extends SparkSessionWrapper {
12211221
.withColumn("runtime_in_cluster_state", // all runs have an initial cluster state
12221222
when('state.isin("CREATING", "STARTING") || 'cluster_type === "new", 'uptime_in_state_H * 1000 * 3600) // get true cluster time when state is guaranteed fully initial
12231223
.otherwise(runStateFirstToEnd - $"task_runtime.startEpochMS")) // otherwise use jobStart as beginning time and min of stateEnd or jobEnd for end time )
1224+
.withColumn("hourly_core_potential", 'cluster_state_worker_potential_core_H / 'uptime_in_state_H) // executor potential per hour
1225+
.withColumn("worker_potential_core_H", ('runtime_in_cluster_state / 1000.0 / 60.0 / 60.0) * 'hourly_core_potential)
12241226
.withColumn("lifecycleState", lit("init"))
12251227
}
12261228

@@ -1256,6 +1258,8 @@ object WorkflowsTransforms extends SparkSessionWrapper {
12561258
)
12571259
.join(jobRunInitialStates.select(stateLifecycleKeys map col: _*), stateLifecycleKeys, "leftanti") // filter out beginning states
12581260
.withColumn("runtime_in_cluster_state", taskRunEndOrPipelineEnd - runStateLastToStart)
1261+
.withColumn("hourly_core_potential", 'cluster_state_worker_potential_core_H / 'uptime_in_state_H) // executor potential per hour
1262+
.withColumn("worker_potential_core_H", ('runtime_in_cluster_state / 1000.0 / 60.0 / 60.0) * 'hourly_core_potential)
12591263
.withColumn("lifecycleState", lit("terminal"))
12601264
}
12611265

@@ -1295,6 +1299,8 @@ object WorkflowsTransforms extends SparkSessionWrapper {
12951299
.join(jobRunInitialStates.select(stateLifecycleKeys map col: _*), stateLifecycleKeys, "leftanti") // filter out beginning states
12961300
.join(jobRunTerminalStates.select(stateLifecycleKeys map col: _*), stateLifecycleKeys, "leftanti") // filter out ending states
12971301
.withColumn("runtime_in_cluster_state", 'unixTimeMS_state_end - 'unixTimeMS_state_start)
1302+
.withColumn("hourly_core_potential", 'cluster_state_worker_potential_core_H / 'uptime_in_state_H) // executor potential per hour
1303+
.withColumn("worker_potential_core_H", ('runtime_in_cluster_state / 1000.0 / 60.0 / 60.0) * 'hourly_core_potential)
12981304
.withColumn("lifecycleState", lit("intermediate"))
12991305

13001306
}
@@ -1327,7 +1333,7 @@ object WorkflowsTransforms extends SparkSessionWrapper {
13271333
$"workerSpecs.vCPUs".alias("worker_cores"),
13281334
'isAutomated,
13291335
'dbu_rate,
1330-
'worker_potential_core_H,
1336+
'worker_potential_core_H.alias("cluster_state_worker_potential_core_H"),
13311337
'driver_compute_cost,
13321338
'worker_compute_cost,
13331339
'driver_dbu_cost,

0 commit comments

Comments
 (0)