Skip to content

Commit aee8aad

Browse files
committed
Setup automation for Python based computed attributes
1 parent f12135d commit aee8aad

File tree

20 files changed

+484
-411
lines changed

20 files changed

+484
-411
lines changed

backend/infrahub/computed_attribute/models.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1+
from __future__ import annotations
2+
13
from collections import defaultdict
4+
from dataclasses import dataclass
5+
from typing import TYPE_CHECKING
26

3-
from prefect.events.schemas.automations import Automation
7+
from prefect.events.schemas.automations import Automation # noqa: TCH002
48
from pydantic import BaseModel, Field
59
from typing_extensions import Self
610

711
from .constants import AUTOMATION_NAME_PREFIX
812

13+
if TYPE_CHECKING:
14+
from infrahub.core.schema.schema_branch_computed import PythonDefinition
15+
916

1017
class ComputedAttributeAutomations(BaseModel):
1118
data: dict[str, dict[str, Automation]] = Field(default_factory=lambda: defaultdict(dict))
@@ -37,3 +44,14 @@ def has(self, identifier: str, scope: str) -> bool:
3744
if identifier in self.data and scope in self.data[identifier]:
3845
return True
3946
return False
47+
48+
49+
@dataclass
50+
class PythonTransformComputedAttribute:
51+
name: str
52+
repository_id: str
53+
repository_name: str
54+
repository_kind: str
55+
query_name: str
56+
query_models: list[str]
57+
computed_attribute: PythonDefinition

backend/infrahub/computed_attribute/tasks.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
from infrahub.git.repository import get_initialized_repo
1818
from infrahub.services import services
1919
from infrahub.support.macro import MacroDefinition
20+
from infrahub.tasks.registry import refresh_branches
2021
from infrahub.workflows.catalogue import PROCESS_COMPUTED_MACRO, UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM
2122
from infrahub.workflows.utils import add_branch_tag
2223

2324
from .constants import AUTOMATION_NAME, AUTOMATION_NAME_PREFIX
24-
from .models import ComputedAttributeAutomations
25+
from .models import ComputedAttributeAutomations, PythonTransformComputedAttribute
2526

2627
if TYPE_CHECKING:
2728
from infrahub.core.schema.computed_attribute import ComputedAttribute
@@ -48,6 +49,8 @@ async def process_transform(
4849
branch_name: str,
4950
node_kind: str,
5051
object_id: str,
52+
computed_attribute_name: str, # pylint: disable=unused-argument
53+
computed_attribute_kind: str, # pylint: disable=unused-argument
5154
updated_fields: list[str] | None = None, # pylint: disable=unused-argument
5255
) -> None:
5356
"""Request to the creation of git branches in available repositories."""
@@ -262,3 +265,114 @@ async def computed_attribute_setup() -> None:
262265
else:
263266
await client.create_automation(automation=automation)
264267
log.info(f"{computed_attribute.key_name} Created")
268+
269+
270+
@flow(
271+
name="computed-attribute-setup-python",
272+
flow_run_name="Setup computed attributes for Python transforms in task-manager",
273+
)
274+
async def computed_attribute_setup_python() -> None:
275+
service = services.service
276+
async with service.database.start_session() as db:
277+
await refresh_branches(db=db)
278+
279+
schema_branch = registry.schema.get_schema_branch(name=registry.default_branch)
280+
log = get_run_logger()
281+
282+
transform_attributes = schema_branch.computed_attributes.python_attributes_by_transform
283+
284+
transform_names = list(transform_attributes.keys())
285+
286+
transforms = await service.client.filters(
287+
kind="CoreTransformPython",
288+
branch=registry.default_branch,
289+
prefetch_relationships=True,
290+
populate_store=True,
291+
name__values=transform_names,
292+
)
293+
294+
found_transforms_names = [transform.name.value for transform in transforms]
295+
for transform_name in transform_names:
296+
if transform_name not in found_transforms_names:
297+
log.warning(
298+
msg=f"The transform {transform_name} is assigned to a computed attribute but the transform could not be found in the database."
299+
)
300+
301+
computed_attributes: list[PythonTransformComputedAttribute] = []
302+
for transform in transforms:
303+
for attribute in transform_attributes[transform.name.value]:
304+
computed_attributes.append(
305+
PythonTransformComputedAttribute(
306+
name=transform.name.value,
307+
repository_id=transform.repository.peer.id,
308+
repository_name=transform.repository.peer.name.value,
309+
repository_kind=transform.repository.peer.typename,
310+
query_name=transform.query.peer.name.value,
311+
query_models=transform.query.peer.models.value,
312+
computed_attribute=attribute,
313+
)
314+
)
315+
316+
async with get_client(sync_client=False) as client:
317+
deployments = {
318+
item.name: item
319+
for item in await client.read_deployments(
320+
deployment_filter=DeploymentFilter(
321+
name=DeploymentFilterName(any_=[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name])
322+
)
323+
)
324+
}
325+
if UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name not in deployments:
326+
raise ValueError("Unable to find the deployment for UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM")
327+
328+
deployment_id_python = deployments[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name].id
329+
330+
automations = await client.read_automations()
331+
existing_computed_attr_automations = ComputedAttributeAutomations.from_prefect(automations=automations)
332+
333+
for computed_attribute in computed_attributes:
334+
log.info(f"processing {computed_attribute.computed_attribute.key_name}")
335+
scope = "default"
336+
337+
automation = AutomationCore(
338+
name=AUTOMATION_NAME.format(
339+
prefix=AUTOMATION_NAME_PREFIX,
340+
identifier=computed_attribute.computed_attribute.key_name,
341+
scope=scope,
342+
),
343+
description=f"Process value of the computed attribute for {computed_attribute.computed_attribute.key_name} [{scope}]",
344+
enabled=True,
345+
trigger=EventTrigger(
346+
posture=Posture.Reactive,
347+
expect={"infrahub.node.*"},
348+
within=timedelta(0),
349+
match=ResourceSpecification({"infrahub.node.kind": [computed_attribute.computed_attribute.kind]}),
350+
threshold=1,
351+
),
352+
actions=[
353+
RunDeployment(
354+
source="selected",
355+
deployment_id=deployment_id_python,
356+
parameters={
357+
"branch_name": "{{ event.resource['infrahub.branch.name'] }}",
358+
"node_kind": "{{ event.resource['infrahub.node.kind'] }}",
359+
"object_id": "{{ event.resource['infrahub.node.id'] }}",
360+
"computed_attribute_name": computed_attribute.computed_attribute.attribute.name,
361+
"computed_attribute_kind": computed_attribute.computed_attribute.kind,
362+
},
363+
job_variables={},
364+
)
365+
],
366+
)
367+
368+
if existing_computed_attr_automations.has(
369+
identifier=computed_attribute.computed_attribute.key_name, scope=scope
370+
):
371+
existing = existing_computed_attr_automations.get(
372+
identifier=computed_attribute.computed_attribute.key_name, scope=scope
373+
)
374+
await client.update_automation(automation_id=existing.id, automation=automation)
375+
log.info(f"{computed_attribute.computed_attribute.key_name} Updated")
376+
else:
377+
await client.create_automation(automation=automation)
378+
log.info(f"{computed_attribute.computed_attribute.key_name} Created")

backend/infrahub/core/schema/schema_branch.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from infrahub.visuals import select_color
5555

5656
from .constants import INTERNAL_SCHEMA_NODE_KINDS, SchemaNamespace
57+
from .schema_branch_computed import ComputedAttributes
5758

5859
log = get_logger()
5960

@@ -114,6 +115,7 @@ def __init__(self, cache: dict, name: str | None = None, data: dict[str, dict[st
114115
self.nodes: dict[str, str] = {}
115116
self.generics: dict[str, str] = {}
116117
self.profiles: dict[str, str] = {}
118+
self.computed_attributes = ComputedAttributes()
117119
self._computed_jinja2_attribute_map: dict[str, RegisteredNodeComputedAttribute] = {}
118120

119121
if data:
@@ -949,6 +951,7 @@ def validate_kinds(self) -> None:
949951
) from None
950952

951953
def validate_computed_attributes(self) -> None:
954+
self.computed_attributes = ComputedAttributes()
952955
self._computed_jinja2_attribute_map = {}
953956
for name in self.nodes.keys():
954957
node_schema = self.get_node(name=name, duplicate=False)
@@ -1016,11 +1019,14 @@ def _validate_computed_attribute(self, node: NodeSchema, attribute: AttributeSch
10161019

10171020
self._register_computed_attribute_target(node=node, attribute=attribute, schema_path=schema_path)
10181021

1019-
if attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON and not attribute.optional:
1022+
elif attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON and not attribute.optional:
10201023
raise ValueError(
10211024
f"{node.kind}: Attribute {attribute.name!r} is a computed transform, it can't be mandatory"
10221025
)
10231026

1027+
elif attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON:
1028+
self.computed_attributes.add_python_attribute(node=node, attribute=attribute)
1029+
10241030
def get_impacted_macros(self, kind: str, updates: list[str] | None = None) -> list[ComputedAttributeTarget]:
10251031
if mapping := self._computed_jinja2_attribute_map.get(kind):
10261032
return mapping.get_targets(updates=updates)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from dataclasses import dataclass
2+
3+
from infrahub.core.schema import (
4+
AttributeSchema,
5+
NodeSchema,
6+
)
7+
8+
9+
@dataclass
10+
class PythonDefinition:
11+
kind: str
12+
attribute: AttributeSchema
13+
14+
@property
15+
def key_name(self) -> str:
16+
return f"{self.kind}_{self.attribute.name}"
17+
18+
19+
class ComputedAttributes:
20+
def __init__(self) -> None:
21+
self._computed_python_transform_attribute_map: dict[str, list[AttributeSchema]] = {}
22+
23+
def add_python_attribute(self, node: NodeSchema, attribute: AttributeSchema) -> None:
24+
if node.kind not in self._computed_python_transform_attribute_map:
25+
self._computed_python_transform_attribute_map[node.kind] = []
26+
self._computed_python_transform_attribute_map[node.kind].append(attribute)
27+
28+
def get_kinds_python_attributes(self) -> list[str]:
29+
"""Return kinds that have Python attributes defined"""
30+
return list(self._computed_python_transform_attribute_map.keys())
31+
32+
@property
33+
def python_attributes_by_transform(self) -> dict[str, list[PythonDefinition]]:
34+
computed_attributes: dict[str, list[PythonDefinition]] = {}
35+
for kind, attributes in self._computed_python_transform_attribute_map.items():
36+
for attribute in attributes:
37+
if attribute.computed_attribute and attribute.computed_attribute.transform:
38+
if attribute.computed_attribute.transform not in computed_attributes:
39+
computed_attributes[attribute.computed_attribute.transform] = []
40+
41+
computed_attributes[attribute.computed_attribute.transform].append(
42+
PythonDefinition(kind=kind, attribute=attribute)
43+
)
44+
45+
return computed_attributes

backend/infrahub/graphql/mutations/main.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from infrahub.exceptions import ValidationError
2828
from infrahub.log import get_log_data, get_logger
2929
from infrahub.worker import WORKER_IDENTITY
30-
from infrahub.workflows.catalogue import UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM
3130

3231
from .node_getter.by_default_filter import MutationNodeGetterByDefaultFilter
3332
from .node_getter.by_hfid import MutationNodeGetterByHfid
@@ -101,8 +100,6 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: InputObjectTyp
101100
# Reset the time of the query to guarantee that all resolvers executed after this point will account for the changes
102101
context.at = Timestamp()
103102

104-
# Get relevant macros based on the current change
105-
# macros = schema_branch.get_impacted_macros(kind=obj.get_kind(), updates=updated_fields)
106103
if config.SETTINGS.broker.enable and context.background:
107104
log_data = get_log_data()
108105
request_id = log_data.get("request_id", "")
@@ -120,38 +117,6 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: InputObjectTyp
120117

121118
context.background.add_task(context.service.event.send, event)
122119

123-
# Temporary workaround until there's a proper deployment for Transforms during schema load / repo sync
124-
schema_branch = registry.schema.get_schema_branch(name=context.branch.name)
125-
node_schema = schema_branch.get(obj._schema.kind)
126-
if isinstance(node_schema, NodeSchema):
127-
has_computed_attributes = [
128-
attribute for attribute in node_schema.attributes if attribute.computed_attribute
129-
]
130-
if has_computed_attributes:
131-
updated_fields = list(data.keys())
132-
133-
await context.service.workflow.submit_workflow(
134-
workflow=UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
135-
parameters={
136-
"branch_name": context.branch.name,
137-
"node_kind": obj.get_kind(),
138-
"object_id": obj.get_id(),
139-
"updated_fields": updated_fields,
140-
},
141-
)
142-
143-
# # Add event
144-
# if macros:
145-
# await context.service.workflow.submit_workflow(
146-
# workflow=PROCESS_COMPUTED_MACRO,
147-
# parameters={
148-
# "branch_name": context.branch.name,
149-
# "node_kind": obj.get_kind(),
150-
# "object_id": obj.get_id(),
151-
# "updated_fields": updated_fields,
152-
# },
153-
# )
154-
155120
return mutation
156121

157122
@classmethod

backend/infrahub/schema/tasks.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from prefect.events.schemas.automations import EventTrigger, Posture
1111
from prefect.logging import get_run_logger
1212

13-
from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP
13+
from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP, COMPUTED_ATTRIBUTE_SETUP_PYTHON
1414

1515
from .constants import AUTOMATION_NAME
1616

@@ -25,9 +25,7 @@ async def schema_updated_setup() -> None:
2525
for item in await client.read_deployments(
2626
deployment_filter=DeploymentFilter(
2727
name=DeploymentFilterName(
28-
any_=[
29-
COMPUTED_ATTRIBUTE_SETUP.name,
30-
]
28+
any_=[COMPUTED_ATTRIBUTE_SETUP.name, COMPUTED_ATTRIBUTE_SETUP_PYTHON.name]
3129
)
3230
)
3331
)
@@ -36,6 +34,7 @@ async def schema_updated_setup() -> None:
3634
raise ValueError("Unable to find the deployment for PROCESS_COMPUTED_MACRO")
3735

3836
deployment_id_computed_attribute_setup = deployments[COMPUTED_ATTRIBUTE_SETUP.name].id
37+
deployment_id_computed_attribute_setup_python = deployments[COMPUTED_ATTRIBUTE_SETUP_PYTHON.name].id
3938

4039
schema_update_automation = await client.find_automation(id_or_name=AUTOMATION_NAME)
4140

@@ -55,7 +54,13 @@ async def schema_updated_setup() -> None:
5554
deployment_id=deployment_id_computed_attribute_setup,
5655
parameters={},
5756
job_variables={},
58-
)
57+
),
58+
RunDeployment(
59+
source="selected",
60+
deployment_id=deployment_id_computed_attribute_setup_python,
61+
parameters={},
62+
job_variables={},
63+
),
5964
],
6065
)
6166

backend/infrahub/services/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def client(self) -> InfrahubClient:
5555

5656
return self._client
5757

58-
def set_client(self, client: InfrahubClient) -> None:
58+
def set_client(self, client: InfrahubClient | None) -> None:
5959
self._client = client
6060

6161
@property

backend/infrahub/workflows/catalogue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,14 @@
233233
function="computed_attribute_setup",
234234
)
235235

236+
COMPUTED_ATTRIBUTE_SETUP_PYTHON = WorkflowDefinition(
237+
name="computed-attribute-setup-python",
238+
type=WorkflowType.INTERNAL,
239+
module="infrahub.computed_attribute.tasks",
240+
function="computed_attribute_setup_python",
241+
)
242+
243+
236244
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM = WorkflowDefinition(
237245
name="process_computed_attribute_transform",
238246
type=WorkflowType.INTERNAL,
@@ -289,6 +297,7 @@
289297
GIT_REPOSITORY_ADD_READ_ONLY,
290298
PROCESS_COMPUTED_MACRO,
291299
COMPUTED_ATTRIBUTE_SETUP,
300+
COMPUTED_ATTRIBUTE_SETUP_PYTHON,
292301
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
293302
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
294303
SCHEMA_UPDATED_SETUP,

0 commit comments

Comments
 (0)