@@ -2953,64 +2953,6 @@ def test_imap(self):
29532953                    self .assertEqual (next (it ), i  *  i )
29542954                self .assertRaises (StopIteration , it .__next__ )
29552955
2956-     def  test_imap_fast_iterable_with_slow_task (self ):
2957-         if  self .TYPE  !=  "threads" :
2958-             self .skipTest ("test not appropriate for {}" .format (self .TYPE ))
2959- 
2960-         processes  =  4 
2961-         p  =  self .Pool (processes )
2962- 
2963-         tasks_started_later  =  2 
2964-         last_produced_task_arg  =  Value ("i" )
2965- 
2966-         def  produce_args ():
2967-             for  arg  in  range (1 , processes  +  tasks_started_later  +  1 ):
2968-                 last_produced_task_arg .value  =  arg 
2969-                 yield  arg 
2970- 
2971-         it  =  p .imap (functools .partial (sqr , wait = 0.2 ), produce_args ())
2972- 
2973-         next (it )
2974-         time .sleep (0.2 )
2975-         # `iterable` should've been advanced only up by `processes` times, 
2976-         # but in fact advances further (by `>=processes+1`). 
2977-         # In this case, it advances to the maximum value. 
2978-         self .assertGreater (last_produced_task_arg .value , processes  +  1 )
2979- 
2980-         p .terminate ()
2981-         p .join ()
2982- 
2983-     def  test_imap_fast_iterable_with_slow_task_and_buffersize (self ):
2984-         if  self .TYPE  !=  "threads" :
2985-             self .skipTest ("test not appropriate for {}" .format (self .TYPE ))
2986- 
2987-         processes  =  4 
2988-         p  =  self .Pool (processes )
2989- 
2990-         tasks_started_later  =  2 
2991-         last_produced_task_arg  =  Value ("i" )
2992- 
2993-         def  produce_args ():
2994-             for  arg  in  range (1 , processes  +  tasks_started_later  +  1 ):
2995-                 last_produced_task_arg .value  =  arg 
2996-                 yield  arg 
2997- 
2998-         it  =  p .imap (
2999-             functools .partial (sqr , wait = 0.2 ),
3000-             produce_args (),
3001-             buffersize = processes ,
3002-         )
3003- 
3004-         time .sleep (0.2 )
3005-         self .assertEqual (last_produced_task_arg .value , processes )
3006- 
3007-         next (it )
3008-         time .sleep (0.2 )
3009-         self .assertEqual (last_produced_task_arg .value , processes  +  1 )
3010- 
3011-         p .terminate ()
3012-         p .join ()
3013- 
30142956    def  test_imap_handle_iterable_exception (self ):
30152957        if  self .TYPE  ==  'manager' :
30162958            self .skipTest ('test not appropriate for {}' .format (self .TYPE ))
@@ -3101,6 +3043,128 @@ def test_imap_unordered_handle_iterable_exception(self):
31013043                self .assertIn (value , expected_values )
31023044                expected_values .remove (value )
31033045
3046+     def  test_imap_and_imap_unordered_buffersize_type_validation (self ):
3047+         for  method_name  in  ("imap" , "imap_unordered" ):
3048+             for  buffersize  in  ("foo" , 2.0 ):
3049+                 with  (
3050+                     self .subTest (method = method_name , buffersize = buffersize ),
3051+                     self .assertRaisesRegex (
3052+                         TypeError , "buffersize must be an integer or None" 
3053+                     ),
3054+                 ):
3055+                     method  =  getattr (self .pool , method_name )
3056+                     method (str , range (4 ), buffersize = buffersize )
3057+ 
3058+     def  test_imap_and_imap_unordered_buffersize_value_validation (self ):
3059+         for  method_name  in  ("imap" , "imap_unordered" ):
3060+             for  buffersize  in  (0 , - 1 ):
3061+                 with  (
3062+                     self .subTest (method = method_name , buffersize = buffersize ),
3063+                     self .assertRaisesRegex (
3064+                         ValueError , "buffersize must be None or > 0" 
3065+                     ),
3066+                 ):
3067+                     method  =  getattr (self .pool , method_name )
3068+                     method (str , range (4 ), buffersize = buffersize )
3069+ 
3070+     def  test_imap_and_imap_unordered_when_buffer_is_full (self ):
3071+         if  self .TYPE  !=  "threads" :
3072+             self .skipTest ("test not appropriate for {}" .format (self .TYPE ))
3073+ 
3074+         for  method_name  in  ("imap" , "imap_unordered" ):
3075+             with  self .subTest (method = method_name ):
3076+                 processes  =  4 
3077+                 p  =  self .Pool (processes )
3078+                 last_produced_task_arg  =  Value ("i" )
3079+ 
3080+                 def  produce_args ():
3081+                     for  arg  in  itertools .count (1 ):
3082+                         last_produced_task_arg .value  =  arg 
3083+                         yield  arg 
3084+ 
3085+                 method  =  getattr (p , method_name )
3086+                 it  =  method (functools .partial (sqr , wait = 0.2 ), produce_args ())
3087+ 
3088+                 time .sleep (0.2 )
3089+                 # `iterable` could've been advanced only `processes` times, 
3090+                 # but in fact it advances further (`> processes`) because of 
3091+                 # not waiting for workers or user code to catch up. 
3092+                 self .assertGreater (last_produced_task_arg .value , processes )
3093+ 
3094+                 next (it )
3095+                 time .sleep (0.2 )
3096+                 self .assertGreater (last_produced_task_arg .value , processes  +  1 )
3097+ 
3098+                 next (it )
3099+                 time .sleep (0.2 )
3100+                 self .assertGreater (last_produced_task_arg .value , processes  +  2 )
3101+ 
3102+                 p .terminate ()
3103+                 p .join ()
3104+ 
3105+     def  test_imap_and_imap_unordered_buffersize_when_buffer_is_full (self ):
3106+         if  self .TYPE  !=  "threads" :
3107+             self .skipTest ("test not appropriate for {}" .format (self .TYPE ))
3108+ 
3109+         for  method_name  in  ("imap" , "imap_unordered" ):
3110+             with  self .subTest (method = method_name ):
3111+                 processes  =  4 
3112+                 p  =  self .Pool (processes )
3113+                 last_produced_task_arg  =  Value ("i" )
3114+ 
3115+                 def  produce_args ():
3116+                     for  arg  in  itertools .count (1 ):
3117+                         last_produced_task_arg .value  =  arg 
3118+                         yield  arg 
3119+ 
3120+                 method  =  getattr (p , method_name )
3121+                 it  =  method (
3122+                     functools .partial (sqr , wait = 0.2 ),
3123+                     produce_args (),
3124+                     buffersize = processes ,
3125+                 )
3126+ 
3127+                 time .sleep (0.2 )
3128+                 self .assertEqual (last_produced_task_arg .value , processes )
3129+ 
3130+                 next (it )
3131+                 time .sleep (0.2 )
3132+                 self .assertEqual (last_produced_task_arg .value , processes  +  1 )
3133+ 
3134+                 next (it )
3135+                 time .sleep (0.2 )
3136+                 self .assertEqual (last_produced_task_arg .value , processes  +  2 )
3137+ 
3138+                 p .terminate ()
3139+                 p .join ()
3140+ 
3141+     def  test_imap_and_imap_unordered_buffersize_on_infinite_iterable (self ):
3142+         if  self .TYPE  !=  "threads" :
3143+             self .skipTest ("test not appropriate for {}" .format (self .TYPE ))
3144+ 
3145+         for  method_name  in  ("imap" , "imap_unordered" ):
3146+             with  self .subTest (method = method_name ):
3147+                 p  =  self .Pool (4 )
3148+                 method  =  getattr (p , method_name )
3149+ 
3150+                 res  =  method (str , itertools .count (), buffersize = 2 )
3151+ 
3152+                 self .assertEqual (next (res , None ), "0" )
3153+                 self .assertEqual (next (res , None ), "1" )
3154+                 self .assertEqual (next (res , None ), "2" )
3155+ 
3156+                 p .terminate ()
3157+                 p .join ()
3158+ 
3159+     def  test_imap_and_imap_unordered_buffersize_on_empty_iterable (self ):
3160+         for  method_name  in  ("imap" , "imap_unordered" ):
3161+             with  self .subTest (method = method_name ):
3162+                 method  =  getattr (self .pool , method_name )
3163+ 
3164+                 res  =  method (str , [], buffersize = 2 )
3165+ 
3166+                 self .assertIsNone (next (res , None ))
3167+ 
31043168    def  test_make_pool (self ):
31053169        expected_error  =  (RemoteError  if  self .TYPE  ==  'manager' 
31063170                          else  ValueError )
0 commit comments