Skip to content

Commit 40919a7

Browse files
Fixed appending to tables by adding filtering of None rows (#356)
This PR aims to fix #297 and #346 It adds a utility method to filter rows that have a column containing None, this will help Crawlers to not throw an error when a column is None. it also checks if the column in the class is Nullable or not, if it's nullable and the value is None, it's ignored
1 parent 10abd0c commit 40919a7

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,32 @@ def _schema_for(cls, klass):
4141
fields.append(f"{f.name} {spark_type}{not_null}")
4242
return ", ".join(fields)
4343

44+
@classmethod
45+
def _filter_none_rows(cls, rows, full_name):
46+
if len(rows) == 0:
47+
return rows
48+
49+
results = []
50+
nullable_fields = set()
51+
52+
for field in dataclasses.fields(rows[0]):
53+
if field.default is None:
54+
nullable_fields.add(field.name)
55+
56+
for row in rows:
57+
if row is None:
58+
continue
59+
row_contains_none = False
60+
for column, value in dataclasses.asdict(row).items():
61+
if value is None and column not in nullable_fields:
62+
logger.warning(f"[{full_name}] Field {column} is None, filtering row")
63+
row_contains_none = True
64+
break
65+
66+
if not row_contains_none:
67+
results.append(row)
68+
return results
69+
4470

4571
class StatementExecutionBackend(SqlBackend):
4672
def __init__(self, ws: WorkspaceClient, warehouse_id, *, max_records_per_batch: int = 1000):
@@ -60,7 +86,7 @@ def save_table(self, full_name: str, rows: list[any], mode="append"):
6086
if mode == "overwrite":
6187
msg = "Overwrite mode is not yet supported"
6288
raise NotImplementedError(msg)
63-
89+
rows = self._filter_none_rows(rows, full_name)
6490
if len(rows) == 0:
6591
return
6692

@@ -115,6 +141,8 @@ def fetch(self, sql) -> Iterator[any]:
115141
return self._spark.sql(sql).collect()
116142

117143
def save_table(self, full_name: str, rows: list[any], mode: str = "append"):
144+
rows = self._filter_none_rows(rows, full_name)
145+
118146
if len(rows) == 0:
119147
return
120148
# pyspark deals well with lists of dataclass instances, as long as schema is provided

tests/unit/framework/test_crawlers.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ class Foo:
2020
second: bool
2121

2222

23+
@dataclass
24+
class Baz:
25+
first: str
26+
second: str = None
27+
28+
2329
@dataclass
2430
class Bar:
2531
first: str
@@ -204,3 +210,39 @@ def test_runtime_backend_save_table(mocker):
204210
"first STRING NOT NULL, second BOOLEAN NOT NULL",
205211
)
206212
rb._spark.createDataFrame().write.saveAsTable.assert_called_with("a.b.c", mode="append")
213+
214+
215+
def test_runtime_backend_save_table_with_row_containing_none(mocker):
216+
from unittest import mock
217+
218+
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
219+
pyspark_sql_session = mocker.Mock()
220+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
221+
222+
rb = RuntimeBackend()
223+
224+
rb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False), Foo("ccc", None)])
225+
226+
rb._spark.createDataFrame.assert_called_with(
227+
[Foo(first="aaa", second=True), Foo(first="bbb", second=False)],
228+
"first STRING NOT NULL, second BOOLEAN NOT NULL",
229+
)
230+
rb._spark.createDataFrame().write.saveAsTable.assert_called_with("a.b.c", mode="append")
231+
232+
233+
def test_runtime_backend_save_table_with_row_containing_none_with_nullable_class(mocker):
234+
from unittest import mock
235+
236+
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
237+
pyspark_sql_session = mocker.Mock()
238+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
239+
240+
rb = RuntimeBackend()
241+
242+
rb.save_table("a.b.c", [Baz("aaa", "ccc"), Baz("bbb", None)])
243+
244+
rb._spark.createDataFrame.assert_called_with(
245+
[Baz(first="aaa", second="ccc"), Baz(first="bbb", second=None)],
246+
"first STRING NOT NULL, second STRING",
247+
)
248+
rb._spark.createDataFrame().write.saveAsTable.assert_called_with("a.b.c", mode="append")

0 commit comments

Comments
 (0)