|
21 | 21 | from infrahub.workflows.utils import add_branch_tag |
22 | 22 |
|
23 | 23 | from .constants import AUTOMATION_NAME, AUTOMATION_NAME_PREFIX |
24 | | -from .models import ComputedAttributeAutomations |
| 24 | +from .models import ComputedAttributeAutomations, PythonTransformComputedAttribute |
25 | 25 |
|
26 | 26 | if TYPE_CHECKING: |
27 | 27 | from infrahub.core.schema.computed_attribute import ComputedAttribute |
@@ -262,3 +262,109 @@ async def computed_attribute_setup() -> None: |
262 | 262 | else: |
263 | 263 | await client.create_automation(automation=automation) |
264 | 264 | log.info(f"{computed_attribute.key_name} Created") |
| 265 | + |
| 266 | + |
| 267 | +@flow( |
| 268 | + name="computed-attribute-setup-python", |
| 269 | + flow_run_name="Setup computed attributes for Python transforms in task-manager", |
| 270 | +) |
| 271 | +async def computed_attribute_setup_python() -> None: |
| 272 | + service = services.service |
| 273 | + schema_branch = registry.schema.get_schema_branch(name=registry.default_branch) |
| 274 | + log = get_run_logger() |
| 275 | + |
| 276 | + transform_attributes = schema_branch.computed_attributes.python_attributes_by_transform |
| 277 | + |
| 278 | + transform_names = list(transform_attributes.keys()) |
| 279 | + |
| 280 | + transforms = await service.client.filters( |
| 281 | + kind="CoreTransformPython", |
| 282 | + branch=registry.default_branch, |
| 283 | + prefetch_relationships=True, |
| 284 | + populate_store=True, |
| 285 | + name__values=transform_names, |
| 286 | + ) |
| 287 | + |
| 288 | + found_transforms_names = [transform.name.value for transform in transforms] |
| 289 | + for transform_name in transform_names: |
| 290 | + if transform_name not in found_transforms_names: |
| 291 | + log.warning( |
| 292 | + msg=f"The transform {transform_name} is assigned to a computed attribute but the transform could not be found in the database." |
| 293 | + ) |
| 294 | + |
| 295 | + computed_attributes: list[PythonTransformComputedAttribute] = [] |
| 296 | + for transform in transforms: |
| 297 | + for attribute in transform_attributes[transform.name.value]: |
| 298 | + computed_attributes.append( |
| 299 | + PythonTransformComputedAttribute( |
| 300 | + name=transform.name.value, |
| 301 | + repository_id=transform.repository.peer.id, |
| 302 | + repository_name=transform.repository.peer.name.value, |
| 303 | + repository_kind=transform.repository.peer.typename, |
| 304 | + query_name=transform.query.peer.name.value, |
| 305 | + query_models=transform.query.peer.models.value, |
| 306 | + computed_attribute=attribute, |
| 307 | + ) |
| 308 | + ) |
| 309 | + |
| 310 | + async with get_client(sync_client=False) as client: |
| 311 | + deployments = { |
| 312 | + item.name: item |
| 313 | + for item in await client.read_deployments( |
| 314 | + deployment_filter=DeploymentFilter( |
| 315 | + name=DeploymentFilterName(any_=[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name]) |
| 316 | + ) |
| 317 | + ) |
| 318 | + } |
| 319 | + if UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name not in deployments: |
| 320 | + raise ValueError("Unable to find the deployment for UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM") |
| 321 | + |
| 322 | + deployment_id_python = deployments[UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM.name].id |
| 323 | + |
| 324 | + automations = await client.read_automations() |
| 325 | + existing_computed_attr_automations = ComputedAttributeAutomations.from_prefect(automations=automations) |
| 326 | + |
| 327 | + for computed_attribute in computed_attributes: |
| 328 | + log.info(f"processing {computed_attribute.computed_attribute.key_name}") |
| 329 | + scope = "default" |
| 330 | + |
| 331 | + automation = AutomationCore( |
| 332 | + name=AUTOMATION_NAME.format( |
| 333 | + prefix=AUTOMATION_NAME_PREFIX, |
| 334 | + identifier=computed_attribute.computed_attribute.key_name, |
| 335 | + scope=scope, |
| 336 | + ), |
| 337 | + description=f"Process value of the computed attribute for {computed_attribute.computed_attribute.key_name} [{scope}]", |
| 338 | + enabled=True, |
| 339 | + trigger=EventTrigger( |
| 340 | + posture=Posture.Reactive, |
| 341 | + expect={"infrahub.node.*"}, |
| 342 | + within=timedelta(0), |
| 343 | + match=ResourceSpecification({"infrahub.node.kind": [computed_attribute.computed_attribute.kind]}), |
| 344 | + threshold=1, |
| 345 | + ), |
| 346 | + actions=[ |
| 347 | + RunDeployment( |
| 348 | + source="selected", |
| 349 | + deployment_id=deployment_id_python, |
| 350 | + parameters={ |
| 351 | + "branch_name": "{{ event.resource['infrahub.branch.name'] }}", |
| 352 | + "node_kind": "{{ event.resource['infrahub.node.kind'] }}", |
| 353 | + "object_id": "{{ event.resource['infrahub.node.id'] }}", |
| 354 | + }, |
| 355 | + job_variables={}, |
| 356 | + ) |
| 357 | + ], |
| 358 | + ) |
| 359 | + |
| 360 | + if existing_computed_attr_automations.has( |
| 361 | + identifier=computed_attribute.computed_attribute.key_name, scope=scope |
| 362 | + ): |
| 363 | + existing = existing_computed_attr_automations.get( |
| 364 | + identifier=computed_attribute.computed_attribute.key_name, scope=scope |
| 365 | + ) |
| 366 | + await client.update_automation(automation_id=existing.id, automation=automation) |
| 367 | + log.info(f"{computed_attribute.computed_attribute.key_name} Updated") |
| 368 | + else: |
| 369 | + await client.create_automation(automation=automation) |
| 370 | + log.info(f"{computed_attribute.computed_attribute.key_name} Created") |
0 commit comments