1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414import time
15+ from collections import defaultdict
16+ from multiprocessing import Lock
1517from typing import Callable
1618from typing import Collection
1719from typing import Dict
4547from clusterman .util import setup_logging
4648
4749
48- WorkerProcessLabel = Union [str , MigrationEvent ]
49-
50-
5150class NodeMigration (BatchDaemon , BatchLoggingMixin , BatchRunningSentinelMixin ):
5251 notify_emails = ["compute-infra@yelp.com" ]
5352
@@ -56,6 +55,10 @@ class NodeMigration(BatchDaemon, BatchLoggingMixin, BatchRunningSentinelMixin):
5655 DEFAULT_MAX_WORKER_PROCESSES = 6
5756 DEFAULT_RUN_INTERVAL_SECONDS = 60
5857
58+ WORKER_LABEL_SEPARATOR = ":"
59+ EVENT_WORKER_LABEL_PREFIX = "event"
60+ UPTIME_WORKER_LABEL_PREFIX = "uptime"
61+
5962 @batch_command_line_arguments
6063 def parse_args (self , parser ):
6164 arg_group = parser .add_argument_group ("NodeMigration batch options" )
@@ -66,10 +69,11 @@ def parse_args(self, parser):
6669 def configure_initial (self ):
6770 setup_config (self .options )
6871 self .logger = colorlog .getLogger (__name__ )
69- self .migration_workers : Dict [WorkerProcessLabel , RestartableDaemonProcess ] = {}
72+ self .migration_workers : Dict [str , RestartableDaemonProcess ] = {}
7073 self .migration_configs = {}
71- self .events_in_progress = set ()
74+ self .events_in_progress = {}
7275 self .pools_accepting_events = set ()
76+ self .worker_locks = defaultdict (Lock )
7377 self .cluster_connector = KubernetesClusterConnector (self .options .cluster , None , init_crd = True )
7478 self .run_interval = staticconf .read_int (
7579 "batches.node_migration.run_interval_seconds" , self .DEFAULT_RUN_INTERVAL_SECONDS
@@ -101,7 +105,26 @@ def _get_worker_setup(self, pool: str) -> Optional[WorkerSetup]:
101105 self .logger .exception (f"Bad migration configuration for pool { pool } : { e } " )
102106 return None
103107
104- def _spawn_worker (self , label : WorkerProcessLabel , routine : Callable , * args , ** kwargs ) -> bool :
108+ def _build_worker_label (self , pool : Optional [str ] = None , event : Optional [MigrationEvent ] = None ) -> str :
109+ """Composes label for worker process
110+
111+ :param Optional[str] pool: pool name in case of uptime worker label
112+ :param Optional[MigrationEvent] event: event instance in case of event worker label
113+ :return: label string
114+ """
115+ if event :
116+ prefix , cluster , pool = self .EVENT_WORKER_LABEL_PREFIX , event .cluster , event .pool
117+ elif pool :
118+ prefix , cluster = self .UPTIME_WORKER_LABEL_PREFIX , self .options .cluster
119+ else :
120+ raise ValueError ("Either 'pool' or 'event' must be provided as parameter" )
121+ return self .WORKER_LABEL_SEPARATOR .join ((prefix , cluster , pool ))
122+
123+ def _is_event_worker_label (self , label : str ) -> bool :
124+ """Check if process label is for an event worker"""
125+ return label .startswith (self .EVENT_WORKER_LABEL_PREFIX )
126+
127+ def _spawn_worker (self , label : str , routine : Callable , * args , ** kwargs ) -> bool :
105128 """Start worker process
106129
107130 :param Callable routine: worker method
@@ -113,10 +136,12 @@ def _spawn_worker(self, label: WorkerProcessLabel, routine: Callable, *args, **k
113136 self .logger .warning (f"Worker labelled { label } already running, skipping" )
114137 return False
115138 running_workers = sum (proc .is_alive () for proc in self .migration_workers .values ())
116- if isinstance (label , MigrationEvent ) and running_workers >= self .available_worker_slots :
139+ if self . _is_event_worker_label (label ) and running_workers >= self .available_worker_slots :
117140 # uptime workers are prioritized skipping this check
118141 self .logger .warning (f"Too many worker processes running already ({ running_workers } ), skipping" )
119142 return False
143+ lock_key = label .split (self .WORKER_LABEL_SEPARATOR , 1 )[1 ]
144+ kwargs ["pool_lock" ] = self .worker_locks [lock_key ]
120145 proc = RestartableDaemonProcess (target = routine , args = args , kwargs = kwargs )
121146 self .migration_workers [label ] = proc
122147 proc .start ()
@@ -128,7 +153,7 @@ def fetch_event_crd(self) -> Collection[MigrationEvent]:
128153 events = self .cluster_connector .list_node_migration_resources (
129154 [MigrationStatus .PENDING , MigrationStatus .INPROGRESS ]
130155 )
131- return set (events ) - self .events_in_progress
156+ return set (events ) - set ( self .events_in_progress . values ())
132157
133158 def mark_event (self , event : MigrationEvent , status : MigrationStatus = MigrationStatus .COMPLETED ) -> None :
134159 """Set status for CRD event resource
@@ -153,14 +178,15 @@ def spawn_event_worker(self, event: MigrationEvent):
153178 self .mark_event (event , MigrationStatus .SKIPPED )
154179 return
155180 self .logger .info (f"Spawning migration worker for event: { event } " )
181+ worker_label = self ._build_worker_label (event = event )
156182 if self ._spawn_worker (
157- label = event ,
183+ label = worker_label ,
158184 routine = event_migration_worker ,
159185 migration_event = event ,
160186 worker_setup = worker_setup ,
161187 ):
162188 self .mark_event (event , MigrationStatus .INPROGRESS )
163- self .events_in_progress . add ( event )
189+ self .events_in_progress [ worker_label ] = event
164190
165191 def spawn_uptime_worker (self , pool : str , uptime : Union [int , str ]):
166192 """Start process monitoring pool node uptime, and recycling nodes accordingly
@@ -180,7 +206,7 @@ def spawn_uptime_worker(self, pool: str, uptime: Union[int, str]):
180206 return
181207 self .logger .info (f"Spawning uptime migration worker for { pool } pool" )
182208 self ._spawn_worker (
183- label = f"uptime- { self .options . cluster } - { pool } " ,
209+ label = self ._build_worker_label ( pool = pool ) ,
184210 routine = uptime_migration_worker ,
185211 cluster = self .options .cluster ,
186212 pool = pool ,
@@ -198,9 +224,9 @@ def monitor_workers(self):
198224 else :
199225 torestart .append (label )
200226 for label in completed :
201- if isinstance (label , MigrationEvent ):
202- self .mark_event (label , MigrationStatus . COMPLETED )
203- self .events_in_progress . discard ( label )
227+ if self . _is_event_worker_label (label ):
228+ event = self .events_in_progress . pop (label )
229+ self .mark_event ( event , MigrationStatus . COMPLETED )
204230 del self .migration_workers [label ]
205231 for label in torestart :
206232 self .migration_workers [label ].restart ()
0 commit comments