Skip to content

Commit a9887b2

Browse files
authored
Merge pull request #4549 from opsmill/dga-20241006-convert-flow
Convert all operations to prefect flows
2 parents 74fd1b4 + 4fe6d53 commit a9887b2

39 files changed

+137
-8
lines changed

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Optional
22

33
import ujson
4+
from prefect import Flow
45

56
from infrahub.message_bus import RPCErrorResponse, messages
67
from infrahub.message_bus.operations import (
@@ -81,12 +82,17 @@
8182
}
8283

8384

84-
async def execute_message(routing_key: str, message_body: bytes, service: InfrahubServices) -> Optional[MessageTTL]:
85+
async def execute_message(
86+
routing_key: str, message_body: bytes, service: InfrahubServices, skip_flow: bool = False
87+
) -> Optional[MessageTTL]:
8588
message_data = ujson.loads(message_body)
8689
message = messages.MESSAGE_MAP[routing_key](**message_data)
8790
message.set_log_data(routing_key=routing_key)
8891
try:
89-
await COMMAND_MAP[routing_key](message=message, service=service)
92+
func = COMMAND_MAP[routing_key]
93+
if skip_flow and isinstance(func, Flow):
94+
func = func.fn
95+
await func(message=message, service=service)
9096
except Exception as exc: # pylint: disable=broad-except
9197
if message.reply_requested:
9298
response = RPCErrorResponse(errors=[str(exc)], initial_message=message.model_dump())

backend/infrahub/message_bus/operations/check/artifact.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Union
22

3+
from prefect import flow
4+
35
from infrahub.core.constants import InfrahubKind, ValidatorConclusion
46
from infrahub.core.timestamp import Timestamp
57
from infrahub.git.repository import InfrahubReadOnlyRepository, InfrahubRepository
@@ -12,6 +14,7 @@
1214
log = get_logger()
1315

1416

17+
@flow(name="git-repository-check-artifact-create")
1518
async def create(message: messages.CheckArtifactCreate, service: InfrahubServices) -> None:
1619
log.debug("Creating artifact", message=message)
1720
validator = await service.client.get(

backend/infrahub/message_bus/operations/check/generator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from infrahub_sdk import InfrahubNode
44
from infrahub_sdk.exceptions import ModuleImportError
55
from infrahub_sdk.schema import InfrahubGeneratorDefinitionConfig
6+
from prefect import flow
67

78
from infrahub import lock
89
from infrahub.core.constants import GeneratorInstanceStatus, InfrahubKind, ValidatorConclusion
@@ -16,6 +17,7 @@
1617
# pylint: disable=duplicate-code
1718

1819

20+
@flow(name="git-repository-check-generator-run")
1921
async def run(message: messages.CheckGeneratorRun, service: InfrahubServices) -> None:
2022
repository = await get_initialized_repo(
2123
repository_id=message.repository_id,

backend/infrahub/message_bus/operations/check/repository.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List
22

33
from infrahub_sdk import UUIDT
4+
from prefect import flow
45

56
from infrahub import lock
67
from infrahub.core.constants import InfrahubKind
@@ -16,6 +17,7 @@
1617
log = get_logger()
1718

1819

20+
@flow(name="git-repository-check-definition")
1921
async def check_definition(message: messages.CheckRepositoryCheckDefinition, service: InfrahubServices) -> None:
2022
definition = await service.client.get(
2123
kind=InfrahubKind.CHECKDEFINITION, id=message.check_definition_id, branch=message.branch_name
@@ -141,6 +143,7 @@ async def check_definition(message: messages.CheckRepositoryCheckDefinition, ser
141143
await service.send(message=event)
142144

143145

146+
@flow(name="git-repository-check-merge-conflict")
144147
async def merge_conflicts(message: messages.CheckRepositoryMergeConflicts, service: InfrahubServices) -> None:
145148
"""Runs a check to see if there are merge conflicts between two branches."""
146149
log.info(
@@ -222,6 +225,7 @@ async def merge_conflicts(message: messages.CheckRepositoryMergeConflicts, servi
222225
)
223226

224227

228+
@flow(name="git-repository-user-check")
225229
async def user_check(message: messages.CheckRepositoryUserCheck, service: InfrahubServices) -> None:
226230
validator = await service.client.get(kind=InfrahubKind.USERVALIDATOR, id=message.validator_id)
227231
await validator.checks.fetch()

backend/infrahub/message_bus/operations/event/branch.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import List
22

3+
from prefect import flow
4+
35
from infrahub.core import registry
46
from infrahub.core.diff.model.path import BranchTrackingId
57
from infrahub.core.diff.repository.repository import DiffRepository
@@ -11,6 +13,7 @@
1113
log = get_logger()
1214

1315

16+
@flow(name="event-branch-create")
1417
async def create(message: messages.EventBranchCreate, service: InfrahubServices) -> None:
1518
log.info("run_message", branch=message.branch)
1619

@@ -23,6 +26,7 @@ async def create(message: messages.EventBranchCreate, service: InfrahubServices)
2326
await service.send(message=event)
2427

2528

29+
@flow(name="event-branch-delete")
2630
async def delete(message: messages.EventBranchDelete, service: InfrahubServices) -> None:
2731
log.info("Branch was deleted", branch=message.branch)
2832

@@ -36,6 +40,7 @@ async def delete(message: messages.EventBranchDelete, service: InfrahubServices)
3640
await service.send(message=event)
3741

3842

43+
@flow(name="branch-event-merge")
3944
async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -> None:
4045
log.info("Branch merged", source_branch=message.source_branch, target_branch=message.target_branch)
4146

@@ -64,6 +69,7 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -
6469
await service.send(message=event)
6570

6671

72+
@flow(name="event-branch-rebased")
6773
async def rebased(message: messages.EventBranchRebased, service: InfrahubServices) -> None:
6874
log.info("Branch rebased", branch=message.branch)
6975

backend/infrahub/message_bus/operations/event/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import List
22

3+
from prefect import flow
4+
35
from infrahub.core.constants import InfrahubKind
46
from infrahub.log import get_logger
57
from infrahub.message_bus import InfrahubMessage, messages
@@ -8,6 +10,7 @@
810
log = get_logger()
911

1012

13+
@flow(name="event-node-mutated")
1114
async def mutated(
1215
message: messages.EventNodeMutated,
1316
service: InfrahubServices,

backend/infrahub/message_bus/operations/event/schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
from prefect import flow
2+
13
from infrahub.log import get_logger
24
from infrahub.message_bus import messages
35
from infrahub.services import InfrahubServices
46

57
log = get_logger()
68

79

10+
@flow(name="event-schema-update")
811
async def update(message: messages.EventSchemaUpdate, service: InfrahubServices) -> None:
912
log.info("run_message", branch=message.branch)
1013

backend/infrahub/message_bus/operations/event/worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
from prefect import flow
2+
13
from infrahub.message_bus import messages
24
from infrahub.services import InfrahubServices
35

46

7+
@flow(name="event-worker-newprimary-api")
58
async def new_primary_api(message: messages.EventWorkerNewPrimaryAPI, service: InfrahubServices) -> None:
69
service.log.info("api_worker promoted to primary", worker_id=message.worker_id)
710

backend/infrahub/message_bus/operations/finalize/validator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import config
24
from infrahub.core.timestamp import Timestamp
35
from infrahub.log import get_logger
@@ -8,6 +10,7 @@
810
log = get_logger()
911

1012

13+
@flow(name="validator-finalize-execution")
1114
async def execution(message: messages.FinalizeValidatorExecution, service: InfrahubServices) -> None:
1215
"""Monitors the status of checks associated with a validator and finalizes the conclusion of the validator
1316

backend/infrahub/message_bus/operations/git/branch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import lock
24
from infrahub.git.repository import InfrahubRepository
35
from infrahub.log import get_logger
@@ -7,6 +9,7 @@
79
log = get_logger()
810

911

12+
@flow(name="git-repository-branch-create")
1013
async def create(message: messages.GitBranchCreate, service: InfrahubServices) -> None:
1114
log.info("creating branch in repository", branch=message.branch, repository=message.repository_name)
1215
repo = await InfrahubRepository.init(id=message.repository_id, name=message.repository_name, client=service.client)

0 commit comments

Comments
 (0)