Skip to content

Commit bbfad29

Browse files
committed
Convert all operations to prefect flow
1 parent 74fd1b4 commit bbfad29

25 files changed

+91
-0
lines changed

backend/infrahub/message_bus/operations/check/artifact.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Union
22

3+
from prefect import flow
4+
35
from infrahub.core.constants import InfrahubKind, ValidatorConclusion
46
from infrahub.core.timestamp import Timestamp
57
from infrahub.git.repository import InfrahubReadOnlyRepository, InfrahubRepository
@@ -12,6 +14,7 @@
1214
log = get_logger()
1315

1416

17+
@flow(name="git-repository-check-artifact-create")
1518
async def create(message: messages.CheckArtifactCreate, service: InfrahubServices) -> None:
1619
log.debug("Creating artifact", message=message)
1720
validator = await service.client.get(

backend/infrahub/message_bus/operations/check/generator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from infrahub_sdk import InfrahubNode
44
from infrahub_sdk.exceptions import ModuleImportError
55
from infrahub_sdk.schema import InfrahubGeneratorDefinitionConfig
6+
from prefect import flow
67

78
from infrahub import lock
89
from infrahub.core.constants import GeneratorInstanceStatus, InfrahubKind, ValidatorConclusion
@@ -16,6 +17,7 @@
1617
# pylint: disable=duplicate-code
1718

1819

20+
@flow(name="git-repository-check-generator-run")
1921
async def run(message: messages.CheckGeneratorRun, service: InfrahubServices) -> None:
2022
repository = await get_initialized_repo(
2123
repository_id=message.repository_id,

backend/infrahub/message_bus/operations/check/repository.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List
22

33
from infrahub_sdk import UUIDT
4+
from prefect import flow
45

56
from infrahub import lock
67
from infrahub.core.constants import InfrahubKind
@@ -16,6 +17,7 @@
1617
log = get_logger()
1718

1819

20+
@flow(name="git-repository-check-definition")
1921
async def check_definition(message: messages.CheckRepositoryCheckDefinition, service: InfrahubServices) -> None:
2022
definition = await service.client.get(
2123
kind=InfrahubKind.CHECKDEFINITION, id=message.check_definition_id, branch=message.branch_name
@@ -141,6 +143,7 @@ async def check_definition(message: messages.CheckRepositoryCheckDefinition, ser
141143
await service.send(message=event)
142144

143145

146+
@flow(name="git-repository-check-merge-conflict")
144147
async def merge_conflicts(message: messages.CheckRepositoryMergeConflicts, service: InfrahubServices) -> None:
145148
"""Runs a check to see if there are merge conflicts between two branches."""
146149
log.info(
@@ -222,6 +225,7 @@ async def merge_conflicts(message: messages.CheckRepositoryMergeConflicts, servi
222225
)
223226

224227

228+
@flow(name="git-repository-user-check")
225229
async def user_check(message: messages.CheckRepositoryUserCheck, service: InfrahubServices) -> None:
226230
validator = await service.client.get(kind=InfrahubKind.USERVALIDATOR, id=message.validator_id)
227231
await validator.checks.fetch()

backend/infrahub/message_bus/operations/finalize/validator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import config
24
from infrahub.core.timestamp import Timestamp
35
from infrahub.log import get_logger
@@ -8,6 +10,7 @@
810
log = get_logger()
911

1012

13+
@flow(name="validator-finalize-execution")
1114
async def execution(message: messages.FinalizeValidatorExecution, service: InfrahubServices) -> None:
1215
"""Monitors the status of checks associated with a validator and finalizes the conclusion of the validator
1316

backend/infrahub/message_bus/operations/git/branch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import lock
24
from infrahub.git.repository import InfrahubRepository
35
from infrahub.log import get_logger
@@ -7,6 +9,7 @@
79
log = get_logger()
810

911

12+
@flow(name="git-repository-branch-create")
1013
async def create(message: messages.GitBranchCreate, service: InfrahubServices) -> None:
1114
log.info("creating branch in repository", branch=message.branch, repository=message.repository_name)
1215
repo = await InfrahubRepository.init(id=message.repository_id, name=message.repository_name, client=service.client)

backend/infrahub/message_bus/operations/git/diff.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub.git.repository import get_initialized_repo
24
from infrahub.log import get_logger
35
from infrahub.message_bus import messages
@@ -7,6 +9,7 @@
79
log = get_logger()
810

911

12+
@flow(name="git-repository-diff-files-names-only")
1013
async def names_only(message: messages.GitDiffNamesOnly, service: InfrahubServices) -> None:
1114
log.info(
1215
"Collecting modifications between commits",

backend/infrahub/message_bus/operations/git/file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub.git.repository import get_initialized_repo
24
from infrahub.log import get_logger
35
from infrahub.message_bus import messages
@@ -7,6 +9,7 @@
79
log = get_logger()
810

911

12+
@flow(name="git-repository-get-file")
1013
async def get(message: messages.GitFileGet, service: InfrahubServices) -> None:
1114
log.info("Collecting file from repository", repository=message.repository_name, file=message.file)
1215

backend/infrahub/message_bus/operations/git/repository.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import lock
24
from infrahub.core.constants import InfrahubKind, RepositoryInternalStatus
35
from infrahub.exceptions import RepositoryError
@@ -13,6 +15,7 @@
1315
log = get_logger()
1416

1517

18+
@flow(name="git-repository-add-read-write")
1619
async def add(message: messages.GitRepositoryAdd, service: InfrahubServices) -> None:
1720
log.info(
1821
"Cloning and importing repository",
@@ -43,6 +46,7 @@ async def add(message: messages.GitRepositoryAdd, service: InfrahubServices) ->
4346
await repo.sync()
4447

4548

49+
@flow(name="git-repository-add-read-only")
4650
async def add_read_only(message: messages.GitRepositoryAddReadOnly, service: InfrahubServices) -> None:
4751
log.info(
4852
"Cloning and importing read-only repository", repository=message.repository_name, location=message.location
@@ -67,6 +71,7 @@ async def add_read_only(message: messages.GitRepositoryAddReadOnly, service: Inf
6771
await repo.sync_from_remote()
6872

6973

74+
@flow(name="git-repository-check-connectivity")
7075
async def connectivity(message: messages.GitRepositoryConnectivity, service: InfrahubServices) -> None:
7176
response_data = GitRepositoryConnectivityResponseData(message="Successfully accessed repository", success=True)
7277

@@ -83,6 +88,7 @@ async def connectivity(message: messages.GitRepositoryConnectivity, service: Inf
8388
await service.reply(message=response, initiator=message)
8489

8590

91+
@flow(name="git-repository-import-object")
8692
async def import_objects(message: messages.GitRepositoryImportObjects, service: InfrahubServices) -> None:
8793
async with service.git_report(
8894
related_node=message.repository_id,

backend/infrahub/message_bus/operations/refresh/registry.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from prefect import flow
2+
13
from infrahub import lock
24
from infrahub.core.registry import registry
35
from infrahub.message_bus import messages
@@ -6,6 +8,7 @@
68
from infrahub.worker import WORKER_IDENTITY
79

810

11+
@flow(name="registry-branch-refresh")
912
async def branches(message: messages.RefreshRegistryBranches, service: InfrahubServices) -> None:
1013
if message.meta and message.meta.initiator_id == WORKER_IDENTITY:
1114
service.log.info("Ignoring refresh registry refresh request originating from self", worker=WORKER_IDENTITY)
@@ -17,6 +20,7 @@ async def branches(message: messages.RefreshRegistryBranches, service: InfrahubS
1720
await service.component.refresh_schema_hash()
1821

1922

23+
@flow(name="registry-branch-rebase")
2024
async def rebased_branch(message: messages.RefreshRegistryRebasedBranch, service: InfrahubServices) -> None:
2125
if message.meta and message.meta.initiator_id == WORKER_IDENTITY:
2226
service.log.info(

backend/infrahub/message_bus/operations/refresh/webhook.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import ujson
2+
from prefect import flow
23

34
from infrahub.core.constants import InfrahubKind
45
from infrahub.message_bus import messages
56
from infrahub.services import InfrahubServices
67

78

9+
@flow(name="registry-webhook-config-refresh")
810
async def configuration(
911
message: messages.RefreshWebhookConfiguration, # pylint: disable=unused-argument
1012
service: InfrahubServices,

0 commit comments

Comments
 (0)