Skip to content

Commit 96731c0

Browse files
authored
Merge pull request #7433 from opsmill/pog-refactor-prefect-setup
Refactor recurring setup jobs for Prefect automations
2 parents 12e420b + d3282f8 commit 96731c0

File tree

7 files changed

+23
-8
lines changed

7 files changed

+23
-8
lines changed

backend/infrahub/core/initialization.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ async def add_indexes(db: InfrahubDatabase) -> None:
137137
await index_manager.add()
138138

139139

140-
async def initialization(db: InfrahubDatabase, add_database_indexes: bool = False) -> None:
140+
async def initialization(db: InfrahubDatabase, add_database_indexes: bool = False) -> bool:
141+
"""Run initialization and setup, returns a boolean to indicate if it's the initial setup."""
141142
if config.SETTINGS.database.db_type == config.DatabaseType.MEMGRAPH:
142143
session = await db.session()
143144
await session.run(query="SET DATABASE SETTING 'log.level' TO 'INFO'")
@@ -148,6 +149,7 @@ async def initialization(db: InfrahubDatabase, add_database_indexes: bool = Fals
148149
# Initialize the database and Load the Root node
149150
# ---------------------------------------------------
150151
async with lock.registry.initialization():
152+
first_time_initialization = len(await Root.get_list(db=db)) == 0
151153
log.debug("Checking Root Node")
152154
await initialize_registry(db=db, initialize=True)
153155

@@ -210,6 +212,7 @@ async def initialization(db: InfrahubDatabase, add_database_indexes: bool = Fals
210212
ip_namespace = await get_default_ipnamespace(db=db)
211213
if ip_namespace:
212214
registry.default_ipnamespace = ip_namespace.id
215+
return first_time_initialization
213216

214217

215218
async def create_root_node(db: InfrahubDatabase) -> Root:

backend/infrahub/core/schema/schema_branch.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,8 @@ def process_validate(self) -> None:
540540
self.validate_identifiers()
541541
self.sync_uniqueness_constraints_and_unique_attributes()
542542
self.validate_uniqueness_constraints()
543-
self.validate_display_label()
544543
self.validate_display_labels()
544+
self.validate_display_label()
545545
self.validate_order_by()
546546
self.validate_default_filters()
547547
self.validate_parent_component()
@@ -789,6 +789,7 @@ def validate_display_label(self) -> None:
789789
)
790790
self.set(name=name, schema=update_candidate)
791791

792+
node_schema = self.get(name=name, duplicate=False)
792793
if not node_schema.display_label:
793794
continue
794795

backend/infrahub/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,13 @@ async def app_initialization(application: FastAPI, enable_scheduler: bool = True
8484
initialize_lock(service=service)
8585
# We must initialize DB after initialize lock and initialize lock depends on cache initialization
8686
async with application.state.db.start_session() as db:
87-
await initialization(db=db, add_database_indexes=True)
87+
is_initial_setup = await initialization(db=db, add_database_indexes=True)
8888

8989
async with database.start_session() as dbs:
9090
await validate_graph_version(db=dbs)
9191

9292
# Initialize the workflow after the registry has been setup
93-
await service.initialize_workflow()
93+
await service.initialize_workflow(is_initial_setup=is_initial_setup)
9494

9595
application.state.service = service
9696
application.state.response_delay = config.SETTINGS.miscellaneous.response_delay

backend/infrahub/services/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ async def new(
112112

113113
return service
114114

115-
async def initialize_workflow(self) -> None:
115+
async def initialize_workflow(self, is_initial_setup: bool = False) -> None:
116116
if self.workflow is not None and isinstance(self.workflow, WorkflowWorkerExecution):
117117
assert self.component is not None
118118
# Ideally `WorkflowWorkerExecution.initialize` would be directly part of WorkflowWorkerExecution
119119
# constructor but this requires some redesign as it depends on InfrahubComponent which is instantiated
120120
# after workflow instantiation.
121121
await self.component.refresh_heartbeat()
122122
is_primary = await self.component.is_primary_gunicorn_worker()
123-
await self.workflow.initialize(component_is_primary_server=is_primary)
123+
await self.workflow.initialize(component_is_primary_server=is_primary, is_initial_setup=is_initial_setup)
124124

125125
@property
126126
def component(self) -> InfrahubComponent:

backend/infrahub/services/adapters/workflow/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from prefect.deployments import run_deployment
77

88
from infrahub.workers.utils import inject_context_parameter
9-
from infrahub.workflows.initialization import setup_task_manager
9+
from infrahub.workflows.initialization import setup_task_manager, setup_task_manager_identifiers
1010
from infrahub.workflows.models import WorkflowInfo
1111

1212
from . import InfrahubWorkflow, Return
@@ -20,10 +20,13 @@
2020

2121
class WorkflowWorkerExecution(InfrahubWorkflow):
2222
@staticmethod
23-
async def initialize(component_is_primary_server: bool) -> None:
23+
async def initialize(component_is_primary_server: bool, is_initial_setup: bool = False) -> None:
2424
if component_is_primary_server:
2525
await setup_task_manager()
2626

27+
if is_initial_setup:
28+
await setup_task_manager_identifiers()
29+
2730
@overload
2831
async def execute_workflow(
2932
self,

backend/infrahub/trigger/tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
gather_trigger_computed_attribute_python,
88
)
99
from infrahub.display_labels.gather import gather_trigger_display_labels_jinja2
10+
from infrahub.hfid.gather import gather_trigger_hfid
1011
from infrahub.trigger.catalogue import builtin_triggers
1112
from infrahub.webhook.gather import gather_trigger_webhook
1213
from infrahub.workers.dependencies import get_database
@@ -20,6 +21,7 @@ async def trigger_configure_all() -> None:
2021
async with database.start_session() as db:
2122
webhook_trigger = await gather_trigger_webhook(db=db)
2223
display_label_triggers = await gather_trigger_display_labels_jinja2()
24+
human_friendly_id_triggers = await gather_trigger_hfid()
2325
computed_attribute_j2_triggers = await gather_trigger_computed_attribute_jinja2()
2426
(
2527
computed_attribute_python_triggers,
@@ -31,6 +33,7 @@ async def trigger_configure_all() -> None:
3133
+ computed_attribute_python_triggers
3234
+ computed_attribute_python_query_triggers
3335
+ display_label_triggers
36+
+ human_friendly_id_triggers
3437
+ builtin_triggers
3538
+ webhook_trigger
3639
+ action_rules

backend/infrahub/workflows/initialization.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ async def setup_task_manager() -> None:
7676
await setup_triggers(
7777
client=client, triggers=builtin_triggers, trigger_type=TriggerType.BUILTIN, force_update=True
7878
)
79+
80+
81+
@flow(name="task-manager-identifiers", flow_run_name="Setup Task Manager Display Labels and HFID")
82+
async def setup_task_manager_identifiers() -> None:
83+
async with get_client(sync_client=False) as client:
7984
display_label_triggers = await gather_trigger_display_labels_jinja2()
8085
await setup_triggers(
8186
client=client,

0 commit comments

Comments
 (0)