Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
52382e8
feat(notifications): Adding Test Run notifications
rboni-dk Nov 26, 2025
20453cb
Merge branch 'main' into 'enterprise'
aarthy-dk Dec 16, 2025
49f5dde
update test run email styles
aarthy-dk Dec 17, 2025
e1dc227
feat(notifications): Adding Score drop notifications
rboni-dk Dec 15, 2025
6e06547
Merge remote-tracking branch 'origin/aarthy/email-templates' into tg-…
rboni-dk Dec 18, 2025
7be4abc
feat(notifications): Miscellaneous fixes
rboni-dk Dec 22, 2025
43e71c9
fix(emails): update score drop template styles
aarthy-dk Dec 22, 2025
3de9098
fix(emails): update test run template to avoid flex styles
aarthy-dk Dec 22, 2025
57d697e
feat(notifications): Adding Profiling Run notifications
rboni-dk Dec 24, 2025
79036aa
Merge branch 'score-template' of gitlab.com:dkinternal/testgen/dataop…
rboni-dk Dec 26, 2025
c543bc1
feat(notifications): Miscellaneous fixes and unit tests
rboni-dk Dec 26, 2025
b6a92f5
Merge branch 'enterprise' of gitlab.com:dkinternal/testgen/dataops-te…
rboni-dk Dec 28, 2025
9a612fb
fix(emails): update profiling run template
aarthy-dk Dec 29, 2025
09c7214
Merge branch 'aarthy/profiling-email' into 'tg-965-email-notifications'
rboni-dk Dec 29, 2025
98ec5ba
misc: Code review feedback
rboni-dk Dec 30, 2025
47ea799
Merge branch 'tg-965-email-notifications' into 'enterprise'
Dec 30, 2025
c2c2577
fix(emails): normalize test definition id in results diff
aarthy-dk Jan 7, 2026
61bbf7c
fix(table freshness): bugs in calculating historical thresholds
aarthy-dk Jan 7, 2026
a98d0b5
fix: misc ui improvements
aarthy-dk Jan 7, 2026
05b238c
Merge branch 'aarthy/qa-fixes' into 'enterprise'
Jan 8, 2026
91ff18b
fix(emails): add empty state message when smtp server not configured
aarthy-dk Jan 8, 2026
7406c51
Merge branch 'aarthy/email-empty' into 'enterprise'
Jan 8, 2026
151ab53
release: 4.38.7 -> 4.39.2
aarthy-dk Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dataops-testgen"
version = "4.38.7"
version = "4.39.2"
description = "DataKitchen's Data Quality DataOps TestGen"
authors = [
{ "name" = "DataKitchen, Inc.", "email" = "[email protected]" },
Expand Down
16 changes: 12 additions & 4 deletions testgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
)
from testgen.common.models import with_database_session
from testgen.common.models.profiling_run import ProfilingRun
from testgen.common.models.settings import PersistedSetting
from testgen.common.models.test_run import TestRun
from testgen.common.models.test_suite import TestSuite
from testgen.common.notifications.base import smtp_configured
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
from testgen.common.notifications.test_run import send_test_run_notifications
from testgen.scheduler import register_scheduler_job, run_scheduler
from testgen.utils import plugins

Expand Down Expand Up @@ -645,14 +649,18 @@ def run_ui():
patch_streamlit.patch(force=True)

@with_database_session
def cancel_all_running():
def init_ui():
try:
ProfilingRun.cancel_all_running()
TestRun.cancel_all_running()
for profiling_run_id in ProfilingRun.cancel_all_running():
send_profiling_run_notifications(ProfilingRun.get(profiling_run_id))
for test_run_id in TestRun.cancel_all_running():
send_test_run_notifications(TestRun.get(test_run_id))
except Exception:
LOG.warning("Failed to cancel 'Running' profiling/test runs")

cancel_all_running()
PersistedSetting.set("SMTP_CONFIGURED", smtp_configured())

init_ui()

app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py")
process= subprocess.Popen(
Expand Down
7 changes: 6 additions & 1 deletion testgen/commands/run_profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from testgen.common.models.profiling_run import ProfilingRun
from testgen.common.models.table_group import TableGroup
from testgen.common.models.test_suite import TestSuite
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
from testgen.ui.session import session
from testgen.utils import get_exception_message

Expand All @@ -53,7 +54,7 @@ def run_profiling_in_background(table_group_id: str | UUID) -> None:
def run_profiling(table_group_id: str | UUID, username: str | None = None, run_date: datetime | None = None) -> str:
if table_group_id is None:
raise ValueError("Table Group ID was not specified")

LOG.info(f"Starting profiling run for table group {table_group_id}")
time_delta = (run_date - datetime.now(UTC)) if run_date else timedelta()

Expand Down Expand Up @@ -104,12 +105,16 @@ def run_profiling(table_group_id: str | UUID, username: str | None = None, run_d
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
profiling_run.status = "Error"
profiling_run.save()

send_profiling_run_notifications(profiling_run)
else:
LOG.info("Setting profiling run status to Completed")
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
profiling_run.status = "Complete"
profiling_run.save()

send_profiling_run_notifications(profiling_run)

_rollup_profiling_scores(profiling_run, table_group)

if bool(table_group.monitor_test_suite_id) and not table_group.last_complete_profile_run_id:
Expand Down
10 changes: 8 additions & 2 deletions testgen/commands/run_refresh_score_cards_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ScoreDefinitionResult,
ScoreDefinitionResultHistoryEntry,
)
from testgen.common.notifications.score_drop import collect_score_notification_data, send_score_drop_notifications

LOG = logging.getLogger("testgen")

Expand All @@ -26,16 +27,16 @@ def run_refresh_score_cards_results(
_refresh_date = refresh_date or datetime.datetime.now(datetime.UTC)

try:
definitions = []
if not definition_id:
definitions = ScoreDefinition.all(project_code=project_code)
else:
definitions.append(ScoreDefinition.get(str(definition_id)))
definitions = [ScoreDefinition.get(str(definition_id))]
except Exception:
LOG.exception("Stopping scorecards results refresh after unexpected error")
return

db_session = get_current_session()
score_notification_data = []

for definition in definitions:
LOG.info(
Expand All @@ -46,6 +47,9 @@ def run_refresh_score_cards_results(

try:
fresh_score_card = definition.as_score_card()

collect_score_notification_data(score_notification_data, definition, fresh_score_card)

definition.clear_results()
definition.results = _score_card_to_results(fresh_score_card)
definition.breakdown = _score_definition_to_results_breakdown(definition)
Expand Down Expand Up @@ -89,6 +93,8 @@ def run_refresh_score_cards_results(
end_time = time.time()
LOG.info("Refreshing results for %s done after %s seconds", scope, round(end_time - start_time, 2))

send_score_drop_notifications(score_notification_data)


def _score_card_to_results(score_card: ScoreCard) -> list[ScoreDefinitionResult]:
return [
Expand Down
12 changes: 9 additions & 3 deletions testgen/commands/run_test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from testgen.common.models.table_group import TableGroup
from testgen.common.models.test_run import TestRun
from testgen.common.models.test_suite import TestSuite
from testgen.common.notifications.test_run import send_test_run_notifications
from testgen.ui.session import session
from testgen.utils import get_exception_message

Expand Down Expand Up @@ -81,6 +82,10 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r

sql_generator = TestExecutionSQL(connection, table_group, test_run)

# Update the thresholds before retrieving the test definitions in the next steps
LOG.info("Updating historic test thresholds")
execute_db_queries([sql_generator.update_historic_thresholds()])

LOG.info("Retrieving active test definitions in test suite")
test_defs = fetch_dict_from_db(*sql_generator.get_active_test_definitions())
test_defs = [TestExecutionDef(**item) for item in test_defs]
Expand All @@ -99,9 +104,6 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
)

if valid_test_defs:
LOG.info("Updating historic test thresholds")
execute_db_queries([sql_generator.update_historic_thresholds()])

column_types = {(col.schema_name, col.table_name, col.column_name): col.column_type for col in data_chars}
for td in valid_test_defs:
td.column_type = column_types.get((td.schema_name, td.table_name, td.column_name))
Expand All @@ -128,13 +130,16 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
execute_db_queries(sql_generator.update_test_results())
# Refresh needed because previous query updates the test run too
test_run.refresh()

except Exception as e:
LOG.exception("Test execution encountered an error.")
LOG.info("Setting test run status to Error")
test_run.log_message = get_exception_message(e)
test_run.test_endtime = datetime.now(UTC) + time_delta
test_run.status = "Error"
test_run.save()

send_test_run_notifications(test_run)
else:
LOG.info("Setting test run status to Completed")
test_run.test_endtime = datetime.now(UTC) + time_delta
Expand All @@ -145,6 +150,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
test_suite.last_complete_test_run_id = test_run.id
test_suite.save()

send_test_run_notifications(test_run)
_rollup_test_scores(test_run, table_group)
finally:
MixpanelService().send_event(
Expand Down
145 changes: 145 additions & 0 deletions testgen/common/models/hygiene_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import re
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Self
from uuid import UUID, uuid4

from sqlalchemy import Column, ForeignKey, String, and_, case, null, select
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased, relationship
from sqlalchemy.sql.functions import func

from testgen.common.models import Base, get_current_session
from testgen.common.models.entity import Entity

PII_RISK_RE = re.compile(r"Risk: (MODERATE|HIGH),")


@dataclass
class IssueCount:
total: int = 0
inactive: int = 0

@property
def active(self):
return self.total - self.inactive


class HygieneIssueType(Base):
__tablename__ = "profile_anomaly_types"

id: str = Column(String, primary_key=True)
likelihood: str = Column("issue_likelihood", String)
name: str = Column("anomaly_name", String)

# Note: not all table columns are implemented by this entity


class HygieneIssue(Entity):
__tablename__ = "profile_anomaly_results"

id: UUID = Column(postgresql.UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid4)

project_code: str = Column(String, ForeignKey("projects.project_code"))
table_groups_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("table_groups.id"), nullable=False)
profile_run_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("profiling_runs.id"), nullable=False)

type_id: str = Column("anomaly_id", String, ForeignKey("profile_anomaly_types.id"), nullable=False)
type_ = relationship(HygieneIssueType)

schema_name: str = Column(String, nullable=False)
table_name: str = Column(String, nullable=False)
column_name: str = Column(String, nullable=False)

detail: str = Column(String, nullable=False)
disposition: str = Column(String)

# Note: not all table columns are implemented by this entity

@hybrid_property
def priority(self):
if self.type_.likelihood != "Potential PII":
return self.type_.likelihood
elif self.detail and (match := PII_RISK_RE.search(self.detail)):
return match.group(1).capitalize()
else:
return None

@priority.expression
def priority(cls):
return case(
(
HygieneIssueType.likelihood != "Potential PII",
HygieneIssueType.likelihood,
),
else_=func.initcap(
func.substring(cls.detail, PII_RISK_RE.pattern)
),
)

@classmethod
def select_count_by_priority(cls, profiling_run_id: UUID) -> dict[str, IssueCount]:
count_query = (
select(
cls.priority,
func.count(),
func.count(cls.disposition.in_(("Dismissed", "Inactive"))),
)
.select_from(cls)
.join(HygieneIssueType)
.where(cls.profile_run_id == profiling_run_id)
.group_by(cls.priority)
)
result = {
priority: IssueCount(total, inactive)
for priority, total, inactive in get_current_session().execute(count_query)
}
for p in ("Definite", "Likely", "Possible", "High", "Moderate"):
result.setdefault(p, IssueCount())
return result

@classmethod
def select_with_diff(
cls, profiling_run_id: UUID, other_profiling_run_id: UUID | None, *where_clauses, limit: int | None = None
) -> Iterable[tuple[Self,bool,str]]:
other = aliased(cls)
order_weight = case(
(cls.priority == "Definite", 1),
(cls.priority == "Likely", 2),
(cls.priority == "Possible", 3),
(cls.priority == "High", 4),
(cls.priority == "Moderate", 5),
else_=6,
)
is_new_col = (other.id.is_(None) if other_profiling_run_id else null()).label("is_new")
query = (
select(
cls,
is_new_col,
)
.outerjoin(
other,
and_(
other.table_groups_id == cls.table_groups_id,
other.schema_name == cls.schema_name,
other.table_name == cls.table_name,
other.column_name == cls.column_name,
other.type_id == cls.type_id,
other.profile_run_id == other_profiling_run_id,
),
).join(
HygieneIssueType,
HygieneIssueType.id == cls.type_id,
).where(
cls.profile_run_id == profiling_run_id,
*where_clauses
).order_by(
is_new_col.desc(),
order_weight,
).limit(
limit,
)
)

return get_current_session().execute(query)
Loading