Skip to content

Commit 6804636

Browse files
committed
Add validator events
1 parent d3c088a commit 6804636

File tree

17 files changed

+423
-26
lines changed

17 files changed

+423
-26
lines changed

backend/infrahub/core/constants/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class EventType(InfrahubStringEnum):
6666
ARTIFACT_CREATED = f"{EVENT_NAMESPACE}.artifact.created"
6767
ARTIFACT_UPDATED = f"{EVENT_NAMESPACE}.artifact.updated"
6868

69+
VALIDATOR_STARTED = f"{EVENT_NAMESPACE}.validator.started"
70+
VALIDATOR_PASSED = f"{EVENT_NAMESPACE}.validator.passed"
71+
VALIDATOR_FAILED = f"{EVENT_NAMESPACE}.validator.failed"
72+
6973

7074
class PermissionLevel(enum.Flag):
7175
READ = 1

backend/infrahub/core/validators/checks_runner.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import asyncio
22
from typing import Any, Coroutine
33

4-
from infrahub_sdk.node import InfrahubNode
4+
from infrahub_sdk.protocols import CoreValidator
55

6+
from infrahub.context import InfrahubContext
67
from infrahub.core.constants import ValidatorConclusion, ValidatorState
78
from infrahub.core.timestamp import Timestamp
9+
from infrahub.services import InfrahubServices
10+
from infrahub.validators.events import send_failed_validator, send_passed_validator
811

912

1013
async def run_checks_and_update_validator(
11-
checks: list[Coroutine[Any, None, ValidatorConclusion]], validator: InfrahubNode
14+
checks: list[Coroutine[Any, None, ValidatorConclusion]],
15+
validator: CoreValidator,
16+
context: InfrahubContext,
17+
service: InfrahubServices,
18+
proposed_change_id: str,
1219
) -> None:
1320
"""
1421
Execute a list of checks coroutines, and set validator fields accordingly.
@@ -22,11 +29,17 @@ async def run_checks_and_update_validator(
2229
validator.completed_at.value = ""
2330
await validator.save()
2431

32+
failed_early = False
33+
2534
for earliest_task in asyncio.as_completed(checks):
2635
result = await earliest_task
2736
if validator.conclusion.value != ValidatorConclusion.FAILURE.value and result == ValidatorConclusion.FAILURE:
2837
validator.conclusion.value = ValidatorConclusion.FAILURE.value
38+
failed_early = True
2939
await validator.save()
40+
await send_failed_validator(
41+
service=service, validator=validator, proposed_change_id=proposed_change_id, context=context
42+
)
3043
# Continue to iterate to wait for the end of all checks
3144

3245
validator.state.value = ValidatorState.COMPLETED.value
@@ -35,3 +48,13 @@ async def run_checks_and_update_validator(
3548
validator.conclusion.value = ValidatorConclusion.SUCCESS.value
3649

3750
await validator.save()
51+
52+
if not failed_early:
53+
if validator.conclusion.value == ValidatorConclusion.SUCCESS.value:
54+
await send_passed_validator(
55+
service=service, validator=validator, proposed_change_id=proposed_change_id, context=context
56+
)
57+
else:
58+
await send_failed_validator(
59+
service=service, validator=validator, proposed_change_id=proposed_change_id, context=context
60+
)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from pydantic import Field, computed_field
2+
3+
from infrahub.message_bus import InfrahubMessage
4+
5+
from .constants import EVENT_NAMESPACE
6+
from .models import InfrahubEvent
7+
8+
9+
class ValidatorEvent(InfrahubEvent):
10+
"""Event generated when an validator within a pipeline has started."""
11+
12+
node_id: str = Field(..., description="The ID of the validator")
13+
kind: str = Field(..., description="The kind of the validator")
14+
proposed_change_id: str = Field(..., description="The ID of the proposed change")
15+
16+
def get_resource(self) -> dict[str, str]:
17+
return {
18+
"prefect.resource.id": self.node_id,
19+
"infrahub.node.kind": self.kind,
20+
"infrahub.node.id": self.node_id,
21+
"infrahub.branch.name": self.meta.context.branch.name,
22+
}
23+
24+
def get_related(self) -> list[dict[str, str]]:
25+
related = super().get_related()
26+
related.append(
27+
{
28+
"prefect.resource.id": self.proposed_change_id,
29+
"prefect.resource.role": "infrahub.related.node",
30+
"infrahub.node.kind": "CoreProposedChange",
31+
}
32+
)
33+
34+
return related
35+
36+
def get_messages(self) -> list[InfrahubMessage]:
37+
return []
38+
39+
40+
class ValidatorStartedEvent(ValidatorEvent):
41+
"""Event generated when an validator within a pipeline has started."""
42+
43+
@computed_field
44+
def event_name(self) -> str:
45+
return f"{EVENT_NAMESPACE}.validator.started"
46+
47+
48+
class ValidatorPassedEvent(ValidatorEvent):
49+
"""Event generated when an validator within a pipeline has completed successfully."""
50+
51+
@computed_field
52+
def event_name(self) -> str:
53+
return f"{EVENT_NAMESPACE}.validator.passed"
54+
55+
56+
class ValidatorFailedEvent(ValidatorEvent):
57+
"""Event generated when an validator within a pipeline has completed successfully."""
58+
59+
@computed_field
60+
def event_name(self) -> str:
61+
return f"{EVENT_NAMESPACE}.validator.failed"

backend/infrahub/git/tasks.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
from infrahub_sdk import InfrahubClient
2-
from infrahub_sdk.protocols import CoreArtifact, CoreArtifactDefinition, CoreCheckDefinition, CoreRepository
2+
from infrahub_sdk.protocols import (
3+
CoreArtifact,
4+
CoreArtifactDefinition,
5+
CoreCheckDefinition,
6+
CoreRepository,
7+
CoreRepositoryValidator,
8+
CoreUserValidator,
9+
)
310
from infrahub_sdk.uuidt import UUIDT
411
from prefect import flow, task
512
from prefect.cache_policies import NONE
@@ -12,6 +19,7 @@
1219
from infrahub.exceptions import CheckError, RepositoryError
1320
from infrahub.message_bus import Meta, messages
1421
from infrahub.services import InfrahubServices
22+
from infrahub.validators.events import send_start_validator
1523
from infrahub.worker import WORKER_IDENTITY
1624

1725
from ..core.manager import NodeManager
@@ -509,7 +517,9 @@ async def git_repository_diff_names_only(
509517
name="git-repository-user-checks-definition-trigger",
510518
flow_run_name="Trigger user defined checks for repository {model.repository_name}",
511519
)
512-
async def trigger_repository_user_checks_definitions(model: UserCheckDefinitionData, service: InfrahubServices) -> None:
520+
async def trigger_repository_user_checks_definitions(
521+
model: UserCheckDefinitionData, context: InfrahubContext, service: InfrahubServices
522+
) -> None:
513523
await add_tags(branches=[model.branch_name], nodes=[model.proposed_change])
514524
log = get_run_logger()
515525

@@ -520,7 +530,7 @@ async def trigger_repository_user_checks_definitions(model: UserCheckDefinitionD
520530
validator_execution_id = str(UUIDT())
521531
check_execution_ids: list[str] = []
522532
await proposed_change.validations.fetch()
523-
validator = None
533+
validator: CoreUserValidator | None = None
524534

525535
for relationship in proposed_change.validations.peers:
526536
existing_validator = relationship.peer
@@ -541,7 +551,7 @@ async def trigger_repository_user_checks_definitions(model: UserCheckDefinitionD
541551
await validator.save()
542552
else:
543553
validator = await service.client.create(
544-
kind=InfrahubKind.USERVALIDATOR,
554+
kind=CoreUserValidator,
545555
data={
546556
"label": f"Check: {definition.name.value}",
547557
"proposed_change": model.proposed_change,
@@ -551,6 +561,10 @@ async def trigger_repository_user_checks_definitions(model: UserCheckDefinitionD
551561
)
552562
await validator.save()
553563

564+
await send_start_validator(
565+
service=service, validator=validator, proposed_change_id=model.proposed_change, context=context
566+
)
567+
554568
if definition.targets.id:
555569
# Check against a group of targets
556570
await definition.targets.fetch()
@@ -612,14 +626,22 @@ async def trigger_repository_user_checks_definitions(model: UserCheckDefinitionD
612626
for model in check_models
613627
]
614628

615-
await run_checks_and_update_validator(checks_coroutines, validator)
629+
await run_checks_and_update_validator(
630+
checks=checks_coroutines,
631+
validator=validator,
632+
context=context,
633+
service=service,
634+
proposed_change_id=model.proposed_change,
635+
)
616636

617637

618638
@flow(
619639
name="git-repository-trigger-user-checks",
620640
flow_run_name="Evaluating user-defined checks on repository {model.repository_name}",
621641
)
622-
async def trigger_user_checks(model: TriggerRepositoryUserChecks, service: InfrahubServices) -> None:
642+
async def trigger_user_checks(
643+
model: TriggerRepositoryUserChecks, service: InfrahubServices, context: InfrahubContext
644+
) -> None:
623645
"""Request to start validation checks on a specific repository for User-defined checks."""
624646

625647
await add_tags(branches=[model.source_branch], nodes=[model.proposed_change])
@@ -645,15 +667,19 @@ async def trigger_user_checks(model: TriggerRepositoryUserChecks, service: Infra
645667
branch_diff=model.branch_diff,
646668
)
647669
await service.workflow.submit_workflow(
648-
workflow=GIT_REPOSITORY_USER_CHECKS_DEFINITIONS_TRIGGER, parameters={"model": user_check_definition_model}
670+
workflow=GIT_REPOSITORY_USER_CHECKS_DEFINITIONS_TRIGGER,
671+
context=context,
672+
parameters={"model": user_check_definition_model},
649673
)
650674

651675

652676
@flow(
653677
name="git-repository-trigger-internal-checks",
654678
flow_run_name="Running repository checks for repository {model.repository}",
655679
)
656-
async def trigger_internal_checks(model: TriggerRepositoryInternalChecks, service: InfrahubServices) -> None:
680+
async def trigger_internal_checks(
681+
model: TriggerRepositoryInternalChecks, service: InfrahubServices, context: InfrahubContext
682+
) -> None:
657683
"""Request to start validation checks on a specific repository."""
658684
await add_tags(branches=[model.source_branch], nodes=[model.proposed_change])
659685
log = get_run_logger()
@@ -669,7 +695,7 @@ async def trigger_internal_checks(model: TriggerRepositoryInternalChecks, servic
669695
await repository.checks.fetch()
670696

671697
validator_name = f"Repository Validator: {repository.name.value}"
672-
validator = None
698+
validator: CoreRepositoryValidator | None = None
673699
for relationship in proposed_change.validations.peers:
674700
existing_validator = relationship.peer
675701

@@ -687,7 +713,7 @@ async def trigger_internal_checks(model: TriggerRepositoryInternalChecks, servic
687713
await validator.save()
688714
else:
689715
validator = await service.client.create(
690-
kind=InfrahubKind.REPOSITORYVALIDATOR,
716+
kind=CoreRepositoryValidator,
691717
data={
692718
"label": validator_name,
693719
"proposed_change": model.proposed_change,
@@ -696,6 +722,10 @@ async def trigger_internal_checks(model: TriggerRepositoryInternalChecks, servic
696722
)
697723
await validator.save()
698724

725+
await send_start_validator(
726+
service=service, validator=validator, proposed_change_id=model.proposed_change, context=context
727+
)
728+
699729
check_execution_id = str(UUIDT())
700730
check_execution_ids.append(check_execution_id)
701731
log.info("Adding check for merge conflict")
@@ -718,7 +748,13 @@ async def trigger_internal_checks(model: TriggerRepositoryInternalChecks, servic
718748
expected_return=ValidatorConclusion,
719749
)
720750

721-
await run_checks_and_update_validator(checks=[check_coroutine], validator=validator)
751+
await run_checks_and_update_validator(
752+
checks=[check_coroutine],
753+
validator=validator,
754+
context=context,
755+
service=service,
756+
proposed_change_id=model.proposed_change,
757+
)
722758

723759

724760
@flow(

backend/infrahub/message_bus/messages/finalize_validator_execution.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from pydantic import Field
22

3+
from infrahub.context import InfrahubContext
34
from infrahub.message_bus import InfrahubMessage
45

56

@@ -10,3 +11,5 @@ class FinalizeValidatorExecution(InfrahubMessage):
1011
validator_execution_id: str = Field(..., description="The id of current execution of the associated validator")
1112
start_time: str = Field(..., description="Start time when the message was first created")
1213
validator_type: str = Field(..., description="The type of validator to complete")
14+
context: InfrahubContext = Field(..., description="The Infrahub context")
15+
proposed_change: str = Field(..., description="The ID of the proposed change")

backend/infrahub/message_bus/messages/request_generatordefinition_check.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from pydantic import ConfigDict, Field
22

3+
from infrahub.context import InfrahubContext
34
from infrahub.generators.models import ProposedChangeGeneratorDefinition
45
from infrahub.message_bus import InfrahubMessage
56
from infrahub.message_bus.types import ProposedChangeBranchDiff
@@ -16,3 +17,4 @@ class RequestGeneratorDefinitionCheck(InfrahubMessage):
1617
source_branch: str = Field(..., description="The source branch")
1718
source_branch_sync_with_git: bool = Field(..., description="Indicates if the source branch should sync with git")
1819
destination_branch: str = Field(..., description="The target branch")
20+
context: InfrahubContext = Field(..., description="The Infrahub context")

backend/infrahub/message_bus/operations/finalize/validator.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
1+
from infrahub_sdk.protocols import (
2+
CoreArtifactValidator,
3+
CoreDataValidator,
4+
CoreGeneratorValidator,
5+
CoreRepositoryValidator,
6+
CoreSchemaValidator,
7+
CoreUserValidator,
8+
CoreValidator,
9+
)
110
from prefect import flow
211

312
from infrahub import config
13+
from infrahub.core.constants import InfrahubKind, ValidatorConclusion
414
from infrahub.core.timestamp import Timestamp
515
from infrahub.log import get_logger
616
from infrahub.message_bus import messages
717
from infrahub.message_bus.types import KVTTL, MessageTTL
818
from infrahub.services import InfrahubServices
19+
from infrahub.validators.events import send_failed_validator, send_passed_validator
920

1021
log = get_logger()
1122

@@ -20,7 +31,8 @@ async def execution(message: messages.FinalizeValidatorExecution, service: Infra
2031
2132
The message will get rescheduled until the timeout has exceeded or until all checks are accounted for.
2233
"""
23-
validator = await service.client.get(kind=message.validator_type, id=message.validator_id)
34+
validator_type = get_validator_type(validator_type=message.validator_type)
35+
validator = await service.client.get(kind=validator_type, id=message.validator_id)
2436
checks_key = f"validator_execution_id:{message.validator_execution_id}:checks"
2537
current_conclusion = validator.conclusion.value
2638
if validator.state.value != "in_progress":
@@ -81,3 +93,41 @@ async def execution(message: messages.FinalizeValidatorExecution, service: Infra
8193
validator.completed_at.value = Timestamp().to_string()
8294
validator.conclusion.value = conclusion
8395
await validator.save()
96+
if validator.conclusion.value == ValidatorConclusion.SUCCESS.value:
97+
await send_passed_validator(
98+
service=service, validator=validator, proposed_change_id=message.proposed_change, context=message.context
99+
)
100+
else:
101+
await send_failed_validator(
102+
service=service, validator=validator, proposed_change_id=message.proposed_change, context=message.context
103+
)
104+
105+
106+
def get_validator_type(
107+
validator_type: str,
108+
) -> (
109+
type[CoreArtifactValidator]
110+
| type[CoreDataValidator]
111+
| type[CoreGeneratorValidator]
112+
| type[CoreRepositoryValidator]
113+
| type[CoreSchemaValidator]
114+
| type[CoreUserValidator]
115+
| type[CoreValidator]
116+
):
117+
match validator_type:
118+
case InfrahubKind.USERVALIDATOR:
119+
validator_kind = CoreUserValidator
120+
case InfrahubKind.SCHEMAVALIDATOR:
121+
validator_kind = CoreSchemaValidator
122+
case InfrahubKind.GENERATORVALIDATOR:
123+
validator_kind = CoreGeneratorValidator
124+
case InfrahubKind.REPOSITORYVALIDATOR:
125+
validator_kind = CoreRepositoryValidator
126+
case InfrahubKind.DATAVALIDATOR:
127+
validator_kind = CoreDataValidator
128+
case InfrahubKind.ARTIFACTVALIDATOR:
129+
validator_kind = CoreArtifactValidator
130+
case _:
131+
validator_kind = CoreValidator
132+
133+
return validator_kind

0 commit comments

Comments
 (0)