Skip to content

Commit c9b9f82

Browse files
committed
Setup automation for Python based computed attributes
1 parent 9ff2ff8 commit c9b9f82

File tree

7 files changed

+194
-39
lines changed

7 files changed

+194
-39
lines changed

backend/infrahub/api/schema.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
from infrahub.services import services
3636
from infrahub.types import ATTRIBUTE_PYTHON_TYPES
3737
from infrahub.worker import WORKER_IDENTITY
38-
from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP, SCHEMA_APPLY_MIGRATION, SCHEMA_VALIDATE_MIGRATION
38+
from infrahub.workflows.catalogue import (
39+
COMPUTED_ATTRIBUTE_SETUP,
40+
COMPUTED_ATTRIBUTE_SETUP_PYTHON,
41+
SCHEMA_APPLY_MIGRATION,
42+
SCHEMA_VALIDATE_MIGRATION,
43+
)
3944

4045
if TYPE_CHECKING:
4146
from typing_extensions import Self
@@ -350,6 +355,7 @@ async def load_schema(
350355

351356
# Temporary workaround, we need to emit a proper event
352357
await service.workflow.submit_workflow(workflow=COMPUTED_ATTRIBUTE_SETUP)
358+
await service.workflow.submit_workflow(workflow=COMPUTED_ATTRIBUTE_SETUP_PYTHON)
353359

354360
return SchemaUpdate(hash=updated_hash, previous_hash=original_hash, diff=result.diff)
355361

backend/infrahub/computed_attribute/models.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1+
from __future__ import annotations
2+
13
from collections import defaultdict
4+
from dataclasses import dataclass
5+
from typing import TYPE_CHECKING
26

3-
from prefect.events.schemas.automations import Automation
7+
from prefect.events.schemas.automations import Automation # noqa: TCH002
48
from pydantic import BaseModel, Field
59
from typing_extensions import Self
610

711
from .constants import AUTOMATION_NAME_PREFIX
812

13+
if TYPE_CHECKING:
14+
from infrahub.core.schema.schema_branch_computed import PythonDefinition
15+
916

1017
class ComputedAttributeAutomations(BaseModel):
1118
data: dict[str, dict[str, Automation]] = Field(default_factory=lambda: defaultdict(dict))
@@ -37,3 +44,14 @@ def has(self, identifier: str, scope: str) -> bool:
3744
if identifier in self.data and scope in self.data[identifier]:
3845
return True
3946
return False
47+
48+
49+
@dataclass
50+
class PythonTransformComputedAttribute:
51+
name: str
52+
repository_id: str
53+
repository_name: str
54+
repository_kind: str
55+
query_name: str
56+
query_models: list[str]
57+
computed_attribute: PythonDefinition

backend/infrahub/computed_attribute/tasks.py

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from infrahub.workflows.utils import add_branch_tag
2222

2323
from .constants import AUTOMATION_NAME, AUTOMATION_NAME_PREFIX
24-
from .models import ComputedAttributeAutomations
24+
from .models import ComputedAttributeAutomations, PythonTransformComputedAttribute
2525

2626
if TYPE_CHECKING:
2727
from infrahub.core.schema.computed_attribute import ComputedAttribute
@@ -251,3 +251,109 @@ async def computed_attribute_setup() -> None:
251251
else:
252252
await client.create_automation(automation=automation)
253253
log.info(f"{computed_attribute.key_name} Created")
254+
255+
256+
@flow(
257+
name="computed-attribute-setup-python",
258+
flow_run_name="Setup computed attributes for Python transforms in task-manager",
259+
)
260+
async def computed_attribute_setup_python() -> None:
261+
service = services.service
262+
schema_branch = registry.schema.get_schema_branch(name=registry.default_branch)
263+
log = get_run_logger()
264+
265+
transform_attributes = schema_branch.computed_attributes.python_attributes_by_transform
266+
267+
transform_names = list(transform_attributes.keys())
268+
269+
transforms = await service.client.filters(
270+
kind="CoreTransformPython",
271+
branch=registry.default_branch,
272+
prefetch_relationships=True,
273+
populate_store=True,
274+
name__values=transform_names,
275+
)
276+
277+
found_transforms_names = [transform.name.value for transform in transforms]
278+
for transform_name in transform_names:
279+
if transform_name not in found_transforms_names:
280+
log.warning(
281+
msg=f"The transform {transform_name} is assigned to a computed attribute but the transform could not be found in the database."
282+
)
283+
284+
computed_attributes: list[PythonTransformComputedAttribute] = []
285+
for transform in transforms:
286+
for attribute in transform_attributes[transform.name.value]:
287+
computed_attributes.append(
288+
PythonTransformComputedAttribute(
289+
name=transform.name.value,
290+
repository_id=transform.repository.peer.id,
291+
repository_name=transform.repository.peer.name.value,
292+
repository_kind=transform.repository.peer.typename,
293+
query_name=transform.query.peer.name.value,
294+
query_models=transform.query.peer.models.value,
295+
computed_attribute=attribute,
296+
)
297+
)
298+
299+
async with get_client(sync_client=False) as client:
300+
deployments = {
301+
item.name: item
302+
for item in await client.read_deployments(
303+
deployment_filter=DeploymentFilter(
304+
name=DeploymentFilterName(any_=[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name])
305+
)
306+
)
307+
}
308+
if UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name not in deployments:
309+
raise ValueError("Unable to find the deployment for UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM")
310+
311+
deployment_id_python = deployments[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name].id
312+
313+
automations = await client.read_automations()
314+
existing_computed_attr_automations = ComputedAttributeAutomations.from_prefect(automations=automations)
315+
316+
for computed_attribute in computed_attributes:
317+
log.info(f"processing {computed_attribute.computed_attribute.key_name}")
318+
scope = "default"
319+
320+
automation = AutomationCore(
321+
name=AUTOMATION_NAME.format(
322+
prefix=AUTOMATION_NAME_PREFIX,
323+
identifier=computed_attribute.computed_attribute.key_name,
324+
scope=scope,
325+
),
326+
description=f"Process value of the computed attribute for {computed_attribute.computed_attribute.key_name} [{scope}]",
327+
enabled=True,
328+
trigger=EventTrigger(
329+
posture=Posture.Reactive,
330+
expect={"infrahub.node.*"},
331+
within=timedelta(0),
332+
match=ResourceSpecification({"infrahub.node.kind": [computed_attribute.computed_attribute.kind]}),
333+
threshold=1,
334+
),
335+
actions=[
336+
RunDeployment(
337+
source="selected",
338+
deployment_id=deployment_id_python,
339+
parameters={
340+
"branch_name": "{{ event.resource['infrahub.branch.name'] }}",
341+
"node_kind": "{{ event.resource['infrahub.node.kind'] }}",
342+
"object_id": "{{ event.resource['infrahub.node.id'] }}",
343+
},
344+
job_variables={},
345+
)
346+
],
347+
)
348+
349+
if existing_computed_attr_automations.has(
350+
identifier=computed_attribute.computed_attribute.key_name, scope=scope
351+
):
352+
existing = existing_computed_attr_automations.get(
353+
identifier=computed_attribute.computed_attribute.key_name, scope=scope
354+
)
355+
await client.update_automation(automation_id=existing.id, automation=automation)
356+
log.info(f"{computed_attribute.computed_attribute.key_name} Updated")
357+
else:
358+
await client.create_automation(automation=automation)
359+
log.info(f"{computed_attribute.computed_attribute.key_name} Created")

backend/infrahub/core/schema/schema_branch.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from infrahub.visuals import select_color
5555

5656
from .constants import INTERNAL_SCHEMA_NODE_KINDS, SchemaNamespace
57+
from .schema_branch_computed import ComputedAttributes
5758

5859
log = get_logger()
5960

@@ -114,6 +115,7 @@ def __init__(self, cache: dict, name: str | None = None, data: dict[str, dict[st
114115
self.nodes: dict[str, str] = {}
115116
self.generics: dict[str, str] = {}
116117
self.profiles: dict[str, str] = {}
118+
self.computed_attributes = ComputedAttributes()
117119
self._computed_jinja2_attribute_map: dict[str, RegisteredNodeComputedAttribute] = {}
118120

119121
if data:
@@ -949,6 +951,7 @@ def validate_kinds(self) -> None:
949951
) from None
950952

951953
def validate_computed_attributes(self) -> None:
954+
self.computed_attributes = ComputedAttributes()
952955
self._computed_jinja2_attribute_map = {}
953956
for name in self.nodes.keys():
954957
node_schema = self.get_node(name=name, duplicate=False)
@@ -1016,11 +1019,14 @@ def _validate_computed_attribute(self, node: NodeSchema, attribute: AttributeSch
10161019

10171020
self._register_computed_attribute_target(node=node, attribute=attribute, schema_path=schema_path)
10181021

1019-
if attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON and not attribute.optional:
1022+
elif attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON and not attribute.optional:
10201023
raise ValueError(
10211024
f"{node.kind}: Attribute {attribute.name!r} is a computed transform, it can't be mandatory"
10221025
)
10231026

1027+
elif attribute.computed_attribute.kind == ComputedAttributeKind.TRANSFORM_PYTHON:
1028+
self.computed_attributes.add_python_attribute(node=node, attribute=attribute)
1029+
10241030
def get_impacted_macros(self, kind: str, updates: list[str] | None = None) -> list[ComputedAttributeTarget]:
10251031
if mapping := self._computed_jinja2_attribute_map.get(kind):
10261032
return mapping.get_targets(updates=updates)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from dataclasses import dataclass
2+
3+
from infrahub.core.schema import (
4+
AttributeSchema,
5+
NodeSchema,
6+
)
7+
8+
9+
@dataclass
10+
class PythonDefinition:
11+
kind: str
12+
attribute: AttributeSchema
13+
14+
@property
15+
def key_name(self) -> str:
16+
return f"{self.kind}_{self.attribute.name}"
17+
18+
19+
class ComputedAttributes:
20+
def __init__(self) -> None:
21+
self._computed_python_transform_attribute_map: dict[str, list[AttributeSchema]] = {}
22+
23+
def add_python_attribute(self, node: NodeSchema, attribute: AttributeSchema) -> None:
24+
if node.kind not in self._computed_python_transform_attribute_map:
25+
self._computed_python_transform_attribute_map[node.kind] = []
26+
self._computed_python_transform_attribute_map[node.kind].append(attribute)
27+
28+
def get_kinds_python_attributes(self) -> list[str]:
29+
"""Return kinds that have Python attributes defined"""
30+
return list(self._computed_python_transform_attribute_map.keys())
31+
32+
@property
33+
def python_attributes_by_transform(self) -> dict[str, list[PythonDefinition]]:
34+
computed_attributes: dict[str, list[PythonDefinition]] = {}
35+
for kind, attributes in self._computed_python_transform_attribute_map.items():
36+
for attribute in attributes:
37+
if attribute.computed_attribute and attribute.computed_attribute.transform:
38+
if attribute.computed_attribute.transform not in computed_attributes:
39+
computed_attributes[attribute.computed_attribute.transform] = []
40+
41+
computed_attributes[attribute.computed_attribute.transform].append(
42+
PythonDefinition(kind=kind, attribute=attribute)
43+
)
44+
45+
return computed_attributes

backend/infrahub/graphql/mutations/main.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from infrahub.exceptions import ValidationError
2828
from infrahub.log import get_log_data, get_logger
2929
from infrahub.worker import WORKER_IDENTITY
30-
from infrahub.workflows.catalogue import UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM
3130

3231
from .node_getter.by_default_filter import MutationNodeGetterByDefaultFilter
3332
from .node_getter.by_hfid import MutationNodeGetterByHfid
@@ -101,8 +100,6 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: InputObjectTyp
101100
# Reset the time of the query to guarantee that all resolvers executed after this point will account for the changes
102101
context.at = Timestamp()
103102

104-
# Get relevant macros based on the current change
105-
# macros = schema_branch.get_impacted_macros(kind=obj.get_kind(), updates=updated_fields)
106103
if config.SETTINGS.broker.enable and context.background:
107104
log_data = get_log_data()
108105
request_id = log_data.get("request_id", "")
@@ -120,38 +117,6 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: InputObjectTyp
120117

121118
context.background.add_task(context.service.event.send, event)
122119

123-
# Temporary workaround until there's a proper deployment for Transforms during schema load / repo sync
124-
schema_branch = registry.schema.get_schema_branch(name=context.branch.name)
125-
node_schema = schema_branch.get(obj._schema.kind)
126-
if isinstance(node_schema, NodeSchema):
127-
has_computed_attributes = [
128-
attribute for attribute in node_schema.attributes if attribute.computed_attribute
129-
]
130-
if has_computed_attributes:
131-
updated_fields = list(data.keys())
132-
133-
await context.service.workflow.submit_workflow(
134-
workflow=UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
135-
parameters={
136-
"branch_name": context.branch.name,
137-
"node_kind": obj.get_kind(),
138-
"object_id": obj.get_id(),
139-
"updated_fields": updated_fields,
140-
},
141-
)
142-
143-
# # Add event
144-
# if macros:
145-
# await context.service.workflow.submit_workflow(
146-
# workflow=PROCESS_COMPUTED_MACRO,
147-
# parameters={
148-
# "branch_name": context.branch.name,
149-
# "node_kind": obj.get_kind(),
150-
# "object_id": obj.get_id(),
151-
# "updated_fields": updated_fields,
152-
# },
153-
# )
154-
155120
return mutation
156121

157122
@classmethod

backend/infrahub/workflows/catalogue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,14 @@
218218
function="computed_attribute_setup",
219219
)
220220

221+
COMPUTED_ATTRIBUTE_SETUP_PYTHON = WorkflowDefinition(
222+
name="computed-attribute-setup-python",
223+
type=WorkflowType.INTERNAL,
224+
module="infrahub.computed_attribute.tasks",
225+
function="computed_attribute_setup_python",
226+
)
227+
228+
221229
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM = WorkflowDefinition(
222230
name="process_computed_attribute_transform",
223231
type=WorkflowType.INTERNAL,
@@ -265,6 +273,7 @@
265273
GIT_REPOSITORY_ADD_READ_ONLY,
266274
PROCESS_COMPUTED_MACRO,
267275
COMPUTED_ATTRIBUTE_SETUP,
276+
COMPUTED_ATTRIBUTE_SETUP_PYTHON,
268277
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
269278
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
270279
]

0 commit comments

Comments
 (0)