Skip to content

Commit f9c8dd0

Browse files
gueniaiaman-dbsouravbaner-damohanbaabu1996
authored
0812 release (#1249)
* Initial commit * adding fix for schemaScrubber and StructToMap (#1232) * fix for null driver_type_id and node_type_id in jrcp (#1236) * Modify Cluster_snapshot_bronze column (#1234) * Comvert all the struct field inside 'spec' column for cluster_snapshot_bronze to mapType * Dropped Spec column from snapshot * Removed Reductant VerifyMinSchema * Update_AWS_instance_types (#1248) * Update_gcp_instance_types (#1244) Update_gcp_instance_types * Update_AWS_instance_types Update_AWS_instance_types --------- Co-authored-by: Aman <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Mohan Baabu <[email protected]>
1 parent 7390d4a commit f9c8dd0

File tree

6 files changed

+158
-46
lines changed

6 files changed

+158
-46
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name := "overwatch"
22

33
organization := "com.databricks.labs"
44

5-
version := "0.8.1.1"
5+
version := "0.8.1.2"
66

77
scalaVersion := "2.12.12"
88
scalacOptions ++= Seq("-Xmax-classfile-name", "78")

src/main/resources/AWS_Instance_Details.csv

Lines changed: 139 additions & 0 deletions
Large diffs are not rendered by default.

src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -552,46 +552,17 @@ trait BronzeTransforms extends SparkSessionWrapper {
552552

553553
val rawDF = deriveRawApiResponseDF(spark.read.json(tmpClusterSnapshotSuccessPath))
554554
if (rawDF.columns.contains("cluster_id")) {
555-
val outputDF = SchemaScrubber.scrubSchema(rawDF)
556-
val finalDF = outputDF.withColumn("default_tags", SchemaTools.structToMap(outputDF, "default_tags"))
557-
.withColumn("custom_tags", SchemaTools.structToMap(outputDF, "custom_tags"))
558-
.withColumn("spark_conf", SchemaTools.structToMap(outputDF, "spark_conf"))
559-
.withColumn("spark_env_vars", SchemaTools.structToMap(outputDF, "spark_env_vars"))
560-
.withColumn(s"aws_attributes", SchemaTools.structToMap(outputDF, s"aws_attributes"))
561-
.withColumn(s"azure_attributes", SchemaTools.structToMap(outputDF, s"azure_attributes"))
562-
.withColumn(s"gcp_attributes", SchemaTools.structToMap(outputDF, s"gcp_attributes"))
555+
val scrubbedDF = SchemaScrubber.scrubSchema(rawDF)
556+
val df = scrubbedDF.withColumn("default_tags", SchemaTools.structToMap(scrubbedDF, "default_tags"))
557+
.withColumn("custom_tags", SchemaTools.structToMap(scrubbedDF, "custom_tags"))
558+
.withColumn("spark_conf", SchemaTools.structToMap(scrubbedDF, "spark_conf"))
559+
.withColumn("spark_env_vars", SchemaTools.structToMap(scrubbedDF, "spark_env_vars"))
560+
.withColumn(s"aws_attributes", SchemaTools.structToMap(scrubbedDF, s"aws_attributes"))
561+
.withColumn(s"azure_attributes", SchemaTools.structToMap(scrubbedDF, s"azure_attributes"))
562+
.withColumn(s"gcp_attributes", SchemaTools.structToMap(scrubbedDF, s"gcp_attributes"))
563563
.withColumn("organization_id", lit(config.organizationId))
564-
.verifyMinimumSchema(clusterSnapMinimumSchema)
565-
566-
val explodedDF = finalDF
567-
.withColumnRenamed("custom_tags", "custom_tags_old")
568-
.selectExpr("*", "spec.custom_tags")
569-
570-
val normalizedDf = explodedDF.withColumn("custom_tags", SchemaTools.structToMap(explodedDF, "custom_tags"))
571-
572-
// Replace the custom_tags field inside the spec struct with custom_tags outside of spec column
573-
val updatedDf = normalizedDf.schema.fields.find(_.name == "spec") match {
574-
case Some(field) =>
575-
field.dataType match {
576-
case structType: StructType =>
577-
// Create a new struct expression, replacing the specified field with the new column
578-
val newFields = structType.fields.map { f =>
579-
if (f.name.equalsIgnoreCase("custom_tags")) {
580-
col("custom_tags").as("custom_tags") // Replace with new column if names match
581-
} else {
582-
col(s"spec.${f.name}") // Keep existing fields as is
583-
}
584-
}
585-
// Update the DataFrame with the new struct replacing the old one
586-
normalizedDf.withColumn("spec", struct(newFields: _*))
587-
case _ => normalizedDf // No action if the specified structColName is not a struct type
588-
}
589-
case None => normalizedDf // No action if the specified structColName does not exist
590-
}
591-
592-
updatedDf.drop("custom_tags")
593-
.withColumnRenamed("custom_tags_old", "custom_tags")
594-
564+
.drop("spec")
565+
df.verifyMinimumSchema(clusterSnapMinimumSchema)
595566
} else {
596567
throw new NoNewDataException(msg, Level.WARN, true)
597568
}

src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,10 +1325,10 @@ object WorkflowsTransforms extends SparkSessionWrapper {
13251325
clusterStateEndOrPipelineEnd.alias("unixTimeMS_state_end"), // if clusterState still open -- close it for calculations
13261326
'timestamp_state_start,
13271327
'timestamp_state_end, 'state, 'cloud_billable, 'databricks_billable, 'uptime_in_state_H, 'current_num_workers, 'target_num_workers,
1328-
$"driverSpecs.API_Name".alias("driver_node_type_id"),
1328+
coalesce('driver_node_type_id, $"driverSpecs.API_Name").alias("driver_node_type_id"),
13291329
$"driverSpecs.Compute_Contract_Price".alias("driver_compute_hourly"),
13301330
$"driverSpecs.Hourly_DBUs".alias("driver_dbu_hourly"),
1331-
$"workerSpecs.API_Name".alias("node_type_id"),
1331+
coalesce('node_type_id, $"workerSpecs.API_Name").alias("node_type_id"),
13321332
$"workerSpecs.Compute_Contract_Price".alias("worker_compute_hourly"),
13331333
$"workerSpecs.Hourly_DBUs".alias("worker_dbu_hourly"),
13341334
$"workerSpecs.vCPUs".alias("worker_cores"),

src/main/scala/com/databricks/labs/overwatch/utils/SchemaScrubber.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,12 @@ class SchemaScrubber(
8686
s"DUPLICATE FIELDS:\n" +
8787
s"${dups.mkString("\n")}"
8888
logger.log(Level.WARN, warnMsg)
89+
val counterMap = scala.collection.mutable.Map[String, Int]().withDefaultValue(0)
8990
fields.map(f => {
90-
val fieldName = if (caseSensitive) f.sanitizedField.name else f.sanitizedField.name.toLowerCase
91+
val fieldName = if (caseSensitive) f.sanitizedField.name.trim else f.sanitizedField.name.toLowerCase.trim
9192
if (dups.contains(fieldName)) {
92-
val generatedUniqueName = f.sanitizedField.name + "_UNIQUESUFFIX_" + f.originalField.name.hashCode.toString
93+
counterMap(fieldName) += 1
94+
val generatedUniqueName = f.sanitizedField.name.trim + "_UNIQUESUFFIX_" + f.originalField.name.trim.hashCode.toString + "_" + counterMap(fieldName)
9395
val uniqueColumnMapping = s"\n${f.originalField.name} --> ${generatedUniqueName}"
9496
logger.log(Level.WARN, uniqueColumnMapping)
9597
f.sanitizedField.copy(name = generatedUniqueName)

src/test/scala/com/databricks/labs/overwatch/utils/SchemaToolsTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ class SchemaToolsTest extends AnyFunSpec with SparkSessionTestWrapper with Given
270270

271271
val expectedResString = "`b_2_2_2` STRUCT<`abc`: STRING, `c_1__45`: BIGINT>,`exception_parent` " +
272272
"STRUCT<`dup1`: BIGINT, `dup2`: BIGINT, `xyz`: STRUCT<`_mixed`: BIGINT, `_bad`: BIGINT, " +
273-
"`dup1_UNIQUESUFFIX_95946320`: BIGINT, `dup1_UNIQUESUFFIX_95946320`: BIGINT, `dup2_UNIQUESUFFIX_3095059`: " +
274-
"BIGINT, `dup2_UNIQUESUFFIX_3095059`: STRING, `good_col`: BIGINT, `jkl`: BIGINT, `otherexcept`: BIGINT>, " +
273+
"`dup1_UNIQUESUFFIX_95946320_1`: BIGINT, `dup1_UNIQUESUFFIX_95946320_2`: BIGINT, `dup2_UNIQUESUFFIX_3095059_1`: " +
274+
"BIGINT, `dup2_UNIQUESUFFIX_3095059_2`: STRING, `good_col`: BIGINT, `jkl`: BIGINT, `otherexcept`: BIGINT>, " +
275275
"`zyx`: BIGINT>,`i_1` BIGINT,`parentwspace` STRING,`validParent` STRING"
276276
val ddlFromLogic = df.scrubSchema(exceptionScrubber).schema.toDDL
277277
assertResult(expectedResString) {

0 commit comments

Comments
 (0)