-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathapplication.py
More file actions
92 lines (80 loc) · 3.33 KB
/
application.py
File metadata and controls
92 lines (80 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import dataclasses
import json
import os
import signal
import sys
from collections.abc import Sequence
from types import FrameType
import sentry_sdk
from mozlog import get_proxy_logger
from git_hg_sync import PID_FILEPATH
from git_hg_sync.events import Event, Push
from git_hg_sync.mapping import Mapping, SyncOperation
from git_hg_sync.pulse_worker import PulseWorker
from git_hg_sync.repo_synchronizer import RepoSynchronizer
logger = get_proxy_logger(__name__)
class Application:
def __init__(
self,
worker: PulseWorker,
repo_synchronizers: dict[str, RepoSynchronizer],
mappings: Sequence[Mapping],
) -> None:
self._worker = worker
self._worker.event_handler = self._handle_event
self._repo_synchronizers = repo_synchronizers
self._mappings = mappings
def run(self) -> None:
def signal_handler(_sig: int, _frame: FrameType | None) -> None:
PID_FILEPATH.unlink(missing_ok=True)
if self._worker.should_stop:
logger.info("Process killed by user")
sys.exit(1)
self._worker.should_stop = True
logger.info("Process exiting gracefully")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
PID_FILEPATH.write_text(f"{os.getpid()}\n")
self._worker.run()
def _handle_push_event(self, push_event: Push) -> None:
push_event_str = f"{push_event.push_id} for {push_event.repo_url}"
logger.debug(f"Handling push event: {push_event_str}")
synchronizer = self._repo_synchronizers[push_event.repo_url]
operations_by_destination: dict[str, list[SyncOperation]] = {}
for mapping in self._mappings:
if matches := mapping.match(push_event):
for match in matches:
operations_by_destination.setdefault(
match.destination_url, []
).append(match.operation)
if not operations_by_destination:
logger.warning(f"No operation for push event: {push_event_str}")
return
for destination, operations in operations_by_destination.items():
try:
synchronizer.sync(destination, operations, push_event.user)
except Exception as exc:
sentry_sdk.capture_exception(exc)
error_data = json.dumps(
{
"destination_url": destination,
"operations": [
dataclasses.asdict(operation) for operation in operations
],
}
)
logger.warning(
f"An error prevented completion of the following sync operations from push event {push_event_str}. {error_data}",
exc_info=True,
)
raise exc
logger.info(f"Successfully handled event: {push_event_str}")
def _handle_event(self, event: Event) -> None:
if event.repo_url not in self._repo_synchronizers:
logger.warning(f"Ignoring event for untracked repository: {event.repo_url}")
return
match event:
case Push():
self._handle_push_event(event)
case _:
raise NotImplementedError()