Skip to content

Commit 529a97e

Browse files
authored
Merge pull request #4885 from opsmill/lgu-migrate-req-prop-changed-data-integrity
Migrate RequestProposedChangeDataIntegrity to prefect
2 parents 82f8842 + 00888e0 commit 529a97e

File tree

9 files changed

+44
-89
lines changed

9 files changed

+44
-89
lines changed

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from .git_file_get import GitFileGet, GitFileGetResponse
1818
from .git_repository_connectivity import GitRepositoryConnectivity
1919
from .git_repository_importobjects import GitRepositoryImportObjects
20-
from .proposed_change.request_proposedchange_dataintegrity import RequestProposedChangeDataIntegrity
2120
from .proposed_change.request_proposedchange_refreshartifacts import RequestProposedChangeRefreshArtifacts
2221
from .proposed_change.request_proposedchange_repositorychecks import RequestProposedChangeRepositoryChecks
2322
from .proposed_change.request_proposedchange_rungenerators import RequestProposedChangeRunGenerators
@@ -61,7 +60,6 @@
6160
"refresh.webhook.configuration": RefreshWebhookConfiguration,
6261
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
6362
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
64-
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
6563
"request.proposed_change.pipeline": RequestProposedChangePipeline,
6664
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,
6765
"request.proposed_change.repository_checks": RequestProposedChangeRepositoryChecks,

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
"refresh.webhook.configuration": refresh.webhook.configuration,
4242
"request.generator_definition.check": requests.generator_definition.check,
4343
"request.artifact_definition.check": requests.artifact_definition.check,
44-
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,
4544
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
4645
"request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts,
4746
"request.proposed_change.repository_checks": requests.proposed_change.repository_checks,

backend/infrahub/message_bus/operations/requests/proposed_change.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
ProposedChangeRepository,
3333
ProposedChangeSubscriber,
3434
)
35+
from infrahub.proposed_change.models import RequestProposedChangeDataIntegrity
3536
from infrahub.pytest_plugin import InfrahubBackendPlugin
3637
from infrahub.services import InfrahubServices # noqa: TCH001
38+
from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY
3739

3840
if TYPE_CHECKING:
3941
from infrahub_sdk.node import InfrahubNode
@@ -71,23 +73,6 @@ def log_line(self) -> str:
7173
return "Doesn't require changes due to no relevant modified kinds or file changes in Git"
7274

7375

74-
@flow(name="proposed-changed-data-integrity")
75-
async def data_integrity(message: messages.RequestProposedChangeDataIntegrity, service: InfrahubServices) -> None:
76-
"""Triggers a data integrity validation check on the provided proposed change to start."""
77-
async with service.task_report(
78-
related_node=message.proposed_change,
79-
title="Data Integrity",
80-
):
81-
log.info(f"Got a request to process data integrity defined in proposed_change: {message.proposed_change}")
82-
83-
destination_branch = await registry.get_branch(db=service.database, branch=message.destination_branch)
84-
source_branch = await registry.get_branch(db=service.database, branch=message.source_branch)
85-
component_registry = get_component_registry()
86-
async with service.database.start_transaction() as dbt:
87-
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=dbt, branch=source_branch)
88-
await diff_coordinator.update_branch_diff(base_branch=destination_branch, diff_branch=source_branch)
89-
90-
9176
@flow(name="proposed-changed-pipeline")
9277
async def pipeline(message: messages.RequestProposedChangePipeline, service: InfrahubServices) -> None:
9378
async with service.task_report(
@@ -164,14 +149,15 @@ async def pipeline(message: messages.RequestProposedChangePipeline, service: Inf
164149
branch=message.source_branch
165150
):
166151
await task_report.info("Adding Data Integrity job", proposed_change=message.proposed_change)
167-
events.append(
168-
messages.RequestProposedChangeDataIntegrity(
169-
proposed_change=message.proposed_change,
170-
source_branch=message.source_branch,
171-
source_branch_sync_with_git=message.source_branch_sync_with_git,
172-
destination_branch=message.destination_branch,
173-
branch_diff=branch_diff,
174-
)
152+
model = RequestProposedChangeDataIntegrity(
153+
proposed_change=message.proposed_change,
154+
source_branch=message.source_branch,
155+
source_branch_sync_with_git=message.source_branch_sync_with_git,
156+
destination_branch=message.destination_branch,
157+
branch_diff=branch_diff,
158+
)
159+
await service.workflow.submit_workflow(
160+
workflow=REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY, parameters={"model": model}
175161
)
176162

177163
if message.check_type in [CheckType.REPOSITORY, CheckType.USER]:
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .base_with_diff import BaseProposedChangeWithDiffMessage
1+
from infrahub.message_bus.messages.proposed_change.base_with_diff import BaseProposedChangeWithDiffMessage
22

33

44
class RequestProposedChangeDataIntegrity(BaseProposedChangeWithDiffMessage):

backend/infrahub/proposed_change/tasks.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
from prefect import flow, task
55
from prefect.logging import get_run_logger
66

7+
from infrahub.core import registry
78
from infrahub.core.constants import ProposedChangeState
8-
from infrahub.services import (
9-
services,
9+
from infrahub.core.diff.coordinator import DiffCoordinator
10+
from infrahub.dependencies.registry import get_component_registry
11+
from infrahub.proposed_change.models import (
12+
RequestProposedChangeDataIntegrity, # noqa: TCH001. as symbol is required by prefect flow
1013
)
14+
from infrahub.services import services
1115

1216

1317
@flow(name="proposed-changes-cancel-branch", description="Cancel all Proposed change associated with a branch.")
@@ -39,3 +43,19 @@ async def cancel_proposed_change(proposed_change: CoreProposedChange) -> None:
3943
proposed_change = await service.client.get(kind=CoreProposedChange, id=proposed_change.id)
4044
proposed_change.state.value = ProposedChangeState.CANCELED.value
4145
await proposed_change.save()
46+
47+
48+
@flow(
49+
name="proposed-changed-data-integrity",
50+
flow_run_name="Triggers data integrity check",
51+
)
52+
async def run_proposed_change_data_integrity_check(model: RequestProposedChangeDataIntegrity) -> None:
53+
"""Triggers a data integrity validation check on the provided proposed change to start."""
54+
55+
service = services.service
56+
destination_branch = await registry.get_branch(db=service.database, branch=model.destination_branch)
57+
source_branch = await registry.get_branch(db=service.database, branch=model.source_branch)
58+
component_registry = get_component_registry()
59+
async with service.database.start_transaction() as dbt:
60+
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=dbt, branch=source_branch)
61+
await diff_coordinator.update_branch_diff(base_branch=destination_branch, diff_branch=source_branch)

backend/infrahub/workflows/catalogue.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from infrahub.core.constants import BranchSupportType
2-
31
from .constants import WorkflowTag, WorkflowType
42
from .models import WorkerPoolDefinition, WorkflowDefinition
53

@@ -19,15 +17,13 @@
1917
type=WorkflowType.USER,
2018
module="infrahub.transformations.tasks",
2119
function="transform_render_jinja2_template",
22-
branch_support=BranchSupportType.AWARE,
2320
)
2421

2522
TRANSFORM_PYTHON_RENDER = WorkflowDefinition(
2623
name="transform_render_python",
2724
type=WorkflowType.USER,
2825
module="infrahub.transformations.tasks",
2926
function="transform_python",
30-
branch_support=BranchSupportType.AWARE,
3127
)
3228

3329
ANONYMOUS_TELEMETRY_SEND = WorkflowDefinition(
@@ -43,7 +39,6 @@
4339
type=WorkflowType.INTERNAL,
4440
module="infrahub.core.migrations.schema.tasks",
4541
function="schema_apply_migrations",
46-
branch_support=BranchSupportType.AWARE,
4742
tags=[WorkflowTag.DATABASE_CHANGE],
4843
)
4944

@@ -52,7 +47,6 @@
5247
type=WorkflowType.INTERNAL,
5348
module="infrahub.core.validators.tasks",
5449
function="schema_validate_migrations",
55-
branch_support=BranchSupportType.AWARE,
5650
)
5751

5852
TRIGGER_ARTIFACT_DEFINITION_GENERATE = WorkflowDefinition(
@@ -74,7 +68,6 @@
7468
type=WorkflowType.INTERNAL,
7569
module="infrahub.core.ipam.tasks",
7670
function="ipam_reconciliation",
77-
branch_support=BranchSupportType.AWARE,
7871
tags=[WorkflowTag.DATABASE_CHANGE],
7972
)
8073

@@ -90,7 +83,6 @@
9083
type=WorkflowType.INTERNAL,
9184
module="infrahub.generators.tasks",
9285
function="request_generator_definition_run",
93-
branch_support=BranchSupportType.AWARE,
9486
)
9587

9688
REQUEST_ARTIFACT_GENERATE = WorkflowDefinition(
@@ -134,7 +126,6 @@
134126
type=WorkflowType.INTERNAL,
135127
module="infrahub.git.tasks",
136128
function="create_branch",
137-
branch_support=BranchSupportType.AWARE,
138129
tags=[WorkflowTag.DATABASE_CHANGE],
139130
)
140131

@@ -143,7 +134,6 @@
143134
type=WorkflowType.INTERNAL,
144135
module="infrahub.git.tasks",
145136
function="add_git_repository",
146-
branch_support=BranchSupportType.AWARE,
147137
tags=[WorkflowTag.DATABASE_CHANGE],
148138
)
149139

@@ -152,7 +142,6 @@
152142
type=WorkflowType.INTERNAL,
153143
module="infrahub.git.tasks",
154144
function="add_git_repository_read_only",
155-
branch_support=BranchSupportType.AWARE,
156145
tags=[WorkflowTag.DATABASE_CHANGE],
157146
)
158147

@@ -168,7 +157,6 @@
168157
type=WorkflowType.INTERNAL,
169158
module="infrahub.git.tasks",
170159
function="merge_git_repository",
171-
branch_support=BranchSupportType.AWARE,
172160
tags=[WorkflowTag.DATABASE_CHANGE],
173161
)
174162

@@ -177,7 +165,6 @@
177165
type=WorkflowType.INTERNAL,
178166
module="infrahub.core.branch.tasks",
179167
function="rebase_branch",
180-
branch_support=BranchSupportType.AWARE,
181168
tags=[WorkflowTag.DATABASE_CHANGE],
182169
)
183170

@@ -186,7 +173,6 @@
186173
type=WorkflowType.INTERNAL,
187174
module="infrahub.core.branch.tasks",
188175
function="merge_branch",
189-
branch_support=BranchSupportType.AWARE,
190176
tags=[WorkflowTag.DATABASE_CHANGE],
191177
)
192178

@@ -195,15 +181,13 @@
195181
type=WorkflowType.INTERNAL,
196182
module="infrahub.core.branch.tasks",
197183
function="delete_branch",
198-
branch_support=BranchSupportType.AWARE,
199184
)
200185

201186
BRANCH_VALIDATE = WorkflowDefinition(
202187
name="branch-validate",
203188
type=WorkflowType.INTERNAL,
204189
module="infrahub.core.branch.tasks",
205190
function="validate_branch",
206-
branch_support=BranchSupportType.AWARE,
207191
)
208192

209193
BRANCH_CANCEL_PROPOSED_CHANGES = WorkflowDefinition(
@@ -218,7 +202,6 @@
218202
type=WorkflowType.INTERNAL,
219203
module="infrahub.groups.tasks",
220204
function="update_graphql_query_group",
221-
branch_support=BranchSupportType.AWARE,
222205
)
223206

224207
PROCESS_COMPUTED_MACRO = WorkflowDefinition(
@@ -242,6 +225,13 @@
242225
function="process_transform",
243226
)
244227

228+
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY = WorkflowDefinition(
229+
name="proposed-changed-data-integrity",
230+
type=WorkflowType.INTERNAL,
231+
module="infrahub.proposed_change.tasks",
232+
function="run_proposed_change_data_integrity_check",
233+
)
234+
245235

246236
worker_pools = [INFRAHUB_WORKER_POOL]
247237

@@ -276,4 +266,5 @@
276266
PROCESS_COMPUTED_MACRO,
277267
COMPUTED_ATTRIBUTE_SETUP,
278268
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
269+
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
279270
]

backend/infrahub/workflows/models.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing_extensions import Self
1111

1212
from infrahub import __version__
13-
from infrahub.core.constants import BranchSupportType
1413

1514
from .constants import TAG_NAMESPACE, WorkflowTag, WorkflowType
1615

@@ -40,7 +39,6 @@ class WorkflowDefinition(BaseModel):
4039
module: str
4140
function: str
4241
cron: str | None = None
43-
branch_support: BranchSupportType = BranchSupportType.AGNOSTIC
4442
tags: list[WorkflowTag] = Field(default_factory=list)
4543

4644
@property

backend/tests/integration/message_bus/operations/request/test_proposed_change.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from infrahub.message_bus.types import ProposedChangeBranchDiff
1414
from infrahub.server import app, app_initialization
1515
from infrahub.services import InfrahubServices, services
16+
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
1617
from tests.adapters.log import FakeLogger
1718
from tests.adapters.message_bus import BusRecorder
1819
from tests.helpers.file_repo import FileRepo
@@ -97,7 +98,7 @@ async def prepare_proposed_change(
9798
config = Config(api_token=admin_token, requester=test_client.async_request)
9899
client = InfrahubClient(config=config)
99100

100-
service = InfrahubServices(message_bus=bus, client=client)
101+
service = InfrahubServices(message_bus=bus, client=client, workflow=WorkflowLocalExecution())
101102
services.prepare(service=service)
102103

103104
repo = await InfrahubRepository.new(id=obj.id, name=file_repo.name, location=file_repo.path, client=client)
@@ -154,7 +155,6 @@ async def test_run_pipeline_validate_requested_jobs(
154155
]
155156

156157
assert sorted(bus_post_data_changes.seen_routing_keys) == [
157-
"request.proposed_change.data_integrity",
158158
"request.proposed_change.run_generators",
159159
"request.proposed_change.run_tests",
160160
"request.proposed_change.schema_integrity",

docs/docs/reference/message-bus-events.mdx

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -536,24 +536,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
536536
### Request Proposed Change
537537
<!-- vale on -->
538538

539-
<!-- vale off -->
540-
#### Event request.proposed_change.data_integrity
541-
<!-- vale on -->
542-
543-
**Description**: Sent trigger data integrity checks for a proposed change
544-
545-
**Priority**: 3
546-
547-
<!-- vale off -->
548-
| Key | Description | Type | Default Value |
549-
|-----|-------------|------|---------------|
550-
| **meta** | Meta properties for the message | N/A | None |
551-
| **proposed_change** | The unique ID of the Proposed Change | string | None |
552-
| **source_branch** | The source branch of the proposed change | string | None |
553-
| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None |
554-
| **destination_branch** | The destination branch of the proposed change | string | None |
555-
| **branch_diff** | The calculated diff between the two branches | N/A | None |
556-
<!-- vale on -->
557539
<!-- vale off -->
558540
#### Event request.proposed_change.pipeline
559541
<!-- vale on -->
@@ -1298,25 +1280,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
12981280
### Request Proposed Change
12991281
<!-- vale on -->
13001282

1301-
<!-- vale off -->
1302-
#### Event request.proposed_change.data_integrity
1303-
<!-- vale on -->
1304-
1305-
**Description**: Sent trigger data integrity checks for a proposed change
1306-
1307-
**Priority**: 3
1308-
1309-
1310-
<!-- vale off -->
1311-
| Key | Description | Type | Default Value |
1312-
|-----|-------------|------|---------------|
1313-
| **meta** | Meta properties for the message | N/A | None |
1314-
| **proposed_change** | The unique ID of the Proposed Change | string | None |
1315-
| **source_branch** | The source branch of the proposed change | string | None |
1316-
| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None |
1317-
| **destination_branch** | The destination branch of the proposed change | string | None |
1318-
| **branch_diff** | The calculated diff between the two branches | N/A | None |
1319-
<!-- vale on -->
13201283
<!-- vale off -->
13211284
#### Event request.proposed_change.pipeline
13221285
<!-- vale on -->

0 commit comments

Comments
 (0)