Skip to content

Commit e6c8923

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-54620][SQL] Add safety check in ObservationManager to avoid Observation blocking
### What changes were proposed in this pull request? This patch adds a safety check into `ObservationManager.tryComplete` to avoid Observation blocking. ### Why are the changes needed? We got reports that for some corner cases `Observation.get` will be blocked forever. It is not deadlock case after investigation. If the `CollectMetricsExec` operator was optimized away, e.g., the executed plan was optimized to have some empty relation propagation on top of plan tree of `CollectMetricsExec`, Spark won't fulfill the promise in `Observation` and `get` calls will be blocked forever. ### Does this PR introduce _any_ user-facing change? Yes. Previously for some corner cases `Observation.get` call will be blocked forever. After this change, `get` will return an empty map. ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #53358 from viirya/fix_observation_blocking. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent e2722b8 commit e6c8923

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

sql/api/src/main/scala/org/apache/spark/sql/Observation.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class Observation(val name: String) {
7070
* first action. Only the result of the first action is available. Subsequent actions do not
7171
* modify the result.
7272
*
73+
* Note that if no metrics were recorded, an empty map is probably returned. It possibly happens
74+
* when the operators used for observation are optimized away.
75+
*
7376
* @return
7477
* the observed metrics as a `Map[String, Any]`
7578
* @throws InterruptedException
@@ -78,14 +81,21 @@ class Observation(val name: String) {
7881
@throws[InterruptedException]
7982
def get: Map[String, Any] = {
8083
val row = getRow
81-
row.getValuesMap(row.schema.map(_.name))
84+
if (row == null || row.schema == null) {
85+
Map.empty
86+
} else {
87+
row.getValuesMap(row.schema.map(_.name))
88+
}
8289
}
8390

8491
/**
8592
* (Java-specific) Get the observed metrics. This waits for the observed dataset to finish its
8693
* first action. Only the result of the first action is available. Subsequent actions do not
8794
* modify the result.
8895
*
96+
* Note that if no metrics were recorded, an empty map is probably returned. It possibly happens
97+
* when the operators used for observation are optimized away.
98+
*
8999
* @return
90100
* the observed metrics as a `java.util.Map[String, Object]`
91101
* @throws InterruptedException

sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.classic
1818

1919
import java.util.concurrent.ConcurrentHashMap
2020

21-
import org.apache.spark.sql.Observation
21+
import org.apache.spark.sql.{Observation, Row}
2222
import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics
2323
import org.apache.spark.sql.execution.QueryExecution
2424
import org.apache.spark.sql.util.QueryExecutionListener
@@ -56,10 +56,22 @@ private[sql] class ObservationManager(session: SparkSession) {
5656
val allMetrics = qe.observedMetrics
5757
qe.logical.foreach {
5858
case c: CollectMetrics =>
59-
allMetrics.get(c.name).foreach { metrics =>
59+
val keyExists = observations.containsKey((c.name, c.dataframeId))
60+
val metrics = allMetrics.get(c.name)
61+
if (keyExists && metrics.isEmpty) {
62+
// If the key exists but no metrics were collected, it means for some reason the metrics
63+
// could not be collected. This can happen e.g., if the CollectMetricsExec was optimized
64+
// away.
6065
val observation = observations.remove((c.name, c.dataframeId))
6166
if (observation != null) {
62-
observation.setMetricsAndNotify(metrics)
67+
observation.setMetricsAndNotify(Row.empty)
68+
}
69+
} else {
70+
metrics.foreach { metrics =>
71+
val observation = observations.remove((c.name, c.dataframeId))
72+
if (observation != null) {
73+
observation.setMetricsAndNotify(metrics)
74+
}
6375
}
6476
}
6577
case _ =>

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2878,6 +2878,24 @@ class DatasetSuite extends QueryTest
28782878
checkDataset(Seq(seqMutableSet).toDS(), seqMutableSet)
28792879
checkDataset(Seq(mapMutableSet).toDS(), mapMutableSet)
28802880
}
2881+
2882+
test("SPARK-54620: Observation should not blocking forever") {
2883+
val observation = Observation("row_count")
2884+
2885+
var df = Seq.empty[(Int, Int)].toDF("v1", "v2")
2886+
df = df.observe(observation,
2887+
functions.count(functions.lit(1)).alias("record_cnt"))
2888+
df = df.repartition($"v1")
2889+
.select($"v1" + 1 as "v1", $"v2" + 1 as "v2")
2890+
.join(
2891+
Seq((1, 2), (3, 4)).toDF("v1", "v2").repartition($"v2"),
2892+
Seq("v1"),
2893+
"inner")
2894+
df.collect()
2895+
2896+
val metrics = observation.get
2897+
assert(metrics.isEmpty)
2898+
}
28812899
}
28822900

28832901
/**

0 commit comments

Comments
 (0)