Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e134bd4
Initial commit
gueniai May 31, 2024
da14f88
adding fix for schemaScrubber and StructToMap (#1232)
aman-db Jun 4, 2024
a5c8b54
Comvert all the struct field inside 'spec' column for cluster_snapsho…
souravbaner-da Jun 11, 2024
aeef7ff
Dropped Spec column from snapshot
souravbaner-da Jun 11, 2024
4f64d83
Squash merge of show_dataframe_in_logs
neilbest-db Jun 13, 2024
0fdac8e
Squash merge of named_transformation
neilbest-db Jun 13, 2024
1175940
Refactor lookups in Silver Job Runs
neilbest-db Jun 13, 2024
98cfed6
use `NamedTransformation`s in Silver Job Runs
neilbest-db Jun 12, 2024
d751d5f
adjust Silver Job Runs module configuration and caching
neilbest-db Jun 21, 2024
0c8c9c2
add more Spark UI labels to Silver Job Runs transformations
neilbest-db Jun 25, 2024
2ead752
flip transformation names to beginning of label
neilbest-db Jun 25, 2024
97236ae
Initial commit
gueniai May 8, 2024
9fe9f8c
revert Spark UI Job Group labels
neilbest-db Jun 29, 2024
995e0da
Merge branch '0820_release' into 1228-silver-job-runs-spark312-r0812
neilbest-db Jun 29, 2024
25671b7
recon enhancement done to deal with different columns in source and t…
sriram251-code Jul 8, 2024
f7460bd
adjust Silver Job Runs module configuration (#1256)
neilbest-db Jul 30, 2024
caa3282
append null columns from cluster snapshot for cluster_spec_silver (#1…
sriram251-code Aug 7, 2024
f3ffd7c
1201 collect all event logs on first run (#1255)
souravbaner-da Aug 7, 2024
3c16b5f
Redefine views so that they are created from tables not locations (#1…
souravbaner-da Aug 7, 2024
d6fa441
1030 pipeline validation framework (#1071)
souravbaner-da Aug 7, 2024
59daae5
adding fix for duplicate accountId in module 2010 and 3019 (#1270)
aman-db Aug 8, 2024
3055a22
1218 warehouse state details (#1254)
aman-db Aug 12, 2024
bbdb61f
Add descriptive `NamedTransformation`s to Spark UI (#1223)
neilbest-db Aug 20, 2024
12fd6ac
adding code for warehouseStateFact gold (#1265)
aman-db Aug 23, 2024
1eb6a00
Show `DataFrame` records in logs (#1224)
neilbest-db Aug 23, 2024
75c8760
Merge branch '0820_release' into 1228-silver-job-runs-spark312-r0812
neilbest-db Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.1.2"
version := "0.8.2.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")

Test / fork := true
Test / envVars := Map("OVERWATCH_ENV" -> " ","OVERWATCH_TOKEN" -> " ","OVERWATCH" -> " ")

logBuffered in Test := false
// parallelExecution in Test := false

val sparkVersion = "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
Expand All @@ -18,6 +21,7 @@ libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provide
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"

//libraryDependencies += "org.apache.hive" % "hive-metastore" % "2.3.9"

Expand Down Expand Up @@ -51,4 +55,4 @@ assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
19 changes: 19 additions & 0 deletions src/main/resources/Warehouse_DBU_Details.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
cloud,cluster_size,driver_size,worker_count,total_dbus
AWS,2X-Small,i3.2xlarge,1,4
AWS,X-Small,i3.2xlarge,2,6
AWS,Small,i3.4xlarge,4,12
AWS,Medium,i3.8xlarge,8,24
AWS,Large,i3.8xlarge,16,40
AWS,X-Large,i3.16xlarge,32,80
AWS,2X-Large,i3.16xlarge,64,144
AWS,3X-Large,i3.16xlarge,128,272
AWS,4X-Large,i3.16xlarge,256,528
AZURE,2X-Small,E8ds v4,1,4
AZURE,X-Small,E8ds v4,2,6
AZURE,Small,E16ds v4,4,12
AZURE,Medium,E32ds v4,8,24
AZURE,Large,E32ds v4,16,40
AZURE,X-Large,E64ds v4,32,80
AZURE,2X-Large,E64ds v4,64,144
AZURE,3X-Large,E64ds v4,128,272
AZURE,4X-Large,E64ds v4,256,528
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
val sqlComputerDBUPrice: Double = config.sql_compute_dbu_price
val jobsLightDBUPrice: Double = config.jobs_light_dbu_price
val customWorkspaceName: String = config.workspace_name
val standardScopes = "audit,sparkEvents,jobs,clusters,clusterEvents,notebooks,pools,accounts,dbsql,notebookCommands".split(",")
val standardScopes = OverwatchScope.toArray
val scopesToExecute = (standardScopes.map(_.toLowerCase).toSet --
config.excluded_scopes.getOrElse("").split(":").map(_.toLowerCase).toSet).toArray

Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/api/ApiMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,32 @@ trait ApiMeta {
jsonObject.toString
}

/**
* Function will add the meta info to the api response.
*
* @param response
* @param jsonQuery
* @param queryMap
* @return a string containing the api response and the meta for the api call.
*/
private[overwatch] def enrichAPIResponse(response: HttpResponse[String], jsonQuery: String, queryMap: Map[String, String]): String = {
val filter: String = if (apiCallType.equals("POST")) jsonQuery else {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(queryMap)
}
val jsonObject = new JSONObject();
val apiTraceabilityMeta = new JSONObject();
apiTraceabilityMeta.put("endPoint", apiName)
apiTraceabilityMeta.put("type", apiCallType)
apiTraceabilityMeta.put("apiVersion", apiV)
apiTraceabilityMeta.put("responseCode", response.code)
apiTraceabilityMeta.put("batchKeyFilter", filter)
jsonObject.put("rawResponse", response.body.trim)
jsonObject.put("apiTraceabilityMeta", apiTraceabilityMeta)
jsonObject.toString
}

}

/**
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import scala.concurrent.Future
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter


/**
Expand Down Expand Up @@ -422,6 +424,40 @@ class Workspace(config: Config) extends SparkSessionWrapper {
addReport
}

/**
* Fetch the warehouse event data from system.compute.warehouse_events
* @param fromTime : from time to fetch the data
* @param untilTime: until time to fetch the data
* @param maxHistoryDays: maximum history days to fetch the data
* @return
*/
def getWarehousesEventDF(fromTime: TimeTypes,
untilTime: TimeTypes,
config: Config,
maxHistoryDays: Int = 30
): DataFrame = {
val sysTableFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val moduleFromTime = fromTime.asLocalDateTime.format(sysTableFormat)
val moduleUntilTime = untilTime.asLocalDateTime.format(sysTableFormat)
val useSystemTableMessage = "Use system tables as a source to audit logs"
val tableDoesNotExistsMessage = "Table system.compute.warehouse_events does not exists"

if(config.auditLogConfig.systemTableName.isEmpty)
throw new NoNewDataException(useSystemTableMessage, Level.WARN, allowModuleProgression = false)

if(!spark.catalog.tableExists("system.compute.warehouse_events"))
throw new NoNewDataException(tableDoesNotExistsMessage, Level.WARN, allowModuleProgression = false)

spark.sql(s"""
select * from system.compute.warehouse_events
WHERE workspace_id = '${config.organizationId}'
and event_time >= DATE_SUB('${moduleFromTime}', ${maxHistoryDays})
and event_time <= '${moduleUntilTime}'
""")
.withColumnRenamed("event_type","state")
.withColumnRenamed("workspace_id","organization_id")
.withColumnRenamed("event_time","timestamp")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)

}



lazy private[overwatch] val jobsSnapshotModule = Module(1001, "Bronze_Jobs_Snapshot", this)
lazy private val appendJobsProcess: () => ETLDefinition = {
() =>
Expand Down Expand Up @@ -171,6 +169,7 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
BronzeTargets.clustersSnapshotTarget.asDF,
Seq(
prepClusterEventLogs(
clusterEventLogsModule.isFirstRun,
BronzeTargets.auditLogsTarget.asIncrementalDF(clusterEventLogsModule, BronzeTargets.auditLogsTarget.incrementalColumns, additionalLagDays = 1), // 1 lag day to get laggard records
clusterEventLogsModule.fromTime,
clusterEventLogsModule.untilTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,21 @@ trait BronzeTransforms extends SparkSessionWrapper {
pipelineSnapTime: Long,
tmpClusterEventsSuccessPath: String,
tmpClusterEventsErrorPath: String,
config: Config) = {
config: Config,
isFirstRun: Boolean) = {
val finalResponseCount = clusterIDs.length
val clusterEventsEndpoint = "clusters/events"

val lagTime = 86400000 //1 day
val lagStartTime = startTime.asUnixTimeMilli - lagTime

val lagStartTime = if (isFirstRun) {
logger.log(Level.INFO, "First run, acquiring all cluster events")
0.toLong
} else {
logger.log(Level.INFO, "Subsequent run, acquiring new cluster events")
startTime.asUnixTimeMilli - lagTime
}

// creating Json input for parallel API calls
val jsonInput = Map(
"start_value" -> "0",
Expand Down Expand Up @@ -601,6 +610,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
}

protected def prepClusterEventLogs(
isFirstRun : Boolean,
filteredAuditLogDF: DataFrame,
startTime: TimeTypes,
endTime: TimeTypes,
Expand All @@ -626,18 +636,23 @@ trait BronzeTransforms extends SparkSessionWrapper {

val tmpClusterEventsSuccessPath = s"${config.tempWorkingDir}/${apiEndpointTempDir}/success_" + pipelineSnapTS.asUnixTimeMilli
val tmpClusterEventsErrorPath = s"${config.tempWorkingDir}/${apiEndpointTempDir}/error_" + pipelineSnapTS.asUnixTimeMilli
try{
landClusterEvents(clusterIDs, startTime, endTime, pipelineSnapTS.asUnixTimeMilli, tmpClusterEventsSuccessPath,
tmpClusterEventsErrorPath, config)
}catch {
try {
landClusterEvents(
clusterIDs, startTime, endTime,
pipelineSnapTS.asUnixTimeMilli,
tmpClusterEventsSuccessPath,
tmpClusterEventsErrorPath,
config,
isFirstRun)
} catch {
case e: Throwable =>
val errMsg = s"Error in landing cluster events: ${e.getMessage}"
logger.log(Level.ERROR, errMsg)
throw e
}
if (Helpers.pathExists(tmpClusterEventsErrorPath)) {
persistErrors(
deriveRawApiResponseDF(spark.read.json(tmpClusterEventsErrorPath))
spark.read.json(tmpClusterEventsErrorPath)
.withColumn("from_ts", toTS(col("from_epoch")))
.withColumn("until_ts", toTS(col("until_epoch"))),
database,
Expand Down Expand Up @@ -1375,4 +1390,4 @@ trait BronzeTransforms extends SparkSessionWrapper {
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ class ETLDefinition(

val transformedDF = transforms.foldLeft(verifiedSourceDF) {
case (df, transform) =>
df.transform(transform)

/*
* reverting Spark UI Job Group labels for now
*
* TODO: enumerate the regressions this would introduce
* when the labels set by then platform are replaced
* this way.
* df.sparkSession.sparkContext.setJobGroup(
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
* transform.toString)
*/

df.transform( transform)

}
write(transformedDF, module)
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/pipeline/Gold.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,24 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
)
}

lazy private[overwatch] val warehouseStateFactModule = Module(3020, "Gold_WarehouseStateFact", this, Array(2022, 2021), 3.0)
lazy private val appendWarehouseStateFactProcess: () => ETLDefinition = {
() =>
ETLDefinition(
SilverTargets.warehousesStateDetailTarget.asIncrementalDF(
warehouseStateFactModule,
SilverTargets.warehousesStateDetailTarget.incrementalColumns,
GoldTargets.warehouseStateFactTarget.maxMergeScanDates
),
Seq(buildWarehouseStateFact(
BronzeTargets.cloudMachineDetail,
BronzeTargets.warehouseDbuDetail,
SilverTargets.warehousesSpecTarget
)),
append(GoldTargets.warehouseStateFactTarget)
)
}

private def processSparkEvents(): Unit = {

sparkExecutorModule.execute(appendSparkExecutorProcess)
Expand Down Expand Up @@ -400,6 +418,10 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
notebookCommandsFactModule.execute(appendNotebookCommandsFactProcess)
GoldTargets.notebookCommandsFactViewTarget.publish(notebookCommandsFactViewColumnMapping)
}
case OverwatchScope.warehouseEvents => {
warehouseStateFactModule.execute(appendWarehouseStateFactProcess)
GoldTargets.warehouseStateFactViewTarget.publish(warehouseStateFactViewColumnMappings)
}
case _ =>
}
}
Expand Down
Loading