Skip to content

Commit 6db610e

Browse files
authored
fix: handle support for partial success introduce in dbt 1.9 with microbatching (#2041)
* add support for partial success * tweaks * fix unit tests
1 parent f15cb18 commit 6db610e

File tree

2 files changed

+71
-18
lines changed

2 files changed

+71
-18
lines changed

elementary/monitor/data_monitoring/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class Status(str, Enum):
2121
SKIPPED = "skipped"
2222
ERROR = "error"
2323
RUNTIME_ERROR = "runtime error"
24+
PARTIAL_SUCCESS = "partial success"
2425

2526

2627
class ResourceType(str, Enum):

tests/unit/monitor/api/alerts/test_alert_filters.py

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,33 @@ def initial_alerts():
232232
resource_type=ResourceType.MODEL,
233233
),
234234
),
235+
PendingAlertSchema(
236+
id="model_alert_4",
237+
alert_class_id="elementary.model_id_3",
238+
type=AlertTypes.MODEL,
239+
detected_at=datetime(2022, 10, 10, 7, 0, 0),
240+
created_at=datetime(2022, 10, 10, 7, 0, 0),
241+
updated_at=datetime(2022, 10, 10, 7, 0, 0),
242+
status=AlertStatus.PENDING,
243+
data=ModelAlertDataSchema(
244+
id="4",
245+
alert_class_id="elementary.model_id_3",
246+
model_unique_id="elementary.model_id_3",
247+
alias="model3",
248+
path="my/path3",
249+
original_path="",
250+
materialization="incremental",
251+
message="",
252+
full_refresh=False,
253+
detected_at=datetime(2022, 10, 10, 7, 0, 0),
254+
tags=["microbatch"],
255+
model_meta=dict(owner='["alice"]'),
256+
status="partial success",
257+
database_name="test_db",
258+
schema_name="test_schema",
259+
resource_type=ResourceType.MODEL,
260+
),
261+
),
235262
]
236263
source_freshness_alerts = [
237264
PendingAlertSchema(
@@ -359,9 +386,10 @@ def test_filter_alerts_by_tags():
359386
assert len(filter_test_alerts) == 2
360387
assert filter_test_alerts[0].id == "test_alert_2"
361388
assert filter_test_alerts[1].id == "test_alert_4"
362-
assert len(filter_model_alerts) == 2
389+
assert len(filter_model_alerts) == 3
363390
assert filter_model_alerts[0].id == "model_alert_2"
364391
assert filter_model_alerts[1].id == "model_alert_3"
392+
assert filter_model_alerts[2].id == "model_alert_4"
365393

366394
filter = FiltersSchema(
367395
tags=[FilterSchema(values=["three"], type=FilterType.IS)], statuses=[]
@@ -383,8 +411,9 @@ def test_filter_alerts_by_tags():
383411
assert len(filter_test_alerts) == 2
384412
assert filter_test_alerts[0].id == "test_alert_1"
385413
assert filter_test_alerts[1].id == "test_alert_3"
386-
assert len(filter_model_alerts) == 1
414+
assert len(filter_model_alerts) == 2
387415
assert filter_model_alerts[0].id == "model_alert_1"
416+
assert filter_model_alerts[1].id == "model_alert_4"
388417

389418
filter = FiltersSchema(
390419
tags=[FilterSchema(values=["four"], type=FilterType.IS)], statuses=[]
@@ -407,10 +436,11 @@ def test_filter_alerts_by_tags():
407436
"test_alert_2",
408437
"test_alert_3",
409438
]
410-
assert len(filter_model_alerts) == 2
439+
assert len(filter_model_alerts) == 3
411440
assert sorted([alert.id for alert in filter_model_alerts]) == [
412441
"model_alert_1",
413442
"model_alert_2",
443+
"model_alert_4",
414444
]
415445

416446
filter = FiltersSchema(
@@ -441,10 +471,11 @@ def test_filter_alerts_by_tags():
441471
"test_alert_2",
442472
"test_alert_4",
443473
]
444-
assert len(filter_model_alerts) == 2
474+
assert len(filter_model_alerts) == 3
445475
assert sorted([alert.id for alert in filter_model_alerts]) == [
446476
"model_alert_2",
447477
"model_alert_3",
478+
"model_alert_4",
448479
]
449480

450481
filter = FiltersSchema(
@@ -470,8 +501,11 @@ def test_filter_alerts_by_tags():
470501
filter_model_alerts = filter_alerts(model_alerts, filter)
471502
assert len(filter_test_alerts) == 1
472503
assert filter_test_alerts[0].id == "test_alert_2"
473-
assert len(filter_model_alerts) == 1
474-
assert filter_model_alerts[0].id == "model_alert_2"
504+
assert len(filter_model_alerts) == 2
505+
assert sorted([alert.id for alert in filter_model_alerts]) == [
506+
"model_alert_2",
507+
"model_alert_4",
508+
]
475509

476510
filter = FiltersSchema(
477511
tags=[
@@ -503,8 +537,11 @@ def test_filter_alerts_by_tags():
503537
filter_model_alerts = filter_alerts(model_alerts, filter)
504538
assert len(filter_test_alerts) == 1
505539
assert filter_test_alerts[0].id == "test_alert_2"
506-
assert len(filter_model_alerts) == 1
507-
assert filter_model_alerts[0].id == "model_alert_2"
540+
assert len(filter_model_alerts) == 2
541+
assert sorted([alert.id for alert in filter_model_alerts]) == [
542+
"model_alert_2",
543+
"model_alert_4",
544+
]
508545

509546
filter = FiltersSchema(
510547
tags=[
@@ -564,8 +601,9 @@ def test_filter_alerts_by_owners():
564601
filter_model_alerts = filter_alerts(model_alerts, filter)
565602
assert len(filter_test_alerts) == 1
566603
assert filter_test_alerts[0].id == "test_alert_3"
567-
assert len(filter_model_alerts) == 1
604+
assert len(filter_model_alerts) == 2
568605
assert filter_model_alerts[0].id == "model_alert_2"
606+
assert filter_model_alerts[1].id == "model_alert_4"
569607

570608
filter = FiltersSchema(
571609
owners=[FilterSchema(values=["john"], type=FilterType.IS)], statuses=[]
@@ -587,8 +625,11 @@ def test_filter_alerts_by_owners():
587625
filter_model_alerts = filter_alerts(model_alerts, filter)
588626
assert len(filter_test_alerts) == 1
589627
assert filter_test_alerts[0].id == "test_alert_4"
590-
assert len(filter_model_alerts) == 1
591-
assert filter_model_alerts[0].id == "model_alert_3"
628+
assert len(filter_model_alerts) == 2
629+
assert sorted([alert.id for alert in filter_model_alerts]) == [
630+
"model_alert_3",
631+
"model_alert_4",
632+
]
592633

593634
filter = FiltersSchema(
594635
owners=[
@@ -652,8 +693,9 @@ def test_filter_alerts_by_model():
652693
assert len(filter_test_alerts) == 2
653694
assert filter_test_alerts[0].id == "test_alert_3"
654695
assert filter_test_alerts[1].id == "test_alert_4"
655-
assert len(filter_model_alerts) == 1
696+
assert len(filter_model_alerts) == 2
656697
assert filter_model_alerts[0].id == "model_alert_3"
698+
assert filter_model_alerts[1].id == "model_alert_4"
657699

658700
filter = FiltersSchema(
659701
models=[FilterSchema(values=["model_id_2"], type=FilterType.IS)], statuses=[]
@@ -675,9 +717,10 @@ def test_filter_alerts_by_model():
675717
assert len(filter_test_alerts) == 2
676718
assert filter_test_alerts[0].id == "test_alert_1"
677719
assert filter_test_alerts[1].id == "test_alert_2"
678-
assert len(filter_model_alerts) == 2
720+
assert len(filter_model_alerts) == 3
679721
assert filter_model_alerts[0].id == "model_alert_1"
680722
assert filter_model_alerts[1].id == "model_alert_2"
723+
assert filter_model_alerts[2].id == "model_alert_4"
681724

682725
filter = FiltersSchema(
683726
models=[FilterSchema(values=["model_id_1", "model_id_2"], type=FilterType.IS)],
@@ -731,7 +774,8 @@ def test_filter_alerts_by_node_names():
731774
filter_test_alerts = filter_alerts(test_alerts, filter)
732775
filter_model_alerts = filter_alerts(model_alerts, filter)
733776
assert len(filter_test_alerts) == 0
734-
assert len(filter_model_alerts) == 0
777+
assert len(filter_model_alerts) == 1
778+
assert filter_model_alerts[0].id == "model_alert_4"
735779

736780

737781
def test_filter_alerts_by_statuses():
@@ -790,7 +834,11 @@ def test_filter_alerts_by_statuses():
790834
filter_test_alerts = filter_alerts(test_alerts, filter)
791835
filter_model_alerts = filter_alerts(model_alerts, filter)
792836
assert len(filter_test_alerts) == 0
793-
assert len(filter_model_alerts) == 1
837+
assert len(filter_model_alerts) == 2
838+
assert sorted([alert.id for alert in filter_model_alerts]) == [
839+
"model_alert_3",
840+
"model_alert_4",
841+
]
794842

795843

796844
def test_filter_alerts_by_resource_types():
@@ -828,18 +876,22 @@ def test_filter_alerts_by_resource_types():
828876
def test_filter_alerts():
829877
test_alerts, model_alerts, _ = initial_alerts()
830878

831-
# Test that empty filter returns all the alerts except for skipped.
879+
# Test that empty filter returns all the alerts except for skipped and partial success.
832880
filter = FiltersSchema()
833881
filter_test_alerts = filter_alerts(test_alerts, filter)
834882
filter_model_alerts = filter_alerts(model_alerts, filter)
835883
assert len(filter_test_alerts) == len(test_alerts)
836-
assert len(filter_model_alerts) == len(model_alerts) - 1 # 1 skipped model alert
884+
assert (
885+
len(filter_model_alerts) == len(model_alerts) - 2
886+
) # 1 skipped + 1 partial success model alert
837887

838888
# Test that passing no filter returns all the alerts.
839889
filter_test_alerts = filter_alerts(test_alerts)
840890
filter_model_alerts = filter_alerts(model_alerts)
841891
assert len(filter_test_alerts) == len(test_alerts)
842-
assert len(filter_model_alerts) == len(model_alerts) - 1 # 1 skipped model alert
892+
assert (
893+
len(filter_model_alerts) == len(model_alerts) - 2
894+
) # 1 skipped + 1 partial success model alert
843895

844896
# Test that filter with unsupported selector returns no alert
845897
filter = FiltersSchema(last_invocation=True, selector="last_invocation")

0 commit comments

Comments
 (0)