|
2 | 2 |
|
3 | 3 | import pytest |
4 | 4 |
|
5 | | -from databricks.labs.ucx.source_code.base import Advisory, Deprecation, CurrentSessionState |
6 | | -from databricks.labs.ucx.source_code.linters.pyspark import SparkMatchers, SparkSql, AstHelper, TableNameMatcher |
| 5 | +from databricks.labs.ucx.source_code.base import Deprecation, CurrentSessionState |
| 6 | +from databricks.labs.ucx.source_code.linters.pyspark import SparkSql, AstHelper, TableNameMatcher |
7 | 7 | from databricks.labs.ucx.source_code.queries import FromTable |
8 | 8 |
|
9 | 9 |
|
@@ -87,214 +87,11 @@ def test_spark_sql_match_named(migration_index): |
87 | 87 | ] == list(sqf.lint(old_code)) |
88 | 88 |
|
89 | 89 |
|
90 | | -METHOD_NAMES = [ |
91 | | - "cacheTable", |
92 | | - "createTable", |
93 | | - "createExternalTable", |
94 | | - "getTable", |
95 | | - "isCached", |
96 | | - "listColumns", |
97 | | - "tableExists", |
98 | | - "recoverPartitions", |
99 | | - "refreshTable", |
100 | | - "uncacheTable", |
101 | | - "table", |
102 | | - "insertInto", |
103 | | - "saveAsTable", |
104 | | -] |
105 | | - |
106 | | - |
107 | | -@pytest.mark.parametrize("method_name", METHOD_NAMES) |
108 | | -def test_spark_table_match(migration_index, method_name): |
109 | | - spark_matchers = SparkMatchers() |
110 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
111 | | - sqf = SparkSql(ftf, migration_index) |
112 | | - matcher = spark_matchers.matchers[method_name] |
113 | | - args_list = ["a"] * min(5, matcher.max_args) |
114 | | - args_list[matcher.table_arg_index] = '"old.things"' |
115 | | - args = ",".join(args_list) |
116 | | - old_code = f""" |
117 | | -spark.read.csv("s3://bucket/path") |
118 | | -for i in range(10): |
119 | | - df = spark.{method_name}({args}) |
120 | | - do_stuff_with_df(df) |
121 | | -""" |
122 | | - assert [ |
123 | | - Deprecation( |
124 | | - code='direct-filesystem-access', |
125 | | - message='The use of direct filesystem references is deprecated: ' 's3://bucket/path', |
126 | | - start_line=2, |
127 | | - start_col=0, |
128 | | - end_line=2, |
129 | | - end_col=34, |
130 | | - ), |
131 | | - Deprecation( |
132 | | - code='table-migrate', |
133 | | - message='Table old.things is migrated to brand.new.stuff in Unity Catalog', |
134 | | - start_line=4, |
135 | | - start_col=9, |
136 | | - end_line=4, |
137 | | - end_col=17 + len(method_name) + len(args), |
138 | | - ), |
139 | | - ] == list(sqf.lint(old_code)) |
140 | | - |
141 | | - |
142 | | -@pytest.mark.parametrize("method_name", METHOD_NAMES) |
143 | | -def test_spark_table_no_match(migration_index, method_name): |
144 | | - spark_matchers = SparkMatchers() |
145 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
146 | | - sqf = SparkSql(ftf, migration_index) |
147 | | - matcher = spark_matchers.matchers[method_name] |
148 | | - args_list = ["a"] * min(5, matcher.max_args) |
149 | | - args_list[matcher.table_arg_index] = '"table.we.know.nothing.about"' |
150 | | - args = ",".join(args_list) |
151 | | - old_code = f""" |
152 | | -for i in range(10): |
153 | | - df = spark.{method_name}({args}) |
154 | | - do_stuff_with_df(df) |
155 | | -""" |
156 | | - assert not list(sqf.lint(old_code)) |
157 | | - |
158 | | - |
159 | | -@pytest.mark.parametrize("method_name", METHOD_NAMES) |
160 | | -def test_spark_table_too_many_args(migration_index, method_name): |
161 | | - spark_matchers = SparkMatchers() |
162 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
163 | | - sqf = SparkSql(ftf, migration_index) |
164 | | - matcher = spark_matchers.matchers[method_name] |
165 | | - if matcher.max_args > 100: |
166 | | - return |
167 | | - args_list = ["a"] * (matcher.max_args + 1) |
168 | | - args_list[matcher.table_arg_index] = '"table.we.know.nothing.about"' |
169 | | - args = ",".join(args_list) |
170 | | - old_code = f""" |
171 | | -for i in range(10): |
172 | | - df = spark.{method_name}({args}) |
173 | | - do_stuff_with_df(df) |
174 | | -""" |
175 | | - assert not list(sqf.lint(old_code)) |
176 | | - |
177 | | - |
178 | | -def test_spark_table_named_args(migration_index): |
179 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
180 | | - sqf = SparkSql(ftf, migration_index) |
181 | | - old_code = """ |
182 | | -spark.read.csv("s3://bucket/path") |
183 | | -for i in range(10): |
184 | | - df = spark.saveAsTable(format="xyz", name="old.things") |
185 | | - do_stuff_with_df(df) |
186 | | -""" |
187 | | - assert [ |
188 | | - Deprecation( |
189 | | - code='direct-filesystem-access', |
190 | | - message='The use of direct filesystem references is deprecated: ' 's3://bucket/path', |
191 | | - start_line=2, |
192 | | - start_col=0, |
193 | | - end_line=2, |
194 | | - end_col=34, |
195 | | - ), |
196 | | - Deprecation( |
197 | | - code='table-migrate', |
198 | | - message='Table old.things is migrated to brand.new.stuff in Unity Catalog', |
199 | | - start_line=4, |
200 | | - start_col=9, |
201 | | - end_line=4, |
202 | | - end_col=59, |
203 | | - ), |
204 | | - ] == list(sqf.lint(old_code)) |
205 | | - |
206 | | - |
207 | | -def test_spark_table_variable_arg(migration_index): |
208 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
209 | | - sqf = SparkSql(ftf, migration_index) |
210 | | - old_code = """ |
211 | | -spark.read.csv("s3://bucket/path") |
212 | | -for i in range(10): |
213 | | - df = spark.saveAsTable(name) |
214 | | - do_stuff_with_df(df) |
215 | | -""" |
216 | | - assert [ |
217 | | - Deprecation( |
218 | | - code='direct-filesystem-access', |
219 | | - message='The use of direct filesystem references is deprecated: ' 's3://bucket/path', |
220 | | - start_line=2, |
221 | | - start_col=0, |
222 | | - end_line=2, |
223 | | - end_col=34, |
224 | | - ), |
225 | | - Advisory( |
226 | | - code='table-migrate', |
227 | | - message="Can't migrate 'saveAsTable' because its table name argument is not a constant", |
228 | | - start_line=4, |
229 | | - start_col=9, |
230 | | - end_line=4, |
231 | | - end_col=32, |
232 | | - ), |
233 | | - ] == list(sqf.lint(old_code)) |
234 | | - |
235 | | - |
236 | | -def test_spark_table_fstring_arg(migration_index): |
237 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
238 | | - sqf = SparkSql(ftf, migration_index) |
239 | | - old_code = """ |
240 | | -spark.read.csv("s3://bucket/path") |
241 | | -for i in range(10): |
242 | | - df = spark.saveAsTable(f"boop{stuff}") |
243 | | - do_stuff_with_df(df) |
244 | | -""" |
245 | | - assert [ |
246 | | - Deprecation( |
247 | | - code='direct-filesystem-access', |
248 | | - message='The use of direct filesystem references is deprecated: ' 's3://bucket/path', |
249 | | - start_line=2, |
250 | | - start_col=0, |
251 | | - end_line=2, |
252 | | - end_col=34, |
253 | | - ), |
254 | | - Advisory( |
255 | | - code='table-migrate', |
256 | | - message="Can't migrate 'saveAsTable' because its table name argument is not a constant", |
257 | | - start_line=4, |
258 | | - start_col=9, |
259 | | - end_line=4, |
260 | | - end_col=42, |
261 | | - ), |
262 | | - ] == list(sqf.lint(old_code)) |
263 | | - |
264 | | - |
265 | | -def test_spark_table_return_value(migration_index): |
266 | | - ftf = FromTable(migration_index, CurrentSessionState()) |
267 | | - sqf = SparkSql(ftf, migration_index) |
268 | | - old_code = """ |
269 | | -spark.read.csv("s3://bucket/path") |
270 | | -for table in spark.listTables(): |
271 | | - do_stuff_with_table(table) |
272 | | -""" |
273 | | - assert [ |
274 | | - Deprecation( |
275 | | - code='direct-filesystem-access', |
276 | | - message='The use of direct filesystem references is deprecated: ' 's3://bucket/path', |
277 | | - start_line=2, |
278 | | - start_col=0, |
279 | | - end_line=2, |
280 | | - end_col=34, |
281 | | - ), |
282 | | - Advisory( |
283 | | - code='table-migrate', |
284 | | - message="Call to 'listTables' will return a list of <catalog>.<database>.<table> instead of <database>.<table>.", |
285 | | - start_line=3, |
286 | | - start_col=13, |
287 | | - end_line=3, |
288 | | - end_col=31, |
289 | | - ), |
290 | | - ] == list(sqf.lint(old_code)) |
291 | | - |
292 | | - |
293 | 90 | def test_spark_table_return_value_apply(migration_index): |
294 | 91 | ftf = FromTable(migration_index, CurrentSessionState()) |
295 | 92 | sqf = SparkSql(ftf, migration_index) |
296 | 93 | old_code = """spark.read.csv('s3://bucket/path') |
297 | | -for table in spark.listTables(): |
| 94 | +for table in spark.catalog.listTables(): |
298 | 95 | do_stuff_with_table(table)""" |
299 | 96 | fixed_code = sqf.apply(old_code) |
300 | 97 | # no transformations to apply, only lint messages |
|
0 commit comments