@@ -36,6 +36,13 @@ def run(self):
3636 while retries < JobThread .MAX_RETRIES_ON_EXCEPTION :
3737 vol_job = None
3838 try :
39+ if not self .async_job .run_event .is_set ():
40+ log .debug ('will wait for run_event to bet set. postponing '
41+ 'getting jobs until then' )
42+ self .async_job .run_event .wait ()
43+ log .debug ('run_event has been set, waiting is complete. '
44+ 'proceeding to get jobs now' )
45+
3946 # fetch next job to execute
4047 with lock_timeout_log (self .async_job .lock ):
4148 while True :
@@ -139,6 +146,11 @@ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
139146 self .stopping = threading .Event ()
140147
141148 self .cancel_cv = threading .Condition (self .lock )
149+
150+ self .run_event = threading .Event ()
151+ # let async threads run by default
152+ self .run_event .set ()
153+
142154 self .nr_concurrent_jobs = nr_concurrent_jobs
143155 self .name_pfx = name_pfx
144156 # each async job group uses its own libcephfs connection (pool)
@@ -196,6 +208,47 @@ def run(self):
196208 self .spawn_more_threads ()
197209 self .cv .wait (timeout = self .wakeup_timeout )
198210
211+ def pause (self ):
212+ self .run_event .clear ()
213+
214+ log .debug ('pause() cancelling ongoing jobs now and respective worker '
215+ 'threads...' )
216+ self .cancel_all_jobs (update_queue = False )
217+
218+ # XXX: cancel_all_jobs() sets jobthread.cancel_event causing all ongoing
219+ # jobs to cancel. But if there are no jobs (that is self.q is empty),
220+ # cancel_all_jobs() will return without doing anything and
221+ # jobthread.cancel_event won't be set. This results in future jobs to be
222+ # executed even when config option to pause is already set. Similarly,
223+ # when there's only 1 ongoing job, jobthread.cancel_event is set for it
224+ # but not for other threads causing rest of threads to pick new jobs
225+ # when they are queued.
226+ # Therefore, set jobthread.cancel_event explicitly.
227+ log .debug ('pause() pausing rest of worker threads' )
228+ for t in self .threads :
229+ # is_set(), although technically redundant, is called to emphasize
230+ # that cancel_event might be set on some threads but might not be
231+ # on others. this prevents removal of the call to set() below after
232+ # the incomplete observation that cancel_event is already set on
233+ # (some) threads.
234+ if not t .cancel_event .is_set ():
235+ t .cancel_event .set ()
236+ log .debug ('pause() all jobs cancelled and cancel_event have been set for '
237+ 'all threads, queue and threads have been paused' )
238+
239+ def resume (self ):
240+ if self .run_event .is_set ():
241+ log .debug ('resume() no need to resume, run_event is already set.' )
242+ return
243+
244+ log .debug ('resume() enabling worker threads' )
245+ for t in self .threads :
246+ t .cancel_event .clear ()
247+
248+ self .run_event .set ()
249+ log .debug ('resume() run_event has been set, queue and threads have been'
250+ ' resumed' )
251+
199252 def shutdown (self ):
200253 self .stopping .set ()
201254 self .cancel_all_jobs ()
@@ -240,8 +293,10 @@ def get_job(self):
240293 nr_vols -= 1
241294 for vol in to_remove :
242295 log .debug ("auto removing volume '{0}' from tracked volumes" .format (vol ))
243- self .q .remove (vol )
244- self .jobs .pop (vol )
296+ if vol in self .q :
297+ self .q .remove (vol )
298+ if vol in self .jobs :
299+ self .jobs .pop (vol )
245300 return next_job
246301
247302 def register_async_job (self , volname , job , thread_id ):
@@ -253,7 +308,10 @@ def unregister_async_job(self, volname, job, thread_id):
253308 self .jobs [volname ].remove ((job , thread_id ))
254309
255310 cancelled = thread_id .should_cancel ()
256- thread_id .reset_cancel ()
311+ # don't clear cancel_event flag if queuing and threads have been paused
312+ # (that is, run_event is not set).
313+ if self .run_event .is_set ():
314+ thread_id .reset_cancel ()
257315
258316 # wake up cancellation waiters if needed
259317 if cancelled :
@@ -271,7 +329,7 @@ def queue_job(self, volname):
271329 self .jobs [volname ] = []
272330 self .cv .notifyAll ()
273331
274- def _cancel_jobs (self , volname ):
332+ def _cancel_jobs (self , volname , update_queue = True ):
275333 """
276334 cancel all jobs for the volume. do nothing is the no jobs are
277335 executing for the given volume. this would wait until all jobs
@@ -281,7 +339,10 @@ def _cancel_jobs(self, volname):
281339 try :
282340 if volname not in self .q and volname not in self .jobs :
283341 return
284- self .q .remove (volname )
342+
343+ if update_queue :
344+ self .q .remove (volname )
345+
285346 # cancel in-progress operation and wait until complete
286347 for j in self .jobs [volname ]:
287348 j [1 ].cancel_job ()
@@ -290,7 +351,9 @@ def _cancel_jobs(self, volname):
290351 log .debug ("waiting for {0} in-progress jobs for volume '{1}' to "
291352 "cancel" .format (len (self .jobs [volname ]), volname ))
292353 self .cancel_cv .wait ()
293- self .jobs .pop (volname )
354+
355+ if update_queue :
356+ self .jobs .pop (volname )
294357 except (KeyError , ValueError ):
295358 pass
296359
@@ -328,13 +391,13 @@ def cancel_jobs(self, volname):
328391 with lock_timeout_log (self .lock ):
329392 self ._cancel_jobs (volname )
330393
331- def cancel_all_jobs (self ):
394+ def cancel_all_jobs (self , update_queue = True ):
332395 """
333396 call all executing jobs for all volumes.
334397 """
335398 with lock_timeout_log (self .lock ):
336399 for volname in list (self .q ):
337- self ._cancel_jobs (volname )
400+ self ._cancel_jobs (volname , update_queue = update_queue )
338401
339402 def get_next_job (self , volname , running_jobs ):
340403 """
0 commit comments