Skip to content

Commit 0c015ce

Browse files
committed
Migrate RequestGeneratorDefinitionRun to prefect
1 parent 93c4695 commit 0c015ce

File tree

8 files changed

+84
-116
lines changed

8 files changed

+84
-116
lines changed

backend/infrahub/generators/models.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Optional
22

3-
from pydantic import BaseModel, Field
3+
from pydantic import BaseModel, ConfigDict, Field
44

55
from infrahub.message_bus.types import ProposedChangeGeneratorDefinition
66

@@ -21,3 +21,12 @@ class RequestGeneratorRun(BaseModel):
2121
target_name: str = Field(..., description="Name of the generator target")
2222
query: str = Field(..., description="The name of the query to use when collecting data")
2323
variables: dict = Field(..., description="Input variables when running the generator")
24+
25+
26+
class RequestGeneratorDefinitionRun(BaseModel):
27+
"""Sent to trigger a Generator to run on a specific branch."""
28+
29+
model_config = ConfigDict(arbitrary_types_allowed=True)
30+
31+
generator_definition: ProposedChangeGeneratorDefinition = Field(..., description="The Generator Definition")
32+
branch: str = Field(..., description="The branch to target")

backend/infrahub/generators/tasks.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88

99
from infrahub import lock
1010
from infrahub.core.constants import GeneratorInstanceStatus, InfrahubKind
11-
from infrahub.generators.models import RequestGeneratorRun
11+
from infrahub.generators.models import RequestGeneratorDefinitionRun, RequestGeneratorRun
1212
from infrahub.git.base import extract_repo_file_information
1313
from infrahub.git.repository import get_initialized_repo
14-
from infrahub.message_bus import messages
1514
from infrahub.message_bus.types import ProposedChangeGeneratorDefinition
1615
from infrahub.services import InfrahubServices, services
16+
from infrahub.workflows.catalogue import REQUEST_GENERATOR_DEFINITION_RUN, REQUEST_GENERATOR_RUN
1717

1818

1919
@flow(name="generator-run")
@@ -130,9 +130,66 @@ async def run_generator_definition(branch: str) -> None:
130130
for generator in generators
131131
]
132132

133-
events = [
134-
messages.RequestGeneratorDefinitionRun(branch=branch, generator_definition=generator_definition)
135-
for generator_definition in generator_definitions
136-
]
137-
for event in events:
138-
await service.send(message=event)
133+
for generator_definition in generator_definitions:
134+
model = RequestGeneratorDefinitionRun(branch=branch, generator_definition=generator_definition)
135+
await service.workflow.submit_workflow(workflow=REQUEST_GENERATOR_DEFINITION_RUN, parameters={"model": model})
136+
137+
138+
@flow(name="request_generator_definition_run")
139+
async def request_generator_definition_run(model: RequestGeneratorDefinitionRun, service: InfrahubServices) -> None:
140+
async with service.task_report(
141+
title="Executing Generator",
142+
related_node=model.generator_definition.definition_id,
143+
) as task_report:
144+
service.log.info(
145+
"Received request to run generator",
146+
branch=model.branch,
147+
generator_definition=model.generator_definition.definition_id,
148+
)
149+
150+
group = await service.client.get(
151+
kind=InfrahubKind.GENERICGROUP,
152+
prefetch_relationships=True,
153+
populate_store=True,
154+
id=model.generator_definition.group_id,
155+
branch=model.branch,
156+
)
157+
await group.members.fetch()
158+
159+
existing_instances = await service.client.filters(
160+
kind=InfrahubKind.GENERATORINSTANCE,
161+
definition__ids=[model.generator_definition.definition_id],
162+
include=["object"],
163+
branch=model.branch,
164+
)
165+
instance_by_member = {}
166+
for instance in existing_instances:
167+
instance_by_member[instance.object.peer.id] = instance.id
168+
169+
repository = await service.client.get(
170+
kind=InfrahubKind.REPOSITORY, branch=model.branch, id=model.generator_definition.repository_id
171+
)
172+
173+
for relationship in group.members.peers:
174+
member = relationship.peer
175+
generator_instance = instance_by_member.get(member.id)
176+
request_generator_run_model = RequestGeneratorRun(
177+
generator_definition=model.generator_definition,
178+
commit=repository.commit.value,
179+
generator_instance=generator_instance,
180+
repository_id=repository.id,
181+
repository_name=repository.name.value,
182+
repository_kind=repository.typename,
183+
branch_name=model.branch,
184+
query=model.generator_definition.query_name,
185+
variables=member.extract(params=model.generator_definition.parameters),
186+
target_id=member.id,
187+
target_name=member.display_label,
188+
)
189+
await service.workflow.submit_workflow(
190+
workflow=REQUEST_GENERATOR_RUN, parameters={"model": request_generator_run_model}
191+
)
192+
193+
await task_report.info(
194+
event=f"Generator triggered for {len(group.members.peers)} members in {group.name.value}."
195+
)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from .refresh_webhook_configuration import RefreshWebhookConfiguration
3131
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
3232
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
33-
from .request_generatordefinition_run import RequestGeneratorDefinitionRun
3433
from .request_graphqlquerygroup_update import RequestGraphQLQueryGroupUpdate
3534
from .request_proposedchange_pipeline import RequestProposedChangePipeline
3635
from .request_repository_checks import RequestRepositoryChecks
@@ -67,7 +66,6 @@
6766
"refresh.webhook.configuration": RefreshWebhookConfiguration,
6867
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
6968
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
70-
"request.generator_definition.run": RequestGeneratorDefinitionRun,
7169
"request.graphql_query_group.update": RequestGraphQLQueryGroupUpdate,
7270
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
7371
"request.proposed_change.pipeline": RequestProposedChangePipeline,

backend/infrahub/message_bus/messages/request_generatordefinition_run.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
4343
"refresh.webhook.configuration": refresh.webhook.configuration,
4444
"request.generator_definition.check": requests.generator_definition.check,
45-
"request.generator_definition.run": requests.generator_definition.run,
4645
"request.graphql_query_group.update": requests.graphql_query_group.update,
4746
"request.artifact_definition.check": requests.artifact_definition.check,
4847
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,

backend/infrahub/message_bus/operations/requests/generator_definition.py

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55

66
from infrahub.core.constants import InfrahubKind, ValidatorConclusion, ValidatorState
77
from infrahub.core.timestamp import Timestamp
8-
from infrahub.generators.models import RequestGeneratorRun
98
from infrahub.message_bus import InfrahubMessage, Meta, messages
109
from infrahub.message_bus.types import KVTTL
1110
from infrahub.services import InfrahubServices
12-
from infrahub.workflows.catalogue import REQUEST_GENERATOR_RUN
1311

1412

1513
@flow(name="generator-definition-check")
@@ -131,64 +129,6 @@ async def check(message: messages.RequestGeneratorDefinitionCheck, service: Infr
131129
await service.send(message=event)
132130

133131

134-
@flow(name="generator-definition-run")
135-
async def run(message: messages.RequestGeneratorDefinitionRun, service: InfrahubServices) -> None:
136-
async with service.task_report(
137-
title="Executing Generator",
138-
related_node=message.generator_definition.definition_id,
139-
) as task_report:
140-
service.log.info(
141-
"Received request to run generator",
142-
branch=message.branch,
143-
generator_definition=message.generator_definition.definition_id,
144-
)
145-
146-
group = await service.client.get(
147-
kind=InfrahubKind.GENERICGROUP,
148-
prefetch_relationships=True,
149-
populate_store=True,
150-
id=message.generator_definition.group_id,
151-
branch=message.branch,
152-
)
153-
await group.members.fetch()
154-
155-
existing_instances = await service.client.filters(
156-
kind=InfrahubKind.GENERATORINSTANCE,
157-
definition__ids=[message.generator_definition.definition_id],
158-
include=["object"],
159-
branch=message.branch,
160-
)
161-
instance_by_member = {}
162-
for instance in existing_instances:
163-
instance_by_member[instance.object.peer.id] = instance.id
164-
165-
repository = await service.client.get(
166-
kind=InfrahubKind.REPOSITORY, branch=message.branch, id=message.generator_definition.repository_id
167-
)
168-
169-
for relationship in group.members.peers:
170-
member = relationship.peer
171-
generator_instance = instance_by_member.get(member.id)
172-
model = RequestGeneratorRun(
173-
generator_definition=message.generator_definition,
174-
commit=repository.commit.value,
175-
generator_instance=generator_instance,
176-
repository_id=repository.id,
177-
repository_name=repository.name.value,
178-
repository_kind=repository.typename,
179-
branch_name=message.branch,
180-
query=message.generator_definition.query_name,
181-
variables=member.extract(params=message.generator_definition.parameters),
182-
target_id=member.id,
183-
target_name=member.display_label,
184-
)
185-
await service.workflow.submit_workflow(workflow=REQUEST_GENERATOR_RUN, parameters={"model": model})
186-
187-
await task_report.info(
188-
event=f"Generator triggered for {len(group.members.peers)} members in {group.name.value}."
189-
)
190-
191-
192132
def _run_generator(instance_id: Optional[str], managed_branch: bool, impacted_instances: list[str]) -> bool:
193133
"""Returns a boolean to indicate if a generator instance needs to be executed
194134
Will return true if:

backend/infrahub/workflows/catalogue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@
8585
function="run_generator",
8686
)
8787

88+
REQUEST_GENERATOR_DEFINITION_RUN = WorkflowDefinition(
89+
name="request_generator_definition_run",
90+
type=WorkflowType.INTERNAL,
91+
module="infrahub.generators.tasks",
92+
function="request_generator_definition_run",
93+
branch_support=BranchSupportType.AWARE,
94+
)
95+
8896
REQUEST_ARTIFACT_GENERATE = WorkflowDefinition(
8997
name="artifact-generate",
9098
type=WorkflowType.INTERNAL,
@@ -204,4 +212,5 @@
204212
GIT_REPOSITORIES_MERGE,
205213
TRIGGER_GENERATOR_DEFINITION_RUN,
206214
BRANCH_CANCEL_PROPOSED_CHANGES,
215+
REQUEST_GENERATOR_DEFINITION_RUN,
207216
]

docs/docs/reference/message-bus-events.mdx

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -571,21 +571,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
571571
| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None |
572572
| **destination_branch** | The target branch | string | None |
573573
<!-- vale on -->
574-
<!-- vale off -->
575-
#### Event request.generator_definition.run
576-
<!-- vale on -->
577-
578-
**Description**: Sent to trigger a Generator to run on a specific branch.
579-
580-
**Priority**: 3
581-
582-
<!-- vale off -->
583-
| Key | Description | Type | Default Value |
584-
|-----|-------------|------|---------------|
585-
| **meta** | Meta properties for the message | N/A | None |
586-
| **generator_definition** | The Generator Definition | N/A | None |
587-
| **branch** | The branch to target | string | None |
588-
<!-- vale on -->
589574

590575
<!-- vale off -->
591576
### Request Graphql Query Group
@@ -1414,22 +1399,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
14141399
| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None |
14151400
| **destination_branch** | The target branch | string | None |
14161401
<!-- vale on -->
1417-
<!-- vale off -->
1418-
#### Event request.generator_definition.run
1419-
<!-- vale on -->
1420-
1421-
**Description**: Sent to trigger a Generator to run on a specific branch.
1422-
1423-
**Priority**: 3
1424-
1425-
1426-
<!-- vale off -->
1427-
| Key | Description | Type | Default Value |
1428-
|-----|-------------|------|---------------|
1429-
| **meta** | Meta properties for the message | N/A | None |
1430-
| **generator_definition** | The Generator Definition | N/A | None |
1431-
| **branch** | The branch to target | string | None |
1432-
<!-- vale on -->
14331402

14341403
<!-- vale off -->
14351404
### Request Graphql Query Group

0 commit comments

Comments
 (0)