Skip to content

Commit 82d51f5

Browse files
authored
Merge pull request #6475 from opsmill/pog-global-trigger-locks
Add central locking options for updating triggers
2 parents 5c98124 + 3c9ec41 commit 82d51f5

File tree

6 files changed

+64
-51
lines changed

6 files changed

+64
-51
lines changed

backend/infrahub/actions/tasks.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
from infrahub_sdk.graphql import Mutation
44
from prefect import flow
5-
from prefect.client.orchestration import get_client
65

7-
from infrahub import lock
86
from infrahub.context import InfrahubContext # noqa: TC001 needed for prefect flow
97
from infrahub.services import InfrahubServices # noqa: TC001 needed for prefect flow
108
from infrahub.trigger.models import TriggerType
11-
from infrahub.trigger.setup import setup_triggers
9+
from infrahub.trigger.setup import setup_triggers_specific
1210
from infrahub.workflows.utils import add_tags
1311

1412
from .gather import gather_trigger_action_rules
@@ -101,14 +99,9 @@ async def run_generator_group_event(
10199
async def configure_action_rules(
102100
service: InfrahubServices,
103101
) -> None:
104-
async with lock.registry.get(name="configure-action-rules", namespace="trigger-rules", local=False):
105-
triggers = await gather_trigger_action_rules(db=service.database)
106-
async with get_client(sync_client=False) as prefect_client:
107-
await setup_triggers(
108-
client=prefect_client,
109-
triggers=triggers,
110-
trigger_type=TriggerType.ACTION_TRIGGER_RULE,
111-
) # type: ignore[misc]
102+
await setup_triggers_specific(
103+
gatherer=gather_trigger_action_rules, trigger_type=TriggerType.ACTION_TRIGGER_RULE, db=service.database
104+
) # type: ignore[misc]
112105

113106

114107
async def _run_generator(

backend/infrahub/computed_attribute/gather.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ async def gather_python_transform_attributes(
100100
name="gather-trigger-computed-attribute-jinja2",
101101
cache_policy=NONE,
102102
)
103-
async def gather_trigger_computed_attribute_jinja2() -> list[ComputedAttrJinja2TriggerDefinition]:
103+
async def gather_trigger_computed_attribute_jinja2(
104+
db: InfrahubDatabase | None = None, # noqa: ARG001 Needed to have a common function signature for gathering functions
105+
) -> list[ComputedAttrJinja2TriggerDefinition]:
104106
log = get_run_logger()
105107

106108
# Build a list of all branches to process based on which branch is different from main

backend/infrahub/computed_attribute/tasks.py

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from infrahub.git.repository import get_initialized_repo
1616
from infrahub.services import InfrahubServices # noqa: TC001 needed for prefect flow
1717
from infrahub.trigger.models import TriggerSetupReport, TriggerType
18-
from infrahub.trigger.setup import setup_triggers
18+
from infrahub.trigger.setup import setup_triggers, setup_triggers_specific
1919
from infrahub.workflows.catalogue import (
2020
COMPUTED_ATTRIBUTE_PROCESS_JINJA2,
2121
COMPUTED_ATTRIBUTE_PROCESS_TRANSFORM,
@@ -304,36 +304,30 @@ async def computed_attribute_setup_jinja2(
304304
await add_tags(branches=[branch_name])
305305
await wait_for_schema_to_converge(branch_name=branch_name, component=service.component, db=db, log=log)
306306

307-
triggers = await gather_trigger_computed_attribute_jinja2()
308-
307+
report: TriggerSetupReport = await setup_triggers_specific(
308+
gatherer=gather_trigger_computed_attribute_jinja2, trigger_type=TriggerType.COMPUTED_ATTR_JINJA2
309+
) # type: ignore[misc]
309310
# Configure all ComputedAttrJinja2Trigger in Prefect
310-
async with get_client(sync_client=False) as prefect_client:
311-
report: TriggerSetupReport = await setup_triggers(
312-
client=prefect_client,
313-
triggers=triggers,
314-
trigger_type=TriggerType.COMPUTED_ATTR_JINJA2,
315-
force_update=False,
316-
) # type: ignore[misc]
317311

318-
# Since we can have multiple trigger per NodeKind
319-
# we need to extract the list of unique node that should be processed
320-
unique_nodes: set[tuple[str, str, str]] = {
321-
(trigger.branch, trigger.computed_attribute.kind, trigger.computed_attribute.attribute.name) # type: ignore[attr-defined]
322-
for trigger in report.updated + report.created
323-
}
324-
for branch, kind, attribute_name in unique_nodes:
325-
if event_name != BranchDeletedEvent.event_name and branch == branch_name:
326-
await service.workflow.submit_workflow(
327-
workflow=TRIGGER_UPDATE_JINJA_COMPUTED_ATTRIBUTES,
328-
context=context,
329-
parameters={
330-
"branch_name": branch,
331-
"computed_attribute_name": attribute_name,
332-
"computed_attribute_kind": kind,
333-
},
334-
)
335-
336-
log.info(f"{len(triggers)} Computed Attribute for Jinja2 automation configuration completed")
312+
# Since we can have multiple trigger per NodeKind
313+
# we need to extract the list of unique node that should be processed
314+
unique_nodes: set[tuple[str, str, str]] = {
315+
(trigger.branch, trigger.computed_attribute.kind, trigger.computed_attribute.attribute.name) # type: ignore[attr-defined]
316+
for trigger in report.updated + report.created
317+
}
318+
for branch, kind, attribute_name in unique_nodes:
319+
if event_name != BranchDeletedEvent.event_name and branch == branch_name:
320+
await service.workflow.submit_workflow(
321+
workflow=TRIGGER_UPDATE_JINJA_COMPUTED_ATTRIBUTES,
322+
context=context,
323+
parameters={
324+
"branch_name": branch,
325+
"computed_attribute_name": attribute_name,
326+
"computed_attribute_kind": kind,
327+
},
328+
)
329+
330+
log.info(f"{report.in_use_count} Computed Attribute for Jinja2 automation configuration completed")
337331

338332

339333
@flow(

backend/infrahub/trigger/models.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class TriggerSetupReport(BaseModel):
2828
deleted: list[Automation] = Field(default_factory=list)
2929
unchanged: list[TriggerDefinition] = Field(default_factory=list)
3030

31+
@property
32+
def in_use_count(self) -> int:
33+
return len(self.created + self.updated + self.unchanged)
34+
3135

3236
class TriggerType(str, Enum):
3337
ACTION_TRIGGER_RULE = "action_trigger_rule"

backend/infrahub/trigger/setup.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
from typing import TYPE_CHECKING
1+
from typing import TYPE_CHECKING, Awaitable, Callable
22

33
from prefect import get_run_logger, task
44
from prefect.automations import AutomationCore
55
from prefect.cache_policies import NONE
6-
from prefect.client.orchestration import PrefectClient
6+
from prefect.client.orchestration import PrefectClient, get_client
77
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
88
from prefect.events.schemas.automations import Automation
99

10+
from infrahub import lock
11+
from infrahub.database import InfrahubDatabase
1012
from infrahub.trigger.models import TriggerDefinition
1113

1214
from .models import TriggerSetupReport, TriggerType
@@ -27,6 +29,28 @@ def compare_automations(target: AutomationCore, existing: Automation) -> bool:
2729
return target_dump == existing_dump
2830

2931

32+
@task(name="trigger-setup-specific", task_run_name="Setup triggers of a specific kind", cache_policy=NONE) # type: ignore[arg-type]
33+
async def setup_triggers_specific(
34+
gatherer: Callable[[InfrahubDatabase | None], Awaitable[list[TriggerDefinition]]],
35+
trigger_type: TriggerType,
36+
db: InfrahubDatabase | None = None,
37+
) -> TriggerSetupReport:
38+
async with lock.registry.get(
39+
name=f"configure-action-rules-{trigger_type.value}", namespace="trigger-rules", local=False
40+
):
41+
if db:
42+
async with db.start_session(read_only=True) as dbs:
43+
triggers = await gatherer(dbs)
44+
else:
45+
triggers = await gatherer(db)
46+
async with get_client(sync_client=False) as prefect_client:
47+
return await setup_triggers(
48+
client=prefect_client,
49+
triggers=triggers,
50+
trigger_type=trigger_type,
51+
) # type: ignore[misc]
52+
53+
3054
@task(name="trigger-setup", task_run_name="Setup triggers", cache_policy=NONE) # type: ignore[arg-type]
3155
async def setup_triggers(
3256
client: PrefectClient,

backend/infrahub/webhook/tasks.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from infrahub.message_bus.types import KVTTL
1515
from infrahub.services import InfrahubServices # noqa: TC001 needed for prefect flow
1616
from infrahub.trigger.models import TriggerType
17-
from infrahub.trigger.setup import setup_triggers
17+
from infrahub.trigger.setup import setup_triggers_specific
1818
from infrahub.workflows.utils import add_tags
1919

2020
from .gather import gather_trigger_webhook
@@ -114,14 +114,10 @@ async def configure_webhook_all(service: InfrahubServices) -> None:
114114
async with service.database.start_session(read_only=True) as db:
115115
triggers = await gather_trigger_webhook(db=db)
116116

117-
async with get_client(sync_client=False) as prefect_client:
118-
await setup_triggers(
119-
client=prefect_client,
120-
triggers=triggers,
121-
trigger_type=TriggerType.WEBHOOK,
122-
) # type: ignore[misc]
123-
124117
log.info(f"{len(triggers)} Webhooks automation configuration completed")
118+
await setup_triggers_specific(
119+
gatherer=gather_trigger_webhook, db=service.database, trigger_type=TriggerType.WEBHOOK
120+
) # type: ignore[misc]
125121

126122

127123
@flow(name="webhook-setup-automation-one", flow_run_name="Configurate webhook for {webhook_name}")

0 commit comments

Comments
 (0)