Skip to content

Commit 618bc69

Browse files
committed
Add flow decorator to all tasks
1 parent 5a220b3 commit 618bc69

File tree

12 files changed

+47
-2
lines changed

12 files changed

+47
-2
lines changed

backend/infrahub/message_bus/operations/event/branch.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import List
22

3+
from prefect import flow
4+
35
from infrahub.core import registry
46
from infrahub.core.diff.model.path import BranchTrackingId
57
from infrahub.core.diff.repository.repository import DiffRepository
@@ -11,6 +13,7 @@
1113
log = get_logger()
1214

1315

16+
@flow(name="event-branch-create")
1417
async def create(message: messages.EventBranchCreate, service: InfrahubServices) -> None:
1518
log.info("run_message", branch=message.branch)
1619

@@ -23,6 +26,7 @@ async def create(message: messages.EventBranchCreate, service: InfrahubServices)
2326
await service.send(message=event)
2427

2528

29+
@flow(name="event-branch-delete")
2630
async def delete(message: messages.EventBranchDelete, service: InfrahubServices) -> None:
2731
log.info("Branch was deleted", branch=message.branch)
2832

@@ -36,6 +40,7 @@ async def delete(message: messages.EventBranchDelete, service: InfrahubServices)
3640
await service.send(message=event)
3741

3842

43+
@flow(name="branch-event-merge")
3944
async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -> None:
4045
log.info("Branch merged", source_branch=message.source_branch, target_branch=message.target_branch)
4146

@@ -64,6 +69,7 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -
6469
await service.send(message=event)
6570

6671

72+
@flow(name="event-branch-rebased")
6773
async def rebased(message: messages.EventBranchRebased, service: InfrahubServices) -> None:
6874
log.info("Branch rebased", branch=message.branch)
6975

backend/infrahub/message_bus/operations/event/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import List
22

3+
from prefect import flow
4+
35
from infrahub.core.constants import InfrahubKind
46
from infrahub.log import get_logger
57
from infrahub.message_bus import InfrahubMessage, messages
@@ -8,6 +10,7 @@
810
log = get_logger()
911

1012

13+
@flow(name="event-node-mutated")
1114
async def mutated(
1215
message: messages.EventNodeMutated,
1316
service: InfrahubServices,

backend/infrahub/message_bus/operations/event/schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
from prefect import flow
2+
13
from infrahub.log import get_logger
24
from infrahub.message_bus import messages
35
from infrahub.services import InfrahubServices
46

57
log = get_logger()
68

79

10+
@flow(name="event-schema-update")
811
async def update(message: messages.EventSchemaUpdate, service: InfrahubServices) -> None:
912
log.info("run_message", branch=message.branch)
1013

backend/infrahub/message_bus/operations/event/worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
from prefect import flow
2+
13
from infrahub.message_bus import messages
24
from infrahub.services import InfrahubServices
35

46

7+
@flow(name="event-worker-newprimary-api")
58
async def new_primary_api(message: messages.EventWorkerNewPrimaryAPI, service: InfrahubServices) -> None:
69
service.log.info("api_worker promoted to primary", worker_id=message.worker_id)
710

backend/infrahub/message_bus/operations/git/repository.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ async def import_objects(message: messages.GitRepositoryImportObjects, service:
104104
await repo.import_objects_from_files(infrahub_branch_name=message.infrahub_branch_name, commit=message.commit)
105105

106106

107+
@flow(name="git-repository-pull-read-only")
107108
async def pull_read_only(message: messages.GitRepositoryPullReadOnly, service: InfrahubServices) -> None:
108109
if not message.ref and not message.commit:
109110
log.warning(
@@ -154,6 +155,7 @@ async def pull_read_only(message: messages.GitRepositoryPullReadOnly, service: I
154155
await repo.sync_from_remote(commit=message.commit)
155156

156157

158+
@flow(name="git-repository-merge")
157159
async def merge(message: messages.GitRepositoryMerge, service: InfrahubServices) -> None:
158160
log.info(
159161
"Merging repository branch",

backend/infrahub/message_bus/operations/schema/migration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from prefect import task
1+
from prefect import flow, task
22
from prefect.logging import get_run_logger
33
from prefect.runtime import task_run
44

@@ -15,6 +15,7 @@
1515
log = get_logger()
1616

1717

18+
@flow(name="schema-migration-path")
1819
async def path(message: SchemaMigrationPath, service: InfrahubServices) -> None:
1920
async with service.database.start_session() as db:
2021
node_kind = None

backend/infrahub/message_bus/operations/schema/validator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from prefect import task
1+
from prefect import flow, task
22
from prefect.runtime import task_run
33

44
from infrahub.core.validators.aggregated_checker import AggregatedConstraintChecker
@@ -16,6 +16,7 @@
1616
log = get_logger()
1717

1818

19+
@flow(name="schema-validator-path")
1920
async def path(message: SchemaValidatorPath, service: InfrahubServices) -> None:
2021
async with service.database.start_session() as db:
2122
log.info(

backend/infrahub/message_bus/operations/send/echo.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
from prefect import flow
2+
13
from infrahub.message_bus import messages
24
from infrahub.message_bus.messages.send_echo_request import SendEchoRequestResponse, SendEchoRequestResponseData
35
from infrahub.services import InfrahubServices
46

57

8+
@flow(name="echo-request")
69
async def request(message: messages.SendEchoRequest, service: InfrahubServices) -> None:
710
service.log.info(f"Received message: {message.message}")
811
if message.reply_requested:

backend/infrahub/message_bus/operations/send/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict:
9696
return data
9797

9898

99+
@flow(name="telemetry-push-legacy")
99100
async def push(
100101
message: messages.SendTelemetryPush, # pylint: disable=unused-argument
101102
service: InfrahubServices,

backend/infrahub/message_bus/operations/send/webhook.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from infrahub.webhook import CustomWebhook, StandardWebhook, TransformWebhook, Webhook
1212

1313

14+
@flow(name="event-send-webhook")
1415
async def event(message: messages.SendWebhookEvent, service: InfrahubServices) -> None:
1516
async with service.task_report(
1617
related_node=message.webhook_id,

0 commit comments

Comments
 (0)