Skip to content

Commit f12135d

Browse files
authored
Merge pull request #4918 from opsmill/pog-schema-updated-event-IFC-857
Migrate EventSchemaUpdate to InfrahubEvent
2 parents 380ec49 + dd87ca2 commit f12135d

File tree

10 files changed

+195
-40
lines changed

10 files changed

+195
-40
lines changed

backend/infrahub/api/schema.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any, Optional, Union
44

5-
from fastapi import APIRouter, BackgroundTasks, Depends, Query, Request
5+
from fastapi import APIRouter, Depends, Query, Request
66
from pydantic import (
77
BaseModel,
88
Field,
@@ -12,7 +12,7 @@
1212
)
1313
from starlette.responses import JSONResponse
1414

15-
from infrahub import config, lock
15+
from infrahub import lock
1616
from infrahub.api.dependencies import get_branch_dep, get_current_user, get_db
1717
from infrahub.api.exceptions import SchemaNotValidError
1818
from infrahub.core import registry
@@ -29,13 +29,13 @@
2929
from infrahub.core.schema.constants import SchemaNamespace # noqa: TCH001
3030
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
3131
from infrahub.database import InfrahubDatabase # noqa: TCH001
32+
from infrahub.events import EventMeta
33+
from infrahub.events.schema_action import SchemaUpdatedEvent
3234
from infrahub.exceptions import MigrationError, PermissionDeniedError
33-
from infrahub.log import get_logger
34-
from infrahub.message_bus import Meta, messages
35-
from infrahub.services import services
35+
from infrahub.log import get_log_data, get_logger
3636
from infrahub.types import ATTRIBUTE_PYTHON_TYPES
3737
from infrahub.worker import WORKER_IDENTITY
38-
from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP, SCHEMA_APPLY_MIGRATION, SCHEMA_VALIDATE_MIGRATION
38+
from infrahub.workflows.catalogue import SCHEMA_APPLY_MIGRATION, SCHEMA_VALIDATE_MIGRATION
3939

4040
if TYPE_CHECKING:
4141
from typing_extensions import Self
@@ -239,7 +239,6 @@ async def get_json_schema_by_kind(schema_kind: str, branch: Branch = Depends(get
239239
async def load_schema(
240240
request: Request,
241241
schemas: SchemasLoadAPI,
242-
background_tasks: BackgroundTasks,
243242
db: InfrahubDatabase = Depends(get_db),
244243
branch: Branch = Depends(get_branch_dep),
245244
account_session: AccountSession = Depends(get_current_user),
@@ -339,18 +338,17 @@ async def load_schema(
339338
if migration_error_msgs:
340339
raise MigrationError(message=",\n".join(migration_error_msgs))
341340

342-
if config.SETTINGS.broker.enable:
343-
message = messages.EventSchemaUpdate(
344-
branch=branch.name,
345-
meta=Meta(initiator_id=WORKER_IDENTITY),
346-
)
347-
background_tasks.add_task(services.send, message)
341+
log_data = get_log_data()
342+
request_id = log_data.get("request_id", "")
343+
event = SchemaUpdatedEvent(
344+
branch=branch.name,
345+
schema_hash=branch.active_schema_hash.main,
346+
meta=EventMeta(initiator_id=WORKER_IDENTITY, request_id=request_id, account_id=account_session.account_id),
347+
)
348+
await service.event.send(event=event)
348349

349350
await service.component.refresh_schema_hash(branches=[branch.name])
350351

351-
# Temporary workaround, we need to emit a proper event
352-
await service.workflow.submit_workflow(workflow=COMPUTED_ATTRIBUTE_SETUP)
353-
354352
return SchemaUpdate(hash=updated_hash, previous_hash=original_hash, diff=result.diff)
355353

356354

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import Any
2+
3+
from pydantic import Field
4+
5+
from infrahub.message_bus import InfrahubMessage
6+
from infrahub.message_bus.messages.event_schema_update import EventSchemaUpdate
7+
8+
from .models import InfrahubBranchEvent
9+
10+
11+
class SchemaUpdatedEvent(InfrahubBranchEvent):
12+
"""Event generated when the schema within a branch has been updated."""
13+
14+
schema_hash: str = Field(..., description="Schema hash after the update")
15+
16+
def get_name(self) -> str:
17+
return f"{self.get_event_namespace()}.schema.update"
18+
19+
def get_resource(self) -> dict[str, str]:
20+
return {
21+
"prefect.resource.id": f"infrahub.schema_branch.{self.branch}",
22+
"infrahub.branch.name": self.branch,
23+
"infrahub.branch.schema_hash": self.schema_hash,
24+
}
25+
26+
def get_payload(self) -> dict[str, Any]:
27+
return {"branch": self.branch, "schema_hash": self.schema_hash}
28+
29+
def get_messages(self) -> list[InfrahubMessage]:
30+
return [
31+
EventSchemaUpdate(
32+
branch=self.branch,
33+
meta=self.get_message_meta(),
34+
)
35+
]

backend/infrahub/graphql/mutations/schema.py

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
1-
from typing import TYPE_CHECKING, Union
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Self, Union
24

35
from graphene import Boolean, Field, InputObjectType, Mutation, String
4-
from graphql import GraphQLResolveInfo
56

6-
from infrahub import config, lock
7+
from infrahub import lock
78
from infrahub.core import registry
8-
from infrahub.core.branch import Branch
99
from infrahub.core.constants import RESTRICTED_NAMESPACES
1010
from infrahub.core.manager import NodeManager
1111
from infrahub.core.schema import DropdownChoice, GenericSchema, NodeSchema
1212
from infrahub.database import InfrahubDatabase, retry_db_transaction
13+
from infrahub.events import EventMeta
14+
from infrahub.events.schema_action import SchemaUpdatedEvent
1315
from infrahub.exceptions import ValidationError
14-
from infrahub.log import get_logger
15-
from infrahub.message_bus import Meta, messages
16-
from infrahub.services import services
16+
from infrahub.log import get_log_data, get_logger
1717
from infrahub.worker import WORKER_IDENTITY
1818

1919
from ..types import DropdownFields
2020

2121
if TYPE_CHECKING:
22+
from graphql import GraphQLResolveInfo
23+
24+
from infrahub.core.branch import Branch
25+
from infrahub.services import InfrahubServices
26+
2227
from ..initialization import GraphqlContext
2328

2429
log = get_logger()
@@ -56,7 +61,7 @@ async def mutate(
5661
root: dict, # pylint: disable=unused-argument
5762
info: GraphQLResolveInfo,
5863
data: SchemaDropdownAddInput,
59-
):
64+
) -> Self:
6065
context: GraphqlContext = info.context
6166

6267
kind = context.db.schema.get(name=str(data.kind), branch=context.branch.name)
@@ -73,7 +78,13 @@ async def mutate(
7378
)
7479
attrib.choices.append(choice)
7580

76-
await update_registry(kind=kind, branch=context.branch, db=context.db)
81+
await update_registry(
82+
kind=kind,
83+
branch=context.branch,
84+
db=context.db,
85+
account_id=context.active_account_session.account_id,
86+
service=context.active_service,
87+
)
7788

7889
kind = context.db.schema.get(name=str(data.kind), branch=context.branch.name)
7990
attrib = kind.get_attribute(attribute)
@@ -130,7 +141,13 @@ async def mutate(
130141
raise ValidationError(f"Unable to remove the last dropdown on {kind.kind} in attribute {attribute}")
131142
attrib.choices = [entry for entry in attrib.choices if dropdown != entry.name]
132143

133-
await update_registry(kind=kind, branch=context.branch, db=context.db)
144+
await update_registry(
145+
kind=kind,
146+
branch=context.branch,
147+
db=context.db,
148+
account_id=context.active_account_session.account_id,
149+
service=context.active_service,
150+
)
134151

135152
return {"ok": True}
136153

@@ -165,7 +182,13 @@ async def mutate(
165182
)
166183
attrib.enum.append(enum)
167184

168-
await update_registry(kind=kind, branch=context.branch, db=context.db)
185+
await update_registry(
186+
kind=kind,
187+
branch=context.branch,
188+
db=context.db,
189+
account_id=context.active_account_session.account_id,
190+
service=context.active_service,
191+
)
169192

170193
return {"ok": True}
171194

@@ -206,7 +229,13 @@ async def mutate(
206229
raise ValidationError(f"Unable to remove the last enum on {kind.kind} in attribute {attribute}")
207230
attrib.enum = [entry for entry in attrib.enum if entry != enum]
208231

209-
await update_registry(kind=kind, branch=branch, db=db)
232+
await update_registry(
233+
kind=kind,
234+
branch=branch,
235+
db=db,
236+
account_id=context.active_account_session.account_id,
237+
service=context.active_service,
238+
)
210239

211240
return {"ok": True}
212241

@@ -237,7 +266,9 @@ def validate_kind(kind: Union[GenericSchema, NodeSchema], attribute: str) -> Non
237266
raise ValidationError(f"Attribute {attribute} on {kind.kind} is inherited and must be changed on the generic")
238267

239268

240-
async def update_registry(kind: NodeSchema, branch: Branch, db: InfrahubDatabase) -> None:
269+
async def update_registry(
270+
kind: NodeSchema, db: InfrahubDatabase, branch: Branch, account_id: str, service: InfrahubServices
271+
) -> None:
241272
async with lock.registry.global_schema_lock():
242273
branch_schema = registry.schema.get_schema_branch(name=branch.name)
243274

@@ -256,13 +287,20 @@ async def update_registry(kind: NodeSchema, branch: Branch, db: InfrahubDatabase
256287
schema=tmp_schema, db=dbt, branch=branch.name, limit=diff.all, update_db=True
257288
)
258289
branch.update_schema_hash()
259-
log.info("Schema has been updated", branch=branch.name, hash=branch.schema_hash.main)
290+
log.info("Schema has been updated", branch=branch.name, hash=branch.active_schema_hash.main)
260291
await branch.save(db=dbt)
261292

262-
if config.SETTINGS.broker.enable:
263-
message = messages.EventSchemaUpdate(
264-
branch=branch.name,
265-
meta=Meta(initiator_id=WORKER_IDENTITY),
266-
)
267-
await services.send(message)
268-
await services.service.component.refresh_schema_hash(branches=[branch.name])
293+
log_data = get_log_data()
294+
request_id = log_data.get("request_id", "")
295+
event = SchemaUpdatedEvent(
296+
branch=branch.name,
297+
schema_hash=branch.active_schema_hash.main,
298+
meta=EventMeta(
299+
initiator_id=WORKER_IDENTITY,
300+
request_id=request_id,
301+
account_id=account_id,
302+
),
303+
)
304+
await service.event.send(event=event)
305+
306+
await service.component.refresh_schema_hash(branches=[branch.name])

backend/infrahub/schema/__init__.py

Whitespace-only changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AUTOMATION_NAME = "Trigger-schema-update-event"

backend/infrahub/schema/tasks.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from __future__ import annotations
2+
3+
from datetime import timedelta
4+
5+
from prefect import flow
6+
from prefect.automations import AutomationCore
7+
from prefect.client.orchestration import get_client
8+
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
9+
from prefect.events.actions import RunDeployment
10+
from prefect.events.schemas.automations import EventTrigger, Posture
11+
from prefect.logging import get_run_logger
12+
13+
from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP
14+
15+
from .constants import AUTOMATION_NAME
16+
17+
18+
@flow(name="schema-updated-setup", flow_run_name="Setup schema updated event in task-manager")
19+
async def schema_updated_setup() -> None:
20+
log = get_run_logger()
21+
22+
async with get_client(sync_client=False) as client:
23+
deployments = {
24+
item.name: item
25+
for item in await client.read_deployments(
26+
deployment_filter=DeploymentFilter(
27+
name=DeploymentFilterName(
28+
any_=[
29+
COMPUTED_ATTRIBUTE_SETUP.name,
30+
]
31+
)
32+
)
33+
)
34+
}
35+
if COMPUTED_ATTRIBUTE_SETUP.name not in deployments:
36+
raise ValueError("Unable to find the deployment for PROCESS_COMPUTED_MACRO")
37+
38+
deployment_id_computed_attribute_setup = deployments[COMPUTED_ATTRIBUTE_SETUP.name].id
39+
40+
schema_update_automation = await client.find_automation(id_or_name=AUTOMATION_NAME)
41+
42+
automation = AutomationCore(
43+
name=AUTOMATION_NAME,
44+
description="Trigger actions on schema update event",
45+
enabled=True,
46+
trigger=EventTrigger(
47+
posture=Posture.Reactive,
48+
expect={"infrahub.schema.update"},
49+
within=timedelta(0),
50+
threshold=1,
51+
),
52+
actions=[
53+
RunDeployment(
54+
source="selected",
55+
deployment_id=deployment_id_computed_attribute_setup,
56+
parameters={},
57+
job_variables={},
58+
)
59+
],
60+
)
61+
62+
if schema_update_automation:
63+
await client.update_automation(automation_id=schema_update_automation.id, automation=automation)
64+
log.info(f"{AUTOMATION_NAME} Updated")
65+
else:
66+
await client.create_automation(automation=automation)
67+
log.info(f"{AUTOMATION_NAME} Created")

backend/infrahub/services/adapters/workflow/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any, overload
44

5-
from prefect.client.schemas import StateType
5+
from prefect.client.schemas.objects import StateType
66
from prefect.deployments import run_deployment
77

88
from infrahub.workflows.initialization import setup_task_manager

backend/infrahub/workflows/catalogue.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,13 @@
247247
function="run_proposed_change_data_integrity_check",
248248
)
249249

250+
SCHEMA_UPDATED_SETUP = WorkflowDefinition(
251+
name="schema-updated-setup",
252+
type=WorkflowType.INTERNAL,
253+
module="infrahub.schema.tasks",
254+
function="schema_updated_setup",
255+
)
256+
250257

251258
worker_pools = [INFRAHUB_WORKER_POOL]
252259

@@ -284,4 +291,5 @@
284291
COMPUTED_ATTRIBUTE_SETUP,
285292
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
286293
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
294+
SCHEMA_UPDATED_SETUP,
287295
]

backend/infrahub/workflows/initialization.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from infrahub import config
99

10-
from .catalogue import worker_pools, workflows
10+
from .catalogue import SCHEMA_UPDATED_SETUP, worker_pools, workflows
1111
from .models import TASK_RESULT_STORAGE_NAME
1212

1313

@@ -37,6 +37,9 @@ async def setup_deployments(client: PrefectClient) -> None:
3737
await workflow.save(client=client, work_pool=work_pool)
3838
log.info(f"Flow {workflow.name}, created successfully ... ")
3939

40+
schema_update_setup = SCHEMA_UPDATED_SETUP.get_function()
41+
await schema_update_setup()
42+
4043

4144
@task(name="task-manager-setup-blocks", task_run_name="Setup Blocks")
4245
async def setup_blocks() -> None:

pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,12 @@ ignore_errors = true
330330

331331
[[tool.mypy.overrides]]
332332
module = "infrahub.graphql.mutations.schema"
333-
ignore_errors = true
333+
disable_error_code = [
334+
"arg-type",
335+
"operator",
336+
"union-attr",
337+
]
338+
334339

335340
[[tool.mypy.overrides]]
336341
module = "infrahub.graphql.resolver"

0 commit comments

Comments
 (0)