Skip to content

Commit a264ddf

Browse files
committed
Refactor computed_attribute_setup
1 parent 7efd57d commit a264ddf

File tree

5 files changed

+114
-11
lines changed

5 files changed

+114
-11
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
AUTOMATION_NAME_PREFIX = "Computed-attribute-process"
2+
3+
AUTOMATION_NAME = AUTOMATION_NAME_PREFIX + "::{identifier}::{scope}"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from collections import defaultdict
2+
3+
from prefect.events.schemas.automations import Automation
4+
from pydantic import BaseModel, Field
5+
from typing_extensions import Self
6+
7+
from .constants import AUTOMATION_NAME_PREFIX
8+
9+
10+
class ComputedAttributeAutomations(BaseModel):
11+
data: dict[str, dict[str, Automation]] = Field(default_factory=lambda: defaultdict(dict))
12+
13+
@classmethod
14+
def from_prefect(cls, automations: list[Automation]) -> Self:
15+
obj = cls()
16+
for automation in automations:
17+
if not automation.name.startswith(AUTOMATION_NAME_PREFIX):
18+
continue
19+
20+
name_split = automation.name.split("::")
21+
if len(name_split) != 3:
22+
continue
23+
24+
identifier = name_split[1]
25+
scope = name_split[2]
26+
27+
obj.data[identifier][scope] = automation
28+
29+
return obj
30+
31+
def get(self, identifier: str, scope: str) -> Automation:
32+
if identifier in self.data and scope in self.data[identifier]:
33+
return self.data[identifier][scope]
34+
raise KeyError(f"Unable to find an automation for {identifier} {scope}")
35+
36+
def has(self, identifier: str, scope: str) -> bool:
37+
if identifier in self.data and scope in self.data[identifier]:
38+
return True
39+
return False

backend/infrahub/computed_attribute/tasks.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
from infrahub.git.repository import get_initialized_repo
1818
from infrahub.services import services
1919
from infrahub.support.macro import MacroDefinition
20-
from infrahub.workflows.catalogue import PROCESS_COMPUTED_MACRO
20+
from infrahub.workflows.catalogue import PROCESS_COMPUTED_MACRO, UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM
2121
from infrahub.workflows.utils import add_branch_tag
2222

23+
from .constants import AUTOMATION_NAME, AUTOMATION_NAME_PREFIX
24+
from .models import ComputedAttributeAutomations
25+
2326
if TYPE_CHECKING:
2427
from infrahub.core.schema.computed_attribute import ComputedAttribute
2528
from infrahub.core.schema.schema_branch import ComputedAttributeTarget
@@ -175,24 +178,33 @@ async def process_jinja2(
175178
print()
176179

177180

178-
@flow(name="computed-attribute-setup")
181+
@flow(name="computed-attribute-setup", flow_run_name="Setup computed attributes in task-manager")
179182
async def computed_attribute_setup() -> None:
180183
# service = services.service
181184
schema_branch = registry.schema.get_schema_branch(name=registry.default_branch)
182185
log = get_run_logger()
186+
183187
async with get_client(sync_client=False) as client:
184188
deployments = {
185189
item.name: item
186190
for item in await client.read_deployments(
187-
deployment_filter=DeploymentFilter(name=DeploymentFilterName(any_=[PROCESS_COMPUTED_MACRO.name]))
191+
deployment_filter=DeploymentFilter(
192+
name=DeploymentFilterName(
193+
any_=[PROCESS_COMPUTED_MACRO.name, UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name]
194+
)
195+
)
188196
)
189197
}
190198
if PROCESS_COMPUTED_MACRO.name not in deployments:
191199
raise ValueError("Unable to find the deployment for PROCESS_COMPUTED_MACRO")
192-
deployment_id = deployments[PROCESS_COMPUTED_MACRO.name].id
200+
if UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name not in deployments:
201+
raise ValueError("Unable to find the deployment for UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM")
193202

194-
# TODO need to pull the existing automation to see if we need to create or update each object
195-
# automations = await client.read_automations()
203+
deployment_id_jinja = deployments[PROCESS_COMPUTED_MACRO.name].id
204+
# deployment_id_python = deployments[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name].id
205+
206+
automations = await client.read_automations()
207+
existing_computed_attr_automations = ComputedAttributeAutomations.from_prefect(automations=automations)
196208

197209
computed_attributes: dict[str, ComputedAttributeTarget] = {}
198210
for item in schema_branch._computed_jinja2_attribute_map.values():
@@ -205,9 +217,11 @@ async def computed_attribute_setup() -> None:
205217

206218
for identifier, computed_attribute in computed_attributes.items():
207219
log.info(f"processing {computed_attribute.key_name}")
220+
scope = "default"
221+
208222
automation = AutomationCore(
209-
name=f"computed-attribute-process-{identifier}",
210-
description=f"Process value of the computed attribute for {identifier}",
223+
name=AUTOMATION_NAME.format(prefix=AUTOMATION_NAME_PREFIX, identifier=identifier, scope=scope),
224+
description=f"Process value of the computed attribute for {identifier} [{scope}]",
211225
enabled=True,
212226
trigger=EventTrigger(
213227
posture=Posture.Reactive,
@@ -219,7 +233,7 @@ async def computed_attribute_setup() -> None:
219233
actions=[
220234
RunDeployment(
221235
source="selected",
222-
deployment_id=deployment_id,
236+
deployment_id=deployment_id_jinja,
223237
parameters={
224238
"branch_name": "{{ event.resource['infrahub.branch.name'] }}",
225239
"node_kind": "{{ event.resource['infrahub.node.kind'] }}",
@@ -230,5 +244,10 @@ async def computed_attribute_setup() -> None:
230244
],
231245
)
232246

233-
response = await client.create_automation(automation=automation)
234-
log.info(f"Processed: {computed_attribute.key_name} : {response}")
247+
if existing_computed_attr_automations.has(identifier=identifier, scope=scope):
248+
existing = existing_computed_attr_automations.get(identifier=identifier, scope=scope)
249+
await client.update_automation(automation_id=existing.id, automation=automation)
250+
log.info(f"{computed_attribute.key_name} Updated")
251+
else:
252+
await client.create_automation(automation=automation)
253+
log.info(f"{computed_attribute.key_name} Created")

backend/tests/unit/computed_attribute/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import uuid
2+
from datetime import timedelta
3+
4+
from prefect.events.schemas.automations import Automation, EventTrigger, Posture
5+
6+
from infrahub.computed_attribute.constants import AUTOMATION_NAME
7+
from infrahub.computed_attribute.models import ComputedAttributeAutomations
8+
9+
10+
def generate_automation(
11+
name: str, description: str = "", trigger: EventTrigger | None = None, actions: list | None = None
12+
) -> Automation:
13+
default_trigger = EventTrigger(
14+
posture=Posture.Reactive,
15+
expect={"infrahub.node.*"},
16+
within=timedelta(0),
17+
threshold=1,
18+
)
19+
20+
return Automation(
21+
id=uuid.uuid4(),
22+
name=name,
23+
description=description,
24+
enabled=True,
25+
trigger=trigger or default_trigger,
26+
actions=actions or [],
27+
)
28+
29+
30+
async def test_load_from_prefect():
31+
automations: list[Automation] = [
32+
generate_automation(name=AUTOMATION_NAME.format(identifier="AAAAA", scope="default")),
33+
generate_automation(name=AUTOMATION_NAME.format(identifier="AAAAA", scope="yyyy")),
34+
generate_automation(name=AUTOMATION_NAME.format(identifier="BBBBB", scope="default")),
35+
generate_automation(name="anothername"),
36+
]
37+
38+
obj = ComputedAttributeAutomations.from_prefect(automations=automations)
39+
40+
assert obj.has(identifier="AAAAA", scope="default")
41+
assert obj.has(identifier="AAAAA", scope="yyyy")
42+
assert obj.has(identifier="BBBBB", scope="default")

0 commit comments

Comments
 (0)