Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.

Commit 09ccb2f

Browse files
committed
avoid node migration overlaps for a pool
1 parent f7c987c commit 09ccb2f

File tree

4 files changed

+70
-42
lines changed

4 files changed

+70
-42
lines changed

clusterman/batch/node_migration.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import time
15+
from collections import defaultdict
16+
from multiprocessing import Lock
1517
from typing import Callable
1618
from typing import Collection
1719
from typing import Dict
@@ -45,9 +47,6 @@
4547
from clusterman.util import setup_logging
4648

4749

48-
WorkerProcessLabel = Union[str, MigrationEvent]
49-
50-
5150
class NodeMigration(BatchDaemon, BatchLoggingMixin, BatchRunningSentinelMixin):
5251
notify_emails = ["compute-infra@yelp.com"]
5352

@@ -56,6 +55,10 @@ class NodeMigration(BatchDaemon, BatchLoggingMixin, BatchRunningSentinelMixin):
5655
DEFAULT_MAX_WORKER_PROCESSES = 6
5756
DEFAULT_RUN_INTERVAL_SECONDS = 60
5857

58+
WORKER_LABEL_SEPARATOR = ":"
59+
EVENT_WORKER_LABEL_PREFIX = "event"
60+
UPTIME_WORKER_LABEL_PREFIX = "uptime"
61+
5962
@batch_command_line_arguments
6063
def parse_args(self, parser):
6164
arg_group = parser.add_argument_group("NodeMigration batch options")
@@ -66,10 +69,11 @@ def parse_args(self, parser):
6669
def configure_initial(self):
6770
setup_config(self.options)
6871
self.logger = colorlog.getLogger(__name__)
69-
self.migration_workers: Dict[WorkerProcessLabel, RestartableDaemonProcess] = {}
72+
self.migration_workers: Dict[str, RestartableDaemonProcess] = {}
7073
self.migration_configs = {}
71-
self.events_in_progress = set()
74+
self.events_in_progress = {}
7275
self.pools_accepting_events = set()
76+
self.worker_locks = defaultdict(Lock)
7377
self.cluster_connector = KubernetesClusterConnector(self.options.cluster, None, init_crd=True)
7478
self.run_interval = staticconf.read_int(
7579
"batches.node_migration.run_interval_seconds", self.DEFAULT_RUN_INTERVAL_SECONDS
@@ -101,7 +105,26 @@ def _get_worker_setup(self, pool: str) -> Optional[WorkerSetup]:
101105
self.logger.exception(f"Bad migration configuration for pool {pool}: {e}")
102106
return None
103107

104-
def _spawn_worker(self, label: WorkerProcessLabel, routine: Callable, *args, **kwargs) -> bool:
108+
def _build_worker_label(self, pool: Optional[str] = None, event: Optional[MigrationEvent] = None) -> str:
109+
"""Composes label for worker process
110+
111+
:param Optional[str] pool: pool name in case of uptime worker label
112+
:param Optional[MigrationEvent] event: event instance in case of event worker label
113+
:return: label string
114+
"""
115+
if event:
116+
prefix, cluster, pool = self.EVENT_WORKER_LABEL_PREFIX, event.cluster, event.pool
117+
elif pool:
118+
prefix, cluster = self.UPTIME_WORKER_LABEL_PREFIX, self.options.cluster
119+
else:
120+
raise ValueError("Either 'pool' or 'event' must be provided as parameter")
121+
return self.WORKER_LABEL_SEPARATOR.join((prefix, cluster, pool))
122+
123+
def _is_event_worker_label(self, label: str) -> bool:
124+
"""Check if process label is for an event worker"""
125+
return label.startswith(self.EVENT_WORKER_LABEL_PREFIX)
126+
127+
def _spawn_worker(self, label: str, routine: Callable, *args, **kwargs) -> bool:
105128
"""Start worker process
106129
107130
:param Callable routine: worker method
@@ -113,10 +136,12 @@ def _spawn_worker(self, label: WorkerProcessLabel, routine: Callable, *args, **k
113136
self.logger.warning(f"Worker labelled {label} already running, skipping")
114137
return False
115138
running_workers = sum(proc.is_alive() for proc in self.migration_workers.values())
116-
if isinstance(label, MigrationEvent) and running_workers >= self.available_worker_slots:
139+
if self._is_event_worker_label(label) and running_workers >= self.available_worker_slots:
117140
# uptime workers are prioritized skipping this check
118141
self.logger.warning(f"Too many worker processes running already ({running_workers}), skipping")
119142
return False
143+
lock_key = label.split(self.WORKER_LABEL_SEPARATOR, 1)[1]
144+
kwargs["pool_lock"] = self.worker_locks[lock_key]
120145
proc = RestartableDaemonProcess(target=routine, args=args, kwargs=kwargs)
121146
self.migration_workers[label] = proc
122147
proc.start()
@@ -128,7 +153,7 @@ def fetch_event_crd(self) -> Collection[MigrationEvent]:
128153
events = self.cluster_connector.list_node_migration_resources(
129154
[MigrationStatus.PENDING, MigrationStatus.INPROGRESS]
130155
)
131-
return set(events) - self.events_in_progress
156+
return set(events) - set(self.events_in_progress.values())
132157

133158
def mark_event(self, event: MigrationEvent, status: MigrationStatus = MigrationStatus.COMPLETED) -> None:
134159
"""Set status for CRD event resource
@@ -153,14 +178,15 @@ def spawn_event_worker(self, event: MigrationEvent):
153178
self.mark_event(event, MigrationStatus.SKIPPED)
154179
return
155180
self.logger.info(f"Spawning migration worker for event: {event}")
181+
worker_label = self._build_worker_label(event=event)
156182
if self._spawn_worker(
157-
label=event,
183+
label=worker_label,
158184
routine=event_migration_worker,
159185
migration_event=event,
160186
worker_setup=worker_setup,
161187
):
162188
self.mark_event(event, MigrationStatus.INPROGRESS)
163-
self.events_in_progress.add(event)
189+
self.events_in_progress[worker_label] = event
164190

165191
def spawn_uptime_worker(self, pool: str, uptime: Union[int, str]):
166192
"""Start process monitoring pool node uptime, and recycling nodes accordingly
@@ -180,7 +206,7 @@ def spawn_uptime_worker(self, pool: str, uptime: Union[int, str]):
180206
return
181207
self.logger.info(f"Spawning uptime migration worker for {pool} pool")
182208
self._spawn_worker(
183-
label=f"uptime-{self.options.cluster}-{pool}",
209+
label=self._build_worker_label(pool=pool),
184210
routine=uptime_migration_worker,
185211
cluster=self.options.cluster,
186212
pool=pool,
@@ -198,9 +224,9 @@ def monitor_workers(self):
198224
else:
199225
torestart.append(label)
200226
for label in completed:
201-
if isinstance(label, MigrationEvent):
202-
self.mark_event(label, MigrationStatus.COMPLETED)
203-
self.events_in_progress.discard(label)
227+
if self._is_event_worker_label(label):
228+
event = self.events_in_progress.pop(label)
229+
self.mark_event(event, MigrationStatus.COMPLETED)
204230
del self.migration_workers[label]
205231
for label in torestart:
206232
self.migration_workers[label].restart()

clusterman/migration/worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import time
1515
from functools import partial
1616
from multiprocessing import Process
17+
from multiprocessing.synchronize import Lock as LockBase
1718
from statistics import mean
1819
from typing import Callable
1920
from typing import cast
@@ -122,7 +123,9 @@ def _drain_node_selection(
122123
return True
123124

124125

125-
def uptime_migration_worker(cluster: str, pool: str, uptime_seconds: int, worker_setup: WorkerSetup) -> None:
126+
def uptime_migration_worker(
127+
cluster: str, pool: str, uptime_seconds: int, worker_setup: WorkerSetup, pool_lock: LockBase
128+
) -> None:
126129
"""Worker monitoring and migrating nodes according to uptime
127130
128131
:parma str cluster: cluster name
@@ -137,24 +140,28 @@ def uptime_migration_worker(cluster: str, pool: str, uptime_seconds: int, worker
137140
return
138141
while True:
139142
if manager.is_capacity_satisfied():
140-
_drain_node_selection(manager, node_selector, worker_setup)
143+
with pool_lock:
144+
_drain_node_selection(manager, node_selector, worker_setup)
141145
else:
142146
logger.warning(f"Pool {cluster}:{pool} is currently underprovisioned, skipping uptime migration iteration")
143147
time.sleep(UPTIME_CHECK_INTERVAL_SECONDS)
144148
manager.reload_state()
145149

146150

147-
def event_migration_worker(migration_event: MigrationEvent, worker_setup: WorkerSetup) -> None:
151+
def event_migration_worker(migration_event: MigrationEvent, worker_setup: WorkerSetup, pool_lock: LockBase) -> None:
148152
"""Worker migrating nodes according to event configuration
149153
150154
:param MigrationEvent migration_event: event instance
151155
:param WorkerSetup worker_setup: migration setup
152156
"""
157+
pool_lock_acquired = False
153158
manager = PoolManager(migration_event.cluster, migration_event.pool, SUPPORTED_POOL_SCHEDULER, fetch_state=False)
154159
connector = cast(KubernetesClusterConnector, manager.cluster_connector)
155160
connector.set_label_selectors(migration_event.label_selectors, add_to_existing=True)
156161
manager.reload_state()
157162
try:
163+
pool_lock.acquire(timeout=worker_setup.expected_duration)
164+
pool_lock_acquired = True
158165
if worker_setup.disable_autoscaling:
159166
logger.info(f"Disabling autoscaling for {migration_event.cluster}:{migration_event.pool}")
160167
disable_autoscaling(
@@ -181,6 +188,8 @@ def event_migration_worker(migration_event: MigrationEvent, worker_setup: Worker
181188
logger.error(f"Issue while processing migration event {migration_event}: {e}")
182189
raise
183190
finally:
191+
if pool_lock_acquired:
192+
pool_lock.release()
184193
# we do not reset the pool target capacity in case of pre-scaling as we
185194
# trust the autoscaler to readjust that in a short time eventually
186195
if worker_setup.disable_autoscaling:

tests/batch/node_migration_test.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
from argparse import Namespace
15+
from collections import defaultdict
1516
from unittest.mock import MagicMock
1617
from unittest.mock import patch
1718

@@ -45,7 +46,7 @@ def migration_batch():
4546

4647
def test_fetch_event_crd(migration_batch: NodeMigration):
4748
migration_batch.events_in_progress = {
48-
MigrationEvent(
49+
"event:mesos-test:bar": MigrationEvent(
4950
resource_name="mesos-test-bar-220912-1",
5051
cluster="mesos-test",
5152
pool="bar",
@@ -113,26 +114,16 @@ def test_get_worker_setup(migration_batch):
113114
)
114115

115116

116-
@pytest.mark.parametrize(
117-
"worker_label",
118-
(
119-
(
120-
MigrationEvent(
121-
resource_name="mesos-test-bar-220912-1",
122-
cluster="mesos-test",
123-
pool="bar",
124-
label_selectors=[],
125-
condition=MigrationCondition(ConditionTrait.KERNEL, ConditionOperator.GE, "3.2.1"),
126-
),
127-
),
128-
("foobar",),
129-
),
130-
)
131117
@patch("clusterman.batch.node_migration.RestartableDaemonProcess")
132-
def test_spawn_worker(mock_process, migration_batch, worker_label):
118+
def test_spawn_worker(mock_process, migration_batch):
119+
mock_lock = MagicMock()
133120
mock_routine = MagicMock()
121+
worker_label = "foobar:123:456"
122+
migration_batch.worker_locks = defaultdict(mock_lock)
134123
assert migration_batch._spawn_worker(worker_label, mock_routine, 1, x=2) is True
135-
mock_process.assert_called_once_with(target=mock_routine, args=(1,), kwargs={"x": 2})
124+
mock_process.assert_called_once_with(
125+
target=mock_routine, args=(1,), kwargs={"x": 2, "pool_lock": mock_lock.return_value}
126+
)
136127
assert migration_batch.migration_workers == {worker_label: mock_process.return_value}
137128

138129

@@ -146,7 +137,7 @@ def test_spawn_worker_existing(mock_process, migration_batch):
146137
@patch("clusterman.batch.node_migration.RestartableDaemonProcess")
147138
def test_spawn_worker_over_capacity(mock_process, migration_batch):
148139
migration_batch.migration_workers = {f"foobar{i}": MagicMock(is_alive=lambda: True) for i in range(6)}
149-
assert migration_batch._spawn_worker(MigrationEvent(None, None, None, None, None), MagicMock(), 1, x=2) is False
140+
assert migration_batch._spawn_worker("event:foo:bar", MagicMock(), 1, x=2) is False
150141
mock_process.assert_not_called()
151142

152143

@@ -197,7 +188,7 @@ def test_spawn_event_worker(mock_worker_routine, migration_batch, event, worker_
197188
migration_batch.spawn_event_worker(event)
198189
if is_spawned:
199190
mock_spawn.assert_called_once_with(
200-
label=event,
191+
label=f"event:{event.cluster}:{event.pool}",
201192
routine=mock_worker_routine,
202193
migration_event=event,
203194
worker_setup=worker_setup,
@@ -225,7 +216,7 @@ def test_spawn_uptime_worker(mock_worker_routine, migration_batch, uptime, worke
225216
migration_batch.spawn_uptime_worker("bar", uptime)
226217
if expected_uptime_spawn:
227218
mock_spawn.assert_called_once_with(
228-
label="uptime-mesos-test-bar",
219+
label="uptime:mesos-test:bar",
229220
routine=mock_worker_routine,
230221
cluster="mesos-test",
231222
pool="bar",
@@ -244,10 +235,12 @@ def test_monitor_workers(migration_batch):
244235
"foobar": mock_ok_worker,
245236
"buzz": mock_to_restart,
246237
"some": MagicMock(is_alive=lambda: False, exitcode=0),
247-
mock_event: MagicMock(is_alive=lambda: False, exitcode=0),
238+
"event:123:456": MagicMock(is_alive=lambda: False, exitcode=0),
248239
}
240+
migration_batch.events_in_progress = {"event:123:456": mock_event}
249241
with patch.object(migration_batch, "mark_event") as mock_mark:
250242
migration_batch.monitor_workers()
251243
mock_mark.assert_called_once_with(mock_event, MigrationStatus.COMPLETED)
252244
mock_to_restart.restart.assert_called_once_with()
253245
assert migration_batch.migration_workers == {"foobar": mock_ok_worker, "buzz": mock_to_restart}
246+
assert not migration_batch.events_in_progress

tests/migration/migration_worker_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def test_uptime_migration_worker(mock_drain_selection, mock_manager_class, mock_
128128
mock_manager = mock_manager_class.return_value
129129
mock_manager.is_capacity_satisfied.side_effect = [True, False, True]
130130
with pytest.raises(StopIteration): # using end of mock side-effect to get out of forever looop
131-
uptime_migration_worker("mesos-test", "bar", 10000, mock_setup)
131+
uptime_migration_worker("mesos-test", "bar", 10000, mock_setup, pool_lock=MagicMock())
132132
assert mock_drain_selection.call_count == 2
133133
selector = mock_drain_selection.call_args_list[0][0][1]
134134
assert selector(ClusterNodeMetadata(None, InstanceMetadata(None, None, uptime=timedelta(seconds=10001)))) is True
@@ -160,7 +160,7 @@ def test_event_migration_worker(
160160
for i in range(1, 6)
161161
]
162162
mock_manager.target_capacity = 19
163-
event_migration_worker(mock_migration_event, event_worker_setup)
163+
event_migration_worker(mock_migration_event, event_worker_setup, pool_lock=MagicMock())
164164
mock_manager.modify_target_capacity.assert_called_once_with(23)
165165
mock_disable_scaling.assert_called_once_with("mesos-test", "bar", "kubernetes", 3)
166166
mock_enable_scaling.assert_called_once_with("mesos-test", "bar", "kubernetes")
@@ -192,7 +192,7 @@ def test_event_migration_worker_error(
192192
mock_manager = mock_manager_class.return_value
193193
mock_manager.get_node_metadatas.side_effect = Exception(123)
194194
with pytest.raises(Exception):
195-
event_migration_worker(mock_migration_event, event_worker_setup)
195+
event_migration_worker(mock_migration_event, event_worker_setup, pool_lock=MagicMock())
196196
mock_disable_scaling.assert_called_once_with("mesos-test", "bar", "kubernetes", 3)
197197
mock_enable_scaling.assert_called_once_with("mesos-test", "bar", "kubernetes")
198198
mock_manager.modify_target_capacity.assert_not_called()

0 commit comments

Comments
 (0)