Skip to content

Commit cac500a

Browse files
authored
Refine disabling of workers (#1839)
This commit's PR started small but grew quite big. But there's two major changes: * Bugfix: don't give tasks to disabled assistants Previously assistants would keep getting tasks and never stop. Despite that a user have clicked "disable worker" in the WebUI. * Invent a worker-state, pass it along in get_work and worker_list This gives an opportunity for workers to know if the scheduler considers them disabled. It also allows the WebUI to show if a worker is disabled or not. Making the UI experience ever so more responsive. The old "greyed out worker" now means will mean "disabled". Previously it was equivalent to "has no pending tasks". But because we also associate "greyed out" with "can't click disable", the WebUI have confused at least me for a while.
1 parent aa30be8 commit cac500a

File tree

7 files changed

+218
-103
lines changed

7 files changed

+218
-103
lines changed

luigi/scheduler.py

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
DISABLED: UPSTREAM_DISABLED,
7070
}
7171

72+
WORKER_STATE_DISABLED = 'disabled'
73+
WORKER_STATE_ACTIVE = 'active'
74+
7275
TASK_FAMILY_RE = re.compile(r'([^(_]+)[(_]')
7376

7477
RPC_METHODS = {}
@@ -319,6 +322,17 @@ def is_trivial_worker(self, state):
319322
def assistant(self):
320323
return self.info.get('assistant', False)
321324

325+
@property
326+
def enabled(self):
327+
return not self.disabled
328+
329+
@property
330+
def state(self):
331+
if self.enabled:
332+
return WORKER_STATE_ACTIVE
333+
else:
334+
return WORKER_STATE_DISABLED
335+
322336
def __str__(self):
323337
return self.id
324338

@@ -527,7 +541,7 @@ def get_active_workers(self, last_active_lt=None, last_get_work_gt=None):
527541
for worker in six.itervalues(self._active_workers):
528542
if last_active_lt is not None and worker.last_active >= last_active_lt:
529543
continue
530-
last_get_work = getattr(worker, 'last_get_work', None)
544+
last_get_work = worker.last_get_work
531545
if last_get_work_gt is not None and (
532546
last_get_work is None or last_get_work <= last_get_work_gt):
533547
continue
@@ -554,10 +568,10 @@ def _remove_workers_from_tasks(self, workers, remove_stakeholders=True):
554568
task.stakeholders.difference_update(workers)
555569
task.workers.difference_update(workers)
556570

557-
def disable_workers(self, workers):
558-
self._remove_workers_from_tasks(workers, remove_stakeholders=False)
559-
for worker in workers:
560-
self.get_worker(worker).disabled = True
571+
def disable_workers(self, worker_ids):
572+
self._remove_workers_from_tasks(worker_ids, remove_stakeholders=False)
573+
for worker_id in worker_ids:
574+
self.get_worker(worker_id).disabled = True
561575

562576

563577
class Scheduler(object):
@@ -623,13 +637,12 @@ def _prune_tasks(self):
623637

624638
self._state.inactivate_tasks(remove_tasks)
625639

626-
def update(self, worker_id, worker_reference=None, get_work=False):
627-
"""
628-
Keep track of whenever the worker was last active.
629-
"""
640+
def _update_worker(self, worker_id, worker_reference=None, get_work=False):
641+
# Keep track of whenever the worker was last active.
642+
# For convenience also return the worker object.
630643
worker = self._state.get_worker(worker_id)
631644
worker.update(worker_reference, get_work=get_work)
632-
return not getattr(worker, 'disabled', False)
645+
return worker
633646

634647
def _update_priority(self, task, prio, worker):
635648
"""
@@ -663,10 +676,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
663676
"""
664677
assert worker is not None
665678
worker_id = worker
666-
worker_enabled = self.update(worker_id)
679+
worker = self._update_worker(worker_id)
667680
retry_policy = self._generate_retry_policy(retry_policy_dict)
668681

669-
if worker_enabled:
682+
if worker.enabled:
670683
_default_task = self._make_task(
671684
task_id=task_id, status=PENDING, deps=deps, resources=resources,
672685
priority=priority, family=family, module=module, params=params,
@@ -676,7 +689,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
676689

677690
task = self._state.get_task(task_id, setdefault=_default_task)
678691

679-
if task is None or (task.status != RUNNING and not worker_enabled):
692+
if task is None or (task.status != RUNNING and not worker.enabled):
680693
return
681694

682695
# for setting priority, we'll sometimes create tasks with unset family and params
@@ -728,7 +741,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
728741
if resources is not None:
729742
task.resources = resources
730743

731-
if worker_enabled and not assistant:
744+
if worker.enabled and not assistant:
732745
task.stakeholders.add(worker_id)
733746

734747
# Task dependencies might not exist yet. Let's create dummy tasks for them for now.
@@ -743,7 +756,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
743756
# before we know their retry_policy, we always set it here
744757
task.retry_policy = retry_policy
745758

746-
if runnable and status != FAILED and worker_enabled:
759+
if runnable and status != FAILED and worker.enabled:
747760
task.workers.add(worker_id)
748761
self._state.get_worker(worker_id).tasks.add(task)
749762
task.runnable = runnable
@@ -837,8 +850,19 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
837850

838851
assert worker is not None
839852
worker_id = worker
840-
# Return remaining tasks that have no FAILED descendants
841-
self.update(worker_id, {'host': host}, get_work=True)
853+
worker = self._update_worker(
854+
worker_id,
855+
worker_reference={'host': host},
856+
get_work=True)
857+
if not worker.enabled:
858+
reply = {'n_pending_tasks': 0,
859+
'running_tasks': [],
860+
'task_id': None,
861+
'n_unique_pending': 0,
862+
'worker_state': worker.state,
863+
}
864+
return reply
865+
842866
if assistant:
843867
self.add_worker(worker_id, [('assistant', assistant)])
844868

@@ -942,7 +966,9 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
942966
reply = {'n_pending_tasks': locally_pending_tasks,
943967
'running_tasks': running_tasks,
944968
'task_id': None,
945-
'n_unique_pending': n_unique_pending}
969+
'n_unique_pending': n_unique_pending,
970+
'worker_state': worker.state,
971+
}
946972

947973
if len(batched_tasks) > 1:
948974
batch_string = '|'.join(task.id for task in batched_tasks)
@@ -976,7 +1002,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
9761002
@rpc_method(attempts=1)
9771003
def ping(self, **kwargs):
9781004
worker_id = kwargs['worker']
979-
self.update(worker_id)
1005+
self._update_worker(worker_id)
9801006

9811007
def _upstream_status(self, task_id, upstream_status_table):
9821008
if task_id in upstream_status_table:
@@ -1142,7 +1168,7 @@ def filter_func(t):
11421168
return all(term in t.pretty_id for term in terms)
11431169
for task in filter(filter_func, self._state.get_active_tasks(status)):
11441170
if task.status != PENDING or not upstream_status or upstream_status == self._upstream_status(task.id, upstream_status_table):
1145-
serialized = self._serialize_task(task.id, False)
1171+
serialized = self._serialize_task(task.id, include_deps=False)
11461172
result[task.id] = serialized
11471173
if limit and len(result) > (max_shown_tasks or self._config.max_shown_tasks):
11481174
return {'num_tasks': len(result)}
@@ -1162,7 +1188,8 @@ def worker_list(self, include_running=True, **kwargs):
11621188
dict(
11631189
name=worker.id,
11641190
last_active=worker.last_active,
1165-
started=getattr(worker, 'started', None),
1191+
started=worker.started,
1192+
state=worker.state,
11661193
first_task_display_name=self._first_task_display_name(worker),
11671194
**worker.info
11681195
) for worker in self._state.get_active_workers()]
@@ -1173,7 +1200,7 @@ def worker_list(self, include_running=True, **kwargs):
11731200
num_uniques = collections.defaultdict(int)
11741201
for task in self._state.get_pending_tasks():
11751202
if task.status == RUNNING and task.worker_running:
1176-
running[task.worker_running][task.id] = self._serialize_task(task.id, False)
1203+
running[task.worker_running][task.id] = self._serialize_task(task.id, include_deps=False)
11771204
elif task.status == PENDING:
11781205
for worker in task.workers:
11791206
num_pending[worker] += 1
@@ -1204,7 +1231,7 @@ def resource_list(self):
12041231
for task in self._state.get_running_tasks():
12051232
if task.status == RUNNING and task.resources:
12061233
for resource, amount in six.iteritems(task.resources):
1207-
consumers[resource][task.id] = self._serialize_task(task.id, False)
1234+
consumers[resource][task.id] = self._serialize_task(task.id, include_deps=False)
12081235
for resource in resources:
12091236
tasks = consumers[resource['name']]
12101237
resource['num_consumer'] = len(tasks)
@@ -1235,7 +1262,7 @@ def task_search(self, task_str, **kwargs):
12351262
result = collections.defaultdict(dict)
12361263
for task in self._state.get_active_tasks():
12371264
if task.id.find(task_str) != -1:
1238-
serialized = self._serialize_task(task.id, False)
1265+
serialized = self._serialize_task(task.id, include_deps=False)
12391266
result[task.status][task.id] = serialized
12401267
return result
12411268

luigi/static/visualiser/index.html

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ <h4 class="modal-title" id="disiableWorkerLabel">Disable worker?</h4>
274274
</div>
275275
<div class="modal-body">
276276
Are you sure you want to disable this worker?
277+
<p>
278+
A disabled worker will finish its existing tasks but not start new ones.
279+
</p>
277280
</div>
278281
<div class="modal-footer">
279282
<button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
@@ -283,22 +286,22 @@ <h4 class="modal-title" id="disiableWorkerLabel">Disable worker?</h4>
283286
</div>
284287
</div>
285288
{{#workers}}
286-
{{#num_pending}}
287-
<div class="box">
288-
{{/num_pending}}
289-
{{^num_pending}}
289+
{{#is_disabled}}
290290
<div class="box box-solid box-default">
291-
{{/num_pending}}
291+
{{/is_disabled}}
292+
{{^is_disabled}}
293+
<div class="box">
294+
{{/is_disabled}}
292295
<div class="box-header with-border">
293296
<h3 class="box-title">{{name}}</h3>
294297
<div class="box-tools pull-right">
295-
{{#num_pending}}
298+
{{^is_disabled}}
296299
<div class="button-tooltip" data-toggle="tooltip" title="Disable Worker">
297300
<button type="button" class="btn btn-danger btn-disable-worker" data-toggle="modal" data-target="#disableWorkerModal" data-worker="{{name}}">
298301
<i class="fa fa-fire-extinguisher"></i>
299302
</button>
300303
</div>
301-
{{/num_pending}}
304+
{{/is_disabled}}
302305
</div>
303306
</div>
304307
<div class="box-body">
@@ -308,6 +311,9 @@ <h3 class="box-title">{{name}}</h3>
308311
Running: {{num_running}}<br>
309312
Pending: {{num_pending}}<br>
310313
Unique Pending: {{num_uniques}}<br>
314+
{{#is_disabled}}
315+
This worker is <b>disabled</b>. It will not start new tasks.<br>
316+
{{/is_disabled}}
311317

312318
{{#num_running}}
313319
<hr>

luigi/static/visualiser/js/visualiserApp.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ function visualiserApp(luigi) {
240240
worker.tasks.sort(function(task1, task2) { return task1.timeRunning - task2.timeRunning; });
241241
worker.start_time = new Date(worker.started * 1000).toLocaleString();
242242
worker.active = new Date(worker.last_active * 1000).toLocaleString();
243+
worker.is_disabled = worker.state === 'disabled';
243244
return worker;
244245
}
245246

luigi/worker.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from luigi.event import Event
5555
from luigi.task_register import load_task
5656
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
57+
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
5758
from luigi.target import Target
5859
from luigi.task import Task, flatten, getpaths, Config
5960
from luigi.task_register import TaskClassException
@@ -730,7 +731,7 @@ def _get_work_task_id(self, get_work_response):
730731

731732
def _get_work(self):
732733
if self._stop_requesting_work:
733-
return None, 0, 0, 0
734+
return None, 0, 0, 0, WORKER_STATE_DISABLED
734735
logger.debug("Asking scheduler for work...")
735736
r = self._scheduler.get_work(
736737
worker=self._id,
@@ -741,6 +742,9 @@ def _get_work(self):
741742
n_pending_tasks = r['n_pending_tasks']
742743
running_tasks = r['running_tasks']
743744
n_unique_pending = r['n_unique_pending']
745+
# TODO: For a tiny amount of time (a month?) we'll keep forwards compatibility
746+
# That is you can user a newer client than server (Sep 2016)
747+
worker_state = r.get('worker_state', WORKER_STATE_ACTIVE) # state according to server!
744748
task_id = self._get_work_task_id(r)
745749

746750
self._get_work_response_history.append({
@@ -773,7 +777,7 @@ def _get_work(self):
773777
self._scheduled_tasks.get(batch_id) for batch_id in r['batch_task_ids']])
774778
self._batch_running_tasks[task_id] = batch_tasks
775779

776-
return task_id, running_tasks, n_pending_tasks, n_unique_pending
780+
return task_id, running_tasks, n_pending_tasks, n_unique_pending, worker_state
777781

778782
def _run_task(self, task_id):
779783
task = self._scheduled_tasks[task_id]
@@ -927,8 +931,15 @@ def handle_interrupt(self, signum, _):
927931
Stops the assistant from asking for more work on SIGUSR1
928932
"""
929933
if signum == signal.SIGUSR1:
930-
self._config.keep_alive = False
931-
self._stop_requesting_work = True
934+
self._start_phasing_out()
935+
936+
def _start_phasing_out(self):
937+
"""
938+
Go into a mode where we dont ask for more work and quit once existing
939+
tasks are done.
940+
"""
941+
self._config.keep_alive = False
942+
self._stop_requesting_work = True
932943

933944
def run(self):
934945
"""
@@ -946,7 +957,10 @@ def run(self):
946957
logger.debug('%d running tasks, waiting for next task to finish', len(self._running_tasks))
947958
self._handle_next_task()
948959

949-
task_id, running_tasks, n_pending_tasks, n_unique_pending = self._get_work()
960+
task_id, running_tasks, n_pending_tasks, n_unique_pending, worker_state = self._get_work()
961+
962+
if worker_state == WORKER_STATE_DISABLED:
963+
self._start_phasing_out()
950964

951965
if task_id is None:
952966
if not self._stop_requesting_work:

test/scheduler_api_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,15 @@ def test_disable_worker_can_fail_task(self):
13051305
def test_disable_worker_stays_disabled_on_new_deps(self):
13061306
self._test_disable_worker_helper(new_status='PENDING', new_deps=['B', 'C'])
13071307

1308+
def test_disable_worker_assistant_gets_no_task(self):
1309+
self.setTime(0)
1310+
self.sch.add_task(worker=WORKER, task_id='A')
1311+
self.sch.add_worker('assistant', [('assistant', True)])
1312+
self.sch.ping(worker='assistant')
1313+
self.sch.disable_worker('assistant')
1314+
self.assertIsNone(self.sch.get_work(worker='assistant', assistant=True)['task_id'])
1315+
self.assertIsNotNone(self.sch.get_work(worker=WORKER)['task_id'])
1316+
13081317
def test_prune_worker(self):
13091318
self.setTime(1)
13101319
self.sch.add_worker(worker=WORKER, info={})

test/scheduler_visualisation_test.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ def complete(self):
532532
self.assertEqual(0, worker['num_pending'])
533533
self.assertEqual(0, worker['num_uniques'])
534534
self.assertEqual(0, worker['num_running'])
535+
self.assertEqual('active', worker['state'])
535536
self.assertEqual(1, worker['workers'])
536537

537538
def test_worker_list_pending_uniques(self):
@@ -583,6 +584,17 @@ class X(luigi.Task):
583584
self.assertEqual(1, worker['num_pending'])
584585
self.assertEqual(1, worker['num_uniques'])
585586

587+
def test_worker_list_disabled_worker(self):
588+
class X(luigi.Task):
589+
pass
586590

587-
if __name__ == '__main__':
588-
unittest.main()
591+
with luigi.worker.Worker(worker_id='w', scheduler=self.scheduler) as w:
592+
w.add(X()) #
593+
workers = self._remote().worker_list()
594+
self.assertEqual(1, len(workers))
595+
self.assertEqual('active', workers[0]['state'])
596+
self.scheduler.disable_worker('w')
597+
workers = self._remote().worker_list()
598+
self.assertEqual(1, len(workers))
599+
self.assertEqual(1, len(workers))
600+
self.assertEqual('disabled', workers[0]['state'])

0 commit comments

Comments
 (0)