Skip to content

Commit 9c13862

Browse files
committed
workers: create abstract Worker class, use asyncio (bug 1744327)
This is a work in progress, do not land.
1 parent 14ef83b commit 9c13862

File tree

5 files changed

+225
-191
lines changed

5 files changed

+225
-191
lines changed

landoapi/cli.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# This Source Code Form is subject to the terms of the Mozilla Public
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
import asyncio
45
import logging
56
import os
67
import sys
@@ -81,10 +82,10 @@ def landing_worker():
8182
for system in get_subsystems(exclude=exclusions):
8283
system.ensure_ready()
8384

84-
from landoapi.landing_worker import LandingWorker
85+
from landoapi.workers.landing_worker import LandingWorker
8586

8687
worker = LandingWorker()
87-
worker.start()
88+
asyncio.run(worker.start())
8889

8990

9091
@cli.command(name="revision-worker")
@@ -95,10 +96,10 @@ def revision_worker():
9596
for system in get_subsystems(exclude=exclusions):
9697
system.ensure_ready()
9798

98-
from landoapi.revision_worker import RevisionWorker
99+
from landoapi.workers.revision_worker import RevisionWorker
99100

100101
worker = RevisionWorker()
101-
worker.start()
102+
asyncio.run(worker.start())
102103

103104

104105
@cli.command(name="run-pre-deploy-sequence")

landoapi/workers/__init__.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
"""This module contains the abstract repo worker implementation."""
5+
6+
import logging
7+
import os
8+
import subprocess
9+
import re
10+
import asyncio
11+
import signal
12+
from flask import current_app
13+
from landoapi.repos import repo_clone_subsystem
14+
from landoapi.treestatus import treestatus_subsystem
15+
from landoapi.models.configuration import ConfigurationVariable
16+
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class Worker:
22+
def __init__(self, pause_key, sleep_seconds=5, with_ssh=True, with_aws=True):
23+
SSH_PRIVATE_KEY_ENV_KEY = "SSH_PRIVATE_KEY"
24+
25+
# ConfigurationVariable key used to control the paused state of this worker.
26+
self.PAUSE_KEY = pause_key
27+
self.sleep_seconds = sleep_seconds
28+
29+
if with_aws:
30+
config_keys = ["AWS_SECRET_KEY", "AWS_ACCESS_KEY", "PATCH_BUCKET_NAME"]
31+
self.config = {k: current_app.config[k] for k in config_keys}
32+
33+
# The list of all repos that are enabled for this worker
34+
self.applicable_repos = (
35+
list(repo_clone_subsystem.repos)
36+
if hasattr(repo_clone_subsystem, "repos")
37+
else []
38+
)
39+
40+
# The list of all repos that have open trees; refreshed when needed via
41+
# `self.refresh_enabled_repos`.
42+
self.enabled_repos = []
43+
44+
# This is True when a worker active, and False when it is shut down
45+
self.running = False
46+
47+
# This is True when the worker is busy processing a job
48+
self.busy = False
49+
50+
if with_ssh:
51+
# Fetch ssh private key from the environment. Note that this key should be
52+
# stored in standard format including all new lines and new line at the end
53+
# of the file.
54+
self.ssh_private_key = os.environ.get(SSH_PRIVATE_KEY_ENV_KEY)
55+
if not self.ssh_private_key:
56+
logger.warning(f"No {SSH_PRIVATE_KEY_ENV_KEY} present in environment.")
57+
58+
@staticmethod
59+
def _setup_ssh(ssh_private_key):
60+
"""Add a given private ssh key to ssh agent.
61+
62+
SSH keys are needed in order to push to repositories that have an ssh
63+
push path.
64+
65+
The private key should be passed as it is in the key file, including all
66+
new line characters and the new line character at the end.
67+
68+
Args:
69+
ssh_private_key (str): A string representing the private SSH key file.
70+
"""
71+
# Set all the correct environment variables
72+
agent_process = subprocess.run(
73+
["ssh-agent", "-s"], capture_output=True, universal_newlines=True
74+
)
75+
76+
# This pattern will match keys and values, and ignore everything after the
77+
# semicolon. For example, the output of `agent_process` is of the form:
78+
# SSH_AUTH_SOCK=/tmp/ssh-c850kLXXOS5e/agent.120801; export SSH_AUTH_SOCK;
79+
# SSH_AGENT_PID=120802; export SSH_AGENT_PID;
80+
# echo Agent pid 120802;
81+
pattern = re.compile("(.+)=([^;]*)")
82+
for key, value in pattern.findall(agent_process.stdout):
83+
logger.info(f"_setup_ssh: setting {key} to {value}")
84+
os.environ[key] = value
85+
86+
# Add private SSH key to agent
87+
# NOTE: ssh-add seems to output everything to stderr, including upon exit 0.
88+
add_process = subprocess.run(
89+
["ssh-add", "-"],
90+
input=ssh_private_key,
91+
capture_output=True,
92+
universal_newlines=True,
93+
)
94+
if add_process.returncode != 0:
95+
raise Exception(add_process.stderr)
96+
logger.info("Added private SSH key from environment.")
97+
98+
@property
99+
def _paused(self):
100+
return ConfigurationVariable.get(self.PAUSE_KEY, False)
101+
102+
def _setup(self):
103+
loop = asyncio.get_event_loop()
104+
loop.add_signal_handler(
105+
signal.SIGINT,
106+
lambda *args, **kwargs: asyncio.create_task(
107+
self.exit_gracefully(*args, **kwargs)
108+
),
109+
)
110+
loop.add_signal_handler(
111+
signal.SIGTERM,
112+
lambda *args, **kwargs: asyncio.create_task(
113+
self.exit_gracefully(*args, **kwargs)
114+
),
115+
)
116+
117+
if hasattr(self, "ssh_private_key"):
118+
self._setup_ssh(self.ssh_private_key)
119+
120+
async def _start(self, *args, **kwargs):
121+
self.running = True
122+
while self.running:
123+
while self._paused:
124+
await asyncio.sleep(self.sleep_seconds)
125+
await self.loop(*args, **kwargs)
126+
127+
async def sleep(self, sleep_seconds):
128+
await asyncio.sleep(self.sleep_seconds)
129+
130+
def refresh_enabled_repos(self):
131+
self.enabled_repos = [
132+
r
133+
for r in self.applicable_repos
134+
if treestatus_subsystem.client.is_open(repo_clone_subsystem.repos[r].tree)
135+
]
136+
logger.info(f"{len(self.enabled_repos)} enabled repos: {self.enabled_repos}")
137+
138+
async def start(self):
139+
self._setup()
140+
await self._start()
141+
142+
async def loop(self, *args, **kwargs):
143+
raise NotImplementedError()
144+
145+
async def exit_gracefully(self, *args):
146+
logger.info(f"Worker exiting gracefully {args}")
147+
while self.busy:
148+
await asyncio.sleep(self.sleep_seconds)
149+
self.running = False

0 commit comments

Comments
 (0)