Skip to content

Commit 5f20627

Browse files
authored
Merge pull request #4845 from opsmill/lgu-migrate-req-gql-group-update
Migrate RequestGraphQLQueryGroupUpdate to prefect
2 parents ad47864 + 8689304 commit 5f20627

File tree

15 files changed

+161
-189
lines changed

15 files changed

+161
-189
lines changed

backend/infrahub/api/query.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
GRAPHQL_TOP_LEVEL_QUERIES_METRICS,
2525
)
2626
from infrahub.graphql.utils import extract_data
27+
from infrahub.groups.models import RequestGraphQLQueryGroupUpdate
2728
from infrahub.log import get_logger
28-
from infrahub.message_bus import messages
29+
from infrahub.workflows.catalogue import UPDATE_GRAPHQL_QUERY_GROUP
2930

3031
if TYPE_CHECKING:
3132
from infrahub.auth import AccountSession
@@ -106,16 +107,15 @@ async def execute_query(
106107

107108
if update_group:
108109
service: InfrahubServices = request.app.state.service
109-
await service.send(
110-
message=messages.RequestGraphQLQueryGroupUpdate(
111-
branch=branch_params.branch.name,
112-
query_id=gql_query.id,
113-
query_name=gql_query.name.value,
114-
related_node_ids=sorted(list(related_node_ids)),
115-
subscribers=sorted(subscribers),
116-
params=params,
117-
)
110+
model = RequestGraphQLQueryGroupUpdate(
111+
branch=branch_params.branch.name,
112+
query_id=gql_query.id,
113+
query_name=gql_query.name.value,
114+
related_node_ids=sorted(list(related_node_ids)),
115+
subscribers=sorted(subscribers),
116+
params=params,
118117
)
118+
await service.workflow.submit_workflow(workflow=UPDATE_GRAPHQL_QUERY_GROUP, parameters={"model": model})
119119

120120
return response_payload
121121

backend/infrahub/groups/__init__.py

Whitespace-only changes.
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from pydantic import Field
1+
from pydantic import BaseModel, Field
22

3-
from infrahub.message_bus import InfrahubMessage
43

5-
6-
class RequestGraphQLQueryGroupUpdate(InfrahubMessage):
4+
class RequestGraphQLQueryGroupUpdate(BaseModel):
75
"""Sent to create or update a GraphQLQueryGroup associated with a given GraphQLQuery."""
86

97
branch: str = Field(..., description="The branch to target")

backend/infrahub/groups/tasks.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from infrahub_sdk.groups import group_add_subscriber
2+
from infrahub_sdk.utils import dict_hash
3+
from prefect import flow
4+
5+
from infrahub.core.constants import InfrahubKind
6+
from infrahub.groups.models import RequestGraphQLQueryGroupUpdate
7+
from infrahub.services import services
8+
from infrahub.workflows.utils import add_branch_tag
9+
10+
11+
@flow(name="update_graphql_query_group")
12+
async def update_graphql_query_group(model: RequestGraphQLQueryGroupUpdate) -> None:
13+
"""Create or Update a GraphQLQueryGroup."""
14+
15+
await add_branch_tag(branch_name=model.branch)
16+
service = services.service
17+
18+
params_hash = dict_hash(model.params)
19+
group_name = f"{model.query_name}__{params_hash}"
20+
group_label = f"Query {model.query_name} Hash({params_hash[:8]})"
21+
group = await service.client.create(
22+
kind=InfrahubKind.GRAPHQLQUERYGROUP,
23+
branch=model.branch,
24+
name=group_name,
25+
label=group_label,
26+
group_type="internal",
27+
query=model.query_id,
28+
parameters=model.params,
29+
members=model.related_node_ids,
30+
)
31+
await group.save(allow_upsert=True)
32+
33+
if model.subscribers:
34+
await group_add_subscriber(
35+
client=service.client, group=group, subscribers=model.subscribers, branch=model.branch
36+
)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from .refresh_webhook_configuration import RefreshWebhookConfiguration
3131
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
3232
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
33-
from .request_graphqlquerygroup_update import RequestGraphQLQueryGroupUpdate
3433
from .request_proposedchange_pipeline import RequestProposedChangePipeline
3534
from .request_repository_checks import RequestRepositoryChecks
3635
from .request_repository_userchecks import RequestRepositoryUserChecks
@@ -66,7 +65,6 @@
6665
"refresh.webhook.configuration": RefreshWebhookConfiguration,
6766
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
6867
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
69-
"request.graphql_query_group.update": RequestGraphQLQueryGroupUpdate,
7068
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
7169
"request.proposed_change.pipeline": RequestProposedChangePipeline,
7270
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
4343
"refresh.webhook.configuration": refresh.webhook.configuration,
4444
"request.generator_definition.check": requests.generator_definition.check,
45-
"request.graphql_query_group.update": requests.graphql_query_group.update,
4645
"request.artifact_definition.check": requests.artifact_definition.check,
4746
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,
4847
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
from . import (
22
artifact_definition,
33
generator_definition,
4-
graphql_query_group,
54
proposed_change,
65
repository,
76
)
87

98
__all__ = [
109
"artifact_definition",
1110
"generator_definition",
12-
"graphql_query_group",
1311
"proposed_change",
1412
"repository",
1513
]

backend/infrahub/message_bus/operations/requests/graphql_query_group.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

backend/infrahub/workflows/catalogue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@
187187
function="cancel_proposed_changes_branch",
188188
)
189189

190+
UPDATE_GRAPHQL_QUERY_GROUP = WorkflowDefinition(
191+
name="update_graphql_query_group",
192+
type=WorkflowType.INTERNAL,
193+
module="infrahub.groups.tasks",
194+
function="update_graphql_query_group",
195+
branch_support=BranchSupportType.AWARE,
196+
)
197+
190198
worker_pools = [INFRAHUB_WORKER_POOL]
191199

192200
workflows = [
@@ -213,4 +221,5 @@
213221
TRIGGER_GENERATOR_DEFINITION_RUN,
214222
BRANCH_CANCEL_PROPOSED_CHANGES,
215223
REQUEST_GENERATOR_DEFINITION_RUN,
224+
UPDATE_GRAPHQL_QUERY_GROUP,
216225
]

backend/tests/helpers/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1+
from contextlib import contextmanager
2+
from typing import Generator
3+
4+
from infrahub_sdk import InfrahubClient
15
from testcontainers.core.container import DockerContainer
26
from testcontainers.core.waiting_utils import wait_for_logs
37

8+
from infrahub.services import InfrahubServices, services
9+
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
410
from tests.helpers.constants import PORT_BOLT_NEO4J, PORT_HTTP_NEO4J
511

612

@@ -29,3 +35,19 @@ def start_neo4j_container(neo4j_image: str) -> DockerContainer:
2935
container.start()
3036
wait_for_logs(container, "Started.") # wait_container_is_ready does not seem to be enough
3137
return container
38+
39+
40+
@contextmanager
41+
def init_service_with_client(client: InfrahubClient) -> Generator:
42+
"""
43+
This helper is needed for tests defining a specific client while `service` still needs to be accessed
44+
through a global variable within prefect tasks.
45+
"""
46+
47+
original = services.service
48+
service = InfrahubServices(client=client, workflow=WorkflowLocalExecution())
49+
services.service = service
50+
try:
51+
yield service
52+
finally:
53+
services.service = original

0 commit comments

Comments
 (0)