Skip to content

Commit 0fd2c40

Browse files
authored
Merge pull request #4842 from opsmill/lgu-migrate-req-gen-def-run
Migrate RequestGeneratorDefinitionRun to prefect
2 parents 6f92f6b + d909759 commit 0fd2c40

File tree

12 files changed

+102
-135
lines changed

12 files changed

+102
-135
lines changed

backend/infrahub/generators/models.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from typing import Optional
1+
from __future__ import annotations
22

3-
from pydantic import BaseModel, Field
3+
from typing import Optional
44

5-
from infrahub.message_bus.types import ProposedChangeGeneratorDefinition
5+
from pydantic import BaseModel, ConfigDict, Field
66

77

88
class RequestGeneratorRun(BaseModel):
@@ -21,3 +21,25 @@ class RequestGeneratorRun(BaseModel):
2121
target_name: str = Field(..., description="Name of the generator target")
2222
query: str = Field(..., description="The name of the query to use when collecting data")
2323
variables: dict = Field(..., description="Input variables when running the generator")
24+
25+
26+
class RequestGeneratorDefinitionRun(BaseModel):
27+
"""Sent to trigger a Generator to run on a specific branch."""
28+
29+
model_config = ConfigDict(arbitrary_types_allowed=True)
30+
31+
generator_definition: ProposedChangeGeneratorDefinition = Field(..., description="The Generator Definition")
32+
branch: str = Field(..., description="The branch to target")
33+
34+
35+
class ProposedChangeGeneratorDefinition(BaseModel):
36+
definition_id: str
37+
definition_name: str
38+
query_name: str
39+
convert_query_response: bool
40+
query_models: list[str]
41+
repository_id: str
42+
class_name: str
43+
file_path: str
44+
parameters: dict
45+
group_id: str

backend/infrahub/generators/tasks.py

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
from infrahub import lock
1010
from infrahub.core.constants import GeneratorInstanceStatus, InfrahubKind
11-
from infrahub.generators.models import RequestGeneratorRun
11+
from infrahub.generators.models import (
12+
ProposedChangeGeneratorDefinition,
13+
RequestGeneratorDefinitionRun,
14+
RequestGeneratorRun,
15+
)
1216
from infrahub.git.base import extract_repo_file_information
1317
from infrahub.git.repository import get_initialized_repo
14-
from infrahub.message_bus import messages
15-
from infrahub.message_bus.types import ProposedChangeGeneratorDefinition
1618
from infrahub.services import InfrahubServices, services
19+
from infrahub.workflows.catalogue import REQUEST_GENERATOR_DEFINITION_RUN, REQUEST_GENERATOR_RUN
1720

1821

1922
@flow(name="generator-run")
@@ -130,9 +133,61 @@ async def run_generator_definition(branch: str) -> None:
130133
for generator in generators
131134
]
132135

133-
events = [
134-
messages.RequestGeneratorDefinitionRun(branch=branch, generator_definition=generator_definition)
135-
for generator_definition in generator_definitions
136-
]
137-
for event in events:
138-
await service.send(message=event)
136+
for generator_definition in generator_definitions:
137+
model = RequestGeneratorDefinitionRun(branch=branch, generator_definition=generator_definition)
138+
await service.workflow.submit_workflow(workflow=REQUEST_GENERATOR_DEFINITION_RUN, parameters={"model": model})
139+
140+
141+
@flow(name="request_generator_definition_run")
142+
async def request_generator_definition_run(model: RequestGeneratorDefinitionRun) -> None:
143+
service = services.service
144+
async with service.task_report(
145+
title="Executing Generator",
146+
related_node=model.generator_definition.definition_id,
147+
) as task_report:
148+
group = await service.client.get(
149+
kind=InfrahubKind.GENERICGROUP,
150+
prefetch_relationships=True,
151+
populate_store=True,
152+
id=model.generator_definition.group_id,
153+
branch=model.branch,
154+
)
155+
await group.members.fetch()
156+
157+
existing_instances = await service.client.filters(
158+
kind=InfrahubKind.GENERATORINSTANCE,
159+
definition__ids=[model.generator_definition.definition_id],
160+
include=["object"],
161+
branch=model.branch,
162+
)
163+
instance_by_member = {}
164+
for instance in existing_instances:
165+
instance_by_member[instance.object.peer.id] = instance.id
166+
167+
repository = await service.client.get(
168+
kind=InfrahubKind.REPOSITORY, branch=model.branch, id=model.generator_definition.repository_id
169+
)
170+
171+
for relationship in group.members.peers:
172+
member = relationship.peer
173+
generator_instance = instance_by_member.get(member.id)
174+
request_generator_run_model = RequestGeneratorRun(
175+
generator_definition=model.generator_definition,
176+
commit=repository.commit.value,
177+
generator_instance=generator_instance,
178+
repository_id=repository.id,
179+
repository_name=repository.name.value,
180+
repository_kind=repository.typename,
181+
branch_name=model.branch,
182+
query=model.generator_definition.query_name,
183+
variables=member.extract(params=model.generator_definition.parameters),
184+
target_id=member.id,
185+
target_name=member.display_label,
186+
)
187+
await service.workflow.submit_workflow(
188+
workflow=REQUEST_GENERATOR_RUN, parameters={"model": request_generator_run_model}
189+
)
190+
191+
await task_report.info(
192+
event=f"Generator triggered for {len(group.members.peers)} members in {group.name.value}."
193+
)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from .refresh_webhook_configuration import RefreshWebhookConfiguration
3131
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
3232
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
33-
from .request_generatordefinition_run import RequestGeneratorDefinitionRun
3433
from .request_graphqlquerygroup_update import RequestGraphQLQueryGroupUpdate
3534
from .request_proposedchange_pipeline import RequestProposedChangePipeline
3635
from .request_repository_checks import RequestRepositoryChecks
@@ -67,7 +66,6 @@
6766
"refresh.webhook.configuration": RefreshWebhookConfiguration,
6867
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
6968
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
70-
"request.generator_definition.run": RequestGeneratorDefinitionRun,
7169
"request.graphql_query_group.update": RequestGraphQLQueryGroupUpdate,
7270
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
7371
"request.proposed_change.pipeline": RequestProposedChangePipeline,

backend/infrahub/message_bus/messages/check_generator_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
from pydantic import Field
44

5+
from infrahub.generators.models import ProposedChangeGeneratorDefinition
56
from infrahub.message_bus import InfrahubMessage
6-
from infrahub.message_bus.types import ProposedChangeGeneratorDefinition
77

88

99
class CheckGeneratorRun(InfrahubMessage):

backend/infrahub/message_bus/messages/request_generatordefinition_check.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from pydantic import ConfigDict, Field
22

3+
from infrahub.generators.models import ProposedChangeGeneratorDefinition
34
from infrahub.message_bus import InfrahubMessage
4-
from infrahub.message_bus.types import ProposedChangeBranchDiff, ProposedChangeGeneratorDefinition
5+
from infrahub.message_bus.types import ProposedChangeBranchDiff
56

67

78
class RequestGeneratorDefinitionCheck(InfrahubMessage):

backend/infrahub/message_bus/messages/request_generatordefinition_run.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
4343
"refresh.webhook.configuration": refresh.webhook.configuration,
4444
"request.generator_definition.check": requests.generator_definition.check,
45-
"request.generator_definition.run": requests.generator_definition.run,
4645
"request.graphql_query_group.update": requests.graphql_query_group.update,
4746
"request.artifact_definition.check": requests.artifact_definition.check,
4847
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,

backend/infrahub/message_bus/operations/requests/generator_definition.py

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55

66
from infrahub.core.constants import InfrahubKind, ValidatorConclusion, ValidatorState
77
from infrahub.core.timestamp import Timestamp
8-
from infrahub.generators.models import RequestGeneratorRun
98
from infrahub.message_bus import InfrahubMessage, Meta, messages
109
from infrahub.message_bus.types import KVTTL
1110
from infrahub.services import InfrahubServices
12-
from infrahub.workflows.catalogue import REQUEST_GENERATOR_RUN
1311

1412

1513
@flow(name="generator-definition-check")
@@ -131,64 +129,6 @@ async def check(message: messages.RequestGeneratorDefinitionCheck, service: Infr
131129
await service.send(message=event)
132130

133131

134-
@flow(name="generator-definition-run")
135-
async def run(message: messages.RequestGeneratorDefinitionRun, service: InfrahubServices) -> None:
136-
async with service.task_report(
137-
title="Executing Generator",
138-
related_node=message.generator_definition.definition_id,
139-
) as task_report:
140-
service.log.info(
141-
"Received request to run generator",
142-
branch=message.branch,
143-
generator_definition=message.generator_definition.definition_id,
144-
)
145-
146-
group = await service.client.get(
147-
kind=InfrahubKind.GENERICGROUP,
148-
prefetch_relationships=True,
149-
populate_store=True,
150-
id=message.generator_definition.group_id,
151-
branch=message.branch,
152-
)
153-
await group.members.fetch()
154-
155-
existing_instances = await service.client.filters(
156-
kind=InfrahubKind.GENERATORINSTANCE,
157-
definition__ids=[message.generator_definition.definition_id],
158-
include=["object"],
159-
branch=message.branch,
160-
)
161-
instance_by_member = {}
162-
for instance in existing_instances:
163-
instance_by_member[instance.object.peer.id] = instance.id
164-
165-
repository = await service.client.get(
166-
kind=InfrahubKind.REPOSITORY, branch=message.branch, id=message.generator_definition.repository_id
167-
)
168-
169-
for relationship in group.members.peers:
170-
member = relationship.peer
171-
generator_instance = instance_by_member.get(member.id)
172-
model = RequestGeneratorRun(
173-
generator_definition=message.generator_definition,
174-
commit=repository.commit.value,
175-
generator_instance=generator_instance,
176-
repository_id=repository.id,
177-
repository_name=repository.name.value,
178-
repository_kind=repository.typename,
179-
branch_name=message.branch,
180-
query=message.generator_definition.query_name,
181-
variables=member.extract(params=message.generator_definition.parameters),
182-
target_id=member.id,
183-
target_name=member.display_label,
184-
)
185-
await service.workflow.submit_workflow(workflow=REQUEST_GENERATOR_RUN, parameters={"model": model})
186-
187-
await task_report.info(
188-
event=f"Generator triggered for {len(group.members.peers)} members in {group.name.value}."
189-
)
190-
191-
192132
def _run_generator(instance_id: Optional[str], managed_branch: bool, impacted_instances: list[str]) -> bool:
193133
"""Returns a boolean to indicate if a generator instance needs to be executed
194134
Will return true if:

backend/infrahub/message_bus/operations/requests/proposed_change.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
from infrahub.core.validators.checker import schema_validators_checker
2323
from infrahub.core.validators.determiner import ConstraintValidatorDeterminer
2424
from infrahub.dependencies.registry import get_component_registry
25+
from infrahub.generators.models import ProposedChangeGeneratorDefinition
2526
from infrahub.git.repository import InfrahubRepository, get_initialized_repo
2627
from infrahub.log import get_logger
2728
from infrahub.message_bus import InfrahubMessage, messages
2829
from infrahub.message_bus.types import (
2930
ProposedChangeArtifactDefinition,
3031
ProposedChangeBranchDiff,
31-
ProposedChangeGeneratorDefinition,
3232
ProposedChangeRepository,
3333
ProposedChangeSubscriber,
3434
)

backend/infrahub/message_bus/types.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,6 @@ def transform_location(self) -> str:
107107
raise ValueError("Invalid kind for Transform")
108108

109109

110-
class ProposedChangeGeneratorDefinition(BaseModel):
111-
definition_id: str
112-
definition_name: str
113-
query_name: str
114-
convert_query_response: bool
115-
query_models: list[str]
116-
repository_id: str
117-
class_name: str
118-
file_path: str
119-
parameters: dict
120-
group_id: str
121-
122-
123110
class ProposedChangeBranchDiff(BaseModel):
124111
diff_summary: list[NodeDiff] = Field(default_factory=list, description="The DiffSummary between two branches")
125112
repositories: list[ProposedChangeRepository] = Field(default_factory=list)

0 commit comments

Comments
 (0)