@@ -474,7 +474,7 @@ def __exit__(self, *args):
474
474
475
475
def __init__ (self , taskmaster , num , stack_size ) -> None :
476
476
self .taskmaster = taskmaster
477
- self .num_workers = num
477
+ self .max_workers = num
478
478
self .stack_size = stack_size
479
479
self .interrupted = InterruptState ()
480
480
self .workers = []
@@ -484,7 +484,7 @@ def __init__(self, taskmaster, num, stack_size) -> None:
484
484
# also protects access to our state that gets updated
485
485
# concurrently. The `can_search_cv` is associated with
486
486
# this mutex.
487
- self .tm_lock = (threading .Lock if self .num_workers > 1 else NewParallel .FakeLock )()
487
+ self .tm_lock = (threading .Lock if self .max_workers > 1 else NewParallel .FakeLock )()
488
488
489
489
# Guarded under `tm_lock`.
490
490
self .jobs = 0
@@ -493,11 +493,11 @@ def __init__(self, taskmaster, num, stack_size) -> None:
493
493
# The `can_search_cv` is used to manage a leader /
494
494
# follower pattern for access to the taskmaster, and to
495
495
# awaken from stalls.
496
- self .can_search_cv = (threading .Condition if self .num_workers > 1 else NewParallel .FakeCondition )(self .tm_lock )
496
+ self .can_search_cv = (threading .Condition if self .max_workers > 1 else NewParallel .FakeCondition )(self .tm_lock )
497
497
498
498
# The queue of tasks that have completed execution. The
499
499
# next thread to obtain `tm_lock`` will retire them.
500
- self .results_queue_lock = (threading .Lock if self .num_workers > 1 else NewParallel .FakeLock )()
500
+ self .results_queue_lock = (threading .Lock if self .max_workers > 1 else NewParallel .FakeLock )()
501
501
self .results_queue = []
502
502
503
503
if self .taskmaster .trace :
@@ -516,22 +516,27 @@ def trace_message(self, message) -> None:
516
516
method_name = sys ._getframe (1 ).f_code .co_name + "():"
517
517
thread_id = threading .get_ident ()
518
518
self .trace .debug ('%s.%s [Thread:%s] %s' % (type (self ).__name__ , method_name , thread_id , message ))
519
- # print('%-15s %s' % (method_name, message))
520
519
521
520
def start (self ) -> None :
522
- if self .num_workers == 1 :
521
+ if self .max_workers == 1 :
523
522
self ._work ()
524
523
else :
525
- self ._start_workers ()
526
- for worker in self .workers :
527
- worker .join ()
528
- self .workers = []
524
+ self ._start_worker ()
525
+ while len ( self .workers ) > 0 :
526
+ self . workers [ 0 ] .join ()
527
+ self .workers . pop ( 0 )
529
528
self .taskmaster .cleanup ()
530
529
531
- def _start_workers (self ) -> None :
530
+ def _maybe_start_worker (self ) -> None :
531
+ if self .max_workers > 1 and len (self .workers ) < self .max_workers :
532
+ if self .jobs >= len (self .workers ):
533
+ self ._start_worker ()
534
+
535
+ def _start_worker (self ) -> None :
532
536
prev_size = self ._adjust_stack_size ()
533
- for _ in range (self .num_workers ):
534
- self .workers .append (NewParallel .Worker (self ))
537
+ if self .trace :
538
+ self .trace_message ("Starting new worker thread" )
539
+ self .workers .append (NewParallel .Worker (self ))
535
540
self ._restore_stack_size (prev_size )
536
541
537
542
def _adjust_stack_size (self ):
@@ -680,6 +685,11 @@ def _work(self):
680
685
self .trace_message ("Found task requiring execution" )
681
686
self .state = NewParallel .State .READY
682
687
self .can_search_cv .notify ()
688
+ # This thread will be busy taking care of
689
+ # `execute`ing this task. If we haven't
690
+ # reached the limit, spawn a new thread to
691
+ # turn the crank and find the next task.
692
+ self ._maybe_start_worker ()
683
693
684
694
else :
685
695
# We failed to find a task, so this thread
0 commit comments