Skip to content

Commit b0ec093

Browse files
committed
Migrate RequestGraphQLQueryGroupUpdate to prefect
1 parent ad47864 commit b0ec093

File tree

10 files changed

+100
-123
lines changed

10 files changed

+100
-123
lines changed

backend/infrahub/api/query.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
GRAPHQL_RESPONSE_SIZE_METRICS,
2424
GRAPHQL_TOP_LEVEL_QUERIES_METRICS,
2525
)
26+
from infrahub.graphql.models import RequestGraphQLQueryGroupUpdate
2627
from infrahub.graphql.utils import extract_data
2728
from infrahub.log import get_logger
28-
from infrahub.message_bus import messages
29+
from infrahub.workflows.catalogue import REQUEST_GRAPHQL_QUERY_GROUP_UPDATE
2930

3031
if TYPE_CHECKING:
3132
from infrahub.auth import AccountSession
@@ -106,16 +107,15 @@ async def execute_query(
106107

107108
if update_group:
108109
service: InfrahubServices = request.app.state.service
109-
await service.send(
110-
message=messages.RequestGraphQLQueryGroupUpdate(
111-
branch=branch_params.branch.name,
112-
query_id=gql_query.id,
113-
query_name=gql_query.name.value,
114-
related_node_ids=sorted(list(related_node_ids)),
115-
subscribers=sorted(subscribers),
116-
params=params,
117-
)
110+
model = RequestGraphQLQueryGroupUpdate(
111+
branch=branch_params.branch.name,
112+
query_id=gql_query.id,
113+
query_name=gql_query.name.value,
114+
related_node_ids=sorted(list(related_node_ids)),
115+
subscribers=sorted(subscribers),
116+
params=params,
118117
)
118+
await service.workflow.submit_workflow(workflow=REQUEST_GRAPHQL_QUERY_GROUP_UPDATE, parameters={"model": model})
119119

120120
return response_payload
121121

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from pydantic import Field
1+
from pydantic import BaseModel, Field
22

3-
from infrahub.message_bus import InfrahubMessage
43

5-
6-
class RequestGraphQLQueryGroupUpdate(InfrahubMessage):
4+
class RequestGraphQLQueryGroupUpdate(BaseModel):
75
"""Sent to create or update a GraphQLQueryGroup associated with a given GraphQLQuery."""
86

97
branch: str = Field(..., description="The branch to target")
Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66
from prefect import flow
77

88
from infrahub.core.constants import InfrahubKind
9-
from infrahub.log import get_logger
10-
from infrahub.message_bus import messages
11-
from infrahub.services import InfrahubServices
9+
from infrahub.graphql.models import RequestGraphQLQueryGroupUpdate
10+
from infrahub.services import services
11+
from infrahub.workflows.utils import add_branch_tag
1212

13-
log = get_logger()
1413

15-
16-
async def group_add_subscriber(
14+
async def _group_add_subscriber(
1715
client: InfrahubClient, group: InfrahubNode, subscribers: List[str], branch: str
1816
) -> dict:
1917
subscribers_str = ["{ id: " + f'"{subscriber}"' + " }" for subscriber in subscribers]
@@ -37,26 +35,29 @@ async def group_add_subscriber(
3735
return await client.execute_graphql(query=query, branch_name=branch, tracker="mutation-relationshipadd")
3836

3937

40-
@flow(name="graphql-query-update")
41-
async def update(message: messages.RequestGraphQLQueryGroupUpdate, service: InfrahubServices) -> None:
38+
@flow(name="request_graphql_query_group_update")
39+
async def request_graphql_query_group_update(model: RequestGraphQLQueryGroupUpdate) -> None:
4240
"""Create or Update a GraphQLQueryGroup."""
4341

44-
params_hash = dict_hash(message.params)
45-
group_name = f"{message.query_name}__{params_hash}"
46-
group_label = f"Query {message.query_name} Hash({params_hash[:8]})"
42+
await add_branch_tag(branch_name=model.branch)
43+
service = services.service
44+
45+
params_hash = dict_hash(model.params)
46+
group_name = f"{model.query_name}__{params_hash}"
47+
group_label = f"Query {model.query_name} Hash({params_hash[:8]})"
4748
group = await service.client.create(
4849
kind=InfrahubKind.GRAPHQLQUERYGROUP,
49-
branch=message.branch,
50+
branch=model.branch,
5051
name=group_name,
5152
label=group_label,
5253
group_type="internal",
53-
query=message.query_id,
54-
parameters=message.params,
55-
members=message.related_node_ids,
54+
query=model.query_id,
55+
parameters=model.params,
56+
members=model.related_node_ids,
5657
)
5758
await group.save(allow_upsert=True)
5859

59-
if message.subscribers:
60-
await group_add_subscriber(
61-
client=service.client, group=group, subscribers=message.subscribers, branch=message.branch
60+
if model.subscribers:
61+
await _group_add_subscriber(
62+
client=service.client, group=group, subscribers=model.subscribers, branch=model.branch
6263
)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from .refresh_webhook_configuration import RefreshWebhookConfiguration
3131
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
3232
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
33-
from .request_graphqlquerygroup_update import RequestGraphQLQueryGroupUpdate
3433
from .request_proposedchange_pipeline import RequestProposedChangePipeline
3534
from .request_repository_checks import RequestRepositoryChecks
3635
from .request_repository_userchecks import RequestRepositoryUserChecks
@@ -66,7 +65,6 @@
6665
"refresh.webhook.configuration": RefreshWebhookConfiguration,
6766
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
6867
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
69-
"request.graphql_query_group.update": RequestGraphQLQueryGroupUpdate,
7068
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
7169
"request.proposed_change.pipeline": RequestProposedChangePipeline,
7270
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
4343
"refresh.webhook.configuration": refresh.webhook.configuration,
4444
"request.generator_definition.check": requests.generator_definition.check,
45-
"request.graphql_query_group.update": requests.graphql_query_group.update,
4645
"request.artifact_definition.check": requests.artifact_definition.check,
4746
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,
4847
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
from . import (
22
artifact_definition,
33
generator_definition,
4-
graphql_query_group,
54
proposed_change,
65
repository,
76
)
87

98
__all__ = [
109
"artifact_definition",
1110
"generator_definition",
12-
"graphql_query_group",
1311
"proposed_change",
1412
"repository",
1513
]

backend/infrahub/workflows/catalogue.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@
187187
function="cancel_proposed_changes_branch",
188188
)
189189

190+
REQUEST_GRAPHQL_QUERY_GROUP_UPDATE = WorkflowDefinition(
191+
name="request_graphql_query_group_update",
192+
type=WorkflowType.INTERNAL,
193+
module="infrahub.graphql.tasks",
194+
function="request_graphql_query_group_update",
195+
branch_support=BranchSupportType.AWARE,
196+
tags=[WorkflowTag.DATABASE_CHANGE],
197+
)
198+
190199
worker_pools = [INFRAHUB_WORKER_POOL]
191200

192201
workflows = [
@@ -213,4 +222,5 @@
213222
TRIGGER_GENERATOR_DEFINITION_RUN,
214223
BRANCH_CANCEL_PROPOSED_CHANGES,
215224
REQUEST_GENERATOR_DEFINITION_RUN,
225+
REQUEST_GRAPHQL_QUERY_GROUP_UPDATE,
216226
]

backend/tests/unit/api/test_05_query_api.py

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from __future__ import annotations
22

33
from typing import TYPE_CHECKING
4+
from unittest.mock import call, patch
45

56
import pytest
67

78
from infrahub.core.initialization import create_branch
8-
from infrahub.message_bus import messages
9+
from infrahub.graphql.models import RequestGraphQLQueryGroupUpdate
10+
from infrahub.workflows.catalogue import REQUEST_GRAPHQL_QUERY_GROUP_UPDATE
911

1012
if TYPE_CHECKING:
1113
from fastapi.testclient import TestClient
@@ -32,70 +34,90 @@ async def test_query_endpoint_group_no_params(
3234
patch_services,
3335
):
3436
# Must execute in a with block to execute the startup/shutdown events
35-
with client:
37+
with (
38+
client,
39+
patch(
40+
"infrahub.services.adapters.workflow.local.WorkflowLocalExecution.submit_workflow"
41+
) as mock_submit_workflow,
42+
):
3643
response = client.get(
3744
"/api/query/query01?update_group=true&subscribers=AAAAAA&subscribers=BBBBBB", headers=admin_headers
3845
)
3946

40-
assert "errors" not in response.json()
41-
assert response.status_code == 200
42-
assert response.json()["data"] is not None
43-
result = response.json()["data"]
47+
assert "errors" not in response.json()
48+
assert response.status_code == 200
49+
assert response.json()["data"] is not None
50+
result = response.json()["data"]
4451

45-
result_per_name = {result["node"]["name"]["value"]: result for result in result["TestPerson"]["edges"]}
46-
assert sorted(result_per_name.keys()) == ["Jane", "John"]
47-
assert len(result_per_name["John"]["node"]["cars"]["edges"]) == 2
48-
assert len(result_per_name["Jane"]["node"]["cars"]["edges"]) == 1
52+
result_per_name = {result["node"]["name"]["value"]: result for result in result["TestPerson"]["edges"]}
53+
assert sorted(result_per_name.keys()) == ["Jane", "John"]
54+
assert len(result_per_name["John"]["node"]["cars"]["edges"]) == 2
55+
assert len(result_per_name["Jane"]["node"]["cars"]["edges"]) == 1
4956

50-
q1 = car_person_data["q1"]
51-
p1 = car_person_data["p1"]
52-
p2 = car_person_data["p2"]
53-
c1 = car_person_data["c1"]
54-
c2 = car_person_data["c2"]
55-
c3 = car_person_data["c3"]
57+
q1 = car_person_data["q1"]
58+
p1 = car_person_data["p1"]
59+
p2 = car_person_data["p2"]
60+
c1 = car_person_data["c1"]
61+
c2 = car_person_data["c2"]
62+
c3 = car_person_data["c3"]
5663

57-
assert (
58-
messages.RequestGraphQLQueryGroupUpdate(
64+
model = RequestGraphQLQueryGroupUpdate(
5965
query_id=q1.id,
6066
query_name="query01",
6167
branch="main",
6268
related_node_ids=sorted([p1.id, p2.id, c1.id, c2.id, c3.id]),
6369
subscribers=sorted(["AAAAAA", "BBBBBB"]),
6470
params={},
6571
)
66-
in client.app.state.service.message_bus.messages
67-
)
72+
73+
expected_calls = [
74+
call(
75+
workflow=REQUEST_GRAPHQL_QUERY_GROUP_UPDATE,
76+
parameters={"model": model},
77+
),
78+
]
79+
mock_submit_workflow.assert_has_calls(expected_calls)
6880

6981

7082
async def test_query_endpoint_group_params(
7183
db: InfrahubDatabase, client: TestClient, admin_headers, default_branch, create_test_admin, car_person_data
7284
):
7385
# Must execute in a with block to execute the startup/shutdown events
74-
with client:
86+
with (
87+
client,
88+
patch(
89+
"infrahub.services.adapters.workflow.local.WorkflowLocalExecution.submit_workflow"
90+
) as mock_submit_workflow,
91+
):
7592
response = client.get("/api/query/query02?update_group=true&person=John", headers=admin_headers)
7693

77-
assert "errors" not in response.json()
78-
assert response.status_code == 200
79-
assert response.json()["data"] is not None
80-
result = response.json()["data"]
94+
assert "errors" not in response.json()
95+
assert response.status_code == 200
96+
assert response.json()["data"] is not None
97+
result = response.json()["data"]
8198

82-
result_per_name = {result["node"]["name"]["value"]: result for result in result["TestPerson"]["edges"]}
83-
assert sorted(result_per_name.keys()) == ["John"]
99+
result_per_name = {result["node"]["name"]["value"]: result for result in result["TestPerson"]["edges"]}
100+
assert sorted(result_per_name.keys()) == ["John"]
84101

85-
q2 = car_person_data["q2"]
86-
p1 = car_person_data["p1"]
102+
q2 = car_person_data["q2"]
103+
p1 = car_person_data["p1"]
87104

88-
assert (
89-
messages.RequestGraphQLQueryGroupUpdate(
105+
model = RequestGraphQLQueryGroupUpdate(
90106
query_id=q2.id,
91107
query_name="query02",
92108
branch="main",
93109
related_node_ids={p1.id},
94110
subscribers=[],
95111
params={"person": "John"},
96112
)
97-
in client.app.state.service.message_bus.messages
98-
)
113+
114+
expected_calls = [
115+
call(
116+
workflow=REQUEST_GRAPHQL_QUERY_GROUP_UPDATE,
117+
parameters={"model": model},
118+
),
119+
]
120+
mock_submit_workflow.assert_has_calls(expected_calls)
99121

100122

101123
async def test_query_endpoint_get_default_branch(

backend/tests/unit/message_bus/operations/requests/test_graphql_query_group.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from pytest_httpx import HTTPXMock
99

1010
from infrahub.database import InfrahubDatabase
11-
from infrahub.message_bus import messages
12-
from infrahub.message_bus.operations.requests.graphql_query_group import update
11+
from infrahub.graphql.models import RequestGraphQLQueryGroupUpdate
12+
from infrahub.graphql.tasks import request_graphql_query_group_update
1313
from infrahub.services import InfrahubServices
1414

1515

@@ -32,7 +32,7 @@ async def test_graphql_group_update(db: InfrahubDatabase, httpx_mock: HTTPXMock,
3232
c3 = str(uuid.uuid4())
3333
r1 = str(uuid.uuid4())
3434

35-
message = messages.RequestGraphQLQueryGroupUpdate(
35+
model = RequestGraphQLQueryGroupUpdate(
3636
query_id=q1,
3737
query_name="query01",
3838
branch="main",
@@ -63,4 +63,4 @@ async def test_graphql_group_update(db: InfrahubDatabase, httpx_mock: HTTPXMock,
6363
match_headers={"X-Infrahub-Tracker": "mutation-relationshipadd"},
6464
)
6565

66-
await update.fn(message=message, service=service)
66+
await request_graphql_query_group_update.fn(message=model, service=service)

docs/docs/reference/message-bus-events.mdx

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -572,30 +572,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
572572
| **destination_branch** | The target branch | string | None |
573573
<!-- vale on -->
574574

575-
<!-- vale off -->
576-
### Request Graphql Query Group
577-
<!-- vale on -->
578-
579-
<!-- vale off -->
580-
#### Event request.graphql_query_group.update
581-
<!-- vale on -->
582-
583-
**Description**: Sent to create or update a GraphQLQueryGroup associated with a given GraphQLQuery.
584-
585-
**Priority**: 3
586-
587-
<!-- vale off -->
588-
| Key | Description | Type | Default Value |
589-
|-----|-------------|------|---------------|
590-
| **meta** | Meta properties for the message | N/A | None |
591-
| **branch** | The branch to target | string | None |
592-
| **query_name** | The name of the GraphQLQuery that should be associated with the group | string | None |
593-
| **query_id** | The ID of the GraphQLQuery that should be associated with the group | string | None |
594-
| **related_node_ids** | List of nodes related to the GraphQLQuery | array | None |
595-
| **subscribers** | List of subscribers to add to the group | array | None |
596-
| **params** | Params sent with the query | object | None |
597-
<!-- vale on -->
598-
599575
<!-- vale off -->
600576
### Request Proposed Change
601577
<!-- vale on -->
@@ -1400,31 +1376,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
14001376
| **destination_branch** | The target branch | string | None |
14011377
<!-- vale on -->
14021378

1403-
<!-- vale off -->
1404-
### Request Graphql Query Group
1405-
<!-- vale on -->
1406-
1407-
<!-- vale off -->
1408-
#### Event request.graphql_query_group.update
1409-
<!-- vale on -->
1410-
1411-
**Description**: Sent to create or update a GraphQLQueryGroup associated with a given GraphQLQuery.
1412-
1413-
**Priority**: 3
1414-
1415-
1416-
<!-- vale off -->
1417-
| Key | Description | Type | Default Value |
1418-
|-----|-------------|------|---------------|
1419-
| **meta** | Meta properties for the message | N/A | None |
1420-
| **branch** | The branch to target | string | None |
1421-
| **query_name** | The name of the GraphQLQuery that should be associated with the group | string | None |
1422-
| **query_id** | The ID of the GraphQLQuery that should be associated with the group | string | None |
1423-
| **related_node_ids** | List of nodes related to the GraphQLQuery | array | None |
1424-
| **subscribers** | List of subscribers to add to the group | array | None |
1425-
| **params** | Params sent with the query | object | None |
1426-
<!-- vale on -->
1427-
14281379
<!-- vale off -->
14291380
### Request Proposed Change
14301381
<!-- vale on -->

0 commit comments

Comments
 (0)