@@ -389,22 +389,21 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
389389 return self ._map_async (func , iterable , starmapstar , chunksize ,
390390 callback , error_callback )
391391
392- def _guarded_task_generation (self , result_job , func , iterable ,
393- buffersize_sema = None ):
392+ def _guarded_task_generation (self , result_job , func , iterable , sema = None ):
394393 '''Provides a generator of tasks for imap and imap_unordered with
395394 appropriate handling for iterables which throw exceptions during
396395 iteration.'''
397396 try :
398397 i = - 1
398+ enumerated_iter = iter (enumerate (iterable ))
399399
400- if buffersize_sema is None :
401- for i , x in enumerate ( iterable ) :
400+ if sema is None :
401+ for i , x in enumerated_iter :
402402 yield (result_job , i , func , (x ,), {})
403403
404404 else :
405- enumerated_iter = iter (enumerate (iterable ))
406405 while True :
407- buffersize_sema .acquire ()
406+ sema .acquire ()
408407 try :
409408 i , x = next (enumerated_iter )
410409 except StopIteration :
@@ -419,13 +418,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
419418 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
420419 '''
421420 self ._check_running ()
422- if chunksize < 1 :
423- raise ValueError ("Chunksize must be 1+, not {0:n}" .format (chunksize ))
424- if buffersize is not None :
425- if not isinstance (buffersize , int ):
426- raise TypeError ("buffersize must be an integer or None" )
427- if buffersize < 1 :
428- raise ValueError ("buffersize must be None or > 0" )
421+ self ._check_chunksize (chunksize )
422+ self ._check_buffersize (buffersize )
429423
430424 result = IMapIterator (self , buffersize )
431425 if chunksize == 1 :
@@ -441,8 +435,12 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
441435 task_batches = Pool ._get_tasks (func , iterable , chunksize )
442436 self ._taskqueue .put (
443437 (
444- self ._guarded_task_generation (result ._job , mapstar , task_batches ,
445- result ._buffersize_sema ),
438+ self ._guarded_task_generation (
439+ result ._job ,
440+ mapstar ,
441+ task_batches ,
442+ result ._buffersize_sema ,
443+ ),
446444 result ._set_length ,
447445 )
448446 )
@@ -453,15 +451,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
453451 Like `imap()` method but ordering of results is arbitrary.
454452 '''
455453 self ._check_running ()
456- if chunksize < 1 :
457- raise ValueError (
458- "Chunksize must be 1+, not {0!r}" .format (chunksize )
459- )
460- if buffersize is not None :
461- if not isinstance (buffersize , int ):
462- raise TypeError ("buffersize must be an integer or None" )
463- if buffersize < 1 :
464- raise ValueError ("buffersize must be None or > 0" )
454+ self ._check_chunksize (chunksize )
455+ self ._check_buffersize (buffersize )
465456
466457 result = IMapUnorderedIterator (self , buffersize )
467458 if chunksize == 1 :
@@ -477,8 +468,12 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
477468 task_batches = Pool ._get_tasks (func , iterable , chunksize )
478469 self ._taskqueue .put (
479470 (
480- self ._guarded_task_generation (result ._job , mapstar , task_batches ,
481- result ._buffersize_sema ),
471+ self ._guarded_task_generation (
472+ result ._job ,
473+ mapstar ,
474+ task_batches ,
475+ result ._buffersize_sema ,
476+ ),
482477 result ._set_length ,
483478 )
484479 )
@@ -531,6 +526,22 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
531526 )
532527 return result
533528
529+ @staticmethod
530+ def _check_chunksize (chunksize ):
531+ if chunksize < 1 :
532+ raise ValueError (
533+ "Chunksize must be 1+, not {0:n}" .format (chunksize )
534+ )
535+
536+ @staticmethod
537+ def _check_buffersize (buffersize ):
538+ if buffersize is None :
539+ return
540+ if not isinstance (buffersize , int ):
541+ raise TypeError ("buffersize must be an integer or None" )
542+ if buffersize < 1 :
543+ raise ValueError ("buffersize must be None or > 0" )
544+
534545 @staticmethod
535546 def _wait_for_updates (sentinels , change_notifier , timeout = None ):
536547 wait (sentinels , timeout = timeout )
@@ -876,7 +887,8 @@ def _set(self, i, success_result):
876887#
877888
878889class IMapIterator (object ):
879- def __init__ (self , pool , buffersize ):
890+
891+ def __init__ (self , pool , buffersize = None ):
880892 self ._pool = pool
881893 self ._cond = threading .Condition (threading .Lock ())
882894 self ._job = next (job_counter )
0 commit comments