Skip to content

Commit 8207448

Browse files
Merge pull request #56 from DataKitchen/release/4.39.2
Release/4.39.2
2 parents 0caa623 + 151ab53 commit 8207448

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3556
-211
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta"
88

99
[project]
1010
name = "dataops-testgen"
11-
version = "4.38.7"
11+
version = "4.39.2"
1212
description = "DataKitchen's Data Quality DataOps TestGen"
1313
authors = [
1414
{ "name" = "DataKitchen, Inc.", "email" = "[email protected]" },

testgen/__main__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,12 @@
4444
)
4545
from testgen.common.models import with_database_session
4646
from testgen.common.models.profiling_run import ProfilingRun
47+
from testgen.common.models.settings import PersistedSetting
4748
from testgen.common.models.test_run import TestRun
4849
from testgen.common.models.test_suite import TestSuite
50+
from testgen.common.notifications.base import smtp_configured
51+
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
52+
from testgen.common.notifications.test_run import send_test_run_notifications
4953
from testgen.scheduler import register_scheduler_job, run_scheduler
5054
from testgen.utils import plugins
5155

@@ -645,14 +649,18 @@ def run_ui():
645649
patch_streamlit.patch(force=True)
646650

647651
@with_database_session
648-
def cancel_all_running():
652+
def init_ui():
649653
try:
650-
ProfilingRun.cancel_all_running()
651-
TestRun.cancel_all_running()
654+
for profiling_run_id in ProfilingRun.cancel_all_running():
655+
send_profiling_run_notifications(ProfilingRun.get(profiling_run_id))
656+
for test_run_id in TestRun.cancel_all_running():
657+
send_test_run_notifications(TestRun.get(test_run_id))
652658
except Exception:
653659
LOG.warning("Failed to cancel 'Running' profiling/test runs")
654660

655-
cancel_all_running()
661+
PersistedSetting.set("SMTP_CONFIGURED", smtp_configured())
662+
663+
init_ui()
656664

657665
app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py")
658666
process= subprocess.Popen(

testgen/commands/run_profiling.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from testgen.common.models.profiling_run import ProfilingRun
2828
from testgen.common.models.table_group import TableGroup
2929
from testgen.common.models.test_suite import TestSuite
30+
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
3031
from testgen.ui.session import session
3132
from testgen.utils import get_exception_message
3233

@@ -53,7 +54,7 @@ def run_profiling_in_background(table_group_id: str | UUID) -> None:
5354
def run_profiling(table_group_id: str | UUID, username: str | None = None, run_date: datetime | None = None) -> str:
5455
if table_group_id is None:
5556
raise ValueError("Table Group ID was not specified")
56-
57+
5758
LOG.info(f"Starting profiling run for table group {table_group_id}")
5859
time_delta = (run_date - datetime.now(UTC)) if run_date else timedelta()
5960

@@ -104,12 +105,16 @@ def run_profiling(table_group_id: str | UUID, username: str | None = None, run_d
104105
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
105106
profiling_run.status = "Error"
106107
profiling_run.save()
108+
109+
send_profiling_run_notifications(profiling_run)
107110
else:
108111
LOG.info("Setting profiling run status to Completed")
109112
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
110113
profiling_run.status = "Complete"
111114
profiling_run.save()
112115

116+
send_profiling_run_notifications(profiling_run)
117+
113118
_rollup_profiling_scores(profiling_run, table_group)
114119

115120
if bool(table_group.monitor_test_suite_id) and not table_group.last_complete_profile_run_id:

testgen/commands/run_refresh_score_cards_results.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ScoreDefinitionResult,
1212
ScoreDefinitionResultHistoryEntry,
1313
)
14+
from testgen.common.notifications.score_drop import collect_score_notification_data, send_score_drop_notifications
1415

1516
LOG = logging.getLogger("testgen")
1617

@@ -26,16 +27,16 @@ def run_refresh_score_cards_results(
2627
_refresh_date = refresh_date or datetime.datetime.now(datetime.UTC)
2728

2829
try:
29-
definitions = []
3030
if not definition_id:
3131
definitions = ScoreDefinition.all(project_code=project_code)
3232
else:
33-
definitions.append(ScoreDefinition.get(str(definition_id)))
33+
definitions = [ScoreDefinition.get(str(definition_id))]
3434
except Exception:
3535
LOG.exception("Stopping scorecards results refresh after unexpected error")
3636
return
3737

3838
db_session = get_current_session()
39+
score_notification_data = []
3940

4041
for definition in definitions:
4142
LOG.info(
@@ -46,6 +47,9 @@ def run_refresh_score_cards_results(
4647

4748
try:
4849
fresh_score_card = definition.as_score_card()
50+
51+
collect_score_notification_data(score_notification_data, definition, fresh_score_card)
52+
4953
definition.clear_results()
5054
definition.results = _score_card_to_results(fresh_score_card)
5155
definition.breakdown = _score_definition_to_results_breakdown(definition)
@@ -89,6 +93,8 @@ def run_refresh_score_cards_results(
8993
end_time = time.time()
9094
LOG.info("Refreshing results for %s done after %s seconds", scope, round(end_time - start_time, 2))
9195

96+
send_score_drop_notifications(score_notification_data)
97+
9298

9399
def _score_card_to_results(score_card: ScoreCard) -> list[ScoreDefinitionResult]:
94100
return [

testgen/commands/run_test_execution.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from testgen.common.models.table_group import TableGroup
2626
from testgen.common.models.test_run import TestRun
2727
from testgen.common.models.test_suite import TestSuite
28+
from testgen.common.notifications.test_run import send_test_run_notifications
2829
from testgen.ui.session import session
2930
from testgen.utils import get_exception_message
3031

@@ -81,6 +82,10 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
8182

8283
sql_generator = TestExecutionSQL(connection, table_group, test_run)
8384

85+
# Update the thresholds before retrieving the test definitions in the next steps
86+
LOG.info("Updating historic test thresholds")
87+
execute_db_queries([sql_generator.update_historic_thresholds()])
88+
8489
LOG.info("Retrieving active test definitions in test suite")
8590
test_defs = fetch_dict_from_db(*sql_generator.get_active_test_definitions())
8691
test_defs = [TestExecutionDef(**item) for item in test_defs]
@@ -99,9 +104,6 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
99104
)
100105

101106
if valid_test_defs:
102-
LOG.info("Updating historic test thresholds")
103-
execute_db_queries([sql_generator.update_historic_thresholds()])
104-
105107
column_types = {(col.schema_name, col.table_name, col.column_name): col.column_type for col in data_chars}
106108
for td in valid_test_defs:
107109
td.column_type = column_types.get((td.schema_name, td.table_name, td.column_name))
@@ -128,13 +130,16 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
128130
execute_db_queries(sql_generator.update_test_results())
129131
# Refresh needed because previous query updates the test run too
130132
test_run.refresh()
133+
131134
except Exception as e:
132135
LOG.exception("Test execution encountered an error.")
133136
LOG.info("Setting test run status to Error")
134137
test_run.log_message = get_exception_message(e)
135138
test_run.test_endtime = datetime.now(UTC) + time_delta
136139
test_run.status = "Error"
137140
test_run.save()
141+
142+
send_test_run_notifications(test_run)
138143
else:
139144
LOG.info("Setting test run status to Completed")
140145
test_run.test_endtime = datetime.now(UTC) + time_delta
@@ -145,6 +150,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
145150
test_suite.last_complete_test_run_id = test_run.id
146151
test_suite.save()
147152

153+
send_test_run_notifications(test_run)
148154
_rollup_test_scores(test_run, table_group)
149155
finally:
150156
MixpanelService().send_event(
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import re
2+
from collections.abc import Iterable
3+
from dataclasses import dataclass
4+
from typing import Self
5+
from uuid import UUID, uuid4
6+
7+
from sqlalchemy import Column, ForeignKey, String, and_, case, null, select
8+
from sqlalchemy.dialects import postgresql
9+
from sqlalchemy.ext.hybrid import hybrid_property
10+
from sqlalchemy.orm import aliased, relationship
11+
from sqlalchemy.sql.functions import func
12+
13+
from testgen.common.models import Base, get_current_session
14+
from testgen.common.models.entity import Entity
15+
16+
PII_RISK_RE = re.compile(r"Risk: (MODERATE|HIGH),")
17+
18+
19+
@dataclass
20+
class IssueCount:
21+
total: int = 0
22+
inactive: int = 0
23+
24+
@property
25+
def active(self):
26+
return self.total - self.inactive
27+
28+
29+
class HygieneIssueType(Base):
30+
__tablename__ = "profile_anomaly_types"
31+
32+
id: str = Column(String, primary_key=True)
33+
likelihood: str = Column("issue_likelihood", String)
34+
name: str = Column("anomaly_name", String)
35+
36+
# Note: not all table columns are implemented by this entity
37+
38+
39+
class HygieneIssue(Entity):
40+
__tablename__ = "profile_anomaly_results"
41+
42+
id: UUID = Column(postgresql.UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid4)
43+
44+
project_code: str = Column(String, ForeignKey("projects.project_code"))
45+
table_groups_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("table_groups.id"), nullable=False)
46+
profile_run_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("profiling_runs.id"), nullable=False)
47+
48+
type_id: str = Column("anomaly_id", String, ForeignKey("profile_anomaly_types.id"), nullable=False)
49+
type_ = relationship(HygieneIssueType)
50+
51+
schema_name: str = Column(String, nullable=False)
52+
table_name: str = Column(String, nullable=False)
53+
column_name: str = Column(String, nullable=False)
54+
55+
detail: str = Column(String, nullable=False)
56+
disposition: str = Column(String)
57+
58+
# Note: not all table columns are implemented by this entity
59+
60+
@hybrid_property
61+
def priority(self):
62+
if self.type_.likelihood != "Potential PII":
63+
return self.type_.likelihood
64+
elif self.detail and (match := PII_RISK_RE.search(self.detail)):
65+
return match.group(1).capitalize()
66+
else:
67+
return None
68+
69+
@priority.expression
70+
def priority(cls):
71+
return case(
72+
(
73+
HygieneIssueType.likelihood != "Potential PII",
74+
HygieneIssueType.likelihood,
75+
),
76+
else_=func.initcap(
77+
func.substring(cls.detail, PII_RISK_RE.pattern)
78+
),
79+
)
80+
81+
@classmethod
82+
def select_count_by_priority(cls, profiling_run_id: UUID) -> dict[str, IssueCount]:
83+
count_query = (
84+
select(
85+
cls.priority,
86+
func.count(),
87+
func.count(cls.disposition.in_(("Dismissed", "Inactive"))),
88+
)
89+
.select_from(cls)
90+
.join(HygieneIssueType)
91+
.where(cls.profile_run_id == profiling_run_id)
92+
.group_by(cls.priority)
93+
)
94+
result = {
95+
priority: IssueCount(total, inactive)
96+
for priority, total, inactive in get_current_session().execute(count_query)
97+
}
98+
for p in ("Definite", "Likely", "Possible", "High", "Moderate"):
99+
result.setdefault(p, IssueCount())
100+
return result
101+
102+
@classmethod
103+
def select_with_diff(
104+
cls, profiling_run_id: UUID, other_profiling_run_id: UUID | None, *where_clauses, limit: int | None = None
105+
) -> Iterable[tuple[Self,bool,str]]:
106+
other = aliased(cls)
107+
order_weight = case(
108+
(cls.priority == "Definite", 1),
109+
(cls.priority == "Likely", 2),
110+
(cls.priority == "Possible", 3),
111+
(cls.priority == "High", 4),
112+
(cls.priority == "Moderate", 5),
113+
else_=6,
114+
)
115+
is_new_col = (other.id.is_(None) if other_profiling_run_id else null()).label("is_new")
116+
query = (
117+
select(
118+
cls,
119+
is_new_col,
120+
)
121+
.outerjoin(
122+
other,
123+
and_(
124+
other.table_groups_id == cls.table_groups_id,
125+
other.schema_name == cls.schema_name,
126+
other.table_name == cls.table_name,
127+
other.column_name == cls.column_name,
128+
other.type_id == cls.type_id,
129+
other.profile_run_id == other_profiling_run_id,
130+
),
131+
).join(
132+
HygieneIssueType,
133+
HygieneIssueType.id == cls.type_id,
134+
).where(
135+
cls.profile_run_id == profiling_run_id,
136+
*where_clauses
137+
).order_by(
138+
is_new_col.desc(),
139+
order_weight,
140+
).limit(
141+
limit,
142+
)
143+
)
144+
145+
return get_current_session().execute(query)

0 commit comments

Comments
 (0)