Skip to content

Commit 876eec5

Browse files
committed
Migrate RequestProposedChangeDataIntegrity to prefect
1 parent 99d3266 commit 876eec5

File tree

6 files changed

+52
-31
lines changed

6 files changed

+52
-31
lines changed

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from .git_file_get import GitFileGet, GitFileGetResponse
1818
from .git_repository_connectivity import GitRepositoryConnectivity
1919
from .git_repository_importobjects import GitRepositoryImportObjects
20-
from .proposed_change.request_proposedchange_dataintegrity import RequestProposedChangeDataIntegrity
2120
from .proposed_change.request_proposedchange_refreshartifacts import RequestProposedChangeRefreshArtifacts
2221
from .proposed_change.request_proposedchange_repositorychecks import RequestProposedChangeRepositoryChecks
2322
from .proposed_change.request_proposedchange_rungenerators import RequestProposedChangeRunGenerators
@@ -53,6 +52,7 @@
5352
"git.diff.names_only": GitDiffNamesOnly,
5453
"git.file.get": GitFileGet,
5554
"git.repository.connectivity": GitRepositoryConnectivity,
55+
"git.repository.add_read_only": GitRepositoryAddReadOnly,
5656
"git.repository.import_objects": GitRepositoryImportObjects,
5757
"schema.migration.path": SchemaMigrationPath,
5858
"schema.validator.path": SchemaValidatorPath,

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
"refresh.webhook.configuration": refresh.webhook.configuration,
4242
"request.generator_definition.check": requests.generator_definition.check,
4343
"request.artifact_definition.check": requests.artifact_definition.check,
44-
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,
4544
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
4645
"request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts,
4746
"request.proposed_change.repository_checks": requests.proposed_change.repository_checks,

backend/infrahub/message_bus/operations/requests/proposed_change.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
ProposedChangeRepository,
3333
ProposedChangeSubscriber,
3434
)
35+
from infrahub.proposed_change.models import RequestProposedChangeDataIntegrity
3536
from infrahub.pytest_plugin import InfrahubBackendPlugin
3637
from infrahub.services import InfrahubServices # noqa: TCH001
38+
from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY
3739

3840
if TYPE_CHECKING:
3941
from infrahub_sdk.node import InfrahubNode
@@ -71,23 +73,6 @@ def log_line(self) -> str:
7173
return "Doesn't require changes due to no relevant modified kinds or file changes in Git"
7274

7375

74-
@flow(name="proposed-changed-data-integrity")
75-
async def data_integrity(message: messages.RequestProposedChangeDataIntegrity, service: InfrahubServices) -> None:
76-
"""Triggers a data integrity validation check on the provided proposed change to start."""
77-
async with service.task_report(
78-
related_node=message.proposed_change,
79-
title="Data Integrity",
80-
):
81-
log.info(f"Got a request to process data integrity defined in proposed_change: {message.proposed_change}")
82-
83-
destination_branch = await registry.get_branch(db=service.database, branch=message.destination_branch)
84-
source_branch = await registry.get_branch(db=service.database, branch=message.source_branch)
85-
component_registry = get_component_registry()
86-
async with service.database.start_transaction() as dbt:
87-
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=dbt, branch=source_branch)
88-
await diff_coordinator.update_branch_diff(base_branch=destination_branch, diff_branch=source_branch)
89-
90-
9176
@flow(name="proposed-changed-pipeline")
9277
async def pipeline(message: messages.RequestProposedChangePipeline, service: InfrahubServices) -> None:
9378
async with service.task_report(
@@ -164,14 +149,15 @@ async def pipeline(message: messages.RequestProposedChangePipeline, service: Inf
164149
branch=message.source_branch
165150
):
166151
await task_report.info("Adding Data Integrity job", proposed_change=message.proposed_change)
167-
events.append(
168-
messages.RequestProposedChangeDataIntegrity(
169-
proposed_change=message.proposed_change,
170-
source_branch=message.source_branch,
171-
source_branch_sync_with_git=message.source_branch_sync_with_git,
172-
destination_branch=message.destination_branch,
173-
branch_diff=branch_diff,
174-
)
152+
model = RequestProposedChangeDataIntegrity(
153+
proposed_change=message.proposed_change,
154+
source_branch=message.source_branch,
155+
source_branch_sync_with_git=message.source_branch_sync_with_git,
156+
destination_branch=message.destination_branch,
157+
branch_diff=branch_diff,
158+
)
159+
await service.workflow.submit_workflow(
160+
workflow=REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY, parameters={"model": model}
175161
)
176162

177163
if message.check_type in [CheckType.REPOSITORY, CheckType.USER]:
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .base_with_diff import BaseProposedChangeWithDiffMessage
1+
from infrahub.message_bus.messages.proposed_change.base_with_diff import BaseProposedChangeWithDiffMessage
22

33

44
class RequestProposedChangeDataIntegrity(BaseProposedChangeWithDiffMessage):

backend/infrahub/proposed_change/tasks.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
from __future__ import annotations
22

3+
from typing import TYPE_CHECKING
4+
35
from infrahub_sdk.protocols import CoreProposedChange
46
from prefect import flow, task
57
from prefect.logging import get_run_logger
68

9+
from infrahub.core import registry
710
from infrahub.core.constants import ProposedChangeState
8-
from infrahub.services import (
9-
services,
10-
)
11+
from infrahub.core.diff.coordinator import DiffCoordinator
12+
from infrahub.dependencies.registry import get_component_registry
13+
from infrahub.services import services
14+
15+
if TYPE_CHECKING:
16+
from infrahub.proposed_change.models import RequestProposedChangeDataIntegrity
1117

1218

1319
@flow(name="proposed-changes-cancel-branch", description="Cancel all Proposed change associated with a branch.")
@@ -39,3 +45,26 @@ async def cancel_proposed_change(proposed_change: CoreProposedChange) -> None:
3945
proposed_change = await service.client.get(kind=CoreProposedChange, id=proposed_change.id)
4046
proposed_change.state.value = ProposedChangeState.CANCELED.value
4147
await proposed_change.save()
48+
49+
50+
@flow(
51+
name="proposed-changed-data-integrity",
52+
flow_run_name="Triggers data integrity check on proposed change {model.proposed_change}",
53+
)
54+
async def run_proposed_change_data_integrity_check(model: RequestProposedChangeDataIntegrity) -> None:
55+
"""Triggers a data integrity validation check on the provided proposed change to start."""
56+
57+
service = services.service
58+
async with service.task_report(
59+
related_node=model.proposed_change,
60+
title="Data Integrity",
61+
):
62+
log = get_run_logger()
63+
log.info(f"Got a request to process data integrity defined in proposed_change: {model.proposed_change}")
64+
65+
destination_branch = await registry.get_branch(db=service.database, branch=model.destination_branch)
66+
source_branch = await registry.get_branch(db=service.database, branch=model.source_branch)
67+
component_registry = get_component_registry()
68+
async with service.database.start_transaction() as dbt:
69+
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=dbt, branch=source_branch)
70+
await diff_coordinator.update_branch_diff(base_branch=destination_branch, diff_branch=source_branch)

backend/infrahub/workflows/catalogue.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,13 @@
242242
function="process_transform",
243243
)
244244

245+
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY = WorkflowDefinition(
246+
name="proposed-changed-data-integrity",
247+
type=WorkflowType.INTERNAL,
248+
module="infrahub.proposed_change.tasks",
249+
function="run_proposed_change_data_integrity_check",
250+
)
251+
245252

246253
worker_pools = [INFRAHUB_WORKER_POOL]
247254

0 commit comments

Comments
 (0)