Skip to content

Commit ed1f8ca

Browse files
committed
Migrate RequestArtifactDefinitionCheck to prefect
1 parent 2553ba2 commit ed1f8ca

File tree

17 files changed

+311
-423
lines changed

17 files changed

+311
-423
lines changed

backend/infrahub/core/checks/__init__.py

Whitespace-only changes.

backend/infrahub/message_bus/messages/check_artifact_create.py renamed to backend/infrahub/core/checks/models.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
from typing import Optional
22

3-
from pydantic import Field
3+
from pydantic import BaseModel, ConfigDict, Field
44

5-
from infrahub.message_bus import InfrahubMessage
5+
from infrahub.message_bus.types import ProposedChangeArtifactDefinition, ProposedChangeBranchDiff
66

77

8-
class CheckArtifactCreate(InfrahubMessage):
8+
class RequestArtifactDefinitionCheck(BaseModel):
9+
"""Sent to validate the generation of artifacts in relation to a proposed change."""
10+
11+
model_config = ConfigDict(arbitrary_types_allowed=True)
12+
13+
artifact_definition: ProposedChangeArtifactDefinition = Field(..., description="The Artifact Definition")
14+
branch_diff: ProposedChangeBranchDiff = Field(..., description="The calculated diff between the two branches")
15+
proposed_change: str = Field(..., description="The unique ID of the Proposed Change")
16+
source_branch: str = Field(..., description="The source branch")
17+
source_branch_sync_with_git: bool = Field(..., description="Indicates if the source branch should sync with git")
18+
destination_branch: str = Field(..., description="The target branch")
19+
20+
21+
class CheckArtifactCreate(BaseModel):
922
"""Runs a check to verify the creation of an artifact."""
1023

1124
artifact_name: str = Field(..., description="Name of the artifact")
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
from typing import Optional, Union
2+
3+
from prefect import flow
4+
from prefect.logging import get_run_logger
5+
6+
from infrahub.core.checks.models import CheckArtifactCreate, RequestArtifactDefinitionCheck
7+
from infrahub.core.constants import InfrahubKind, ValidatorConclusion, ValidatorState
8+
from infrahub.core.timestamp import Timestamp
9+
from infrahub.core.validators.checks_runner import run_checks_and_update_validator
10+
from infrahub.git import InfrahubReadOnlyRepository, InfrahubRepository
11+
from infrahub.services import InfrahubServices
12+
from infrahub.tasks.artifact import define_artifact
13+
from infrahub.workflows.catalogue import GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE
14+
from infrahub.workflows.utils import add_tags
15+
16+
17+
@flow(
18+
name="artifact-definition-check",
19+
flow_run_name="Validating generation of artifacts for {model.artifact_definition.definition_name}",
20+
)
21+
async def check(model: RequestArtifactDefinitionCheck, service: InfrahubServices) -> None:
22+
await add_tags(branches=[model.source_branch], nodes=[model.proposed_change], db_change=True)
23+
24+
log = get_run_logger()
25+
artifact_definition = await service.client.get(
26+
kind=InfrahubKind.ARTIFACTDEFINITION,
27+
id=model.artifact_definition.definition_id,
28+
branch=model.source_branch,
29+
)
30+
proposed_change = await service.client.get(kind=InfrahubKind.PROPOSEDCHANGE, id=model.proposed_change)
31+
32+
validator_name = f"Artifact Validator: {model.artifact_definition.definition_name}"
33+
34+
await proposed_change.validations.fetch()
35+
36+
validator = None
37+
for relationship in proposed_change.validations.peers:
38+
existing_validator = relationship.peer
39+
if (
40+
existing_validator.typename == InfrahubKind.ARTIFACTVALIDATOR
41+
and existing_validator.definition.id == model.artifact_definition.definition_id
42+
):
43+
validator = existing_validator
44+
45+
if validator:
46+
validator.conclusion.value = ValidatorConclusion.UNKNOWN.value
47+
validator.state.value = ValidatorState.QUEUED.value
48+
validator.started_at.value = ""
49+
validator.completed_at.value = ""
50+
await validator.save()
51+
else:
52+
validator = await service.client.create(
53+
kind=InfrahubKind.ARTIFACTVALIDATOR,
54+
data={
55+
"label": validator_name,
56+
"proposed_change": model.proposed_change,
57+
"definition": model.artifact_definition.definition_id,
58+
},
59+
)
60+
await validator.save()
61+
62+
await artifact_definition.targets.fetch()
63+
group = artifact_definition.targets.peer
64+
await group.members.fetch()
65+
66+
existing_artifacts = await service.client.filters(
67+
kind=InfrahubKind.ARTIFACT,
68+
definition__ids=[model.artifact_definition.definition_id],
69+
include=["object"],
70+
branch=model.source_branch,
71+
)
72+
artifacts_by_member = {}
73+
for artifact in existing_artifacts:
74+
artifacts_by_member[artifact.object.peer.id] = artifact.id
75+
76+
repository = model.branch_diff.get_repository(repository_id=model.artifact_definition.repository_id)
77+
impacted_artifacts = model.branch_diff.get_subscribers_ids(kind=InfrahubKind.ARTIFACT)
78+
79+
checks = []
80+
81+
for relationship in group.members.peers:
82+
member = relationship.peer
83+
artifact_id = artifacts_by_member.get(member.id)
84+
if _render_artifact(
85+
artifact_id=artifact_id,
86+
managed_branch=model.source_branch_sync_with_git,
87+
impacted_artifacts=impacted_artifacts,
88+
):
89+
log.info(f"Trigger Artifact processing for {member.display_label}")
90+
91+
check_model = CheckArtifactCreate(
92+
artifact_name=model.artifact_definition.artifact_name,
93+
artifact_id=artifact_id,
94+
artifact_definition=model.artifact_definition.definition_id,
95+
commit=repository.source_commit,
96+
content_type=model.artifact_definition.content_type,
97+
transform_type=model.artifact_definition.transform_kind,
98+
transform_location=model.artifact_definition.transform_location,
99+
repository_id=repository.repository_id,
100+
repository_name=repository.repository_name,
101+
repository_kind=repository.kind,
102+
branch_name=model.source_branch,
103+
query=model.artifact_definition.query_name,
104+
variables=member.extract(params=artifact_definition.parameters.value),
105+
target_id=member.id,
106+
target_name=member.display_label,
107+
timeout=model.artifact_definition.timeout,
108+
validator_id=validator.id,
109+
)
110+
111+
checks.append(
112+
service.workflow.submit_workflow(
113+
workflow=GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE, parameters={"model": check_model}
114+
)
115+
)
116+
117+
await run_checks_and_update_validator(checks, validator)
118+
119+
120+
def _render_artifact(artifact_id: Optional[str], managed_branch: bool, impacted_artifacts: list[str]) -> bool: # noqa: ARG001
121+
"""Returns a boolean to indicate if an artifact should be generated or not.
122+
Will return true if:
123+
* The artifact_id wasn't set which could be that it's a new object that doesn't have a previous artifact
124+
* The source brance is not data only which would indicate that it could contain updates in git to the transform
125+
* The artifact_id exists in the impacted_artifacts list
126+
Will return false if:
127+
* The source branch is a data only branch and the artifact_id exists and is not in the impacted list
128+
"""
129+
130+
# if not artifact_id or managed_branch:
131+
# return True
132+
# return artifact_id in impacted_artifacts
133+
# Temporary workaround tracked in https://github.com/opsmill/infrahub/issues/4991
134+
return True
135+
136+
137+
@flow(name="git-repository-check-artifact-create", flow_run_name="Check artifact creation")
138+
async def create(model: CheckArtifactCreate, service: InfrahubServices) -> str:
139+
await add_tags(branches=[model.branch_name], nodes=[model.target_id])
140+
validator = await service.client.get(kind=InfrahubKind.ARTIFACTVALIDATOR, id=model.validator_id, include=["checks"])
141+
142+
repo: InfrahubReadOnlyRepository | InfrahubRepository
143+
if InfrahubKind.READONLYREPOSITORY:
144+
repo = await InfrahubReadOnlyRepository.init(
145+
id=model.repository_id,
146+
name=model.repository_name,
147+
client=service.client,
148+
service=service,
149+
)
150+
else:
151+
repo = await InfrahubRepository.init(
152+
id=model.repository_id,
153+
name=model.repository_name,
154+
client=service.client,
155+
service=service,
156+
)
157+
158+
artifact = await define_artifact(model=model, service=service)
159+
160+
conclusion = ValidatorConclusion.SUCCESS.value
161+
severity = "info"
162+
artifact_result: dict[str, Union[str, bool, None]] = {
163+
"changed": None,
164+
"checksum": None,
165+
"artifact_id": None,
166+
"storage_id": None,
167+
}
168+
check_message = "Failed to render artifact"
169+
170+
try:
171+
result = await repo.render_artifact(artifact=artifact, message=model)
172+
artifact_result["changed"] = result.changed
173+
artifact_result["checksum"] = result.checksum
174+
artifact_result["artifact_id"] = result.artifact_id
175+
artifact_result["storage_id"] = result.storage_id
176+
check_message = "Artifact rendered successfully"
177+
178+
except Exception as exc:
179+
conclusion = ValidatorConclusion.FAILURE.value
180+
artifact.status.value = "Error"
181+
severity = "critical"
182+
check_message += f": {str(exc)}"
183+
await artifact.save()
184+
185+
check = None
186+
check_name = f"{model.artifact_name}: {model.target_name}"
187+
existing_check = await service.client.filters(
188+
kind=InfrahubKind.ARTIFACTCHECK, validator__ids=validator.id, name__value=check_name
189+
)
190+
if existing_check:
191+
check = existing_check[0]
192+
193+
if check:
194+
check.created_at.value = Timestamp().to_string()
195+
check.conclusion.value = conclusion
196+
check.severity.value = severity
197+
check.changed.value = artifact_result["changed"]
198+
check.checksum.value = artifact_result["checksum"]
199+
check.artifact_id.value = artifact_result["artifact_id"]
200+
check.storage_id.value = artifact_result["storage_id"]
201+
await check.save()
202+
else:
203+
check = await service.client.create(
204+
kind=InfrahubKind.ARTIFACTCHECK,
205+
data={
206+
"name": check_name,
207+
"origin": model.repository_id,
208+
"kind": "ArtifactDefinition",
209+
"validator": model.validator_id,
210+
"created_at": Timestamp().to_string(),
211+
"message": check_message,
212+
"conclusion": conclusion,
213+
"severity": severity,
214+
"changed": artifact_result["changed"],
215+
"checksum": artifact_result["checksum"],
216+
"artifact_id": artifact_result["artifact_id"],
217+
"storage_id": artifact_result["storage_id"],
218+
},
219+
)
220+
await check.save()
221+
222+
return conclusion
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
from typing import Coroutine
3+
4+
from infrahub_sdk.node import InfrahubNode
5+
6+
from infrahub.core.constants import ValidatorConclusion, ValidatorState
7+
from infrahub.core.timestamp import Timestamp
8+
9+
10+
async def run_checks_and_update_validator(checks: list[Coroutine], validator: InfrahubNode) -> None:
11+
"""
12+
Execute a list of checks coroutines, and set validator fields accordingly.
13+
Tasks are retrieved by completion order so as soon as we detect a failing check,
14+
we set validator conclusion to failure.
15+
"""
16+
17+
# First set validator to in progress, then wait for results
18+
validator.state.value = ValidatorState.IN_PROGRESS.value
19+
validator.started_at.value = Timestamp().to_string()
20+
validator.completed_at.value = ""
21+
await validator.save()
22+
23+
for earliest_task in asyncio.as_completed(checks):
24+
result = await earliest_task
25+
if result == ValidatorConclusion.FAILURE:
26+
validator.conclusion.value = ValidatorConclusion.FAILURE.value
27+
await validator.save()
28+
# Continue to iterate to wait for the end of all checks
29+
30+
validator.state.value = ValidatorState.COMPLETED.value
31+
validator.completed_at.value = Timestamp().to_string()
32+
if validator.conclusion.value != ValidatorConclusion.FAILURE.value:
33+
validator.conclusion.value = ValidatorConclusion.SUCCESS.value
34+
35+
await validator.save()

backend/infrahub/git/integrator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
from infrahub_sdk.schema.repository import InfrahubRepositoryArtifactDefinitionConfig
5555
from infrahub_sdk.transforms import InfrahubTransform
5656

57+
from infrahub.core.checks.models import CheckArtifactCreate
5758
from infrahub.git.models import RequestArtifactGenerate
58-
from infrahub.message_bus import messages
5959
from infrahub.services import InfrahubServices
6060

6161

@@ -1266,7 +1266,7 @@ async def artifact_generate(
12661266
return ArtifactGenerateResult(changed=True, checksum=checksum, storage_id=storage_id, artifact_id=artifact.id)
12671267

12681268
async def render_artifact(
1269-
self, artifact: CoreArtifact, message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate]
1269+
self, artifact: CoreArtifact, message: Union[CheckArtifactCreate, RequestArtifactGenerate]
12701270
) -> ArtifactGenerateResult:
12711271
response = await self.sdk.query_gql_query(
12721272
name=message.query,

backend/infrahub/git/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ async def generate_artifact(model: RequestArtifactGenerate, service: InfrahubSer
257257
commit=model.commit,
258258
)
259259

260-
artifact = await define_artifact(message=model, service=service)
260+
artifact = await define_artifact(model=model, service=service)
261261

262262
try:
263263
result = await repo.render_artifact(artifact=artifact, message=model)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from infrahub.message_bus import InfrahubMessage, InfrahubResponse
22

3-
from .check_artifact_create import CheckArtifactCreate
43
from .check_generator_run import CheckGeneratorRun
54
from .check_repository_checkdefinition import CheckRepositoryCheckDefinition
65
from .check_repository_mergeconflicts import CheckRepositoryMergeConflicts
@@ -19,15 +18,13 @@
1918
from .refresh_git_fetch import RefreshGitFetch
2019
from .refresh_registry_branches import RefreshRegistryBranches
2120
from .refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch
22-
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
2321
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
2422
from .request_proposedchange_pipeline import RequestProposedChangePipeline
2523
from .request_repository_checks import RequestRepositoryChecks
2624
from .request_repository_userchecks import RequestRepositoryUserChecks
2725
from .send_echo_request import SendEchoRequest, SendEchoRequestResponse
2826

2927
MESSAGE_MAP: dict[str, type[InfrahubMessage]] = {
30-
"check.artifact.create": CheckArtifactCreate,
3128
"check.generator.run": CheckGeneratorRun,
3229
"check.repository.check_definition": CheckRepositoryCheckDefinition,
3330
"check.repository.merge_conflicts": CheckRepositoryMergeConflicts,
@@ -45,7 +42,6 @@
4542
"refresh.git.fetch": RefreshGitFetch,
4643
"refresh.registry.branches": RefreshRegistryBranches,
4744
"refresh.registry.rebased_branch": RefreshRegistryRebasedBranch,
48-
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
4945
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
5046
"request.proposed_change.pipeline": RequestProposedChangePipeline,
5147
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,

backend/infrahub/message_bus/messages/request_artifactdefinition_check.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from infrahub.tasks.check import set_check_status
1919

2020
COMMAND_MAP = {
21-
"check.artifact.create": check.artifact.create,
2221
"check.generator.run": check.generator.run,
2322
"check.repository.check_definition": check.repository.check_definition,
2423
"check.repository.merge_conflicts": check.repository.merge_conflicts,
@@ -34,7 +33,6 @@
3433
"refresh.registry.branches": refresh.registry.branches,
3534
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
3635
"request.generator_definition.check": requests.generator_definition.check,
37-
"request.artifact_definition.check": requests.artifact_definition.check,
3836
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
3937
"request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts,
4038
"request.repository.checks": requests.repository.checks,
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from . import artifact, generator, repository
1+
from . import generator, repository
22

3-
__all__ = ["artifact", "generator", "repository"]
3+
__all__ = ["generator", "repository"]

0 commit comments

Comments
 (0)