Skip to content

Commit 37f378c

Browse files
authored
IFC-873 Remove shared storage requirements for git workers (#4867)
1 parent c5e1456 commit 37f378c

File tree

16 files changed

+286
-54
lines changed

16 files changed

+286
-54
lines changed

backend/infrahub/git/base.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def log(self) -> InfrahubTaskReportLogger:
183183
raise InitializationError("The repository has not been initialized with a TaskReport")
184184

185185
@property
186-
def directory_root(self) -> str:
186+
def legacy_directory_root(self) -> str:
187187
"""Return the path to the root directory for this repository."""
188188
current_dir = os.getcwd()
189189
repositories_directory = config.SETTINGS.git.repositories_directory
@@ -192,6 +192,16 @@ def directory_root(self) -> str:
192192

193193
return os.path.join(repositories_directory, self.name)
194194

195+
@property
196+
def directory_root(self) -> str:
197+
"""Return the path to the root directory for this repository."""
198+
current_dir = os.getcwd()
199+
repositories_directory = config.SETTINGS.git.repositories_directory
200+
if not os.path.isabs(repositories_directory):
201+
repositories_directory = os.path.join(current_dir, config.SETTINGS.git.repositories_directory)
202+
203+
return os.path.join(repositories_directory, str(self.id))
204+
195205
@property
196206
def directory_default(self) -> str:
197207
"""Return the path to the directory of the main branch."""
@@ -242,6 +252,36 @@ def get_git_repo_worktree(self, identifier: str) -> Repo:
242252
message=f"Unable to find the worktree {identifier}.",
243253
)
244254

255+
def relocate_directory_root(self) -> None:
256+
"""Move an old repository directory based on its name to a directory based on its ID.
257+
258+
This method will also take care of removing the legacy directory if:
259+
1. The regular directory exists
260+
2. The regular directory does not exist but will be created after renaming the legacy one
261+
"""
262+
legacy = Path(self.legacy_directory_root)
263+
current = Path(self.directory_root)
264+
265+
if not legacy.exists():
266+
return
267+
268+
if not legacy.is_dir():
269+
log.error("A file named after the repository should not exist", repository=self.name)
270+
return
271+
272+
if current.is_dir():
273+
log.warning(
274+
f"Found legacy directory at {self.legacy_directory_root} but {self.directory_root} exists, deleting legacy directory",
275+
repository=self.name,
276+
)
277+
shutil.rmtree(self.legacy_directory_root)
278+
else:
279+
log.warning(
280+
f"Found legacy directory at {self.legacy_directory_root}, moving it to {self.directory_root}",
281+
repository=self.name,
282+
)
283+
legacy.rename(self.directory_root)
284+
245285
def validate_local_directories(self) -> bool:
246286
"""Check if the local directories structure to ensure that the repository has been properly initialized.
247287
@@ -563,6 +603,8 @@ async def fetch(self) -> bool:
563603

564604
log.debug("Fetching the latest updates from remote origin.", repository=self.name)
565605

606+
self.relocate_directory_root()
607+
566608
repo = self.get_git_repo_main()
567609
try:
568610
repo.remotes.origin.fetch()

backend/infrahub/git/repository.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ async def init(cls, service: Optional[InfrahubServices] = None, **kwargs: Any) -
3030
log.debug("Initiated the object on an existing directory.", repository=self.name)
3131
return self
3232

33+
@classmethod
34+
async def new(cls, service: Optional[InfrahubServices] = None, **kwargs: Any) -> InfrahubRepository:
35+
service = service or InfrahubServices()
36+
self = cls(service=service, **kwargs)
37+
await self.create_locally(infrahub_branch_name=self.infrahub_branch_name)
38+
log.info("Created new repository locally.", repository=self.name)
39+
return self
40+
3341
def get_commit_value(self, branch_name: str, remote: bool = False) -> str:
3442
branches = {}
3543
if remote:
@@ -230,14 +238,6 @@ async def rebase(self, branch_name: str, source_branch: str = "main", push_remot
230238

231239
return response
232240

233-
@classmethod
234-
async def new(cls, service: Optional[InfrahubServices] = None, **kwargs: Any) -> InfrahubRepository:
235-
service = service or InfrahubServices()
236-
self = cls(service=service, **kwargs)
237-
await self.create_locally(infrahub_branch_name=self.infrahub_branch_name)
238-
log.info("Created the new project locally.", repository=self.name)
239-
return self
240-
241241

242242
class InfrahubReadOnlyRepository(InfrahubRepositoryIntegrator):
243243
"""
@@ -264,7 +264,7 @@ async def new(cls, service: Optional[InfrahubServices] = None, **kwargs: Any) ->
264264

265265
self = cls(service=service, **kwargs)
266266
await self.create_locally(checkout_ref=self.ref, infrahub_branch_name=self.infrahub_branch_name)
267-
log.info("Created the new project locally.", repository=self.name)
267+
log.info("Created new repository locally.", repository=self.name)
268268
return self
269269

270270
def get_commit_value(self, branch_name: str, remote: bool = False) -> str:
@@ -309,3 +309,19 @@ async def get_initialized_repo(
309309
)
310310

311311
raise NotImplementedError(f"The repository kind {repository_kind} has not been implemented")
312+
313+
314+
async def initialize_repo(
315+
location: str, repository_id: str, name: str, service: InfrahubServices, repository_kind: str
316+
) -> Union[InfrahubReadOnlyRepository, InfrahubRepository]:
317+
if repository_kind == InfrahubKind.REPOSITORY:
318+
return await InfrahubRepository.new(
319+
location=location, id=repository_id, name=name, client=service._client, service=service
320+
)
321+
322+
if repository_kind == InfrahubKind.READONLYREPOSITORY:
323+
return await InfrahubReadOnlyRepository.new(
324+
location=location, id=repository_id, name=name, client=service._client, service=service
325+
)
326+
327+
raise NotImplementedError(f"The repository kind {repository_kind} has not been implemented")

backend/infrahub/git/tasks.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
from infrahub.core.protocols import CoreRepository
77
from infrahub.core.registry import registry
88
from infrahub.exceptions import RepositoryError
9+
from infrahub.message_bus import Meta, messages
910
from infrahub.services import services
11+
from infrahub.worker import WORKER_IDENTITY
1012

11-
from ..log import get_logger
13+
from ..log import get_log_data, get_logger
1214
from ..tasks.artifact import define_artifact
1315
from ..workflows.catalogue import REQUEST_ARTIFACT_DEFINITION_GENERATE, REQUEST_ARTIFACT_GENERATE
1416
from ..workflows.utils import add_branch_tag
@@ -54,6 +56,17 @@ async def add_git_repository(model: GitRepositoryAdd) -> None:
5456
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
5557
await repo.sync()
5658

59+
# Notify other workers they need to clone the repository
60+
notification = messages.RefreshGitFetch(
61+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
62+
location=model.location,
63+
repository_id=model.repository_id,
64+
repository_name=model.repository_name,
65+
repository_kind=InfrahubKind.REPOSITORY,
66+
infrahub_branch_name=model.infrahub_branch_name,
67+
)
68+
await service.send(message=notification)
69+
5770

5871
@flow(
5972
name="git-repository-add-read-only",
@@ -81,6 +94,17 @@ async def add_git_repository_read_only(model: GitRepositoryAddReadOnly) -> None:
8194
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
8295
await repo.sync_from_remote()
8396

97+
# Notify other workers they need to clone the repository
98+
notification = messages.RefreshGitFetch(
99+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
100+
location=model.location,
101+
repository_id=model.repository_id,
102+
repository_name=model.repository_name,
103+
repository_kind=InfrahubKind.REPOSITORY,
104+
infrahub_branch_name=model.infrahub_branch_name,
105+
)
106+
await service.send(message=notification)
107+
84108

85109
@flow(name="git_repositories_create_branch")
86110
async def create_branch(branch: str, branch_id: str) -> None:
@@ -166,6 +190,16 @@ async def sync_remote_repositories() -> None:
166190

167191
try:
168192
await repo.sync(staging_branch=staging_branch)
193+
# Tell workers to fetch to stay in sync
194+
message = messages.RefreshGitFetch(
195+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
196+
location=repository_data.repository.location.value,
197+
repository_id=repository_data.repository.id,
198+
repository_name=repository_data.repository.name.value,
199+
repository_kind=repository_data.repository.get_kind(),
200+
infrahub_branch_name=infrahub_branch,
201+
)
202+
await service.send(message=message)
169203
except RepositoryError as exc:
170204
error = exc
171205

@@ -178,9 +212,22 @@ async def sync_remote_repositories() -> None:
178212
async def git_branch_create(
179213
client: InfrahubClient, branch: str, branch_id: str, repository_id: str, repository_name: str
180214
) -> None:
215+
service = services.service
216+
181217
repo = await InfrahubRepository.init(id=repository_id, name=repository_name, client=client)
182218
async with lock.registry.get(name=repository_name, namespace="repository"):
183219
await repo.create_branch_in_git(branch_name=branch, branch_id=branch_id)
220+
if repo.location:
221+
# New branch has been pushed remotely, tell workers to fetch it
222+
message = messages.RefreshGitFetch(
223+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
224+
location=repo.location,
225+
repository_id=str(repo.id),
226+
repository_name=repo.name,
227+
repository_kind=InfrahubKind.REPOSITORY,
228+
infrahub_branch_name=branch,
229+
)
230+
await service.send(message=message)
184231

185232

186233
@flow(name="artifact-definition-generate")
@@ -340,6 +387,17 @@ async def pull_read_only(model: GitRepositoryPullReadOnly) -> None:
340387
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
341388
await repo.sync_from_remote(commit=model.commit)
342389

390+
# Tell workers to fetch to stay in sync
391+
message = messages.RefreshGitFetch(
392+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
393+
location=model.location,
394+
repository_id=model.repository_id,
395+
repository_name=model.repository_name,
396+
repository_kind=InfrahubKind.READONLYREPOSITORY,
397+
infrahub_branch_name=model.infrahub_branch_name,
398+
)
399+
await service.send(message=message)
400+
343401

344402
@flow(name="git-repository-merge")
345403
async def merge_git_repository(model: GitRepositoryMerge) -> None:
@@ -371,7 +429,17 @@ async def merge_git_repository(model: GitRepositoryMerge) -> None:
371429
repo_main.commit.value = commit
372430

373431
await repo_main.save()
374-
375432
else:
376433
async with lock.registry.get(name=model.repository_name, namespace="repository"):
377434
await repo.merge(source_branch=model.source_branch, dest_branch=model.destination_branch)
435+
if repo.location:
436+
# Destination branch has changed and pushed remotely, tell workers to re-fetch
437+
message = messages.RefreshGitFetch(
438+
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=get_log_data().get("request_id", "")),
439+
location=repo.location,
440+
repository_id=str(repo.id),
441+
repository_name=repo.name,
442+
repository_kind=InfrahubKind.REPOSITORY,
443+
infrahub_branch_name=model.destination_branch,
444+
)
445+
await service.send(message=message)

backend/infrahub/message_bus/messages/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from .proposed_change.request_proposedchange_rungenerators import RequestProposedChangeRunGenerators
2323
from .proposed_change.request_proposedchange_runtests import RequestProposedChangeRunTests
2424
from .proposed_change.request_proposedchange_schemaintegrity import RequestProposedChangeSchemaIntegrity
25+
from .refresh_git_fetch import RefreshGitFetch
2526
from .refresh_registry_branches import RefreshRegistryBranches
2627
from .refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch
2728
from .refresh_webhook_configuration import RefreshWebhookConfiguration
@@ -55,6 +56,7 @@
5556
"git.repository.import_objects": GitRepositoryImportObjects,
5657
"schema.migration.path": SchemaMigrationPath,
5758
"schema.validator.path": SchemaValidatorPath,
59+
"refresh.git.fetch": RefreshGitFetch,
5860
"refresh.registry.branches": RefreshRegistryBranches,
5961
"refresh.registry.rebased_branch": RefreshRegistryRebasedBranch,
6062
"refresh.webhook.configuration": RefreshWebhookConfiguration,
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from pydantic import Field
2+
3+
from infrahub.message_bus import InfrahubMessage
4+
5+
6+
class RefreshGitFetch(InfrahubMessage):
7+
"""Fetch a repository remote changes."""
8+
9+
location: str = Field(..., description="The external URL of the repository")
10+
repository_id: str = Field(..., description="The unique ID of the repository")
11+
repository_name: str = Field(..., description="The name of the repository")
12+
repository_kind: str = Field(..., description="The type of repository")
13+
infrahub_branch_name: str = Field(..., description="Infrahub branch on which to sync the remote repository")

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"git.file.get": git.file.get,
3737
"git.repository.connectivity": git.repository.connectivity,
3838
"git.repository.import_objects": git.repository.import_objects,
39+
"refresh.git.fetch": git.repository.fetch,
3940
"refresh.registry.branches": refresh.registry.branches,
4041
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
4142
"refresh.webhook.configuration": refresh.webhook.configuration,

backend/infrahub/message_bus/operations/git/repository.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from prefect import flow
22

33
from infrahub.exceptions import RepositoryError
4-
from infrahub.git.repository import InfrahubRepository, get_initialized_repo
4+
from infrahub.git.repository import InfrahubRepository, get_initialized_repo, initialize_repo
55
from infrahub.log import get_logger
66
from infrahub.message_bus import messages
77
from infrahub.message_bus.messages.git_repository_connectivity import (
88
GitRepositoryConnectivityResponse,
99
GitRepositoryConnectivityResponseData,
1010
)
1111
from infrahub.services import InfrahubServices
12+
from infrahub.worker import WORKER_IDENTITY
1213

1314
log = get_logger()
1415

@@ -44,3 +45,29 @@ async def import_objects(message: messages.GitRepositoryImportObjects, service:
4445
)
4546
repo.task_report = git_report
4647
await repo.import_objects_from_files(infrahub_branch_name=message.infrahub_branch_name, commit=message.commit)
48+
49+
50+
@flow(name="refresh-git-fetch", flow_run_name="Fetch git repository {message.repository_name} on " + WORKER_IDENTITY)
51+
async def fetch(message: messages.RefreshGitFetch, service: InfrahubServices) -> None:
52+
if message.meta and message.meta.initiator_id == WORKER_IDENTITY:
53+
log.info("Ignoring git fetch request originating from self", worker=WORKER_IDENTITY)
54+
return
55+
56+
try:
57+
repo = await get_initialized_repo(
58+
repository_id=message.repository_id,
59+
name=message.repository_name,
60+
service=service,
61+
repository_kind=message.repository_kind,
62+
)
63+
except RepositoryError:
64+
repo = await initialize_repo(
65+
location=message.location,
66+
repository_id=message.repository_id,
67+
name=message.repository_name,
68+
service=service,
69+
repository_kind=message.repository_kind,
70+
)
71+
72+
await repo.fetch()
73+
await repo.pull(branch_name=message.infrahub_branch_name)

backend/infrahub/services/adapters/message_bus/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class InfrahubMessageBus:
2525
"trigger.*.*",
2626
]
2727
event_bindings: list[str] = ["refresh.registry.*"]
28+
broadcasted_event_bindings: list[str] = ["refresh.git.*"]
2829

2930
async def initialize(self, service: InfrahubServices) -> None:
3031
"""Initialize the Message bus"""

backend/infrahub/services/adapters/message_bus/nats.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ async def _initialize_api_server(self) -> None:
167167
self.message_enrichers.append(_add_request_id)
168168

169169
async def _initialize_git_worker(self) -> None:
170-
await self._subscribe_events(self.event_bindings, f"git-worker-{WORKER_IDENTITY}")
170+
bindings = self.event_bindings + self.broadcasted_event_bindings
171+
await self._subscribe_events(bindings, f"git-worker-{WORKER_IDENTITY}")
171172

172173
consumer_config = nats.js.api.ConsumerConfig(
173174
ack_policy=nats.js.api.AckPolicy.EXPLICIT,
@@ -177,7 +178,7 @@ async def _initialize_git_worker(self) -> None:
177178
# max_ack_pending=self.settings.maximum_concurrent_messages,
178179
# flow_control=True,
179180
# idle_heartbeat=5.0, # default value
180-
filter_subjects=self.worker_bindings,
181+
filter_subjects=bindings,
181182
durable_name="git-workers",
182183
deliver_group="git-workers",
183184
deliver_subject=self.connection.new_inbox(),
@@ -189,7 +190,7 @@ async def _initialize_git_worker(self) -> None:
189190
if exc.err_code != 10013: # consumer name already in use
190191
raise
191192

192-
for subject in self.worker_bindings:
193+
for subject in bindings:
193194
await self.jetstream.subscribe(
194195
subject=subject,
195196
queue="git-workers",

backend/infrahub/services/adapters/message_bus/rabbitmq.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,14 @@ async def _initialize_api_server(self) -> None:
173173
self.message_enrichers.append(_add_request_id)
174174

175175
async def _initialize_git_worker(self) -> None:
176+
bindings = self.event_bindings + self.broadcasted_event_bindings
176177
events_queue = await self.channel.declare_queue(name=f"worker-events-{WORKER_IDENTITY}", exclusive=True)
177178

178179
self.exchange = await self.channel.declare_exchange(
179180
f"{self.settings.namespace}.events", type="topic", durable=True
180181
)
181182

182-
for routing_key in self.event_bindings:
183+
for routing_key in bindings:
183184
await events_queue.bind(self.exchange, routing_key=routing_key)
184185
self.delayed_exchange = await self.channel.get_exchange(name=f"{self.settings.namespace}.delayed")
185186

0 commit comments

Comments
 (0)