Skip to content

Commit d1b18a7

Browse files
authored
Record errors from crawl_tables step in $inventory.table_failures table and display counter on the dashboard (#300)
Fixes #231
1 parent 4fe6ed8 commit d1b18a7

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- viz type=counter, name=Total Table Scan Failure Count, counter_label=Metastore Crawl Failures, value_column=count_failures
2+
-- widget col=2, row=0, size_x=1, size_y=3
3+
SELECT COUNT(*) AS count_failures
4+
FROM $inventory.table_failures
Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import java.util.concurrent.ConcurrentLinkedQueue
2+
import scala.collection.JavaConverters
3+
14
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
25
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
36
import org.apache.spark.sql.DataFrame
@@ -6,7 +9,12 @@ import org.apache.spark.sql.DataFrame
69
case class TableDetails(catalog: String, database: String, name: String, object_type: String,
710
table_format: String, location: String, view_text: String)
811

9-
def metadataForAllTables(databases: Seq[String]): DataFrame = {
12+
// recording error log in the database
13+
case class TableError(catalog: String, database: String, name: String, error: String)
14+
15+
val failures = new ConcurrentLinkedQueue[TableError]()
16+
17+
def metadataForAllTables(databases: Seq[String], queue: ConcurrentLinkedQueue[TableError]): DataFrame = {
1018
import spark.implicits._
1119

1220
val externalCatalog = spark.sharedState.externalCatalog
@@ -15,25 +23,25 @@ def metadataForAllTables(databases: Seq[String]): DataFrame = {
1523
externalCatalog.listTables(databaseName)
1624
} catch {
1725
case err: NoSuchDatabaseException =>
18-
println(s"[ERROR][${databaseName}] ignoring database because of ${err}")
26+
failures.add(TableError("hive_metastore", databaseName, null, s"ignoring database because of ${err}"))
1927
null
2028
}
2129
if (tables == null) {
22-
println(s"[WARN][${databaseName}] listTables returned null")
30+
failures.add(TableError("hive_metastore", databaseName, null, s"listTables returned null"))
2331
Seq()
2432
} else {
2533
tables.par.map(tableName => try {
2634
val table = externalCatalog.getTable(databaseName, tableName)
2735
if (table == null) {
28-
println(s"[WARN][${databaseName}.${tableName}] result is null")
36+
failures.add(TableError("hive_metastore", databaseName, tableName, s"result is null"))
2937
None
3038
} else {
3139
Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull,
3240
table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull))
3341
}
3442
} catch {
3543
case err: Throwable =>
36-
println(s"[ERROR][${databaseName}.${tableName}] ignoring table because of ${err}")
44+
failures.add(TableError("hive_metastore", databaseName, tableName, s"ignoring table because of ${err}"))
3745
None
3846
}).toList.collect {
3947
case Some(x) => x
@@ -45,5 +53,8 @@ def metadataForAllTables(databases: Seq[String]): DataFrame = {
4553
dbutils.widgets.text("inventory_database", "ucx")
4654
val inventoryDatabase = dbutils.widgets.get("inventory_database")
4755

48-
val df = metadataForAllTables(spark.sharedState.externalCatalog.listDatabases())
49-
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"$inventoryDatabase.tables")
56+
val df = metadataForAllTables(spark.sharedState.externalCatalog.listDatabases(), failures)
57+
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"$inventoryDatabase.tables")
58+
59+
JavaConverters.asScalaIteratorConverter(failures.iterator).asScala.toList.toDF
60+
.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"$inventoryDatabase.table_failures")

0 commit comments

Comments
 (0)