Skip to content

Commit 49dcb08

Browse files
Deduplicate errors emitted by Spark Connect linter (#1824)
## Changes This PR adds deduplication of errors emitted by Spark Connect linter. It also adds more functional tests using the new framework ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [ ] modified existing workflow: `...` - [ ] added a new table - [ ] modified existing table: `...` ### Tests - [ ] manually tested - [x] added unit tests - [ ] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent 893f01c commit 49dcb08

File tree

6 files changed

+55
-47
lines changed

6 files changed

+55
-47
lines changed

src/databricks/labs/ucx/source_code/linters/spark_connect.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ def _cluster_type_str(self) -> str:
2222
def lint(self, node: ast.AST) -> Iterator[Advice]:
2323
pass
2424

25+
def lint_tree(self, tree: ast.AST) -> Iterator[Advice]:
26+
reported_locations = set()
27+
for node in ast.walk(tree):
28+
for advice in self.lint(node):
29+
loc = (advice.start_line, advice.start_col)
30+
if loc not in reported_locations:
31+
reported_locations.add(loc)
32+
yield advice
33+
2534

2635
class JvmAccessMatcher(SharedClusterMatcher):
2736
_FIELDS = [
@@ -195,6 +204,5 @@ def __init__(self, is_serverless: bool = False):
195204

196205
def lint(self, code: str) -> Iterator[Advice]:
197206
tree = ast.parse(code)
198-
for node in ast.walk(tree):
199-
for matcher in self._matchers:
200-
yield from matcher.lint(node)
207+
for matcher in self._matchers:
208+
yield from matcher.lint_tree(tree)

tests/unit/source_code/linters/test_spark_connect.py

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,6 @@ def test_jvm_access_match_shared():
2121
end_line=3,
2222
end_col=18,
2323
),
24-
Failure(
25-
code="jvm-access-in-shared-clusters",
26-
message='Cannot access Spark Driver JVM on UC Shared Clusters',
27-
start_line=3,
28-
start_col=0,
29-
end_line=3,
30-
end_col=13,
31-
),
3224
] == list(linter.lint(code))
3325

3426

@@ -48,14 +40,6 @@ def test_jvm_access_match_serverless():
4840
end_line=3,
4941
end_col=18,
5042
),
51-
Failure(
52-
code="jvm-access-in-shared-clusters",
53-
message='Cannot access Spark Driver JVM on Serverless Compute',
54-
start_line=3,
55-
start_col=0,
56-
end_line=3,
57-
end_col=13,
58-
),
5943
] == list(linter.lint(code))
6044

6145

@@ -74,14 +58,6 @@ def test_rdd_context_match_shared():
7458
end_line=2,
7559
end_col=32,
7660
),
77-
Failure(
78-
code='legacy-context-in-shared-clusters',
79-
message='sc is not supported on UC Shared Clusters. Rewrite it using spark',
80-
start_line=2,
81-
start_col=7,
82-
end_line=2,
83-
end_col=21,
84-
),
8561
Failure(
8662
code="rdd-in-shared-clusters",
8763
message='RDD APIs are not supported on UC Shared Clusters. Rewrite it using DataFrame API',
@@ -90,6 +66,14 @@ def test_rdd_context_match_shared():
9066
end_line=3,
9167
end_col=42,
9268
),
69+
Failure(
70+
code='legacy-context-in-shared-clusters',
71+
message='sc is not supported on UC Shared Clusters. Rewrite it using spark',
72+
start_line=2,
73+
start_col=7,
74+
end_line=2,
75+
end_col=21,
76+
),
9377
Failure(
9478
code="legacy-context-in-shared-clusters",
9579
message='sc is not supported on UC Shared Clusters. Rewrite it using spark',
@@ -116,14 +100,6 @@ def test_rdd_context_match_serverless():
116100
end_line=2,
117101
end_col=32,
118102
),
119-
Failure(
120-
code='legacy-context-in-shared-clusters',
121-
message='sc is not supported on Serverless Compute. Rewrite it using spark',
122-
start_line=2,
123-
start_col=7,
124-
end_line=2,
125-
end_col=21,
126-
),
127103
Failure(
128104
code="rdd-in-shared-clusters",
129105
message='RDD APIs are not supported on Serverless Compute. Rewrite it using DataFrame API',
@@ -132,6 +108,14 @@ def test_rdd_context_match_serverless():
132108
end_line=3,
133109
end_col=42,
134110
),
111+
Failure(
112+
code='legacy-context-in-shared-clusters',
113+
message='sc is not supported on Serverless Compute. Rewrite it using spark',
114+
start_line=2,
115+
start_col=7,
116+
end_line=2,
117+
end_col=21,
118+
),
135119
Failure(
136120
code="legacy-context-in-shared-clusters",
137121
message='sc is not supported on Serverless Compute. Rewrite it using spark',
@@ -158,14 +142,6 @@ def test_rdd_map_partitions():
158142
end_line=3,
159143
end_col=27,
160144
),
161-
Failure(
162-
code="rdd-in-shared-clusters",
163-
message='RDD APIs are not supported on UC Shared Clusters. Rewrite it using DataFrame API',
164-
start_line=3,
165-
start_col=0,
166-
end_line=3,
167-
end_col=6,
168-
),
169145
] == list(linter.lint(code))
170146

171147

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
spark.range(10).collect()
2-
# TODO: looks like a bug in linter, because we are hitting the same issue twice
3-
# ucx[jvm-access-in-shared-clusters:+2:0:+2:18] Cannot access Spark Driver JVM on UC Shared Clusters
42
# ucx[jvm-access-in-shared-clusters:+1:0:+1:18] Cannot access Spark Driver JVM on UC Shared Clusters
53
spark._jspark._jvm.com.my.custom.Name()
64

7-
# ucx[jvm-access-in-shared-clusters:+2:0:+2:18] Cannot access Spark Driver JVM on UC Shared Clusters
85
# ucx[jvm-access-in-shared-clusters:+1:0:+1:18] Cannot access Spark Driver JVM on UC Shared Clusters
96
spark._jspark._jvm.com.my.custom.Name()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
df = spark.createDataFrame([])
2+
# ucx[rdd-in-shared-clusters:+1:0:+1:27] RDD APIs are not supported on UC Shared Clusters. Use mapInArrow() or Pandas UDFs instead
3+
df.rdd.mapPartitions(myUdf)
4+
5+
# ucx[rdd-in-shared-clusters:+1:7:+1:32] RDD APIs are not supported on UC Shared Clusters. Rewrite it using DataFrame API
6+
# ucx[legacy-context-in-shared-clusters:+1:7:+1:21] sc is not supported on UC Shared Clusters. Rewrite it using spark
7+
rdd1 = sc.parallelize([1, 2, 3])
8+
9+
# ucx[rdd-in-shared-clusters:+1:29:+1:42] RDD APIs are not supported on UC Shared Clusters. Rewrite it using DataFrame API
10+
# ucx[legacy-context-in-shared-clusters:+1:29:+1:40] sc is not supported on UC Shared Clusters. Rewrite it using spark
11+
rdd2 = spark.createDataFrame(sc.emptyRDD(), schema)
12+
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# ucx[legacy-context-in-shared-clusters:+1:0:+1:14] sc is not supported on UC Shared Clusters. Rewrite it using spark
2+
# ucx[spark-logging-in-shared-clusters:+1:0:+1:22] Cannot set Spark log level directly from code on UC Shared Clusters. Remove the call and set the cluster spark conf 'spark.log.level' instead
3+
sc.setLogLevel("INFO")
4+
setLogLevel("WARN")
5+
6+
# ucx[jvm-access-in-shared-clusters:+1:14:+1:21] Cannot access Spark Driver JVM on UC Shared Clusters
7+
# ucx[legacy-context-in-shared-clusters:+1:14:+1:21] sc is not supported on UC Shared Clusters. Rewrite it using spark
8+
# ucx[spark-logging-in-shared-clusters:+1:14:+1:38] Cannot access Spark Driver JVM logger on UC Shared Clusters. Use logging.getLogger() instead
9+
log4jLogger = sc._jvm.org.apache.log4j
10+
LOGGER = log4jLogger.LogManager.getLogger(__name__)
11+
12+
# ucx[jvm-access-in-shared-clusters:+1:0:+1:7] Cannot access Spark Driver JVM on UC Shared Clusters
13+
# ucx[legacy-context-in-shared-clusters:+1:0:+1:7] sc is not supported on UC Shared Clusters. Rewrite it using spark
14+
# ucx[spark-logging-in-shared-clusters:+1:12:+1:24] Cannot access Spark Driver JVM logger on UC Shared Clusters. Use logging.getLogger() instead
15+
sc._jvm.org.apache.log4j.LogManager.getLogger(__name__).info("test")

tests/unit/source_code/test_functional.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(self, path: Path):
4747

4848
def verify(self):
4949
expected_problems = list(self._expected_problems())
50-
actual_problems = list(self._lint())
50+
actual_problems = sorted(list(self._lint()), key=lambda a: (a.start_line, a.start_col))
5151
high_level_expected = [f'{p.code}:{p.message}' for p in expected_problems]
5252
high_level_actual = [f'{p.code}:{p.message}' for p in actual_problems]
5353
assert high_level_expected == high_level_actual

0 commit comments

Comments
 (0)