@@ -142,11 +142,31 @@ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
142142 self .wakeup_timeout = None
143143
144144 self .threads = []
145- for i in range (self .nr_concurrent_jobs ):
146- self .threads .append (JobThread (self , volume_client , name = "{0}.{1}" .format (self .name_pfx , i )))
147- self .threads [- 1 ].start ()
145+ self .spawn_all_threads ()
148146 self .start ()
149147
148+ def spawn_new_thread (self , suffix ):
149+ t_name = f'{ self .name_pfx } .{ time .time ()} .{ suffix } '
150+ log .debug (f'spawning new thread with name { t_name } ' )
151+ t = JobThread (self , self .vc , name = t_name )
152+ t .start ()
153+
154+ self .threads .append (t )
155+
156+ def spawn_all_threads (self ):
157+ log .debug (f'spawning { self .nr_concurrent_jobs } to execute more jobs '
158+ 'concurrently' )
159+ for i in range (self .nr_concurrent_jobs ):
160+ self .spawn_new_thread (i )
161+
162+ def spawn_more_threads (self ):
163+ c = len (self .threads )
164+ diff = self .nr_concurrent_jobs - c
165+ log .debug (f'spawning { diff } threads to execute more jobs concurrently' )
166+
167+ for i in range (c , self .nr_concurrent_jobs ):
168+ self .spawn_new_thread (i )
169+
150170 def set_wakeup_timeout (self ):
151171 with self .lock :
152172 # not made configurable on purpose
@@ -169,10 +189,7 @@ def run(self):
169189 self .cv .notifyAll ()
170190 elif c < self .nr_concurrent_jobs :
171191 # Increase concurrency: create more threads.
172- log .debug ("creating new threads to job increase" )
173- for i in range (c , self .nr_concurrent_jobs ):
174- self .threads .append (JobThread (self , self .vc , name = "{0}.{1}.{2}" .format (self .name_pfx , time .time (), i )))
175- self .threads [- 1 ].start ()
192+ self .spawn_more_threads ()
176193 self .cv .wait (timeout = self .wakeup_timeout )
177194
178195 def shutdown (self ):
0 commit comments